diff --git a/src/plumbing/tsfix.c b/src/plumbing/tsfix.c index 14278632..1b868801 100644 --- a/src/plumbing/tsfix.c +++ b/src/plumbing/tsfix.c @@ -65,6 +65,7 @@ typedef struct tsfix { time_t tf_start_time; struct th_pktref_queue tf_ptsq; + struct th_pktref_queue tf_backlog; } tsfix_t; @@ -99,6 +100,7 @@ tsfix_destroy_streams(tsfix_t *tf) { tfstream_t *tfs; pktref_clear_queue(&tf->tf_ptsq); + pktref_clear_queue(&tf->tf_backlog); while((tfs = LIST_FIRST(&tf->tf_streams)) != NULL) { LIST_REMOVE(tfs, tfs_link); free(tfs); @@ -164,6 +166,7 @@ tsfix_start(tsfix_t *tf, streaming_start_t *ss) } TAILQ_INIT(&tf->tf_ptsq); + TAILQ_INIT(&tf->tf_backlog); tf->tf_tsref = PTS_UNSET; tf->tf_hasvideo = hasvideo; @@ -185,12 +188,15 @@ tsfix_stop(tsfix_t *tf) * */ static void -normalize_ts(tsfix_t *tf, tfstream_t *tfs, th_pkt_t *pkt) +normalize_ts(tsfix_t *tf, tfstream_t *tfs, th_pkt_t *pkt, int backlog) { int64_t ref, dts, d; if(tf->tf_tsref == PTS_UNSET) { - pkt_ref_dec(pkt); + if (backlog) + pktref_enqueue(&tf->tf_backlog, pkt); + else + pkt_ref_dec(pkt); return; } @@ -269,8 +275,54 @@ normalize_ts(tsfix_t *tf, tfstream_t *tfs, th_pkt_t *pkt) } +/** + * + */ +static void +tsfix_backlog(tsfix_t *tf) +{ + th_pkt_t *pkt; + th_pktref_t *pr; + tfstream_t *tfs; + + while((pr = TAILQ_FIRST(&tf->tf_backlog)) != NULL) { + pkt = pr->pr_pkt; + TAILQ_REMOVE(&tf->tf_backlog, pr, pr_link); + free(pr); + tfs = tfs_find(tf, pkt); + normalize_ts(tf, tfs, pkt, 0); + } +} +/** + * + */ +static int64_t +tsfix_backlog_diff(tsfix_t *tf) +{ + th_pkt_t *pkt; + th_pktref_t *pr; + tfstream_t *tfs; + int64_t res = 0; + + TAILQ_FOREACH(pr, &tf->tf_backlog, pr_link) { + pkt = pr->pr_pkt; + if (pkt->pkt_dts == PTS_UNSET) continue; + if (pkt->pkt_dts >= tf->tf_tsref) continue; + if (tf->tf_tsref > (PTS_MASK * 3) / 4 && + pkt->pkt_dts < PTS_MASK / 4) continue; + tfs = tfs_find(tf, pkt); + if (!tfs->tfs_audio && !tfs->tfs_video) continue; + res = MAX(tsfix_ts_diff(pkt->pkt_dts, tf->tf_tsref), res); + } + return res; +} + + +/** + * + */ static void recover_pts(tsfix_t *tf, tfstream_t *tfs, th_pkt_t *pkt) { @@ -324,7 +376,7 @@ recover_pts(tsfix_t *tf, tfstream_t *tfs, th_pkt_t *pkt) } free(pr); - normalize_ts(tf, tfs, pkt); + normalize_ts(tf, tfs, pkt, 1); } } @@ -345,7 +397,7 @@ compute_pts(tsfix_t *tf, tfstream_t *tfs, th_pkt_t *pkt) /* PTS known and no other packets in queue, deliver at once */ if(pkt->pkt_pts != PTS_UNSET && TAILQ_FIRST(&tf->tf_ptsq) == NULL) - normalize_ts(tf, tfs, pkt); + normalize_ts(tf, tfs, pkt, 1); else recover_pts(tf, tfs, pkt); } @@ -360,7 +412,7 @@ tsfix_input_packet(tsfix_t *tf, streaming_message_t *sm) th_pkt_t *pkt = pkt_copy_shallow(sm->sm_data); tfstream_t *tfs = tfs_find(tf, pkt); streaming_msg_free(sm); - int64_t diff; + int64_t diff, diff2; if(tfs == NULL || dispatch_clock < tf->tf_start_time) { pkt_ref_dec(pkt); @@ -371,7 +423,12 @@ tsfix_input_packet(tsfix_t *tf, streaming_message_t *sm) ((!tf->tf_hasvideo && tfs->tfs_audio) || (tfs->tfs_video && pkt->pkt_frametype == PKT_I_FRAME))) { tf->tf_tsref = pkt->pkt_dts & PTS_MASK; - tsfixprintf("reference clock set to %"PRId64"\n", tf->tf_tsref); + diff = diff2 = tsfix_backlog_diff(tf); + if (diff > 160000) + diff = 160000; + tf->tf_tsref = (tf->tf_tsref - diff) % PTS_MASK; + tvhtrace("parser", "reference clock set to %"PRId64" (backlog %"PRId64")\n", tf->tf_tsref, diff2); + tsfix_backlog(tf); } else if (tfs->tfs_local_ref == PTS_UNSET && tf->tf_tsref != PTS_UNSET && pkt->pkt_dts != PTS_UNSET) { if (tfs->tfs_audio) {