From a3ff11f7823940b33285f3fa54ed9cd7670ffb4d Mon Sep 17 00:00:00 2001 From: Adam Sutton Date: Tue, 15 Jan 2013 11:01:21 +0000 Subject: [PATCH] timeshift: add periodic status message indicating buffer state etc. --- src/dvr/dvr_rec.c | 1 + src/htsp_server.c | 39 ++++++++++++------- src/plumbing/globalheaders.c | 2 + src/plumbing/tsfix.c | 1 + src/streaming.c | 11 +++++- src/timeshift.h | 8 ++++ src/timeshift/timeshift_reader.c | 67 +++++++++++++++++++++++++++++--- src/timeshift/timeshift_writer.c | 1 + src/tvheadend.h | 6 ++- src/webui/webui.c | 1 + 10 files changed, 115 insertions(+), 22 deletions(-) diff --git a/src/dvr/dvr_rec.c b/src/dvr/dvr_rec.c index 0c98f533..39b929e7 100755 --- a/src/dvr/dvr_rec.c +++ b/src/dvr/dvr_rec.c @@ -541,6 +541,7 @@ dvr_thread(void *aux) case SMT_SPEED: case SMT_SKIP: case SMT_SIGNAL_STATUS: + case SMT_TIMESHIFT_STATUS: break; case SMT_EXIT: diff --git a/src/htsp_server.c b/src/htsp_server.c index 75b15d41..308f64ae 100644 --- a/src/htsp_server.c +++ b/src/htsp_server.c @@ -2169,11 +2169,7 @@ const static char frametypearray[PKT_NTYPES] = { * Build a htsmsg from a th_pkt and enqueue it on our HTSP service */ static void -#if ENABLE_TIMESHIFT -htsp_stream_deliver(htsp_subscription_t *hs, th_pkt_t *pkt, uint64_t timeshift) -#else htsp_stream_deliver(htsp_subscription_t *hs, th_pkt_t *pkt) -#endif { htsmsg_t *m; htsp_msg_t *hm; @@ -2201,12 +2197,6 @@ htsp_stream_deliver(htsp_subscription_t *hs, th_pkt_t *pkt) htsmsg_add_u32(m, "stream", pkt->pkt_componentindex); htsmsg_add_u32(m, "com", pkt->pkt_commercial); -#if ENABLE_TIMESHIFT - if (timeshift) - htsmsg_add_s64(m, "timeshift", timeshift); -#endif - - if(pkt->pkt_pts != PTS_UNSET) { int64_t pts = hs->hs_90khz ? pkt->pkt_pts : ts_rescale(pkt->pkt_pts, 1000000); htsmsg_add_s64(m, "pts", pts); @@ -2451,6 +2441,25 @@ htsp_subscription_skip(htsp_subscription_t *hs, streaming_skip_t *skip) htsp_send(hs->hs_htsp, m, NULL, &hs->hs_q, 0); } +/** + * + */ +#if ENABLE_TIMESHIFT +static void +htsp_subscription_timeshift_status(htsp_subscription_t *hs, timeshift_status_t *status) +{ + htsmsg_t *m = htsmsg_create_map(); + htsmsg_add_str(m, "method", "timeshiftStatus"); + htsmsg_add_u32(m, "full", status->full); + htsmsg_add_s64(m, "shift", hs->hs_90khz ? status->shift : ts_rescale(status->shift, 1000000)); + if (status->pts_start != PTS_UNSET) + htsmsg_add_s64(m, "start", hs->hs_90khz ? status->pts_start : ts_rescale(status->pts_start, 1000000)) ; + if (status->pts_end != PTS_UNSET) + htsmsg_add_s64(m, "end", hs->hs_90khz ? status->pts_end : ts_rescale(status->pts_end, 1000000)) ; + htsp_send(hs->hs_htsp, m, NULL, &hs->hs_q, 0); +} +#endif + /** * */ @@ -2461,11 +2470,7 @@ htsp_streaming_input(void *opaque, streaming_message_t *sm) switch(sm->sm_type) { case SMT_PACKET: -#if ENABLE_TIMESHIFT - htsp_stream_deliver(hs, sm->sm_data, sm->sm_timeshift); -#else htsp_stream_deliver(hs, sm->sm_data); -#endif // reference is transfered sm->sm_data = NULL; break; @@ -2503,6 +2508,12 @@ htsp_streaming_input(void *opaque, streaming_message_t *sm) case SMT_SPEED: htsp_subscription_speed(hs, sm->sm_code); break; + + case SMT_TIMESHIFT_STATUS: +#if ENABLE_TIMESHIFT + htsp_subscription_timeshift_status(hs, sm->sm_data); +#endif + break; } streaming_msg_free(sm); } diff --git a/src/plumbing/globalheaders.c b/src/plumbing/globalheaders.c index 509da87b..7eda3b02 100644 --- a/src/plumbing/globalheaders.c +++ b/src/plumbing/globalheaders.c @@ -257,6 +257,7 @@ gh_hold(globalheaders_t *gh, streaming_message_t *sm) case SMT_MPEGTS: case SMT_SPEED: case SMT_SKIP: + case SMT_TIMESHIFT_STATUS: streaming_target_deliver2(gh->gh_output, sm); break; } @@ -287,6 +288,7 @@ gh_pass(globalheaders_t *gh, streaming_message_t *sm) case SMT_MPEGTS: case SMT_SKIP: case SMT_SPEED: + case SMT_TIMESHIFT_STATUS: streaming_target_deliver2(gh->gh_output, sm); break; diff --git a/src/plumbing/tsfix.c b/src/plumbing/tsfix.c index c54b76fc..858e59ed 100644 --- a/src/plumbing/tsfix.c +++ b/src/plumbing/tsfix.c @@ -371,6 +371,7 @@ tsfix_input(void *opaque, streaming_message_t *sm) case SMT_MPEGTS: case SMT_SPEED: case SMT_SKIP: + case SMT_TIMESHIFT_STATUS: break; } diff --git a/src/streaming.c b/src/streaming.c index 3173d3b0..2e0cace3 100755 --- a/src/streaming.c +++ b/src/streaming.c @@ -23,6 +23,7 @@ #include "packet.h" #include "atomic.h" #include "service.h" +#include "timeshift.h" void streaming_pad_init(streaming_pad_t *sp) @@ -139,7 +140,6 @@ streaming_msg_create(streaming_message_type_t type) sm->sm_type = type; #if ENABLE_TIMESHIFT sm->sm_time = 0; - sm->sm_timeshift = 0; #endif return sm; } @@ -195,7 +195,6 @@ streaming_msg_clone(streaming_message_t *src) dst->sm_type = src->sm_type; #if ENABLE_TIMESHIFT dst->sm_time = src->sm_time; - dst->sm_timeshift = src->sm_timeshift; #endif switch(src->sm_type) { @@ -220,6 +219,11 @@ streaming_msg_clone(streaming_message_t *src) memcpy(dst->sm_data, src->sm_data, sizeof(signal_status_t)); break; + case SMT_TIMESHIFT_STATUS: + dst->sm_data = malloc(sizeof(timeshift_status_t)); + memcpy(dst->sm_data, src->sm_data, sizeof(timeshift_status_t)); + break; + case SMT_SPEED: case SMT_STOP: case SMT_SERVICE_STATUS: @@ -286,6 +290,9 @@ streaming_msg_free(streaming_message_t *sm) case SMT_SKIP: case SMT_SIGNAL_STATUS: +#if ENABLE_TIMESHIFT + case SMT_TIMESHIFT_STATUS: +#endif free(sm->sm_data); break; diff --git a/src/timeshift.h b/src/timeshift.h index 342c7c66..3c6fc0ab 100644 --- a/src/timeshift.h +++ b/src/timeshift.h @@ -30,6 +30,14 @@ extern size_t timeshift_max_size; extern size_t timeshift_total_size; extern pthread_mutex_t timeshift_size_lock; +typedef struct timeshift_status +{ + int full; + int64_t shift; + int64_t pts_start; + int64_t pts_end; +} timeshift_status_t; + void timeshift_init ( void ); void timeshift_term ( void ); void timeshift_save ( void ); diff --git a/src/timeshift/timeshift_reader.c b/src/timeshift/timeshift_reader.c index 5bb509a1..ad948d96 100644 --- a/src/timeshift/timeshift_reader.c +++ b/src/timeshift/timeshift_reader.c @@ -187,6 +187,36 @@ static streaming_message_t *_timeshift_find_sstart return ti ? ti->data : NULL; } +static timeshift_index_iframe_t *_timeshift_first_frame + ( timeshift_t *ts ) +{ + int end; + timeshift_index_iframe_t *tsi = NULL; + timeshift_file_t *tsf = timeshift_filemgr_last(ts); + while (tsf && !tsi) { + if (!(tsi = TAILQ_FIRST(&tsf->iframes))) + tsf = timeshift_filemgr_next(tsf, &end, 0); + } + if (tsf) + tsf->refcount--; + return tsi; +} + +static timeshift_index_iframe_t *_timeshift_last_frame + ( timeshift_t *ts ) +{ + int end; + timeshift_index_iframe_t *tsi = NULL; + timeshift_file_t *tsf = timeshift_filemgr_get(ts, ts->ondemand); + while (tsf && !tsi) { + if (!(tsi = TAILQ_LAST(&tsf->iframes, timeshift_index_iframe_list))) + tsf = timeshift_filemgr_prev(tsf, &end, 0); + } + if (tsf) + tsf->refcount--; + return tsi; +} + static int _timeshift_skip ( timeshift_t *ts, int64_t req_time, int64_t cur_time, timeshift_file_t *cur_file, timeshift_file_t **new_file, @@ -351,8 +381,8 @@ static int _timeshift_flush_to_live if (!*sm) break; if ((*sm)->sm_type == SMT_PACKET) { pts = ((th_pkt_t*)(*sm)->sm_data)->pkt_pts; - tvhlog(LOG_DEBUG, "timeshift", "ts %d deliver %"PRId64" pts=%"PRItime_t " shift=%"PRIu64, - ts->id, (*sm)->sm_time, pts, (*sm)->sm_timeshift ); + tvhlog(LOG_DEBUG, "timeshift", "ts %d deliver %"PRId64" pts=%"PRItime_t, + ts->id, (*sm)->sm_time, pts); } streaming_target_deliver2(ts->output, *sm); *sm = NULL; @@ -360,7 +390,6 @@ static int _timeshift_flush_to_live return 0; } - /* ************************************************************************** * Thread * *************************************************************************/ @@ -380,6 +409,7 @@ void *timeshift_reader ( void *p ) streaming_message_t *sm = NULL, *ctrl = NULL; timeshift_index_iframe_t *tsi = NULL; streaming_skip_t *skip = NULL; + time_t last_status = 0; /* Poll */ struct epoll_event ev = { 0 }; @@ -391,6 +421,11 @@ void *timeshift_reader ( void *p ) /* Output */ while (run) { + // Note: Previously we allowed unlimited wait, but we now must wake periodically + // to output status message + if (wait < 0 || wait > 1000) + wait = 1000; + /* Wait for data */ if(wait) nfds = epoll_wait(efd, &ev, 1, wait); @@ -565,6 +600,28 @@ void *timeshift_reader ( void *p ) } } + /* Status message */ + if (now >= (last_status + 1000000)) { + streaming_message_t *tsm; + timeshift_status_t *status; + timeshift_index_iframe_t *fst, *lst; + status = calloc(1, sizeof(timeshift_status_t)); + fst = _timeshift_first_frame(ts); + lst = _timeshift_last_frame(ts); + status->full = ts->full; + status->shift = ts->state <= TS_LIVE ? 0 : ts_rescale_i(now - last_time, 1000000); + if (lst && fst && lst != fst && ts->pts_delta != PTS_UNSET) { + status->pts_start = ts_rescale_i(fst->time - ts->pts_delta, 1000000); + status->pts_end = ts_rescale_i(lst->time - ts->pts_delta, 1000000); + } else { + status->pts_start = PTS_UNSET; + status->pts_end = PTS_UNSET; + } + tsm = streaming_msg_create_data(SMT_TIMESHIFT_STATUS, status); + streaming_target_deliver2(ts->output, tsm); + last_status = now; + } + /* Done */ if (!run || !cur_file || ((ts->state != TS_PLAY && !skip))) { pthread_mutex_unlock(&ts->state_mutex); @@ -644,16 +701,16 @@ void *timeshift_reader ( void *p ) (((cur_speed < 0) && (sm->sm_time >= deliver)) || ((cur_speed > 0) && (sm->sm_time <= deliver))))) { - sm->sm_timeshift = now - sm->sm_time; #ifndef TSHFT_TRACE if (skip) #endif { time_t pts = 0; + int64_t delta = now - sm->sm_time; if (sm->sm_type == SMT_PACKET) pts = ((th_pkt_t*)sm->sm_data)->pkt_pts; tvhlog(LOG_DEBUG, "timeshift", "ts %d deliver %"PRId64" pts=%"PRItime_t " shift=%"PRIu64, - ts->id, sm->sm_time, pts, sm->sm_timeshift ); + ts->id, sm->sm_time, pts, delta); } streaming_target_deliver2(ts->output, sm); last_time = sm->sm_time; diff --git a/src/timeshift/timeshift_writer.c b/src/timeshift/timeshift_writer.c index 8bf6be03..bf944dbe 100644 --- a/src/timeshift/timeshift_writer.c +++ b/src/timeshift/timeshift_writer.c @@ -257,6 +257,7 @@ static void _process_msg /* Status */ case SMT_NOSTART: case SMT_SERVICE_STATUS: + case SMT_TIMESHIFT_STATUS: break; /* Store */ diff --git a/src/tvheadend.h b/src/tvheadend.h index d8446be7..7b9e95be 100644 --- a/src/tvheadend.h +++ b/src/tvheadend.h @@ -322,6 +322,11 @@ typedef enum { */ SMT_SKIP, + /** + * Timeshift status + */ + SMT_TIMESHIFT_STATUS, + } streaming_message_type_t; #define SMT_TO_MASK(x) (1 << ((unsigned int)x)) @@ -359,7 +364,6 @@ typedef struct streaming_message { streaming_message_type_t sm_type; #if ENABLE_TIMESHIFT int64_t sm_time; - uint64_t sm_timeshift; #endif union { void *sm_data; diff --git a/src/webui/webui.c b/src/webui/webui.c index 3e1e525c..29fddf05 100644 --- a/src/webui/webui.c +++ b/src/webui/webui.c @@ -247,6 +247,7 @@ http_stream_run(http_connection_t *hc, streaming_queue_t *sq, case SMT_SKIP: case SMT_SPEED: case SMT_SIGNAL_STATUS: + case SMT_TIMESHIFT_STATUS: break; case SMT_NOSTART: