diff --git a/src/dvb/dvb_adapter.c b/src/dvb/dvb_adapter.c index 5103d38f..7a0d1e0e 100644 --- a/src/dvb/dvb_adapter.c +++ b/src/dvb/dvb_adapter.c @@ -448,7 +448,7 @@ dvb_adapter_input_dvr(void *aux) for(i = 0; i < r; i += 188) LIST_FOREACH(t, &tda->tda_transports, tht_active_link) if(t->tht_dvb_mux_instance == tda->tda_mux_current) - ts_recv_packet1(t, tsb + i); + ts_recv_packet1(t, tsb + i, NULL); pthread_mutex_unlock(&tda->tda_delivery_mutex); } diff --git a/src/iptv_input.c b/src/iptv_input.c index f3dc1157..be94f686 100644 --- a/src/iptv_input.c +++ b/src/iptv_input.c @@ -111,7 +111,7 @@ iptv_ts_input(th_transport_t *t, uint8_t *tsb) psi_rawts_table_parser(t->tht_pmt_section, tsb, iptv_got_pmt, t); } else { - ts_recv_packet1(t, tsb); + ts_recv_packet1(t, tsb, NULL); } } diff --git a/src/rawtsinput.c b/src/rawtsinput.c index d36cac69..4b1b15e5 100644 --- a/src/rawtsinput.c +++ b/src/rawtsinput.c @@ -50,6 +50,7 @@ static int rawts_transport_start(th_transport_t *t, unsigned int weight, int status, int force_start) { + t->tht_status = TRANSPORT_RUNNING; return 0; // Always ok } @@ -59,7 +60,7 @@ rawts_transport_start(th_transport_t *t, unsigned int weight, int status, static void rawts_transport_stop(th_transport_t *t) { - + t->tht_status = TRANSPORT_IDLE; } /** @@ -236,6 +237,7 @@ process_ts_packet(rawts_t *rt, uint8_t *tsb) { uint16_t pid; th_transport_t *t; + int64_t pcr, d; pid = ((tsb[1] & 0x1f) << 8) | tsb[2]; @@ -245,8 +247,27 @@ process_ts_packet(rawts_t *rt, uint8_t *tsb) return; } - LIST_FOREACH(t, &rt->rt_transports, tht_group_link) - ts_recv_packet1(t, tsb); + LIST_FOREACH(t, &rt->rt_transports, tht_group_link) { + pcr = AV_NOPTS_VALUE; + + ts_recv_packet1(t, tsb, &pcr); + + if(pcr != AV_NOPTS_VALUE) { + + if(t->tht_pcr_last != AV_NOPTS_VALUE) { + struct timespec slp; + + d = pcr - t->tht_pcr_last + t->tht_pcr_last_realtime; + + slp.tv_sec = d / 1000000; + slp.tv_nsec = (d % 1000000) * 1000; + + clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &slp, NULL); + } + t->tht_pcr_last = pcr; + t->tht_pcr_last_realtime = getmonoclock(); + } + } } @@ -271,7 +292,6 @@ raw_ts_reader(void *aux) c++; process_ts_packet(rt, tsblock); } - usleep(1000); } return NULL; diff --git a/src/transports.c b/src/transports.c index c7fe8c12..5f9c6013 100644 --- a/src/transports.c +++ b/src/transports.c @@ -488,6 +488,7 @@ transport_create(const char *identifier, int type, int source_type) t->tht_source_type = source_type; t->tht_refcount = 1; t->tht_enabled = 1; + t->tht_pcr_last = AV_NOPTS_VALUE; streaming_pad_init(&t->tht_streaming_pad); diff --git a/src/tsdemux.c b/src/tsdemux.c index 0bf1877e..4a5f863d 100644 --- a/src/tsdemux.c +++ b/src/tsdemux.c @@ -138,11 +138,13 @@ ts_recv_packet0(th_transport_t *t, th_stream_t *st, uint8_t *tsb) if(off > 188) break; - parse_mpeg_ts(t, st, tsb + off, 188 - off, pusi, err); + if(t->tht_status == TRANSPORT_RUNNING) + parse_mpeg_ts(t, st, tsb + off, 188 - off, pusi, err); break; } } +static const AVRational mpeg_tc = {1, 90000}; /** * Recover PCR @@ -151,10 +153,10 @@ ts_recv_packet0(th_transport_t *t, th_stream_t *st, uint8_t *tsb) * than the stream PCR */ static void -ts_extract_pcr(th_transport_t *t, th_stream_t *st, uint8_t *tsb) +ts_extract_pcr(th_transport_t *t, th_stream_t *st, uint8_t *tsb, + int64_t *pcrp) { - int64_t real = getmonoclock(); - int64_t pcr, d; + int64_t real, pcr, d; pcr = (uint64_t)tsb[6] << 25; pcr |= (uint64_t)tsb[7] << 17; @@ -162,7 +164,15 @@ ts_extract_pcr(th_transport_t *t, th_stream_t *st, uint8_t *tsb) pcr |= (uint64_t)tsb[9] << 1; pcr |= ((uint64_t)tsb[10] >> 7) & 0x01; - pcr = av_rescale_q(pcr, st->st_tb, AV_TIME_BASE_Q); + pcr = av_rescale_q(pcr, mpeg_tc, AV_TIME_BASE_Q); + + if(pcrp != NULL) + *pcrp = pcr; + + if(st == NULL) + return; + + real = getmonoclock(); if(st->st_pcr_real_last != AV_NOPTS_VALUE) { d = (real - st->st_pcr_real_last) - (pcr - st->st_pcr_last); @@ -193,7 +203,7 @@ ts_extract_pcr(th_transport_t *t, th_stream_t *st, uint8_t *tsb) * Process transport stream packets, extract PCR and optionally descramble */ void -ts_recv_packet1(th_transport_t *t, uint8_t *tsb) +ts_recv_packet1(th_transport_t *t, uint8_t *tsb, int64_t *pcrp) { th_stream_t *st; int pid, n, m, r; @@ -207,20 +217,20 @@ ts_recv_packet1(th_transport_t *t, uint8_t *tsb) t->tht_input_status = TRANSPORT_FEED_RAW_INPUT; pid = (tsb[1] & 0x1f) << 8 | tsb[2]; - if((st = transport_find_stream_by_pid(t, pid)) == NULL) - return; - - if(t->tht_status != TRANSPORT_RUNNING) - return; pthread_mutex_lock(&t->tht_stream_mutex); - - avgstat_add(&t->tht_rate, 188, dispatch_clock); + st = transport_find_stream_by_pid(t, pid); /* Extract PCR */ if(tsb[3] & 0x20 && tsb[4] > 0 && tsb[5] & 0x10) - ts_extract_pcr(t, st, tsb); + ts_extract_pcr(t, st, tsb, pcrp); + if(st == NULL) { + pthread_mutex_unlock(&t->tht_stream_mutex); + return; + } + + avgstat_add(&t->tht_rate, 188, dispatch_clock); if((tsb[3] & 0xc0) || (t->tht_scrambled_seen && st->st_type != SCT_CA && diff --git a/src/tsdemux.h b/src/tsdemux.h index 108dfcd5..d6b9c428 100644 --- a/src/tsdemux.h +++ b/src/tsdemux.h @@ -19,7 +19,7 @@ #ifndef TSDEMUX_H #define TSDEMUX_H -void ts_recv_packet1(th_transport_t *t, uint8_t *tsb); +void ts_recv_packet1(th_transport_t *t, uint8_t *tsb, int64_t *pcrp); void ts_recv_packet2(th_transport_t *t, uint8_t *tsb); diff --git a/src/tvhead.h b/src/tvhead.h index 8d1d0f6f..d7e1a0b7 100644 --- a/src/tvhead.h +++ b/src/tvhead.h @@ -457,6 +457,11 @@ typedef struct th_transport { */ int tht_enabled; + /** + * Last PCR seen, we use it for a simple clock for rawtsinput.c + */ + int64_t tht_pcr_last; + int64_t tht_pcr_last_realtime; LIST_ENTRY(th_transport) tht_group_link;