diff --git a/Makefile b/Makefile
index 4a59c6b6..baf25b80 100644
--- a/Makefile
+++ b/Makefile
@@ -1,8 +1,8 @@
-include ../config.mak
SRCS = main.c dispatch.c channels.c transports.c teletext.c psi.c \
- subscriptions.c tsmux.c tsdemux.c pes.c buffer.c tcp.c \
- resolver.c
+ subscriptions.c mux.c tsdemux.c pes.c buffer.c tcp.c \
+ resolver.c tsmux.c
SRCS += http.c htmlui.c
diff --git a/htmlui.c b/htmlui.c
index 9aabc455..203f43a2 100644
--- a/htmlui.c
+++ b/htmlui.c
@@ -913,8 +913,6 @@ page_status(http_connection_t *hc, const char *remain, void *opaque)
th_dvb_mux_instance_t *tdmi;
th_stream_t *st;
const char *txt, *t1, *t2;
- th_muxer_t *tm;
- th_muxstream_t *tms;
char tmptxt[100];
if(!html_verify_access(hc, "system-status"))
@@ -1204,27 +1202,8 @@ page_status(http_connection_t *hc, const char *remain, void *opaque)
tcp_qprintf(&tq,
"Using transport \"%s\"
",
t->tht_name);
-
- if((tm = s->ths_muxer) != NULL) {
- int64_t i64min = INT64_MAX;
- int64_t i64max = INT64_MIN;
-
- LIST_FOREACH(tms, &tm->tm_media_streams, tms_muxer_media_link) {
- if(tms->tms_curpkt == NULL)
- continue; /* stream is currently stale */
-
- if(tms->tms_nextblock < i64min)
- i64min = tms->tms_nextblock;
-
- if(tms->tms_nextblock > i64max)
- i64max = tms->tms_nextblock;
- }
-
- tcp_qprintf(&tq,
- "Internal stream delta: %lld us
",
- i64max - i64min);
- }
- }
+
+ }
tcp_qprintf(&tq, "");
box_bottom(&tq);
tcp_qprintf(&tq, "
");
diff --git a/htsclient.c b/htsclient.c
index 9cd2d779..1f270037 100644
--- a/htsclient.c
+++ b/htsclient.c
@@ -410,7 +410,7 @@ client_subscription_callback(struct th_subscription *s,
case TRANSPORT_AVAILABLE:
assert(c->c_muxer == NULL);
c->c_muxer = ts_muxer_init(s, client_output_ts, c,
- TM_HTSCLIENTMODE | TM_SEEKABLE);
+ TS_HTSCLIENT | TS_SEEK);
ts_muxer_play(c->c_muxer, 0);
break;
diff --git a/iptv_output.c b/iptv_output.c
index ca119b1f..0bcba4b5 100644
--- a/iptv_output.c
+++ b/iptv_output.c
@@ -40,7 +40,7 @@
#define MULTICAST_PKT_SIZ (188 * 7)
typedef struct output_multicast {
- th_muxer_t *om_muxer;
+ ts_muxer_t *om_muxer;
int om_fd;
struct sockaddr_in om_dst;
@@ -61,8 +61,8 @@ typedef struct output_multicast {
* Output MPEG TS
*/
void
-iptv_output_ts(void *opaque, th_subscription_t *s,
- uint8_t *pkt, int blocks, int64_t pcr)
+iptv_output_ts(void *opaque, th_subscription_t *s, uint8_t *pkt,
+ int blocks, int64_t pcr)
{
output_multicast_t *om = opaque;
@@ -101,8 +101,8 @@ iptv_subscription_callback(struct th_subscription *s,
switch(event) {
case TRANSPORT_AVAILABLE:
assert(om->om_muxer == NULL);
- om->om_muxer = ts_muxer_init(s, iptv_output_ts, om, 0);
- om->om_muxer->tm_drop_rate = om->om_intra_drop_rate;
+ om->om_muxer = ts_muxer_init(s, iptv_output_ts, om, TS_SEEK);
+ // om->om_muxer->tm_drop_rate = om->om_intra_drop_rate;
ts_muxer_play(om->om_muxer, 0);
break;
diff --git a/mux.c b/mux.c
new file mode 100644
index 00000000..73b9bc4f
--- /dev/null
+++ b/mux.c
@@ -0,0 +1,199 @@
+/*
+ * tvheadend, Muxing
+ * Copyright (C) 2007 Andreas Öman
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+#define _GNU_SOURCE
+#include
+
+#include
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include
+#include
+#include
+
+#include
+
+#include "tvhead.h"
+#include "dispatch.h"
+#include "teletext.h"
+#include "transports.h"
+#include "subscriptions.h"
+#include "pes.h"
+#include "psi.h"
+#include "buffer.h"
+
+
+
+
+/*
+ * pause playback
+ */
+void
+muxer_pause(th_muxer_t *tm)
+{
+}
+
+
+
+
+/*
+ * playback start
+ */
+void
+muxer_play(th_muxer_t *tm, int64_t toffset)
+{
+ th_subscription_t *s = tm->tm_subscription;
+
+ if(!tm->tm_linked)
+ LIST_INSERT_HEAD(&s->ths_transport->tht_muxers, tm, tm_transport_link);
+
+ if(toffset == AV_NOPTS_VALUE) {
+ /* continue from last playback */
+ tm->tm_offset = 0;
+ } else {
+ tm->tm_offset = toffset;
+ }
+ tm->tm_status = TM_PLAY;
+}
+
+/**
+ *
+ */
+static void
+mux_new_packet_for_stream(th_muxer_t *tm, th_muxstream_t *tms, th_pkt_t *pkt)
+{
+ if(tm->tm_offset == 0) {
+ /* Direct playback, pass it on at once */
+ tm->tm_output(tm->tm_opaque, tms, pkt);
+ return;
+ }
+}
+
+
+
+/**
+ *
+ */
+static void
+mux_new_packet(th_muxer_t *tm, th_stream_t *st, th_pkt_t *pkt)
+{
+ th_muxstream_t *tms;
+
+ pkt_store(pkt); /* need to keep packet around */
+
+ switch(tm->tm_status) {
+ case TM_IDLE:
+ break;
+
+ case TM_WAITING_FOR_LOCK:
+ break;
+
+ case TM_PLAY:
+ LIST_FOREACH(tms, &tm->tm_streams, tms_muxer_link0) {
+ if(tms->tms_stream == st) {
+ mux_new_packet_for_stream(tm, tms, pkt);
+ break;
+ }
+ }
+ break;
+
+ case TM_PAUSE:
+ break;
+ }
+}
+
+
+
+
+
+/*
+ * TS Muxer
+ */
+th_muxer_t *
+muxer_init(th_subscription_t *s, th_mux_output_t *cb, void *opaque,
+ int flags)
+{
+ th_transport_t *t = s->ths_transport;
+ th_stream_t *st;
+ th_muxer_t *tm;
+ th_muxstream_t *tms;
+
+ printf("muxer init!\n");
+
+ tm = calloc(1, sizeof(th_muxer_t));
+ tm->tm_subscription = s;
+
+ tm->tm_output = cb;
+ tm->tm_opaque = opaque;
+ tm->tm_new_pkt = mux_new_packet;
+
+
+ LIST_FOREACH(st, &t->tht_streams, st_link) {
+ tms = calloc(1, sizeof(th_muxstream_t));
+ tms->tms_muxer = tm;
+ tms->tms_stream = st;
+
+
+ LIST_INSERT_HEAD(&tm->tm_streams, tms, tms_muxer_link0);
+ TAILQ_INIT(&tms->tms_metaqueue);
+ }
+
+ s->ths_muxer = tm;
+ return tm;
+}
+
+
+/*
+ *
+ */
+static void
+tms_destroy(th_muxstream_t *tms)
+{
+ LIST_REMOVE(tms, tms_muxer_link0);
+
+ // dtimer_disarm(&tms->tms_timer);
+ free(tms);
+}
+
+
+/*
+ *
+ */
+void
+muxer_deinit(th_muxer_t *tm, th_subscription_t *s)
+{
+ th_muxstream_t *tms;
+
+ s->ths_raw_input = NULL;
+ s->ths_muxer = NULL;
+
+ if(tm->tm_linked)
+ LIST_INSERT_HEAD(&s->ths_transport->tht_muxers, tm, tm_transport_link);
+
+ while((tms = LIST_FIRST(&tm->tm_streams)) != NULL)
+ tms_destroy(tms);
+
+ free(tm);
+}
+
diff --git a/mux.h b/mux.h
new file mode 100644
index 00000000..4a9edc06
--- /dev/null
+++ b/mux.h
@@ -0,0 +1,31 @@
+/*
+ * tvheadend, Stream muxer
+ * Copyright (C) 2007 Andreas Öman
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+#ifndef MUX_H
+#define MUX_H
+
+th_muxer_t *muxer_init(th_subscription_t *s, th_mux_output_t *cb,
+ void *opaque);
+
+void muxer_deinit(th_muxer_t *tm, th_subscription_t *s);
+
+void muxer_play(th_muxer_t *tm, int64_t toffset);
+
+void muxer_pause(th_muxer_t *tm);
+
+#endif /* MUX_H */
diff --git a/psi.c b/psi.c
index 432a2f04..0be811c3 100644
--- a/psi.c
+++ b/psi.c
@@ -131,7 +131,7 @@ psi_append_crc32(uint8_t *buf, int offset, int maxlen)
*/
int
-psi_build_pat(th_transport_t *t, uint8_t *buf, int maxlen)
+psi_build_pat(th_transport_t *t, uint8_t *buf, int maxlen, int pmtpid)
{
if(maxlen < 12)
return -1;
@@ -150,8 +150,8 @@ psi_build_pat(th_transport_t *t, uint8_t *buf, int maxlen)
buf[8] = 0; /* Program number, we only have one program */
buf[9] = 1;
- buf[10] = 0; /* PMT pid */
- buf[11] = 100;
+ buf[10] = 0xe0 | (pmtpid >> 8);
+ buf[11] = pmtpid;
return psi_append_crc32(buf, 12, maxlen);
}
@@ -292,7 +292,7 @@ psi_parse_pmt(th_transport_t *t, uint8_t *ptr, int len, int chksvcid)
*/
int
-psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen)
+psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen, int pcrpid)
{
th_stream_t *st;
th_muxstream_t *tms;
@@ -314,19 +314,8 @@ psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen)
buf[6] = 0;
buf[7] = 0;
- /* Find PID that carries PCR */
-
- LIST_FOREACH(tms, &tm->tm_media_streams, tms_muxer_media_link)
- if(tms->tms_dopcr)
- break;
-
- if(tms == NULL) {
- buf[8] = 0xff;
- buf[9] = 0xff;
- } else {
- buf[8] = 0xe0 | (tms->tms_index >> 8);
- buf[9] = tms->tms_index;
- }
+ buf[8] = 0xe0 | (pcrpid >> 8);
+ buf[9] = pcrpid;
buf[10] = 0xf0; /* Program info length */
buf[11] = 0x00; /* We dont have any such things atm */
@@ -334,7 +323,7 @@ psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen)
buf += 12;
tlen = 12;
- LIST_FOREACH(tms, &tm->tm_media_streams, tms_muxer_media_link) {
+ LIST_FOREACH(tms, &tm->tm_streams, tms_muxer_link0) {
st = tms->tms_stream;
switch(st->st_type) {
diff --git a/psi.h b/psi.h
index 751e9437..13b6d903 100644
--- a/psi.h
+++ b/psi.h
@@ -37,9 +37,9 @@ int psi_parse_pmt(th_transport_t *t, uint8_t *ptr, int len, int chksvcid);
uint32_t psi_crc32(uint8_t *data, size_t datalen);
-int psi_build_pat(th_transport_t *t, uint8_t *buf, int maxlen);
+int psi_build_pat(th_transport_t *t, uint8_t *buf, int maxlen, int pmtpid);
-int psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen);
+int psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen, int pcrpid);
const char *psi_caid2name(uint16_t caid);
diff --git a/pvr.c b/pvr.c
index 39359c99..50cbed56 100644
--- a/pvr.c
+++ b/pvr.c
@@ -787,7 +787,7 @@ pvrr_transport_available(pvr_rec_t *pvrr, th_transport_t *t)
tms = calloc(1, sizeof(th_muxstream_t));
tms->tms_stream = st;
- LIST_INSERT_HEAD(&tm->tm_media_streams, tms, tms_muxer_media_link);
+ LIST_INSERT_HEAD(&tm->tm_streams, tms, tms_muxer_link0);
tms->tms_avstream = av_mallocz(sizeof(AVStream));
@@ -855,8 +855,8 @@ pvrr_transport_unavailable(pvr_rec_t *pvrr, th_transport_t *t)
/* Destroy muxstreams */
- while((tms = LIST_FIRST(&tm->tm_media_streams)) != NULL) {
- LIST_REMOVE(tms, tms_muxer_media_link);
+ while((tms = LIST_FIRST(&tm->tm_streams)) != NULL) {
+ LIST_REMOVE(tms, tms_muxer_link0);
free(tms);
}
@@ -965,7 +965,7 @@ is_all_decoded(th_muxer_t *tm, enum CodecType type)
th_muxstream_t *tms;
AVStream *st;
- LIST_FOREACH(tms, &tm->tm_media_streams, tms_muxer_media_link) {
+ LIST_FOREACH(tms, &tm->tm_streams, tms_muxer_link0) {
st = tms->tms_avstream;
if(st->codec->codec->type == type && tms->tms_decoded == 0)
return 0;
@@ -994,7 +994,7 @@ pvrr_record_packet(pvr_rec_t *pvrr, th_pkt_t *pkt)
char txt[100];
size_t bufsize;
- LIST_FOREACH(tms, &tm->tm_media_streams, tms_muxer_media_link)
+ LIST_FOREACH(tms, &tm->tm_streams, tms_muxer_link0)
if(tms->tms_stream == pkt->pkt_stream)
break;
@@ -1108,7 +1108,7 @@ pvrr_record_packet(pvr_rec_t *pvrr, th_pkt_t *pkt)
if(pkt->pkt_commercial != COMMERCIAL_YES) {
- LIST_FOREACH(tms, &tm->tm_media_streams, tms_muxer_media_link)
+ LIST_FOREACH(tms, &tm->tm_streams, tms_muxer_link0)
tms->tms_decoded = 0;
pvrr_set_rec_state(pvrr, PVR_REC_WAIT_AUDIO_LOCK);
diff --git a/rtp.c b/rtp.c
index 45a952cb..77e832a2 100644
--- a/rtp.c
+++ b/rtp.c
@@ -82,7 +82,7 @@ rtp_sendmsg(uint8_t *pkt, int blocks, int64_t pcr,
void
-rtp_output_ts(void *opaque, struct th_subscription *s,
+rtp_output_ts(void *opaque, th_subscription_t *s,
uint8_t *pkt, int blocks, int64_t pcr)
{
th_rtp_streamer_t *trs = opaque;
diff --git a/rtp.h b/rtp.h
index ef49b1f7..ee084c8d 100644
--- a/rtp.h
+++ b/rtp.h
@@ -29,7 +29,7 @@ typedef struct th_rtp_streamer {
void rtp_streamer_init(th_rtp_streamer_t *trs, int fd,
struct sockaddr_in *dst);
-void rtp_output_ts(void *opaque, struct th_subscription *s,
+void rtp_output_ts(void *opaque, th_subscription_t *s,
uint8_t *pkt, int blocks, int64_t pcr);
int rtp_sendmsg(uint8_t *pkt, int blocks, int64_t pcr,
diff --git a/rtsp.c b/rtsp.c
index 891385d6..d08adc81 100644
--- a/rtsp.c
+++ b/rtsp.c
@@ -123,7 +123,8 @@ rtsp_subscription_callback(struct th_subscription *s,
switch(event) {
case TRANSPORT_AVAILABLE:
assert(rs->rs_muxer == NULL);
- rs->rs_muxer = ts_muxer_init(s, rtp_output_ts, &rs->rs_rtp_streamer, 0);
+ rs->rs_muxer = ts_muxer_init(s, rtp_output_ts, &rs->rs_rtp_streamer,
+ TS_SEEK);
break;
case TRANSPORT_UNAVAILABLE:
diff --git a/tsmux.c b/tsmux.c
index 2125baab..41c79329 100644
--- a/tsmux.c
+++ b/tsmux.c
@@ -1,5 +1,5 @@
/*
- * tvheadend, MPEG transport stream functions
+ * tvheadend, MPEG Transport stream muxer
* Copyright (C) 2007 Andreas Öman
*
* This program is free software: you can redistribute it and/or modify
@@ -32,13 +32,8 @@
#include
#include
-#include
-#include
-
#include
-#include
-
#include "tvhead.h"
#include "dispatch.h"
#include "teletext.h"
@@ -47,619 +42,79 @@
#include "pes.h"
#include "psi.h"
#include "buffer.h"
+#include "mux.h"
#include "tsmux.h"
-#define PES_HEADER_SIZE 19
+#define TS_LOOKAHEAD 500000
-static void ts_muxer_relock(th_muxer_t *tm, uint64_t pcr);
-
-
-/*
- * Return the deadline for a given packet
- */
-static int64_t
-muxer_pkt_deadline(th_pkt_t *pkt, th_muxstream_t *tms)
-{
- th_stream_t *st = pkt->pkt_stream;
- int64_t r;
-
- r = pkt->pkt_dts;
- if(st != NULL)
- r -= st->st_peak_presentation_delay;
-
- r -= tms->tms_mux_offset;
- return r;
-}
-
-
-/*
- *
- */
-static void
-tms_set_curpkt(th_muxstream_t *tms, th_pkt_t *pkt)
-{
- if(tms->tms_curpkt != NULL)
- pkt_deref(tms->tms_curpkt);
-
- tms->tms_offset = 0;
-
- if(pkt != NULL) {
- tms->tms_curpkt = pkt_ref(pkt);
- } else {
- tms->tms_curpkt = NULL;
- }
-}
-
-/*
- *
- */
-static void
-tms_stream_set_stale(th_muxer_t *tm, th_muxstream_t *tms, int64_t pcr)
-{
- tms->tms_nextblock = INT64_MAX;
- tms->tms_staletime = pcr;
- LIST_REMOVE(tms, tms_muxer_link);
- LIST_INSERT_HEAD(&tm->tm_stale_streams, tms, tms_muxer_link);
- tms_set_curpkt(tms, NULL);
-}
-
-/*
- *
- */
-static void
-tms_stream_add_meta(th_muxer_t *tm, th_muxstream_t *tms, th_pkt_t *pkt)
-{
- LIST_REMOVE(tms, tms_muxer_link);
- LIST_INSERT_HEAD(&tm->tm_active_streams, tms, tms_muxer_link);
- tms_set_curpkt(tms, pkt);
- tms->tms_block_interval = 0;
- tms->tms_nextblock = 0;
-}
-
-/*
- *
- */
-static void
-tms_stream_set_active(th_muxer_t *tm, th_muxstream_t *tms, th_pkt_t *pkt,
- int64_t pcr)
-{
- int l, dt;
- int64_t dl;
-
- assert(pkt->pkt_duration > 0);
-
- tms->tms_nextblock = pcr;
- dl = muxer_pkt_deadline(pkt, tms);
- dt = dl - pcr;
- l = (pkt_len(pkt) + PES_HEADER_SIZE) / 188;
-
- tms->tms_dl = dl;
- tms->tms_block_interval = l == 0 ? 1 : dt / l;
-
- if(tms->tms_block_interval < 10)
- tms->tms_block_interval = 10;
-
- LIST_REMOVE(tms, tms_muxer_link);
- LIST_INSERT_HEAD(&tm->tm_active_streams, tms, tms_muxer_link);
- tms_set_curpkt(tms, pkt);
- tms->tms_blockcnt = 0;
-}
-
-
-
-
-
-/*
- * Generates a 188 bytes TS packet in 'tsb'
- */
-static int
-ts_make_pkt(th_muxer_t *tm, th_muxstream_t *tms, uint8_t *tsb, int64_t pcr)
-{
- uint8_t *tsb0 = tsb;
- th_pkt_t *pkt = tms->tms_curpkt;
- uint8_t *src;
- int tsrem; /* Remaining bytes in tspacket */
- int frrem; /* Remaining bytes in frame */
- int64_t ts;
- int pad, cc, len;
- uint16_t u16;
- int is_section = pkt->pkt_stream == NULL;
- int header_len = 0;
- int dumppkt = 0;
- AVRational mpeg_tc = {1, 90000};
-
- if(pkt_payload(pkt) == NULL)
- return -1;
-
- frrem = pkt_len(pkt) - tms->tms_offset;
- assert(frrem > 0);
-
- cc = tms->tms_cc++;
- tsrem = 184;
-
- if(tms->tms_offset == 0) {
- /* When writing the packet header, shave of a bit of available
- payload size */
- if(is_section) {
- header_len = 1;
- } else {
- header_len = PES_HEADER_SIZE;
- }
- tsrem -= header_len;
- }
-
- if(tm->tm_flags & TM_HTSCLIENTMODE) {
- /* UGLY */
- if(tms->tms_stream)
- *tsb++ = tms->tms_stream->st_type;
- else
- *tsb++ = 0xff;
-
- } else {
- /* TS marker */
- *tsb++ = 0x47;
- }
-
- /* Write PID and optionally payload unit start indicator */
- *tsb++ = tms->tms_index >> 8 | (tms->tms_offset ? 0 : 0x40);
- *tsb++ = tms->tms_index;
-
-
-
- /* Compute amout of padding needed */
- pad = tsrem - frrem;
-
-
- if(pcr != AV_NOPTS_VALUE) {
-
- /* Insert PCR */
-
- tsrem -= 8;
- pad -= 8;
- if(pad < 0)
- pad = 0;
-
- *tsb++ = (cc & 0xf) | 0x30;
- *tsb++ = 7 + pad;
- *tsb++ = 0x10; /* PCR flag */
-
- ts = av_rescale_q(pcr, AV_TIME_BASE_Q, mpeg_tc);
- *tsb++ = ts >> 25;
- *tsb++ = ts >> 17;
- *tsb++ = ts >> 9;
- *tsb++ = ts >> 1;
- *tsb++ = (ts & 1) << 7;
- *tsb++ = 0;
-
- memset(tsb, 0xff, pad);
- tsb += pad;
- tsrem -= pad;
-
- } else if(pad > 0) {
- /* Must pad TS packet */
-
- *tsb++ = (cc & 0xf) | 0x30;
- tsrem -= pad;
- *tsb++ = --pad;
-
- memset(tsb, 0x00, pad);
- tsb += pad;
- } else {
- *tsb++ = (cc & 0xf) | 0x10;
- }
-
- if(tms->tms_offset == 0) {
- if(!is_section) {
-
- /* First TS packet for this frame, write PES headers */
-
- /* Write startcode */
-
- *tsb++ = 0;
- *tsb++ = 0;
- *tsb++ = tms->tms_sc >> 8;
- *tsb++ = tms->tms_sc;
-
- /* Write total frame length (without accounting for startcode and
- length field itself */
-
- len = pkt_len(pkt) + header_len - 6;
-
- if(len > 65535) {
- /* It's okay to write len as 0 in transport streams,
- but only for video frames, and i dont expect any of the
- audio frames to exceed 64k
- */
- len = 0;
- }
-
- *tsb++ = len >> 8;
- *tsb++ = len;
-
- *tsb++ = 0x80; /* MPEG2 */
- *tsb++ = 0xc0; /* pts & dts is present */
- *tsb++ = 10; /* length of header (pts & dts) */
-
- /* Write PTS */
-
- ts = av_rescale_q(pkt->pkt_pts, AV_TIME_BASE_Q, mpeg_tc);
- *tsb++ = (((ts >> 30) & 7) << 1) | 1;
- u16 = (((ts >> 15) & 0x7fff) << 1) | 1;
- *tsb++ = u16 >> 8;
- *tsb++ = u16;
- u16 = ((ts & 0x7fff) << 1) | 1;
- *tsb++ = u16 >> 8;
- *tsb++ = u16;
-
- /* Write DTS */
-
- ts = av_rescale_q(pkt->pkt_dts, AV_TIME_BASE_Q, mpeg_tc);
- *tsb++ = (((ts >> 30) & 7) << 1) | 1;
- u16 = (((ts >> 15) & 0x7fff) << 1) | 1;
- *tsb++ = u16 >> 8;
- *tsb++ = u16;
- u16 = ((ts & 0x7fff) << 1) | 1;
- *tsb++ = u16 >> 8;
- *tsb++ = u16;
- } else {
- *tsb++ = 0; /* Pointer field for tables */
- }
- }
-
- /* Fill rest of TS packet with payload */
-
- src = pkt_payload(pkt) + tms->tms_offset;
- memcpy(tsb, src, tsrem);
-
- tms->tms_offset += tsrem;
-
- if(dumppkt) {
- int i;
- for(i = 0; i < 188; i++)
- printf("%02x%c", tsb0[i], " \n"[i & 0xf]);
- printf("\n");
- }
-
- assert(tsb0 + 188 == tsb + tsrem);
- return 0;
-}
-
-
-/*
- *
- */
-int
-ts_mux_packet(th_muxer_t *tm, int64_t pcr, uint8_t *outbuf, int maxblocks)
-{
- th_muxstream_t *tms, *t;
- th_pkt_t *pkt;
- int rem, i;
- int64_t pcr1;
-
- LIST_FOREACH(tms, &tm->tm_active_streams, tms_muxer_link) {
- if(tms->tms_nextblock < pcr)
- tms->tms_nextblock = pcr;
-
- if(tms->tms_curpkt != NULL)
- pkt_load(tms->tms_curpkt);
- }
-
- for(i = 0; i < maxblocks; i++) {
-
- /* Find the stream with the lowest/closest time for next scheduled
- transport stream block */
-
- tms = NULL;
- LIST_FOREACH(t, &tm->tm_active_streams, tms_muxer_link)
- if(tms == NULL || t->tms_nextblock < tms->tms_nextblock)
- tms = t;
-
- if(tms == NULL)
- break; /* No active streams, cannot do anything */
-
-
- pkt = tms->tms_curpkt;
- /* Do we need to send a new PCR update? */
-
- if(tms->tms_dopcr && tms->tms_nextpcr <= pcr) {
- pcr1 = pcr + 200000;
- tms->tms_nextpcr = pcr + 20000;
- } else {
- pcr1 = AV_NOPTS_VALUE;
- }
-
- /* Generate a transport stream packet */
-
- if(ts_make_pkt(tm, tms, outbuf, pcr1) == 0) {
- rem = pkt_len(pkt) - tms->tms_offset;
-
- /* Periodic drop (for testing purposes) */
-
- if(tm->tm_drop_rate && i != 0 && i != maxblocks - 1 &&
- tms->tms_stream &&
- (tms->tms_stream->st_type == HTSTV_H264 ||
- tms->tms_stream->st_type == HTSTV_MPEG2VIDEO) &&
- ++tm->tm_drop_cnt == tm->tm_drop_rate) {
- tm->tm_drop_cnt = 0;
- i--;
- } else {
- outbuf += 188;
- }
-
- } else {
- i--;
- rem = 0;
- }
-
- if(rem == 0) {
- /* End of frame, find next */
- while(1) {
- pkt = pkt->pkt_on_stream_queue ?
- TAILQ_NEXT(pkt, pkt_queue_link) : NULL;
- if(pkt != NULL) {
- /* Ok, reset counters */
- tms_stream_set_active(tm, tms, pkt, pcr);
- pkt_load(tms->tms_curpkt);
-
- } else {
- /* This stream cannot contribute, move it to stale list */
- tms_stream_set_stale(tm, tms, pcr);
- }
- break;
- }
- continue;
- }
- tms->tms_nextblock += tms->tms_block_interval;
- tms->tms_blockcnt++;
-#if 0
- if(tms->tms_stream)
- printf("%-10s %lld [seg: %d] pcr=%lld dl=%lld nxt=%lld off=%d d=%lld\n",
- htstvstreamtype2txt(tms->tms_stream->st_type),
- pkt->pkt_dts, tms->tms_blockcnt, pcr, tms->tms_dl,
- tms->tms_nextblock, tms->tms_offset,
- tms->tms_nextblock - pcr);
-#endif
- }
- return i;
-}
-
-/*
- *
- */
-void
-tm_gen_pat_pmt(th_muxer_t *tm, int64_t pcr)
-{
- th_subscription_t *s = tm->tm_subscription;
- th_transport_t *t = s->ths_transport;
- th_pkt_t *pkt;
-
- pkt = pkt_alloc(NULL, 4096, pcr, pcr);
- pkt->pkt_payloadlen = psi_build_pmt(tm, pkt_payload(pkt), 4096);
- tms_stream_add_meta(tm, tm->tm_pmt, pkt);
- pkt_deref(pkt);
-
- pkt = pkt_alloc(NULL, 4096, pcr, pcr);
- pkt->pkt_payloadlen = psi_build_pat(t, pkt_payload(pkt), 4096);
- tms_stream_add_meta(tm, tm->tm_pat, pkt);
- pkt_deref(pkt);
-}
-
-
-
-static void ts_gen_timer_callback(void *aux, int64_t now);
-
-/*
- *
- */
-void
-ts_gen_packet(th_muxer_t *tm, int64_t clk)
-{
- int64_t pcr, next;
- th_muxstream_t *tms;
- int r;
-
- dtimer_disarm(&tm->tm_timer);
-
- pcr = tm->tm_start_dts + clk - tm->tm_clockref;
-
- do {
- if(pcr >= tm->tm_next_pat) {
- tm->tm_next_pat = pcr + 100000;
- tm_gen_pat_pmt(tm, pcr);
- }
-
- r = ts_mux_packet(tm, pcr, tm->tm_packet, tm->tm_blocks_per_packet);
- if(r == -1) {
- ts_muxer_relock(tm, pcr);
- return;
- }
-
- tm->tm_callback(tm->tm_opaque, tm->tm_subscription, tm->tm_packet, r, pcr);
-
- /* Figure when next packet must be sent */
- next = INT64_MAX;
- LIST_FOREACH(tms, &tm->tm_active_streams, tms_muxer_link)
- if(tms->tms_nextblock < next)
- next = tms->tms_nextblock;
-
- next = next - pcr;
- } while(next < 2000);
-
- if(next > 100000)
- next = 100000; /* We always want to send PAT/PMT at this interval */
-
- dtimer_arm_hires(&tm->tm_timer, ts_gen_timer_callback, tm, clk + next);
-}
-
-/*
- *
- */
-static void
-ts_gen_timer_callback(void *aux, int64_t now)
-{
- th_muxer_t *tm = aux;
- ts_gen_packet(tm, now);
-}
-
-
-/*
- * start demuxing
- */
-static int
-ts_muxer_start(th_muxer_t *tm)
-{
- th_muxstream_t *tms;
- th_pkt_t *f, *l, *pkt;
- th_stream_t *st;
- int64_t v;
-
- LIST_FOREACH(tms, &tm->tm_stopped_streams, tms_muxer_link) {
- st = tms->tms_stream;
- if(st == NULL)
- continue;
-
- f = TAILQ_FIRST(&st->st_pktq);
- l = TAILQ_LAST(&st->st_pktq, th_pkt_queue);
-
- if(f == NULL)
- return -1;
-
- v = muxer_pkt_deadline(f, tms) - f->pkt_duration;
-
- if(tm->tm_start_dts < v) {
- tm->tm_start_dts = v;
- tms->tms_tmppkt = f;
- continue;
- }
-
- v = muxer_pkt_deadline(l, tms);
-
- if(tm->tm_start_dts > v) {
- tm->tm_start_dts = v - l->pkt_duration;
- tms->tms_tmppkt = l;
- continue;
- }
-
- tms->tms_tmppkt = NULL;
-
- while(f != NULL || l != NULL) {
-
- if(f != NULL)
- f = TAILQ_NEXT(f, pkt_queue_link);
-
- if(f != NULL) {
- v = muxer_pkt_deadline(f, tms);
- if(tm->tm_start_dts >= v - f->pkt_duration && tm->tm_start_dts < v) {
- tms->tms_tmppkt = f;
- break;
- }
- }
- if(l != NULL)
- l = TAILQ_PREV(l, th_pkt_queue, pkt_queue_link);
-
- if(l != NULL) {
- v = muxer_pkt_deadline(l, tms);
- if(tm->tm_start_dts >= v - l->pkt_duration && tm->tm_start_dts < v) {
- tms->tms_tmppkt = l;
- break;
- }
- }
- }
-
- if(tms->tms_tmppkt == NULL)
- return -1;
- }
-
- /* All streams have a lock */
-
- tm->tm_status = TM_PLAY;
-
- while((tms = LIST_FIRST(&tm->tm_stopped_streams)) != NULL) {
- st = tms->tms_stream;
- pkt = tms->tms_tmppkt;
-
- if(st == NULL) {
- LIST_REMOVE(tms, tms_muxer_link);
- LIST_INSERT_HEAD(&tm->tm_active_streams, tms, tms_muxer_link);
- } else {
- pkt_load(pkt);
-
- tms_stream_set_active(tm, tms, pkt, tm->tm_start_dts);
- }
- }
-
- tm->tm_clockref = getclock_hires();
- tm_gen_pat_pmt(tm, 0);
- ts_gen_packet(tm, tm->tm_clockref);
- return 0;
-}
-
-
-/*
- *
- */
-static void
-ts_muxer_reinit_stream(th_muxer_t *tm, th_muxstream_t *tms)
-{
- tms->tms_nextblock = INT64_MAX;
- LIST_REMOVE(tms, tms_muxer_link);
- LIST_INSERT_HEAD(&tm->tm_stopped_streams, tms, tms_muxer_link);
- tms_set_curpkt(tms, NULL);
-}
-
-/*
- *
- */
-static void
-ts_muxer_relock(th_muxer_t *tm, uint64_t pcr)
-{
- th_muxstream_t *tms;
-
- while((tms = LIST_FIRST(&tm->tm_active_streams)) != NULL)
- ts_muxer_reinit_stream(tm, tms);
-
- while((tms = LIST_FIRST(&tm->tm_stale_streams)) != NULL)
- ts_muxer_reinit_stream(tm, tms);
-
- tm->tm_start_dts = pcr;
- tm->tm_status = TM_WAITING_FOR_LOCK;
- ts_muxer_start(tm);
-}
+static const AVRational mpeg_tc = {1, 90000};
+#define PID_PMT 1000
+#define PID_ES_BASE 2000
/**
- * Push a MPEG TS packet to output, used in shortcut mode
+ * Send current packet
*/
static void
-ts_muxer_raw_push_packet(th_muxer_t *tm, void *data, uint16_t pid)
+ts_muxer_send_packet(ts_muxer_t *ts)
+{
+ if(ts->ts_block == 0)
+ return;
+
+ ts->ts_output(ts->ts_output_opaque, ts->ts_subscription, ts->ts_packet,
+ ts->ts_block, 0);
+ ts->ts_block = 0;
+}
+
+/**
+ * Push a MPEG TS packet to output
+ */
+static void
+ts_muxer_add_packet(ts_muxer_t *ts, void *data, uint16_t pid)
{
uint8_t *tsb;
- tsb = tm->tm_packet + tm->tm_block_offset * 188;
- tm->tm_block_offset++;
+ tsb = ts->ts_packet + ts->ts_block * 188;
+ ts->ts_block++;
memcpy(tsb, data, 188);
- /* Set PID */
-
tsb[2] = pid;
tsb[1] = (tsb[1] & 0xf0) | (pid >> 8);
- if(tm->tm_block_offset == tm->tm_blocks_per_packet) {
- tm->tm_callback(tm->tm_opaque, tm->tm_subscription, tm->tm_packet,
- tm->tm_block_offset, AV_NOPTS_VALUE);
- tm->tm_block_offset = 0;
- }
+ if(ts->ts_block == ts->ts_blocks_per_packet)
+ ts_muxer_send_packet(ts);
}
+/**
+ * Raw TS input
+ */
+static void
+ts_muxer_raw_input(struct th_subscription *s, void *data, int len,
+ th_stream_t *st, void *opaque)
+{
+ th_muxer_t *tm = s->ths_muxer;
+ ts_muxer_t *ts = opaque;
+ th_muxstream_t *tms;
+
+ LIST_FOREACH(tms, &tm->tm_streams, tms_muxer_link0)
+ if(tms->tms_stream == st)
+ break;
+
+ if(tms == NULL || tms->tms_index == 0)
+ return; /* Unknown / non-mapped stream */
+ ts_muxer_add_packet(ts, data, tms->tms_index);
+}
+
+
+
/**
* Function for encapsulating a short table into a transport stream packet
*/
static void
-ts_muxer_raw_push_table(th_muxer_t *tm, void *table, int tlen, int cc, int pid)
+ts_muxer_build_table(ts_muxer_t *ts, void *table, int tlen, int cc, int pid)
{
int pad;
-
uint8_t tsb0[188], *tsb;
tsb = tsb0;
@@ -675,323 +130,525 @@ ts_muxer_raw_push_table(th_muxer_t *tm, void *table, int tlen, int cc, int pid)
*tsb++ = 0; /* Pointer field for tables */
memcpy(tsb, table, tlen);
- ts_muxer_raw_push_packet(tm, tsb0, pid);
-}
-
-
-/**
- * Periodic timer callback for generating PAT/PMT tables when in
- * shortcut mode
- */
-static void
-ts_muxer_raw_table_generator(void *aux, int64_t now)
-{
- th_muxer_t *tm = aux;
- uint8_t table[180];
- th_muxstream_t *tms;
- int l;
-
- /* rearm timer */
- dtimer_arm_hires(&tm->tm_table_timer, ts_muxer_raw_table_generator,
- tm, now + 100000);
-
- l = psi_build_pat(NULL, table, sizeof(table));
- tms = tm->tm_pat;
- tms->tms_cc++;
- ts_muxer_raw_push_table(tm, table, l, tms->tms_cc, 0);
-
- l = psi_build_pmt(tm, table, sizeof(table));
- tms = tm->tm_pmt;
- tms->tms_cc++;
- ts_muxer_raw_push_table(tm, table, l, tms->tms_cc, 100);
+ ts_muxer_add_packet(ts, tsb0, pid);
}
/**
- * Shortcutted MPEG TS input
- */
-static void
-ts_muxer_raw_input(struct th_subscription *s, void *data, int len,
- th_stream_t *st, void *opaque)
-{
- th_muxer_t *tm = s->ths_muxer;
- th_muxstream_t *tms;
-
- LIST_FOREACH(tms, &tm->tm_media_streams, tms_muxer_media_link)
- if(tms->tms_stream == st)
- break;
-
- if(tms == NULL)
- return; /* Unknown / non-mapped stream */
-
- ts_muxer_raw_push_packet(tm, data, tms->tms_index);
-}
-
-
-/*
- * pause playback
- */
-void
-ts_muxer_pause(th_muxer_t *tm)
-{
- tm->tm_pauseref = getclock_hires();
-
- dtimer_disarm(&tm->tm_timer);
- tm->tm_status = TM_PAUSE;
-}
-
-
-
-
-/*
- * playback start
- */
-void
-ts_muxer_play(th_muxer_t *tm, int64_t toffset)
-{
- int64_t dts = 0, t;
- th_muxstream_t *tms;
- th_stream_t *st;
- int64_t clk;
- th_subscription_t *s = tm->tm_subscription;
-
- if(!(tm->tm_flags & TM_SEEKABLE) &&
- s->ths_transport->tht_source_type == THT_MPEG_TS) {
- /* We dont need to seek and source is MPEG TS, we can use a
- shortcut to avoid remuxing stream */
-
- s->ths_raw_input = ts_muxer_raw_input;
- dtimer_arm_hires(&tm->tm_table_timer, ts_muxer_raw_table_generator,
- tm, getclock_hires() + 100000);
- return;
- }
-
- /* Use normal muxer */
-
- if(!tm->tm_transport_linked) {
- LIST_INSERT_HEAD(&s->ths_transport->tht_muxers, tm, tm_transport_link);
- tm->tm_transport_linked = 1;
- }
-
- switch(tm->tm_status) {
- case TM_IDLE:
- case TM_WAITING_FOR_LOCK:
-
- toffset = 0;
- break;
-
- case TM_PLAY:
- if(toffset == AV_NOPTS_VALUE)
- return;
-
- toffset = 0;
- return; /* XXX */
-
- case TM_PAUSE:
- if(toffset == AV_NOPTS_VALUE) {
- /* Just continue stream, adjust clock reference with the amount
- of time we was paused */
- clk = getclock_hires();
- t = clk - tm->tm_pauseref;
-
- tm->tm_clockref += t;
- ts_gen_packet(tm, clk);
- return;
- }
- break;
- }
-
- assert(LIST_FIRST(&tm->tm_active_streams) == NULL);
- assert(LIST_FIRST(&tm->tm_stale_streams) == NULL);
-
- LIST_FOREACH(tms, &tm->tm_stopped_streams, tms_muxer_link) {
- if((st = tms->tms_stream) == NULL)
- continue;
- if(st->st_last_dts > dts)
- dts = st->st_last_dts;
- }
-
- dts -= toffset;
- if(dts < 0)
- dts = 0;
-
- tm->tm_start_dts = dts;
- tm->tm_status = TM_WAITING_FOR_LOCK;
-
- ts_muxer_start(tm);
-}
-
-
-/*
- * Meta streams
- */
-static th_muxstream_t *
-tm_create_meta_stream(th_muxer_t *tm, int pid)
-{
- th_muxstream_t *tms;
-
- tms = calloc(1, sizeof(th_muxstream_t));
- tms->tms_muxer = tm;
- LIST_INSERT_HEAD(&tm->tm_stopped_streams, tms, tms_muxer_link);
- tms->tms_index = pid;
- return tms;
-}
-
-
-
-/*
*
*/
static void
-ts_encode_new_packet(th_muxer_t *tm, th_stream_t *st, th_pkt_t *pkt)
+ts_muxer_generate_tables(void *aux, int64_t now)
+{
+ ts_muxer_t *ts = aux;
+ th_muxer_t *tm = ts->ts_muxer;
+ uint8_t table[180];
+ int l;
+
+ /* rearm timer */
+ dtimer_arm_hires(&ts->ts_patpmt_timer, ts_muxer_generate_tables,
+ ts, now + 100000);
+
+ l = psi_build_pat(NULL, table, sizeof(table), PID_PMT);
+ ts_muxer_build_table(ts, table, l, ts->ts_pat_cc, 0);
+ ts->ts_pat_cc++;
+
+ l = psi_build_pmt(tm, table, sizeof(table), ts->ts_pcrpid);
+ ts_muxer_build_table(ts, table, l, ts->ts_pmt_cc, PID_PMT);
+ ts->ts_pmt_cc++;
+
+ ts_muxer_send_packet(ts);
+}
+
+/**
+ *
+ */
+static int64_t
+get_delay(th_muxstream_t *tms)
+{
+ th_metapkt_t *f, *l;
+
+ f = TAILQ_FIRST(&tms->tms_metaqueue);
+ if(f == NULL)
+ return -1;
+
+ l = TAILQ_LAST(&tms->tms_metaqueue, th_metapkt_queue);
+
+ return l->tm_ts_stop - f->tm_ts_start; /* Delta time */
+
+}
+
+
+/**
+ *
+ */
+#if 0
+static int
+estimate_bitrate(th_muxstream_t *tms)
+{
+ int64_t delta, rate;
+
+ delta = get_delay(tms);
+ if(delta == -1)
+ return -1;
+ rate = (uint64_t)tms->tms_meta_bytes * 1000000LL / delta;
+ return rate;
+}
+#endif
+
+/**
+ *
+ */
+#if 0
+static int
+estimate_blockrate(th_muxstream_t *tms)
+{
+ int64_t delta, rate;
+
+ delta = get_delay(tms);
+ if(delta == -1)
+ return 0;
+ rate = (uint64_t)tms->tms_meta_packets * 1000000LL / delta;
+ return rate;
+}
+#endif
+
+/**
+ *
+ */
+#define PES_HEADER_SIZE 19
+
+static void
+ts_mux_gen_packets(ts_muxer_t *ts, th_muxstream_t *tms, th_pkt_t *pkt)
+{
+ th_metapkt_t *tm;
+ int off = 0;
+ uint8_t *tsb;
+ int64_t t;
+ int frrem, pad, tsrem, len;
+ uint16_t u16;
+ // int pcroffset;
+ int printts = 0;
+
+ if(printts)printf("Generating TS packets, DTS = %lld +%d\n", pkt->pkt_dts,
+ pkt->pkt_duration);
+
+ while(off < pkt->pkt_payloadlen) {
+
+
+ tm = malloc(sizeof(th_metapkt_t) + 188);
+ tsb = tm->tm_pkt;
+ tm->tm_pcroffset = 0;
+
+ /* Timestamp of first byte */
+ tm->tm_ts_start = pkt->pkt_duration * off /
+ (pkt->pkt_payloadlen + PES_HEADER_SIZE)
+ + pkt->pkt_dts - tms->tms_muxoffset;
+
+
+ if(ts->ts_flags & TS_HTSCLIENT) {
+ /* Temporary hack */
+ *tsb++ = tms->tms_stream->st_type;
+ } else {
+ /* TS marker */
+ *tsb++ = 0x47;
+ }
+
+
+ /* Write PID and optionally payload unit start indicator */
+ *tsb++ = tms->tms_index >> 8 | (off ? 0 : 0x40);
+ *tsb++ = tms->tms_index;
+
+ /* Remaing bytes after 4 bytes of TS header */
+ tsrem = 184;
+
+ if(off == 0) {
+ /* When writing the packet header, shave of a bit of available
+ payload size */
+ tsrem -= PES_HEADER_SIZE;
+ }
+
+ /* Remaining length of frame */
+ frrem = pkt->pkt_payloadlen - off;
+
+ /* Compute amout of padding needed */
+ pad = tsrem - frrem;
+
+ if(pad > 0) {
+ /* Must pad TS packet */
+
+ *tsb++ = 0x30;
+ tsrem -= pad;
+ *tsb++ = --pad;
+
+ memset(tsb, 0x00, pad);
+ tsb += pad;
+ } else {
+ *tsb++ = 0x10;
+ }
+
+
+ if(off == 0) {
+ /* Insert PES header */
+
+ /* Write startcode */
+
+ *tsb++ = 0;
+ *tsb++ = 0;
+ *tsb++ = tms->tms_sc >> 8;
+ *tsb++ = tms->tms_sc;
+
+ /* Write total frame length (without accounting for startcode and
+ length field itself */
+
+ len = pkt_len(pkt) + PES_HEADER_SIZE - 6;
+
+ if(len > 65535) {
+ /* It's okay to write len as 0 in transport streams,
+ but only for video frames, and i dont expect any of the
+ audio frames to exceed 64k
+ */
+ len = 0;
+ }
+
+ *tsb++ = len >> 8;
+ *tsb++ = len;
+
+ *tsb++ = 0x80; /* MPEG2 */
+ *tsb++ = 0xc0; /* pts & dts is present */
+ *tsb++ = 10; /* length of rest of header (pts & dts) */
+
+ /* Write PTS */
+
+ t = av_rescale_q(pkt->pkt_pts, AV_TIME_BASE_Q, mpeg_tc);
+ *tsb++ = (((t >> 30) & 7) << 1) | 1;
+ u16 = (((t >> 15) & 0x7fff) << 1) | 1;
+ *tsb++ = u16 >> 8;
+ *tsb++ = u16;
+ u16 = ((t & 0x7fff) << 1) | 1;
+ *tsb++ = u16 >> 8;
+ *tsb++ = u16;
+
+ /* Write DTS */
+
+ t = av_rescale_q(pkt->pkt_dts, AV_TIME_BASE_Q, mpeg_tc);
+ *tsb++ = (((t >> 30) & 7) << 1) | 1;
+ u16 = (((t >> 15) & 0x7fff) << 1) | 1;
+ *tsb++ = u16 >> 8;
+ *tsb++ = u16;
+ u16 = ((t & 0x7fff) << 1) | 1;
+ *tsb++ = u16 >> 8;
+ *tsb++ = u16;
+ }
+
+ memcpy(tsb, pkt->pkt_payload + off, tsrem);
+
+ /* Timestamp of last byte + 1 */
+ t = pkt->pkt_duration * (off + tsrem) / (pkt->pkt_payloadlen +
+ PES_HEADER_SIZE);
+
+ /* Fix any rounding errors */
+ if(t > pkt->pkt_duration)
+ t = pkt->pkt_duration;
+
+ tm->tm_ts_stop = pkt->pkt_dts + t - tms->tms_muxoffset;
+
+
+ if(printts)printf("TS: copy %7d (%3d bytes) pad = %7d: %lld - %lld\n",
+ off, tsrem, pad, tm->tm_ts_start, tm->tm_ts_stop);
+
+ TAILQ_INSERT_TAIL(&tms->tms_metaqueue, tm, tm_link);
+ tms->tms_meta_packets++;
+
+ off += tsrem;
+ }
+ if(printts)printf("end @ %lld\n", pkt->pkt_dts + pkt->pkt_duration);
+ if(printts)exit(0);
+}
+
+
+/**
+ *
+ */
+static int64_t
+check_total_delay(th_muxer_t *tm)
{
th_muxstream_t *tms;
- int64_t pcr;
-
- pkt_store(pkt); /* need to keep packet around */
-
- switch(tm->tm_status) {
- case TM_IDLE:
- break;
-
- case TM_WAITING_FOR_LOCK:
- ts_muxer_start(tm);
- break;
-
- case TM_PLAY:
- LIST_FOREACH(tms, &tm->tm_stale_streams, tms_muxer_link) {
- if(tms->tms_stream == st) {
- pcr = tm->tm_start_dts + getclock_hires() - tm->tm_clockref;
- tms_stream_set_active(tm, tms, pkt, pcr);
- break;
- }
- }
- break;
-
- case TM_PAUSE:
- break;
+ int64_t delta = -1, v;
+ LIST_FOREACH(tms, &tm->tm_streams, tms_muxer_link0) {
+ v = get_delay(tms);
+ if(v > delta)
+ delta = v;
}
+ return delta;
+}
+
+
+
+/**
+ *
+ */
+static int64_t
+get_start_pcr(th_muxer_t *tm)
+{
+ th_muxstream_t *tms;
+ th_metapkt_t *f;
+ int64_t r = INT64_MAX;
+
+ LIST_FOREACH(tms, &tm->tm_streams, tms_muxer_link0) {
+
+ f = TAILQ_FIRST(&tms->tms_metaqueue);
+ if(f == NULL)
+ continue;
+ if(f->tm_ts_start < r)
+ r = f->tm_ts_start;
+ }
+ return r;
}
-
-/*
- * TS Muxer
+/**
+ *
*/
-th_muxer_t *
-ts_muxer_init(th_subscription_t *s, th_mux_output_t *cb, void *opaque,
- int flags)
+static void
+ts_deliver(void *aux, int64_t now)
{
- th_transport_t *t = s->ths_transport;
- th_stream_t *st;
- th_muxer_t *tm;
+ ts_muxer_t *ts = aux;
+ th_muxer_t *tm = ts->ts_muxer;
+ th_muxstream_t *tms, *c;
+ int rate;
+ int64_t v, pcr;
+ th_metapkt_t *x, *y;
+ uint8_t *tsb;
+
+ int cnt;
+
+ pcr = now - ts->ts_pcr_ref + ts->ts_pcr_offset;
+
+ if(ts->ts_last_pcr + 20000 < now) {
+ LIST_FOREACH(tms, &tm->tm_streams, tms_muxer_link0)
+ if(tms->tms_index == ts->ts_pcrpid)
+ break;
+
+ if(tms != NULL) {
+ uint8_t pkt[188];
+ tsb = &pkt[0];
+ *tsb++ = 0x47;
+ *tsb++ = 0;
+ *tsb++ = 0;
+
+ /* Insert CC */
+ *tsb++ = 0x20 | (tms->tms_cc & 0xf);
+
+ *tsb++ = 183;
+ *tsb++ = 0x10; /* PCR flag */
+
+ v = av_rescale_q(pcr, AV_TIME_BASE_Q, mpeg_tc);
+ *tsb++ = v >> 25;
+ *tsb++ = v >> 17;
+ *tsb++ = v >> 9;
+ *tsb++ = v >> 1;
+ *tsb++ = (v & 1) << 7;
+ *tsb++ = 0;
+
+ ts_muxer_add_packet(ts, pkt, tms->tms_index);
+ ts->ts_last_pcr = now;
+ }
+ }
+
+ cnt = ts->ts_blocks_per_packet - ts->ts_block;
+
+ while(--cnt >= 0) {
+ c = LIST_FIRST(&tm->tm_streams);
+ tms = LIST_NEXT(c, tms_muxer_link0);
+ for(; tms != NULL; tms = LIST_NEXT(tms, tms_muxer_link0)) {
+ x = TAILQ_FIRST(&c->tms_metaqueue);
+ y = TAILQ_FIRST(&tms->tms_metaqueue);
+
+ if(x != NULL && y != NULL && y->tm_ts_start < x->tm_ts_start)
+ c = tms;
+ }
+
+ tms = NULL;
+
+ x = TAILQ_FIRST(&c->tms_metaqueue);
+ if(x == NULL) {
+ printf("underrun\n");
+ break;
+ }
+ TAILQ_REMOVE(&c->tms_metaqueue, x, tm_link);
+ c->tms_meta_packets--;
+
+ /* Insert CC */
+ x->tm_pkt[3] = (x->tm_pkt[3] & 0xf0) | (c->tms_cc & 0xf);
+ c->tms_cc++;
+
+ ts_muxer_add_packet(ts, x->tm_pkt, c->tms_index);
+ free(x);
+ }
+
+
+ rate = 0;
+ LIST_FOREACH(tms, &tm->tm_streams, tms_muxer_link0) {
+ rate += (tms->tms_meta_packets * 1000000ULL) / (uint64_t)TS_LOOKAHEAD;
+ // rate += estimate_blockrate(tms);
+ }
+
+#if 0
+ printf("TS blockrate = %d\n", rate);
+
+ LIST_FOREACH(tms, &tm->tm_streams, tms_muxer_link0) {
+ printf("%-10s: %-5d %-8lld | ",
+ htstvstreamtype2txt(tms->tms_stream->st_type),
+ tms->tms_meta_packets,
+ get_delay(tms));
+ }
+ printf("\n");
+#endif
+
+ v = 1000000 / (rate / ts->ts_blocks_per_packet);
+ dtimer_arm_hires(&ts->ts_stream_timer, ts_deliver, ts, now + v);
+}
+
+/**
+ *
+ */
+static void
+ts_mux_packet_input(void *opaque, th_muxstream_t *tms, th_pkt_t *pkt)
+{
+ ts_muxer_t *ts = opaque;
+ th_muxer_t *tm = ts->ts_muxer;
+ int64_t v, pcr;
+
+ ts_mux_gen_packets(ts, tms, pkt);
+
+ if(ts->ts_running == 0) {
+ v = check_total_delay(tm);
+ if(v < TS_LOOKAHEAD)
+ return;
+ pcr = get_start_pcr(tm);
+ if(pcr == INT64_MAX)
+ return;
+ ts->ts_pcr_offset = pcr;
+ ts->ts_pcr_ref = getclock_hires();
+ ts->ts_running = 1;
+ ts_deliver(ts, getclock_hires());
+ }
+}
+
+
+
+/**
+ *
+ */
+ts_muxer_t *
+ts_muxer_init(th_subscription_t *s, ts_mux_output_t *output,
+ void *opaque, int flags)
+{
+ ts_muxer_t *ts = calloc(1, sizeof(ts_muxer_t));
+ int dopcr;
+ int pididx = PID_ES_BASE;
th_muxstream_t *tms;
- int pididx = 100;
- uint32_t sc;
- int dopcr = 0;
- int offset;
+ th_muxer_t *tm;
+ th_stream_t *st;
- tm = calloc(1, sizeof(th_muxer_t));
- tm->tm_subscription = s;
+ ts->ts_pcr_offset = AV_NOPTS_VALUE;
+ ts->ts_pcr_ref = AV_NOPTS_VALUE;
- tm->tm_clockref = getclock_hires();
- tm->tm_blocks_per_packet = 7;
- tm->tm_callback = cb;
- tm->tm_opaque = opaque;
- tm->tm_flags = flags;
- tm->tm_new_pkt = ts_encode_new_packet;
+ ts->ts_output = output;
+ ts->ts_output_opaque = opaque;
+ ts->ts_flags = flags;
- tm->tm_packet = malloc(188 * tm->tm_blocks_per_packet);
+ ts->ts_subscription = s;
+ tm = ts->ts_muxer = muxer_init(s, ts_mux_packet_input, ts);
+
+ ts->ts_blocks_per_packet = 7;
+ ts->ts_packet = malloc(188 * ts->ts_blocks_per_packet);
- pididx = 200;
+ /* Do TS MUX specific init per stream */
+
+ LIST_FOREACH(tms, &tm->tm_streams, tms_muxer_link0) {
+ st = tms->tms_stream;
- LIST_FOREACH(st, &t->tht_streams, st_link) {
dopcr = 0;
- offset = 0;
switch(st->st_type) {
case HTSTV_MPEG2VIDEO:
- sc = 0x1e0;
+ tms->tms_muxoffset = 200000;
+ tms->tms_sc = 0x1ec;
dopcr = 1;
break;
case HTSTV_MPEG2AUDIO:
- sc = 0x1c0;
+ tms->tms_muxoffset = 75000;
+ tms->tms_sc = 0x1cd;
break;
case HTSTV_AC3:
- sc = 0x1bd;
+ tms->tms_muxoffset = 75000;
+ tms->tms_sc = 0x1bd;
break;
case HTSTV_H264:
- sc = 0x1e0;
+ tms->tms_muxoffset = 900000;
+ tms->tms_sc = 0x1e0;
dopcr = 1;
break;
default:
continue;
}
- tms = calloc(1, sizeof(th_muxstream_t));
- tms->tms_muxer = tm;
- tms->tms_stream = st;
- tms->tms_dopcr = dopcr;
- tms->tms_mux_offset = offset;
+ if(dopcr && ts->ts_pcrpid == 0)
+ ts->ts_pcrpid = pididx;
- LIST_INSERT_HEAD(&tm->tm_stopped_streams, tms, tms_muxer_link);
- LIST_INSERT_HEAD(&tm->tm_media_streams, tms, tms_muxer_media_link);
-
- tms->tms_index = pididx;
- tms->tms_sc = sc;
- pididx++;
+ tms->tms_index = pididx++;
}
-
- tm->tm_pat = tm_create_meta_stream(tm, 0);
- tm->tm_pmt = tm_create_meta_stream(tm, 100);
-
- s->ths_muxer = tm;
- return tm;
+ return ts;
}
-/*
+/**
*
*/
-static void
-tms_destroy(th_muxstream_t *tms)
+void
+ts_muxer_deinit(ts_muxer_t *ts, th_subscription_t *s)
{
- if(tms->tms_stream)
- LIST_REMOVE(tms, tms_muxer_media_link);
-
- LIST_REMOVE(tms, tms_muxer_link);
- tms_set_curpkt(tms, NULL);
- memset(tms, 0xff, sizeof(th_muxstream_t));
- free(tms);
+ free(ts->ts_packet);
+ muxer_deinit(ts->ts_muxer, s);
+ dtimer_disarm(&ts->ts_patpmt_timer);
+ free(ts);
}
-/*
+/**
*
*/
void
-ts_muxer_deinit(th_muxer_t *tm, th_subscription_t *s)
+ts_muxer_play(ts_muxer_t *ts, int64_t toffset)
{
- th_muxstream_t *tms;
+ th_subscription_t *s = ts->ts_muxer->tm_subscription;
- s->ths_raw_input = NULL;
- s->ths_muxer = NULL;
+ /* Start PAT / PMT generator */
+ ts_muxer_generate_tables(ts, getclock_hires());
- if(tm->tm_transport_linked)
- LIST_REMOVE(tm, tm_transport_link);
+ if(!(ts->ts_flags & TS_SEEK) &&
+ s->ths_transport->tht_source_type == THT_MPEG_TS) {
+ /* We dont need to seek and source is MPEG TS, we can use a
+ shortcut to avoid remuxing stream */
- dtimer_disarm(&tm->tm_timer);
- dtimer_disarm(&tm->tm_table_timer);
-
- tms_destroy(tm->tm_pat);
- tms_destroy(tm->tm_pmt);
-
- while((tms = LIST_FIRST(&tm->tm_media_streams)) != NULL)
- tms_destroy(tms);
-
- free(tm->tm_packet);
- free(tm);
+ s->ths_raw_input = ts_muxer_raw_input;
+ s->ths_opaque = ts;
+ } else {
+ muxer_play(ts->ts_muxer, toffset);
+ }
}
+
+/**
+ *
+ */
+void
+ts_muxer_pause(ts_muxer_t *ts)
+{
+ dtimer_disarm(&ts->ts_patpmt_timer);
+ muxer_pause(ts->ts_muxer);
+}
diff --git a/tsmux.h b/tsmux.h
index 4964dbf8..b3b5daf2 100644
--- a/tsmux.h
+++ b/tsmux.h
@@ -19,13 +19,53 @@
#ifndef TSMUX_H
#define TSMUX_H
-th_muxer_t *ts_muxer_init(th_subscription_t *s, th_mux_output_t *cb,
+typedef void (ts_mux_output_t)(void *opaque, th_subscription_t *s,
+ uint8_t *pkt, int npackets, int64_t pcr);
+
+
+typedef struct ts_muxer {
+ th_subscription_t *ts_subscription;
+ int ts_flags;
+#define TS_SEEK 0x1
+#define TS_HTSCLIENT 0x2
+
+ int ts_running;
+
+ th_muxer_t *ts_muxer;
+ ts_mux_output_t *ts_output;
+ void *ts_output_opaque;
+
+ int64_t ts_pcr_offset;
+ int64_t ts_pcr_ref;
+
+
+ int ts_pat_cc;
+ int ts_pmt_cc;
+
+ dtimer_t ts_patpmt_timer;
+
+ uint8_t *ts_packet;
+ int ts_block;
+ int ts_blocks_per_packet;
+
+ dtimer_t ts_stream_timer;
+
+ int ts_pcrpid;
+
+ int64_t ts_last_pcr;
+
+} ts_muxer_t;
+
+
+
+
+ts_muxer_t *ts_muxer_init(th_subscription_t *s, ts_mux_output_t *output,
void *opaque, int flags);
-void ts_muxer_deinit(th_muxer_t *tm, th_subscription_t *s);
+void ts_muxer_deinit(ts_muxer_t *ts, th_subscription_t *s);
-void ts_muxer_play(th_muxer_t *tm, int64_t toffset);
+void ts_muxer_play(ts_muxer_t *ts, int64_t toffset);
-void ts_muxer_pause(th_muxer_t *tm);
+void ts_muxer_pause(ts_muxer_t *ts);
#endif /* TSMUX_H */
diff --git a/tvhead.h b/tvhead.h
index e8f5792d..b0de4bbf 100644
--- a/tvhead.h
+++ b/tvhead.h
@@ -86,7 +86,7 @@ LIST_HEAD(th_pkt_list, th_pkt);
LIST_HEAD(th_muxer_list, th_muxer);
LIST_HEAD(th_muxstream_list, th_muxstream);
LIST_HEAD(th_descrambler_list, th_descrambler);
-
+TAILQ_HEAD(th_metapkt_queue, th_metapkt);
extern time_t dispatch_clock;
extern int startupcounter;
@@ -521,37 +521,39 @@ typedef struct th_pkt {
} th_pkt_t;
+
+/**
+ * Meta packets
+ */
+typedef struct th_metapkt {
+ TAILQ_ENTRY(th_metapkt) tm_link;
+ int64_t tm_ts_start;
+ int64_t tm_ts_stop;
+ int tm_pcroffset;
+ uint8_t tm_pkt[0];
+} th_metapkt_t;
+
+
/*
* A mux stream reader
*/
typedef struct th_muxstream {
- LIST_ENTRY(th_muxstream) tms_muxer_link;
+ LIST_ENTRY(th_muxstream) tms_muxer_link0;
struct th_muxer *tms_muxer;
-
- th_pkt_t *tms_curpkt;
-
- int tms_offset; /* offset in current packet */
-
th_stream_t *tms_stream;
- LIST_ENTRY(th_muxstream) tms_muxer_media_link;
-
- int64_t tms_nextblock; /* Time for delivery of next block */
- int tms_block_interval;
- int tms_block_rate;
-
- int tms_mux_offset;
-
int tms_index; /* Used as PID or whatever */
- th_pkt_t *tms_tmppkt; /* temporary pkt pointer during lock phaze */
+ struct th_metapkt_queue tms_metaqueue;
+ uint32_t tms_meta_packets; /* number of packets "" */
+
+
/* MPEG TS multiplex stuff */
int tms_sc; /* start code */
int tms_cc;
- int tms_dopcr;
- int64_t tms_nextpcr;
+ int64_t tms_muxoffset; /* DTS offset from PCR */
/* Memebers used when running with ffmpeg */
@@ -571,8 +573,8 @@ typedef struct th_muxstream {
struct th_subscription;
-typedef void (th_mux_output_t)(void *opaque, struct th_subscription *s,
- uint8_t *pkt, int blocks, int64_t pcr);
+typedef void (th_mux_output_t)(void *opaque, th_muxstream_t *tms,
+ th_pkt_t *pkt);
typedef void (th_mux_newpkt_t)(struct th_muxer *tm, th_stream_t *st,
@@ -583,33 +585,17 @@ typedef struct th_muxer {
th_mux_newpkt_t *tm_new_pkt;
LIST_ENTRY(th_muxer) tm_transport_link;
- int tm_transport_linked;
+ int tm_linked;
- int64_t tm_clockref; /* Base clock ref */
- int64_t tm_pauseref; /* Time when we were paused */
+ int64_t tm_offset;
- int tm_flags;
-#define TM_HTSCLIENTMODE 0x1 /* Ugly workaround for now */
-#define TM_SEEKABLE 0x2 /* We need the pause / seek to work */
-
- int64_t tm_start_dts;
- int64_t tm_next_pat;
-
- struct th_muxstream_list tm_media_streams;
-
- struct th_muxstream_list tm_active_streams;
- struct th_muxstream_list tm_stale_streams;
- struct th_muxstream_list tm_stopped_streams;
-
- uint8_t *tm_packet;
- int tm_blocks_per_packet;
+ struct th_muxstream_list tm_streams;
struct th_subscription *tm_subscription;
- th_mux_output_t *tm_callback;
+ th_mux_output_t *tm_output;
void *tm_opaque;
- dtimer_t tm_timer;
enum {
TM_IDLE,
@@ -618,16 +604,7 @@ typedef struct th_muxer {
TM_PAUSE,
} tm_status;
- th_muxstream_t *tm_pat;
- th_muxstream_t *tm_pmt;
-
struct AVFormatContext *tm_avfctx;
-
- int tm_drop_rate;
- int tm_drop_cnt;
-
- dtimer_t tm_table_timer;
- int tm_block_offset;
} th_muxer_t;