From aba7e9b77943732df338b43016f6e12e4be2d31c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96man?= Date: Sat, 26 Jan 2008 12:15:34 +0000 Subject: [PATCH] Split streaming control from transport stream muxer. While at it, improve the TS muxer quite a bit. Not perfect yet, but much better. --- Makefile | 4 +- htmlui.c | 25 +- htsclient.c | 2 +- iptv_output.c | 10 +- mux.c | 199 +++++++ mux.h | 31 ++ psi.c | 25 +- psi.h | 4 +- pvr.c | 12 +- rtp.c | 2 +- rtp.h | 2 +- rtsp.c | 3 +- tsmux.c | 1365 ++++++++++++++++++------------------------------- tsmux.h | 48 +- tvhead.h | 75 +-- 15 files changed, 840 insertions(+), 967 deletions(-) create mode 100644 mux.c create mode 100644 mux.h diff --git a/Makefile b/Makefile index 4a59c6b6..baf25b80 100644 --- a/Makefile +++ b/Makefile @@ -1,8 +1,8 @@ -include ../config.mak SRCS = main.c dispatch.c channels.c transports.c teletext.c psi.c \ - subscriptions.c tsmux.c tsdemux.c pes.c buffer.c tcp.c \ - resolver.c + subscriptions.c mux.c tsdemux.c pes.c buffer.c tcp.c \ + resolver.c tsmux.c SRCS += http.c htmlui.c diff --git a/htmlui.c b/htmlui.c index 9aabc455..203f43a2 100644 --- a/htmlui.c +++ b/htmlui.c @@ -913,8 +913,6 @@ page_status(http_connection_t *hc, const char *remain, void *opaque) th_dvb_mux_instance_t *tdmi; th_stream_t *st; const char *txt, *t1, *t2; - th_muxer_t *tm; - th_muxstream_t *tms; char tmptxt[100]; if(!html_verify_access(hc, "system-status")) @@ -1204,27 +1202,8 @@ page_status(http_connection_t *hc, const char *remain, void *opaque) tcp_qprintf(&tq, "Using transport \"%s\"
", t->tht_name); - - if((tm = s->ths_muxer) != NULL) { - int64_t i64min = INT64_MAX; - int64_t i64max = INT64_MIN; - - LIST_FOREACH(tms, &tm->tm_media_streams, tms_muxer_media_link) { - if(tms->tms_curpkt == NULL) - continue; /* stream is currently stale */ - - if(tms->tms_nextblock < i64min) - i64min = tms->tms_nextblock; - - if(tms->tms_nextblock > i64max) - i64max = tms->tms_nextblock; - } - - tcp_qprintf(&tq, - "Internal stream delta: %lld us
", - i64max - i64min); - } - } + + } tcp_qprintf(&tq, ""); box_bottom(&tq); tcp_qprintf(&tq, "
"); diff --git a/htsclient.c b/htsclient.c index 9cd2d779..1f270037 100644 --- a/htsclient.c +++ b/htsclient.c @@ -410,7 +410,7 @@ client_subscription_callback(struct th_subscription *s, case TRANSPORT_AVAILABLE: assert(c->c_muxer == NULL); c->c_muxer = ts_muxer_init(s, client_output_ts, c, - TM_HTSCLIENTMODE | TM_SEEKABLE); + TS_HTSCLIENT | TS_SEEK); ts_muxer_play(c->c_muxer, 0); break; diff --git a/iptv_output.c b/iptv_output.c index ca119b1f..0bcba4b5 100644 --- a/iptv_output.c +++ b/iptv_output.c @@ -40,7 +40,7 @@ #define MULTICAST_PKT_SIZ (188 * 7) typedef struct output_multicast { - th_muxer_t *om_muxer; + ts_muxer_t *om_muxer; int om_fd; struct sockaddr_in om_dst; @@ -61,8 +61,8 @@ typedef struct output_multicast { * Output MPEG TS */ void -iptv_output_ts(void *opaque, th_subscription_t *s, - uint8_t *pkt, int blocks, int64_t pcr) +iptv_output_ts(void *opaque, th_subscription_t *s, uint8_t *pkt, + int blocks, int64_t pcr) { output_multicast_t *om = opaque; @@ -101,8 +101,8 @@ iptv_subscription_callback(struct th_subscription *s, switch(event) { case TRANSPORT_AVAILABLE: assert(om->om_muxer == NULL); - om->om_muxer = ts_muxer_init(s, iptv_output_ts, om, 0); - om->om_muxer->tm_drop_rate = om->om_intra_drop_rate; + om->om_muxer = ts_muxer_init(s, iptv_output_ts, om, TS_SEEK); + // om->om_muxer->tm_drop_rate = om->om_intra_drop_rate; ts_muxer_play(om->om_muxer, 0); break; diff --git a/mux.c b/mux.c new file mode 100644 index 00000000..73b9bc4f --- /dev/null +++ b/mux.c @@ -0,0 +1,199 @@ +/* + * tvheadend, Muxing + * Copyright (C) 2007 Andreas Öman + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +#define _GNU_SOURCE +#include + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include + +#include "tvhead.h" +#include "dispatch.h" +#include "teletext.h" +#include "transports.h" +#include "subscriptions.h" +#include "pes.h" +#include "psi.h" +#include "buffer.h" + + + + +/* + * pause playback + */ +void +muxer_pause(th_muxer_t *tm) +{ +} + + + + +/* + * playback start + */ +void +muxer_play(th_muxer_t *tm, int64_t toffset) +{ + th_subscription_t *s = tm->tm_subscription; + + if(!tm->tm_linked) + LIST_INSERT_HEAD(&s->ths_transport->tht_muxers, tm, tm_transport_link); + + if(toffset == AV_NOPTS_VALUE) { + /* continue from last playback */ + tm->tm_offset = 0; + } else { + tm->tm_offset = toffset; + } + tm->tm_status = TM_PLAY; +} + +/** + * + */ +static void +mux_new_packet_for_stream(th_muxer_t *tm, th_muxstream_t *tms, th_pkt_t *pkt) +{ + if(tm->tm_offset == 0) { + /* Direct playback, pass it on at once */ + tm->tm_output(tm->tm_opaque, tms, pkt); + return; + } +} + + + +/** + * + */ +static void +mux_new_packet(th_muxer_t *tm, th_stream_t *st, th_pkt_t *pkt) +{ + th_muxstream_t *tms; + + pkt_store(pkt); /* need to keep packet around */ + + switch(tm->tm_status) { + case TM_IDLE: + break; + + case TM_WAITING_FOR_LOCK: + break; + + case TM_PLAY: + LIST_FOREACH(tms, &tm->tm_streams, tms_muxer_link0) { + if(tms->tms_stream == st) { + mux_new_packet_for_stream(tm, tms, pkt); + break; + } + } + break; + + case TM_PAUSE: + break; + } +} + + + + + +/* + * TS Muxer + */ +th_muxer_t * +muxer_init(th_subscription_t *s, th_mux_output_t *cb, void *opaque, + int flags) +{ + th_transport_t *t = s->ths_transport; + th_stream_t *st; + th_muxer_t *tm; + th_muxstream_t *tms; + + printf("muxer init!\n"); + + tm = calloc(1, sizeof(th_muxer_t)); + tm->tm_subscription = s; + + tm->tm_output = cb; + tm->tm_opaque = opaque; + tm->tm_new_pkt = mux_new_packet; + + + LIST_FOREACH(st, &t->tht_streams, st_link) { + tms = calloc(1, sizeof(th_muxstream_t)); + tms->tms_muxer = tm; + tms->tms_stream = st; + + + LIST_INSERT_HEAD(&tm->tm_streams, tms, tms_muxer_link0); + TAILQ_INIT(&tms->tms_metaqueue); + } + + s->ths_muxer = tm; + return tm; +} + + +/* + * + */ +static void +tms_destroy(th_muxstream_t *tms) +{ + LIST_REMOVE(tms, tms_muxer_link0); + + // dtimer_disarm(&tms->tms_timer); + free(tms); +} + + +/* + * + */ +void +muxer_deinit(th_muxer_t *tm, th_subscription_t *s) +{ + th_muxstream_t *tms; + + s->ths_raw_input = NULL; + s->ths_muxer = NULL; + + if(tm->tm_linked) + LIST_INSERT_HEAD(&s->ths_transport->tht_muxers, tm, tm_transport_link); + + while((tms = LIST_FIRST(&tm->tm_streams)) != NULL) + tms_destroy(tms); + + free(tm); +} + diff --git a/mux.h b/mux.h new file mode 100644 index 00000000..4a9edc06 --- /dev/null +++ b/mux.h @@ -0,0 +1,31 @@ +/* + * tvheadend, Stream muxer + * Copyright (C) 2007 Andreas Öman + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#ifndef MUX_H +#define MUX_H + +th_muxer_t *muxer_init(th_subscription_t *s, th_mux_output_t *cb, + void *opaque); + +void muxer_deinit(th_muxer_t *tm, th_subscription_t *s); + +void muxer_play(th_muxer_t *tm, int64_t toffset); + +void muxer_pause(th_muxer_t *tm); + +#endif /* MUX_H */ diff --git a/psi.c b/psi.c index 432a2f04..0be811c3 100644 --- a/psi.c +++ b/psi.c @@ -131,7 +131,7 @@ psi_append_crc32(uint8_t *buf, int offset, int maxlen) */ int -psi_build_pat(th_transport_t *t, uint8_t *buf, int maxlen) +psi_build_pat(th_transport_t *t, uint8_t *buf, int maxlen, int pmtpid) { if(maxlen < 12) return -1; @@ -150,8 +150,8 @@ psi_build_pat(th_transport_t *t, uint8_t *buf, int maxlen) buf[8] = 0; /* Program number, we only have one program */ buf[9] = 1; - buf[10] = 0; /* PMT pid */ - buf[11] = 100; + buf[10] = 0xe0 | (pmtpid >> 8); + buf[11] = pmtpid; return psi_append_crc32(buf, 12, maxlen); } @@ -292,7 +292,7 @@ psi_parse_pmt(th_transport_t *t, uint8_t *ptr, int len, int chksvcid) */ int -psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen) +psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen, int pcrpid) { th_stream_t *st; th_muxstream_t *tms; @@ -314,19 +314,8 @@ psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen) buf[6] = 0; buf[7] = 0; - /* Find PID that carries PCR */ - - LIST_FOREACH(tms, &tm->tm_media_streams, tms_muxer_media_link) - if(tms->tms_dopcr) - break; - - if(tms == NULL) { - buf[8] = 0xff; - buf[9] = 0xff; - } else { - buf[8] = 0xe0 | (tms->tms_index >> 8); - buf[9] = tms->tms_index; - } + buf[8] = 0xe0 | (pcrpid >> 8); + buf[9] = pcrpid; buf[10] = 0xf0; /* Program info length */ buf[11] = 0x00; /* We dont have any such things atm */ @@ -334,7 +323,7 @@ psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen) buf += 12; tlen = 12; - LIST_FOREACH(tms, &tm->tm_media_streams, tms_muxer_media_link) { + LIST_FOREACH(tms, &tm->tm_streams, tms_muxer_link0) { st = tms->tms_stream; switch(st->st_type) { diff --git a/psi.h b/psi.h index 751e9437..13b6d903 100644 --- a/psi.h +++ b/psi.h @@ -37,9 +37,9 @@ int psi_parse_pmt(th_transport_t *t, uint8_t *ptr, int len, int chksvcid); uint32_t psi_crc32(uint8_t *data, size_t datalen); -int psi_build_pat(th_transport_t *t, uint8_t *buf, int maxlen); +int psi_build_pat(th_transport_t *t, uint8_t *buf, int maxlen, int pmtpid); -int psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen); +int psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen, int pcrpid); const char *psi_caid2name(uint16_t caid); diff --git a/pvr.c b/pvr.c index 39359c99..50cbed56 100644 --- a/pvr.c +++ b/pvr.c @@ -787,7 +787,7 @@ pvrr_transport_available(pvr_rec_t *pvrr, th_transport_t *t) tms = calloc(1, sizeof(th_muxstream_t)); tms->tms_stream = st; - LIST_INSERT_HEAD(&tm->tm_media_streams, tms, tms_muxer_media_link); + LIST_INSERT_HEAD(&tm->tm_streams, tms, tms_muxer_link0); tms->tms_avstream = av_mallocz(sizeof(AVStream)); @@ -855,8 +855,8 @@ pvrr_transport_unavailable(pvr_rec_t *pvrr, th_transport_t *t) /* Destroy muxstreams */ - while((tms = LIST_FIRST(&tm->tm_media_streams)) != NULL) { - LIST_REMOVE(tms, tms_muxer_media_link); + while((tms = LIST_FIRST(&tm->tm_streams)) != NULL) { + LIST_REMOVE(tms, tms_muxer_link0); free(tms); } @@ -965,7 +965,7 @@ is_all_decoded(th_muxer_t *tm, enum CodecType type) th_muxstream_t *tms; AVStream *st; - LIST_FOREACH(tms, &tm->tm_media_streams, tms_muxer_media_link) { + LIST_FOREACH(tms, &tm->tm_streams, tms_muxer_link0) { st = tms->tms_avstream; if(st->codec->codec->type == type && tms->tms_decoded == 0) return 0; @@ -994,7 +994,7 @@ pvrr_record_packet(pvr_rec_t *pvrr, th_pkt_t *pkt) char txt[100]; size_t bufsize; - LIST_FOREACH(tms, &tm->tm_media_streams, tms_muxer_media_link) + LIST_FOREACH(tms, &tm->tm_streams, tms_muxer_link0) if(tms->tms_stream == pkt->pkt_stream) break; @@ -1108,7 +1108,7 @@ pvrr_record_packet(pvr_rec_t *pvrr, th_pkt_t *pkt) if(pkt->pkt_commercial != COMMERCIAL_YES) { - LIST_FOREACH(tms, &tm->tm_media_streams, tms_muxer_media_link) + LIST_FOREACH(tms, &tm->tm_streams, tms_muxer_link0) tms->tms_decoded = 0; pvrr_set_rec_state(pvrr, PVR_REC_WAIT_AUDIO_LOCK); diff --git a/rtp.c b/rtp.c index 45a952cb..77e832a2 100644 --- a/rtp.c +++ b/rtp.c @@ -82,7 +82,7 @@ rtp_sendmsg(uint8_t *pkt, int blocks, int64_t pcr, void -rtp_output_ts(void *opaque, struct th_subscription *s, +rtp_output_ts(void *opaque, th_subscription_t *s, uint8_t *pkt, int blocks, int64_t pcr) { th_rtp_streamer_t *trs = opaque; diff --git a/rtp.h b/rtp.h index ef49b1f7..ee084c8d 100644 --- a/rtp.h +++ b/rtp.h @@ -29,7 +29,7 @@ typedef struct th_rtp_streamer { void rtp_streamer_init(th_rtp_streamer_t *trs, int fd, struct sockaddr_in *dst); -void rtp_output_ts(void *opaque, struct th_subscription *s, +void rtp_output_ts(void *opaque, th_subscription_t *s, uint8_t *pkt, int blocks, int64_t pcr); int rtp_sendmsg(uint8_t *pkt, int blocks, int64_t pcr, diff --git a/rtsp.c b/rtsp.c index 891385d6..d08adc81 100644 --- a/rtsp.c +++ b/rtsp.c @@ -123,7 +123,8 @@ rtsp_subscription_callback(struct th_subscription *s, switch(event) { case TRANSPORT_AVAILABLE: assert(rs->rs_muxer == NULL); - rs->rs_muxer = ts_muxer_init(s, rtp_output_ts, &rs->rs_rtp_streamer, 0); + rs->rs_muxer = ts_muxer_init(s, rtp_output_ts, &rs->rs_rtp_streamer, + TS_SEEK); break; case TRANSPORT_UNAVAILABLE: diff --git a/tsmux.c b/tsmux.c index 2125baab..41c79329 100644 --- a/tsmux.c +++ b/tsmux.c @@ -1,5 +1,5 @@ /* - * tvheadend, MPEG transport stream functions + * tvheadend, MPEG Transport stream muxer * Copyright (C) 2007 Andreas Öman * * This program is free software: you can redistribute it and/or modify @@ -32,13 +32,8 @@ #include #include -#include -#include - #include -#include - #include "tvhead.h" #include "dispatch.h" #include "teletext.h" @@ -47,619 +42,79 @@ #include "pes.h" #include "psi.h" #include "buffer.h" +#include "mux.h" #include "tsmux.h" -#define PES_HEADER_SIZE 19 +#define TS_LOOKAHEAD 500000 -static void ts_muxer_relock(th_muxer_t *tm, uint64_t pcr); - - -/* - * Return the deadline for a given packet - */ -static int64_t -muxer_pkt_deadline(th_pkt_t *pkt, th_muxstream_t *tms) -{ - th_stream_t *st = pkt->pkt_stream; - int64_t r; - - r = pkt->pkt_dts; - if(st != NULL) - r -= st->st_peak_presentation_delay; - - r -= tms->tms_mux_offset; - return r; -} - - -/* - * - */ -static void -tms_set_curpkt(th_muxstream_t *tms, th_pkt_t *pkt) -{ - if(tms->tms_curpkt != NULL) - pkt_deref(tms->tms_curpkt); - - tms->tms_offset = 0; - - if(pkt != NULL) { - tms->tms_curpkt = pkt_ref(pkt); - } else { - tms->tms_curpkt = NULL; - } -} - -/* - * - */ -static void -tms_stream_set_stale(th_muxer_t *tm, th_muxstream_t *tms, int64_t pcr) -{ - tms->tms_nextblock = INT64_MAX; - tms->tms_staletime = pcr; - LIST_REMOVE(tms, tms_muxer_link); - LIST_INSERT_HEAD(&tm->tm_stale_streams, tms, tms_muxer_link); - tms_set_curpkt(tms, NULL); -} - -/* - * - */ -static void -tms_stream_add_meta(th_muxer_t *tm, th_muxstream_t *tms, th_pkt_t *pkt) -{ - LIST_REMOVE(tms, tms_muxer_link); - LIST_INSERT_HEAD(&tm->tm_active_streams, tms, tms_muxer_link); - tms_set_curpkt(tms, pkt); - tms->tms_block_interval = 0; - tms->tms_nextblock = 0; -} - -/* - * - */ -static void -tms_stream_set_active(th_muxer_t *tm, th_muxstream_t *tms, th_pkt_t *pkt, - int64_t pcr) -{ - int l, dt; - int64_t dl; - - assert(pkt->pkt_duration > 0); - - tms->tms_nextblock = pcr; - dl = muxer_pkt_deadline(pkt, tms); - dt = dl - pcr; - l = (pkt_len(pkt) + PES_HEADER_SIZE) / 188; - - tms->tms_dl = dl; - tms->tms_block_interval = l == 0 ? 1 : dt / l; - - if(tms->tms_block_interval < 10) - tms->tms_block_interval = 10; - - LIST_REMOVE(tms, tms_muxer_link); - LIST_INSERT_HEAD(&tm->tm_active_streams, tms, tms_muxer_link); - tms_set_curpkt(tms, pkt); - tms->tms_blockcnt = 0; -} - - - - - -/* - * Generates a 188 bytes TS packet in 'tsb' - */ -static int -ts_make_pkt(th_muxer_t *tm, th_muxstream_t *tms, uint8_t *tsb, int64_t pcr) -{ - uint8_t *tsb0 = tsb; - th_pkt_t *pkt = tms->tms_curpkt; - uint8_t *src; - int tsrem; /* Remaining bytes in tspacket */ - int frrem; /* Remaining bytes in frame */ - int64_t ts; - int pad, cc, len; - uint16_t u16; - int is_section = pkt->pkt_stream == NULL; - int header_len = 0; - int dumppkt = 0; - AVRational mpeg_tc = {1, 90000}; - - if(pkt_payload(pkt) == NULL) - return -1; - - frrem = pkt_len(pkt) - tms->tms_offset; - assert(frrem > 0); - - cc = tms->tms_cc++; - tsrem = 184; - - if(tms->tms_offset == 0) { - /* When writing the packet header, shave of a bit of available - payload size */ - if(is_section) { - header_len = 1; - } else { - header_len = PES_HEADER_SIZE; - } - tsrem -= header_len; - } - - if(tm->tm_flags & TM_HTSCLIENTMODE) { - /* UGLY */ - if(tms->tms_stream) - *tsb++ = tms->tms_stream->st_type; - else - *tsb++ = 0xff; - - } else { - /* TS marker */ - *tsb++ = 0x47; - } - - /* Write PID and optionally payload unit start indicator */ - *tsb++ = tms->tms_index >> 8 | (tms->tms_offset ? 0 : 0x40); - *tsb++ = tms->tms_index; - - - - /* Compute amout of padding needed */ - pad = tsrem - frrem; - - - if(pcr != AV_NOPTS_VALUE) { - - /* Insert PCR */ - - tsrem -= 8; - pad -= 8; - if(pad < 0) - pad = 0; - - *tsb++ = (cc & 0xf) | 0x30; - *tsb++ = 7 + pad; - *tsb++ = 0x10; /* PCR flag */ - - ts = av_rescale_q(pcr, AV_TIME_BASE_Q, mpeg_tc); - *tsb++ = ts >> 25; - *tsb++ = ts >> 17; - *tsb++ = ts >> 9; - *tsb++ = ts >> 1; - *tsb++ = (ts & 1) << 7; - *tsb++ = 0; - - memset(tsb, 0xff, pad); - tsb += pad; - tsrem -= pad; - - } else if(pad > 0) { - /* Must pad TS packet */ - - *tsb++ = (cc & 0xf) | 0x30; - tsrem -= pad; - *tsb++ = --pad; - - memset(tsb, 0x00, pad); - tsb += pad; - } else { - *tsb++ = (cc & 0xf) | 0x10; - } - - if(tms->tms_offset == 0) { - if(!is_section) { - - /* First TS packet for this frame, write PES headers */ - - /* Write startcode */ - - *tsb++ = 0; - *tsb++ = 0; - *tsb++ = tms->tms_sc >> 8; - *tsb++ = tms->tms_sc; - - /* Write total frame length (without accounting for startcode and - length field itself */ - - len = pkt_len(pkt) + header_len - 6; - - if(len > 65535) { - /* It's okay to write len as 0 in transport streams, - but only for video frames, and i dont expect any of the - audio frames to exceed 64k - */ - len = 0; - } - - *tsb++ = len >> 8; - *tsb++ = len; - - *tsb++ = 0x80; /* MPEG2 */ - *tsb++ = 0xc0; /* pts & dts is present */ - *tsb++ = 10; /* length of header (pts & dts) */ - - /* Write PTS */ - - ts = av_rescale_q(pkt->pkt_pts, AV_TIME_BASE_Q, mpeg_tc); - *tsb++ = (((ts >> 30) & 7) << 1) | 1; - u16 = (((ts >> 15) & 0x7fff) << 1) | 1; - *tsb++ = u16 >> 8; - *tsb++ = u16; - u16 = ((ts & 0x7fff) << 1) | 1; - *tsb++ = u16 >> 8; - *tsb++ = u16; - - /* Write DTS */ - - ts = av_rescale_q(pkt->pkt_dts, AV_TIME_BASE_Q, mpeg_tc); - *tsb++ = (((ts >> 30) & 7) << 1) | 1; - u16 = (((ts >> 15) & 0x7fff) << 1) | 1; - *tsb++ = u16 >> 8; - *tsb++ = u16; - u16 = ((ts & 0x7fff) << 1) | 1; - *tsb++ = u16 >> 8; - *tsb++ = u16; - } else { - *tsb++ = 0; /* Pointer field for tables */ - } - } - - /* Fill rest of TS packet with payload */ - - src = pkt_payload(pkt) + tms->tms_offset; - memcpy(tsb, src, tsrem); - - tms->tms_offset += tsrem; - - if(dumppkt) { - int i; - for(i = 0; i < 188; i++) - printf("%02x%c", tsb0[i], " \n"[i & 0xf]); - printf("\n"); - } - - assert(tsb0 + 188 == tsb + tsrem); - return 0; -} - - -/* - * - */ -int -ts_mux_packet(th_muxer_t *tm, int64_t pcr, uint8_t *outbuf, int maxblocks) -{ - th_muxstream_t *tms, *t; - th_pkt_t *pkt; - int rem, i; - int64_t pcr1; - - LIST_FOREACH(tms, &tm->tm_active_streams, tms_muxer_link) { - if(tms->tms_nextblock < pcr) - tms->tms_nextblock = pcr; - - if(tms->tms_curpkt != NULL) - pkt_load(tms->tms_curpkt); - } - - for(i = 0; i < maxblocks; i++) { - - /* Find the stream with the lowest/closest time for next scheduled - transport stream block */ - - tms = NULL; - LIST_FOREACH(t, &tm->tm_active_streams, tms_muxer_link) - if(tms == NULL || t->tms_nextblock < tms->tms_nextblock) - tms = t; - - if(tms == NULL) - break; /* No active streams, cannot do anything */ - - - pkt = tms->tms_curpkt; - /* Do we need to send a new PCR update? */ - - if(tms->tms_dopcr && tms->tms_nextpcr <= pcr) { - pcr1 = pcr + 200000; - tms->tms_nextpcr = pcr + 20000; - } else { - pcr1 = AV_NOPTS_VALUE; - } - - /* Generate a transport stream packet */ - - if(ts_make_pkt(tm, tms, outbuf, pcr1) == 0) { - rem = pkt_len(pkt) - tms->tms_offset; - - /* Periodic drop (for testing purposes) */ - - if(tm->tm_drop_rate && i != 0 && i != maxblocks - 1 && - tms->tms_stream && - (tms->tms_stream->st_type == HTSTV_H264 || - tms->tms_stream->st_type == HTSTV_MPEG2VIDEO) && - ++tm->tm_drop_cnt == tm->tm_drop_rate) { - tm->tm_drop_cnt = 0; - i--; - } else { - outbuf += 188; - } - - } else { - i--; - rem = 0; - } - - if(rem == 0) { - /* End of frame, find next */ - while(1) { - pkt = pkt->pkt_on_stream_queue ? - TAILQ_NEXT(pkt, pkt_queue_link) : NULL; - if(pkt != NULL) { - /* Ok, reset counters */ - tms_stream_set_active(tm, tms, pkt, pcr); - pkt_load(tms->tms_curpkt); - - } else { - /* This stream cannot contribute, move it to stale list */ - tms_stream_set_stale(tm, tms, pcr); - } - break; - } - continue; - } - tms->tms_nextblock += tms->tms_block_interval; - tms->tms_blockcnt++; -#if 0 - if(tms->tms_stream) - printf("%-10s %lld [seg: %d] pcr=%lld dl=%lld nxt=%lld off=%d d=%lld\n", - htstvstreamtype2txt(tms->tms_stream->st_type), - pkt->pkt_dts, tms->tms_blockcnt, pcr, tms->tms_dl, - tms->tms_nextblock, tms->tms_offset, - tms->tms_nextblock - pcr); -#endif - } - return i; -} - -/* - * - */ -void -tm_gen_pat_pmt(th_muxer_t *tm, int64_t pcr) -{ - th_subscription_t *s = tm->tm_subscription; - th_transport_t *t = s->ths_transport; - th_pkt_t *pkt; - - pkt = pkt_alloc(NULL, 4096, pcr, pcr); - pkt->pkt_payloadlen = psi_build_pmt(tm, pkt_payload(pkt), 4096); - tms_stream_add_meta(tm, tm->tm_pmt, pkt); - pkt_deref(pkt); - - pkt = pkt_alloc(NULL, 4096, pcr, pcr); - pkt->pkt_payloadlen = psi_build_pat(t, pkt_payload(pkt), 4096); - tms_stream_add_meta(tm, tm->tm_pat, pkt); - pkt_deref(pkt); -} - - - -static void ts_gen_timer_callback(void *aux, int64_t now); - -/* - * - */ -void -ts_gen_packet(th_muxer_t *tm, int64_t clk) -{ - int64_t pcr, next; - th_muxstream_t *tms; - int r; - - dtimer_disarm(&tm->tm_timer); - - pcr = tm->tm_start_dts + clk - tm->tm_clockref; - - do { - if(pcr >= tm->tm_next_pat) { - tm->tm_next_pat = pcr + 100000; - tm_gen_pat_pmt(tm, pcr); - } - - r = ts_mux_packet(tm, pcr, tm->tm_packet, tm->tm_blocks_per_packet); - if(r == -1) { - ts_muxer_relock(tm, pcr); - return; - } - - tm->tm_callback(tm->tm_opaque, tm->tm_subscription, tm->tm_packet, r, pcr); - - /* Figure when next packet must be sent */ - next = INT64_MAX; - LIST_FOREACH(tms, &tm->tm_active_streams, tms_muxer_link) - if(tms->tms_nextblock < next) - next = tms->tms_nextblock; - - next = next - pcr; - } while(next < 2000); - - if(next > 100000) - next = 100000; /* We always want to send PAT/PMT at this interval */ - - dtimer_arm_hires(&tm->tm_timer, ts_gen_timer_callback, tm, clk + next); -} - -/* - * - */ -static void -ts_gen_timer_callback(void *aux, int64_t now) -{ - th_muxer_t *tm = aux; - ts_gen_packet(tm, now); -} - - -/* - * start demuxing - */ -static int -ts_muxer_start(th_muxer_t *tm) -{ - th_muxstream_t *tms; - th_pkt_t *f, *l, *pkt; - th_stream_t *st; - int64_t v; - - LIST_FOREACH(tms, &tm->tm_stopped_streams, tms_muxer_link) { - st = tms->tms_stream; - if(st == NULL) - continue; - - f = TAILQ_FIRST(&st->st_pktq); - l = TAILQ_LAST(&st->st_pktq, th_pkt_queue); - - if(f == NULL) - return -1; - - v = muxer_pkt_deadline(f, tms) - f->pkt_duration; - - if(tm->tm_start_dts < v) { - tm->tm_start_dts = v; - tms->tms_tmppkt = f; - continue; - } - - v = muxer_pkt_deadline(l, tms); - - if(tm->tm_start_dts > v) { - tm->tm_start_dts = v - l->pkt_duration; - tms->tms_tmppkt = l; - continue; - } - - tms->tms_tmppkt = NULL; - - while(f != NULL || l != NULL) { - - if(f != NULL) - f = TAILQ_NEXT(f, pkt_queue_link); - - if(f != NULL) { - v = muxer_pkt_deadline(f, tms); - if(tm->tm_start_dts >= v - f->pkt_duration && tm->tm_start_dts < v) { - tms->tms_tmppkt = f; - break; - } - } - if(l != NULL) - l = TAILQ_PREV(l, th_pkt_queue, pkt_queue_link); - - if(l != NULL) { - v = muxer_pkt_deadline(l, tms); - if(tm->tm_start_dts >= v - l->pkt_duration && tm->tm_start_dts < v) { - tms->tms_tmppkt = l; - break; - } - } - } - - if(tms->tms_tmppkt == NULL) - return -1; - } - - /* All streams have a lock */ - - tm->tm_status = TM_PLAY; - - while((tms = LIST_FIRST(&tm->tm_stopped_streams)) != NULL) { - st = tms->tms_stream; - pkt = tms->tms_tmppkt; - - if(st == NULL) { - LIST_REMOVE(tms, tms_muxer_link); - LIST_INSERT_HEAD(&tm->tm_active_streams, tms, tms_muxer_link); - } else { - pkt_load(pkt); - - tms_stream_set_active(tm, tms, pkt, tm->tm_start_dts); - } - } - - tm->tm_clockref = getclock_hires(); - tm_gen_pat_pmt(tm, 0); - ts_gen_packet(tm, tm->tm_clockref); - return 0; -} - - -/* - * - */ -static void -ts_muxer_reinit_stream(th_muxer_t *tm, th_muxstream_t *tms) -{ - tms->tms_nextblock = INT64_MAX; - LIST_REMOVE(tms, tms_muxer_link); - LIST_INSERT_HEAD(&tm->tm_stopped_streams, tms, tms_muxer_link); - tms_set_curpkt(tms, NULL); -} - -/* - * - */ -static void -ts_muxer_relock(th_muxer_t *tm, uint64_t pcr) -{ - th_muxstream_t *tms; - - while((tms = LIST_FIRST(&tm->tm_active_streams)) != NULL) - ts_muxer_reinit_stream(tm, tms); - - while((tms = LIST_FIRST(&tm->tm_stale_streams)) != NULL) - ts_muxer_reinit_stream(tm, tms); - - tm->tm_start_dts = pcr; - tm->tm_status = TM_WAITING_FOR_LOCK; - ts_muxer_start(tm); -} +static const AVRational mpeg_tc = {1, 90000}; +#define PID_PMT 1000 +#define PID_ES_BASE 2000 /** - * Push a MPEG TS packet to output, used in shortcut mode + * Send current packet */ static void -ts_muxer_raw_push_packet(th_muxer_t *tm, void *data, uint16_t pid) +ts_muxer_send_packet(ts_muxer_t *ts) +{ + if(ts->ts_block == 0) + return; + + ts->ts_output(ts->ts_output_opaque, ts->ts_subscription, ts->ts_packet, + ts->ts_block, 0); + ts->ts_block = 0; +} + +/** + * Push a MPEG TS packet to output + */ +static void +ts_muxer_add_packet(ts_muxer_t *ts, void *data, uint16_t pid) { uint8_t *tsb; - tsb = tm->tm_packet + tm->tm_block_offset * 188; - tm->tm_block_offset++; + tsb = ts->ts_packet + ts->ts_block * 188; + ts->ts_block++; memcpy(tsb, data, 188); - /* Set PID */ - tsb[2] = pid; tsb[1] = (tsb[1] & 0xf0) | (pid >> 8); - if(tm->tm_block_offset == tm->tm_blocks_per_packet) { - tm->tm_callback(tm->tm_opaque, tm->tm_subscription, tm->tm_packet, - tm->tm_block_offset, AV_NOPTS_VALUE); - tm->tm_block_offset = 0; - } + if(ts->ts_block == ts->ts_blocks_per_packet) + ts_muxer_send_packet(ts); } +/** + * Raw TS input + */ +static void +ts_muxer_raw_input(struct th_subscription *s, void *data, int len, + th_stream_t *st, void *opaque) +{ + th_muxer_t *tm = s->ths_muxer; + ts_muxer_t *ts = opaque; + th_muxstream_t *tms; + + LIST_FOREACH(tms, &tm->tm_streams, tms_muxer_link0) + if(tms->tms_stream == st) + break; + + if(tms == NULL || tms->tms_index == 0) + return; /* Unknown / non-mapped stream */ + ts_muxer_add_packet(ts, data, tms->tms_index); +} + + + /** * Function for encapsulating a short table into a transport stream packet */ static void -ts_muxer_raw_push_table(th_muxer_t *tm, void *table, int tlen, int cc, int pid) +ts_muxer_build_table(ts_muxer_t *ts, void *table, int tlen, int cc, int pid) { int pad; - uint8_t tsb0[188], *tsb; tsb = tsb0; @@ -675,323 +130,525 @@ ts_muxer_raw_push_table(th_muxer_t *tm, void *table, int tlen, int cc, int pid) *tsb++ = 0; /* Pointer field for tables */ memcpy(tsb, table, tlen); - ts_muxer_raw_push_packet(tm, tsb0, pid); -} - - -/** - * Periodic timer callback for generating PAT/PMT tables when in - * shortcut mode - */ -static void -ts_muxer_raw_table_generator(void *aux, int64_t now) -{ - th_muxer_t *tm = aux; - uint8_t table[180]; - th_muxstream_t *tms; - int l; - - /* rearm timer */ - dtimer_arm_hires(&tm->tm_table_timer, ts_muxer_raw_table_generator, - tm, now + 100000); - - l = psi_build_pat(NULL, table, sizeof(table)); - tms = tm->tm_pat; - tms->tms_cc++; - ts_muxer_raw_push_table(tm, table, l, tms->tms_cc, 0); - - l = psi_build_pmt(tm, table, sizeof(table)); - tms = tm->tm_pmt; - tms->tms_cc++; - ts_muxer_raw_push_table(tm, table, l, tms->tms_cc, 100); + ts_muxer_add_packet(ts, tsb0, pid); } /** - * Shortcutted MPEG TS input - */ -static void -ts_muxer_raw_input(struct th_subscription *s, void *data, int len, - th_stream_t *st, void *opaque) -{ - th_muxer_t *tm = s->ths_muxer; - th_muxstream_t *tms; - - LIST_FOREACH(tms, &tm->tm_media_streams, tms_muxer_media_link) - if(tms->tms_stream == st) - break; - - if(tms == NULL) - return; /* Unknown / non-mapped stream */ - - ts_muxer_raw_push_packet(tm, data, tms->tms_index); -} - - -/* - * pause playback - */ -void -ts_muxer_pause(th_muxer_t *tm) -{ - tm->tm_pauseref = getclock_hires(); - - dtimer_disarm(&tm->tm_timer); - tm->tm_status = TM_PAUSE; -} - - - - -/* - * playback start - */ -void -ts_muxer_play(th_muxer_t *tm, int64_t toffset) -{ - int64_t dts = 0, t; - th_muxstream_t *tms; - th_stream_t *st; - int64_t clk; - th_subscription_t *s = tm->tm_subscription; - - if(!(tm->tm_flags & TM_SEEKABLE) && - s->ths_transport->tht_source_type == THT_MPEG_TS) { - /* We dont need to seek and source is MPEG TS, we can use a - shortcut to avoid remuxing stream */ - - s->ths_raw_input = ts_muxer_raw_input; - dtimer_arm_hires(&tm->tm_table_timer, ts_muxer_raw_table_generator, - tm, getclock_hires() + 100000); - return; - } - - /* Use normal muxer */ - - if(!tm->tm_transport_linked) { - LIST_INSERT_HEAD(&s->ths_transport->tht_muxers, tm, tm_transport_link); - tm->tm_transport_linked = 1; - } - - switch(tm->tm_status) { - case TM_IDLE: - case TM_WAITING_FOR_LOCK: - - toffset = 0; - break; - - case TM_PLAY: - if(toffset == AV_NOPTS_VALUE) - return; - - toffset = 0; - return; /* XXX */ - - case TM_PAUSE: - if(toffset == AV_NOPTS_VALUE) { - /* Just continue stream, adjust clock reference with the amount - of time we was paused */ - clk = getclock_hires(); - t = clk - tm->tm_pauseref; - - tm->tm_clockref += t; - ts_gen_packet(tm, clk); - return; - } - break; - } - - assert(LIST_FIRST(&tm->tm_active_streams) == NULL); - assert(LIST_FIRST(&tm->tm_stale_streams) == NULL); - - LIST_FOREACH(tms, &tm->tm_stopped_streams, tms_muxer_link) { - if((st = tms->tms_stream) == NULL) - continue; - if(st->st_last_dts > dts) - dts = st->st_last_dts; - } - - dts -= toffset; - if(dts < 0) - dts = 0; - - tm->tm_start_dts = dts; - tm->tm_status = TM_WAITING_FOR_LOCK; - - ts_muxer_start(tm); -} - - -/* - * Meta streams - */ -static th_muxstream_t * -tm_create_meta_stream(th_muxer_t *tm, int pid) -{ - th_muxstream_t *tms; - - tms = calloc(1, sizeof(th_muxstream_t)); - tms->tms_muxer = tm; - LIST_INSERT_HEAD(&tm->tm_stopped_streams, tms, tms_muxer_link); - tms->tms_index = pid; - return tms; -} - - - -/* * */ static void -ts_encode_new_packet(th_muxer_t *tm, th_stream_t *st, th_pkt_t *pkt) +ts_muxer_generate_tables(void *aux, int64_t now) +{ + ts_muxer_t *ts = aux; + th_muxer_t *tm = ts->ts_muxer; + uint8_t table[180]; + int l; + + /* rearm timer */ + dtimer_arm_hires(&ts->ts_patpmt_timer, ts_muxer_generate_tables, + ts, now + 100000); + + l = psi_build_pat(NULL, table, sizeof(table), PID_PMT); + ts_muxer_build_table(ts, table, l, ts->ts_pat_cc, 0); + ts->ts_pat_cc++; + + l = psi_build_pmt(tm, table, sizeof(table), ts->ts_pcrpid); + ts_muxer_build_table(ts, table, l, ts->ts_pmt_cc, PID_PMT); + ts->ts_pmt_cc++; + + ts_muxer_send_packet(ts); +} + +/** + * + */ +static int64_t +get_delay(th_muxstream_t *tms) +{ + th_metapkt_t *f, *l; + + f = TAILQ_FIRST(&tms->tms_metaqueue); + if(f == NULL) + return -1; + + l = TAILQ_LAST(&tms->tms_metaqueue, th_metapkt_queue); + + return l->tm_ts_stop - f->tm_ts_start; /* Delta time */ + +} + + +/** + * + */ +#if 0 +static int +estimate_bitrate(th_muxstream_t *tms) +{ + int64_t delta, rate; + + delta = get_delay(tms); + if(delta == -1) + return -1; + rate = (uint64_t)tms->tms_meta_bytes * 1000000LL / delta; + return rate; +} +#endif + +/** + * + */ +#if 0 +static int +estimate_blockrate(th_muxstream_t *tms) +{ + int64_t delta, rate; + + delta = get_delay(tms); + if(delta == -1) + return 0; + rate = (uint64_t)tms->tms_meta_packets * 1000000LL / delta; + return rate; +} +#endif + +/** + * + */ +#define PES_HEADER_SIZE 19 + +static void +ts_mux_gen_packets(ts_muxer_t *ts, th_muxstream_t *tms, th_pkt_t *pkt) +{ + th_metapkt_t *tm; + int off = 0; + uint8_t *tsb; + int64_t t; + int frrem, pad, tsrem, len; + uint16_t u16; + // int pcroffset; + int printts = 0; + + if(printts)printf("Generating TS packets, DTS = %lld +%d\n", pkt->pkt_dts, + pkt->pkt_duration); + + while(off < pkt->pkt_payloadlen) { + + + tm = malloc(sizeof(th_metapkt_t) + 188); + tsb = tm->tm_pkt; + tm->tm_pcroffset = 0; + + /* Timestamp of first byte */ + tm->tm_ts_start = pkt->pkt_duration * off / + (pkt->pkt_payloadlen + PES_HEADER_SIZE) + + pkt->pkt_dts - tms->tms_muxoffset; + + + if(ts->ts_flags & TS_HTSCLIENT) { + /* Temporary hack */ + *tsb++ = tms->tms_stream->st_type; + } else { + /* TS marker */ + *tsb++ = 0x47; + } + + + /* Write PID and optionally payload unit start indicator */ + *tsb++ = tms->tms_index >> 8 | (off ? 0 : 0x40); + *tsb++ = tms->tms_index; + + /* Remaing bytes after 4 bytes of TS header */ + tsrem = 184; + + if(off == 0) { + /* When writing the packet header, shave of a bit of available + payload size */ + tsrem -= PES_HEADER_SIZE; + } + + /* Remaining length of frame */ + frrem = pkt->pkt_payloadlen - off; + + /* Compute amout of padding needed */ + pad = tsrem - frrem; + + if(pad > 0) { + /* Must pad TS packet */ + + *tsb++ = 0x30; + tsrem -= pad; + *tsb++ = --pad; + + memset(tsb, 0x00, pad); + tsb += pad; + } else { + *tsb++ = 0x10; + } + + + if(off == 0) { + /* Insert PES header */ + + /* Write startcode */ + + *tsb++ = 0; + *tsb++ = 0; + *tsb++ = tms->tms_sc >> 8; + *tsb++ = tms->tms_sc; + + /* Write total frame length (without accounting for startcode and + length field itself */ + + len = pkt_len(pkt) + PES_HEADER_SIZE - 6; + + if(len > 65535) { + /* It's okay to write len as 0 in transport streams, + but only for video frames, and i dont expect any of the + audio frames to exceed 64k + */ + len = 0; + } + + *tsb++ = len >> 8; + *tsb++ = len; + + *tsb++ = 0x80; /* MPEG2 */ + *tsb++ = 0xc0; /* pts & dts is present */ + *tsb++ = 10; /* length of rest of header (pts & dts) */ + + /* Write PTS */ + + t = av_rescale_q(pkt->pkt_pts, AV_TIME_BASE_Q, mpeg_tc); + *tsb++ = (((t >> 30) & 7) << 1) | 1; + u16 = (((t >> 15) & 0x7fff) << 1) | 1; + *tsb++ = u16 >> 8; + *tsb++ = u16; + u16 = ((t & 0x7fff) << 1) | 1; + *tsb++ = u16 >> 8; + *tsb++ = u16; + + /* Write DTS */ + + t = av_rescale_q(pkt->pkt_dts, AV_TIME_BASE_Q, mpeg_tc); + *tsb++ = (((t >> 30) & 7) << 1) | 1; + u16 = (((t >> 15) & 0x7fff) << 1) | 1; + *tsb++ = u16 >> 8; + *tsb++ = u16; + u16 = ((t & 0x7fff) << 1) | 1; + *tsb++ = u16 >> 8; + *tsb++ = u16; + } + + memcpy(tsb, pkt->pkt_payload + off, tsrem); + + /* Timestamp of last byte + 1 */ + t = pkt->pkt_duration * (off + tsrem) / (pkt->pkt_payloadlen + + PES_HEADER_SIZE); + + /* Fix any rounding errors */ + if(t > pkt->pkt_duration) + t = pkt->pkt_duration; + + tm->tm_ts_stop = pkt->pkt_dts + t - tms->tms_muxoffset; + + + if(printts)printf("TS: copy %7d (%3d bytes) pad = %7d: %lld - %lld\n", + off, tsrem, pad, tm->tm_ts_start, tm->tm_ts_stop); + + TAILQ_INSERT_TAIL(&tms->tms_metaqueue, tm, tm_link); + tms->tms_meta_packets++; + + off += tsrem; + } + if(printts)printf("end @ %lld\n", pkt->pkt_dts + pkt->pkt_duration); + if(printts)exit(0); +} + + +/** + * + */ +static int64_t +check_total_delay(th_muxer_t *tm) { th_muxstream_t *tms; - int64_t pcr; - - pkt_store(pkt); /* need to keep packet around */ - - switch(tm->tm_status) { - case TM_IDLE: - break; - - case TM_WAITING_FOR_LOCK: - ts_muxer_start(tm); - break; - - case TM_PLAY: - LIST_FOREACH(tms, &tm->tm_stale_streams, tms_muxer_link) { - if(tms->tms_stream == st) { - pcr = tm->tm_start_dts + getclock_hires() - tm->tm_clockref; - tms_stream_set_active(tm, tms, pkt, pcr); - break; - } - } - break; - - case TM_PAUSE: - break; + int64_t delta = -1, v; + LIST_FOREACH(tms, &tm->tm_streams, tms_muxer_link0) { + v = get_delay(tms); + if(v > delta) + delta = v; } + return delta; +} + + + +/** + * + */ +static int64_t +get_start_pcr(th_muxer_t *tm) +{ + th_muxstream_t *tms; + th_metapkt_t *f; + int64_t r = INT64_MAX; + + LIST_FOREACH(tms, &tm->tm_streams, tms_muxer_link0) { + + f = TAILQ_FIRST(&tms->tms_metaqueue); + if(f == NULL) + continue; + if(f->tm_ts_start < r) + r = f->tm_ts_start; + } + return r; } - -/* - * TS Muxer +/** + * */ -th_muxer_t * -ts_muxer_init(th_subscription_t *s, th_mux_output_t *cb, void *opaque, - int flags) +static void +ts_deliver(void *aux, int64_t now) { - th_transport_t *t = s->ths_transport; - th_stream_t *st; - th_muxer_t *tm; + ts_muxer_t *ts = aux; + th_muxer_t *tm = ts->ts_muxer; + th_muxstream_t *tms, *c; + int rate; + int64_t v, pcr; + th_metapkt_t *x, *y; + uint8_t *tsb; + + int cnt; + + pcr = now - ts->ts_pcr_ref + ts->ts_pcr_offset; + + if(ts->ts_last_pcr + 20000 < now) { + LIST_FOREACH(tms, &tm->tm_streams, tms_muxer_link0) + if(tms->tms_index == ts->ts_pcrpid) + break; + + if(tms != NULL) { + uint8_t pkt[188]; + tsb = &pkt[0]; + *tsb++ = 0x47; + *tsb++ = 0; + *tsb++ = 0; + + /* Insert CC */ + *tsb++ = 0x20 | (tms->tms_cc & 0xf); + + *tsb++ = 183; + *tsb++ = 0x10; /* PCR flag */ + + v = av_rescale_q(pcr, AV_TIME_BASE_Q, mpeg_tc); + *tsb++ = v >> 25; + *tsb++ = v >> 17; + *tsb++ = v >> 9; + *tsb++ = v >> 1; + *tsb++ = (v & 1) << 7; + *tsb++ = 0; + + ts_muxer_add_packet(ts, pkt, tms->tms_index); + ts->ts_last_pcr = now; + } + } + + cnt = ts->ts_blocks_per_packet - ts->ts_block; + + while(--cnt >= 0) { + c = LIST_FIRST(&tm->tm_streams); + tms = LIST_NEXT(c, tms_muxer_link0); + for(; tms != NULL; tms = LIST_NEXT(tms, tms_muxer_link0)) { + x = TAILQ_FIRST(&c->tms_metaqueue); + y = TAILQ_FIRST(&tms->tms_metaqueue); + + if(x != NULL && y != NULL && y->tm_ts_start < x->tm_ts_start) + c = tms; + } + + tms = NULL; + + x = TAILQ_FIRST(&c->tms_metaqueue); + if(x == NULL) { + printf("underrun\n"); + break; + } + TAILQ_REMOVE(&c->tms_metaqueue, x, tm_link); + c->tms_meta_packets--; + + /* Insert CC */ + x->tm_pkt[3] = (x->tm_pkt[3] & 0xf0) | (c->tms_cc & 0xf); + c->tms_cc++; + + ts_muxer_add_packet(ts, x->tm_pkt, c->tms_index); + free(x); + } + + + rate = 0; + LIST_FOREACH(tms, &tm->tm_streams, tms_muxer_link0) { + rate += (tms->tms_meta_packets * 1000000ULL) / (uint64_t)TS_LOOKAHEAD; + // rate += estimate_blockrate(tms); + } + +#if 0 + printf("TS blockrate = %d\n", rate); + + LIST_FOREACH(tms, &tm->tm_streams, tms_muxer_link0) { + printf("%-10s: %-5d %-8lld | ", + htstvstreamtype2txt(tms->tms_stream->st_type), + tms->tms_meta_packets, + get_delay(tms)); + } + printf("\n"); +#endif + + v = 1000000 / (rate / ts->ts_blocks_per_packet); + dtimer_arm_hires(&ts->ts_stream_timer, ts_deliver, ts, now + v); +} + +/** + * + */ +static void +ts_mux_packet_input(void *opaque, th_muxstream_t *tms, th_pkt_t *pkt) +{ + ts_muxer_t *ts = opaque; + th_muxer_t *tm = ts->ts_muxer; + int64_t v, pcr; + + ts_mux_gen_packets(ts, tms, pkt); + + if(ts->ts_running == 0) { + v = check_total_delay(tm); + if(v < TS_LOOKAHEAD) + return; + pcr = get_start_pcr(tm); + if(pcr == INT64_MAX) + return; + ts->ts_pcr_offset = pcr; + ts->ts_pcr_ref = getclock_hires(); + ts->ts_running = 1; + ts_deliver(ts, getclock_hires()); + } +} + + + +/** + * + */ +ts_muxer_t * +ts_muxer_init(th_subscription_t *s, ts_mux_output_t *output, + void *opaque, int flags) +{ + ts_muxer_t *ts = calloc(1, sizeof(ts_muxer_t)); + int dopcr; + int pididx = PID_ES_BASE; th_muxstream_t *tms; - int pididx = 100; - uint32_t sc; - int dopcr = 0; - int offset; + th_muxer_t *tm; + th_stream_t *st; - tm = calloc(1, sizeof(th_muxer_t)); - tm->tm_subscription = s; + ts->ts_pcr_offset = AV_NOPTS_VALUE; + ts->ts_pcr_ref = AV_NOPTS_VALUE; - tm->tm_clockref = getclock_hires(); - tm->tm_blocks_per_packet = 7; - tm->tm_callback = cb; - tm->tm_opaque = opaque; - tm->tm_flags = flags; - tm->tm_new_pkt = ts_encode_new_packet; + ts->ts_output = output; + ts->ts_output_opaque = opaque; + ts->ts_flags = flags; - tm->tm_packet = malloc(188 * tm->tm_blocks_per_packet); + ts->ts_subscription = s; + tm = ts->ts_muxer = muxer_init(s, ts_mux_packet_input, ts); + + ts->ts_blocks_per_packet = 7; + ts->ts_packet = malloc(188 * ts->ts_blocks_per_packet); - pididx = 200; + /* Do TS MUX specific init per stream */ + + LIST_FOREACH(tms, &tm->tm_streams, tms_muxer_link0) { + st = tms->tms_stream; - LIST_FOREACH(st, &t->tht_streams, st_link) { dopcr = 0; - offset = 0; switch(st->st_type) { case HTSTV_MPEG2VIDEO: - sc = 0x1e0; + tms->tms_muxoffset = 200000; + tms->tms_sc = 0x1ec; dopcr = 1; break; case HTSTV_MPEG2AUDIO: - sc = 0x1c0; + tms->tms_muxoffset = 75000; + tms->tms_sc = 0x1cd; break; case HTSTV_AC3: - sc = 0x1bd; + tms->tms_muxoffset = 75000; + tms->tms_sc = 0x1bd; break; case HTSTV_H264: - sc = 0x1e0; + tms->tms_muxoffset = 900000; + tms->tms_sc = 0x1e0; dopcr = 1; break; default: continue; } - tms = calloc(1, sizeof(th_muxstream_t)); - tms->tms_muxer = tm; - tms->tms_stream = st; - tms->tms_dopcr = dopcr; - tms->tms_mux_offset = offset; + if(dopcr && ts->ts_pcrpid == 0) + ts->ts_pcrpid = pididx; - LIST_INSERT_HEAD(&tm->tm_stopped_streams, tms, tms_muxer_link); - LIST_INSERT_HEAD(&tm->tm_media_streams, tms, tms_muxer_media_link); - - tms->tms_index = pididx; - tms->tms_sc = sc; - pididx++; + tms->tms_index = pididx++; } - - tm->tm_pat = tm_create_meta_stream(tm, 0); - tm->tm_pmt = tm_create_meta_stream(tm, 100); - - s->ths_muxer = tm; - return tm; + return ts; } -/* +/** * */ -static void -tms_destroy(th_muxstream_t *tms) +void +ts_muxer_deinit(ts_muxer_t *ts, th_subscription_t *s) { - if(tms->tms_stream) - LIST_REMOVE(tms, tms_muxer_media_link); - - LIST_REMOVE(tms, tms_muxer_link); - tms_set_curpkt(tms, NULL); - memset(tms, 0xff, sizeof(th_muxstream_t)); - free(tms); + free(ts->ts_packet); + muxer_deinit(ts->ts_muxer, s); + dtimer_disarm(&ts->ts_patpmt_timer); + free(ts); } -/* +/** * */ void -ts_muxer_deinit(th_muxer_t *tm, th_subscription_t *s) +ts_muxer_play(ts_muxer_t *ts, int64_t toffset) { - th_muxstream_t *tms; + th_subscription_t *s = ts->ts_muxer->tm_subscription; - s->ths_raw_input = NULL; - s->ths_muxer = NULL; + /* Start PAT / PMT generator */ + ts_muxer_generate_tables(ts, getclock_hires()); - if(tm->tm_transport_linked) - LIST_REMOVE(tm, tm_transport_link); + if(!(ts->ts_flags & TS_SEEK) && + s->ths_transport->tht_source_type == THT_MPEG_TS) { + /* We dont need to seek and source is MPEG TS, we can use a + shortcut to avoid remuxing stream */ - dtimer_disarm(&tm->tm_timer); - dtimer_disarm(&tm->tm_table_timer); - - tms_destroy(tm->tm_pat); - tms_destroy(tm->tm_pmt); - - while((tms = LIST_FIRST(&tm->tm_media_streams)) != NULL) - tms_destroy(tms); - - free(tm->tm_packet); - free(tm); + s->ths_raw_input = ts_muxer_raw_input; + s->ths_opaque = ts; + } else { + muxer_play(ts->ts_muxer, toffset); + } } + +/** + * + */ +void +ts_muxer_pause(ts_muxer_t *ts) +{ + dtimer_disarm(&ts->ts_patpmt_timer); + muxer_pause(ts->ts_muxer); +} diff --git a/tsmux.h b/tsmux.h index 4964dbf8..b3b5daf2 100644 --- a/tsmux.h +++ b/tsmux.h @@ -19,13 +19,53 @@ #ifndef TSMUX_H #define TSMUX_H -th_muxer_t *ts_muxer_init(th_subscription_t *s, th_mux_output_t *cb, +typedef void (ts_mux_output_t)(void *opaque, th_subscription_t *s, + uint8_t *pkt, int npackets, int64_t pcr); + + +typedef struct ts_muxer { + th_subscription_t *ts_subscription; + int ts_flags; +#define TS_SEEK 0x1 +#define TS_HTSCLIENT 0x2 + + int ts_running; + + th_muxer_t *ts_muxer; + ts_mux_output_t *ts_output; + void *ts_output_opaque; + + int64_t ts_pcr_offset; + int64_t ts_pcr_ref; + + + int ts_pat_cc; + int ts_pmt_cc; + + dtimer_t ts_patpmt_timer; + + uint8_t *ts_packet; + int ts_block; + int ts_blocks_per_packet; + + dtimer_t ts_stream_timer; + + int ts_pcrpid; + + int64_t ts_last_pcr; + +} ts_muxer_t; + + + + +ts_muxer_t *ts_muxer_init(th_subscription_t *s, ts_mux_output_t *output, void *opaque, int flags); -void ts_muxer_deinit(th_muxer_t *tm, th_subscription_t *s); +void ts_muxer_deinit(ts_muxer_t *ts, th_subscription_t *s); -void ts_muxer_play(th_muxer_t *tm, int64_t toffset); +void ts_muxer_play(ts_muxer_t *ts, int64_t toffset); -void ts_muxer_pause(th_muxer_t *tm); +void ts_muxer_pause(ts_muxer_t *ts); #endif /* TSMUX_H */ diff --git a/tvhead.h b/tvhead.h index e8f5792d..b0de4bbf 100644 --- a/tvhead.h +++ b/tvhead.h @@ -86,7 +86,7 @@ LIST_HEAD(th_pkt_list, th_pkt); LIST_HEAD(th_muxer_list, th_muxer); LIST_HEAD(th_muxstream_list, th_muxstream); LIST_HEAD(th_descrambler_list, th_descrambler); - +TAILQ_HEAD(th_metapkt_queue, th_metapkt); extern time_t dispatch_clock; extern int startupcounter; @@ -521,37 +521,39 @@ typedef struct th_pkt { } th_pkt_t; + +/** + * Meta packets + */ +typedef struct th_metapkt { + TAILQ_ENTRY(th_metapkt) tm_link; + int64_t tm_ts_start; + int64_t tm_ts_stop; + int tm_pcroffset; + uint8_t tm_pkt[0]; +} th_metapkt_t; + + /* * A mux stream reader */ typedef struct th_muxstream { - LIST_ENTRY(th_muxstream) tms_muxer_link; + LIST_ENTRY(th_muxstream) tms_muxer_link0; struct th_muxer *tms_muxer; - - th_pkt_t *tms_curpkt; - - int tms_offset; /* offset in current packet */ - th_stream_t *tms_stream; - LIST_ENTRY(th_muxstream) tms_muxer_media_link; - - int64_t tms_nextblock; /* Time for delivery of next block */ - int tms_block_interval; - int tms_block_rate; - - int tms_mux_offset; - int tms_index; /* Used as PID or whatever */ - th_pkt_t *tms_tmppkt; /* temporary pkt pointer during lock phaze */ + struct th_metapkt_queue tms_metaqueue; + uint32_t tms_meta_packets; /* number of packets "" */ + + /* MPEG TS multiplex stuff */ int tms_sc; /* start code */ int tms_cc; - int tms_dopcr; - int64_t tms_nextpcr; + int64_t tms_muxoffset; /* DTS offset from PCR */ /* Memebers used when running with ffmpeg */ @@ -571,8 +573,8 @@ typedef struct th_muxstream { struct th_subscription; -typedef void (th_mux_output_t)(void *opaque, struct th_subscription *s, - uint8_t *pkt, int blocks, int64_t pcr); +typedef void (th_mux_output_t)(void *opaque, th_muxstream_t *tms, + th_pkt_t *pkt); typedef void (th_mux_newpkt_t)(struct th_muxer *tm, th_stream_t *st, @@ -583,33 +585,17 @@ typedef struct th_muxer { th_mux_newpkt_t *tm_new_pkt; LIST_ENTRY(th_muxer) tm_transport_link; - int tm_transport_linked; + int tm_linked; - int64_t tm_clockref; /* Base clock ref */ - int64_t tm_pauseref; /* Time when we were paused */ + int64_t tm_offset; - int tm_flags; -#define TM_HTSCLIENTMODE 0x1 /* Ugly workaround for now */ -#define TM_SEEKABLE 0x2 /* We need the pause / seek to work */ - - int64_t tm_start_dts; - int64_t tm_next_pat; - - struct th_muxstream_list tm_media_streams; - - struct th_muxstream_list tm_active_streams; - struct th_muxstream_list tm_stale_streams; - struct th_muxstream_list tm_stopped_streams; - - uint8_t *tm_packet; - int tm_blocks_per_packet; + struct th_muxstream_list tm_streams; struct th_subscription *tm_subscription; - th_mux_output_t *tm_callback; + th_mux_output_t *tm_output; void *tm_opaque; - dtimer_t tm_timer; enum { TM_IDLE, @@ -618,16 +604,7 @@ typedef struct th_muxer { TM_PAUSE, } tm_status; - th_muxstream_t *tm_pat; - th_muxstream_t *tm_pmt; - struct AVFormatContext *tm_avfctx; - - int tm_drop_rate; - int tm_drop_cnt; - - dtimer_t tm_table_timer; - int tm_block_offset; } th_muxer_t;