From 86c4a969f72a3dcfa79e2ef82db823936d15df91 Mon Sep 17 00:00:00 2001 From: Adam Sutton Date: Wed, 9 Jan 2013 12:30:11 +0000 Subject: [PATCH] timeshift: finish timeshift skip support. I now have a more complete skip support implemented. This includes properly handling buffer ends. I have tested with a custom pvr.hts in XBMC, but it does need plenty more. --- src/htsp_server.c | 27 ++++- src/timeshift/private.h | 2 + src/timeshift/timeshift_filemgr.c | 9 ++ src/timeshift/timeshift_reader.c | 182 +++++++++++++++++++++++------- src/tvheadend.h | 1 + 5 files changed, 176 insertions(+), 45 deletions(-) diff --git a/src/htsp_server.c b/src/htsp_server.c index 77556d6e..00eda674 100644 --- a/src/htsp_server.c +++ b/src/htsp_server.c @@ -1300,8 +1300,6 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in) streaming_target_t *st = &hs->hs_input; - if(normts) - st = hs->hs_tsfix = tsfix_create(st); #if ENABLE_TIMESHIFT if (timeshiftPeriod != 0) { if (timeshiftPeriod == ~0) @@ -1309,8 +1307,11 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in) else tvhlog(LOG_DEBUG, "htsp", "using timeshift buffer (%u mins)", timeshiftPeriod / 60); st = hs->hs_tshift = timeshift_create(st, timeshiftPeriod); + normts = 1; } #endif + if(normts) + st = hs->hs_tsfix = tsfix_create(st); hs->hs_s = subscription_create_from_channel(ch, weight, htsp->htsp_logname, @@ -1619,6 +1620,7 @@ struct { { "subscribe", htsp_method_subscribe, ACCESS_STREAMING}, { "unsubscribe", htsp_method_unsubscribe, ACCESS_STREAMING}, { "subscriptionChangeWeight", htsp_method_change_weight, ACCESS_STREAMING}, + { "subscriptionSeek", htsp_method_skip, ACCESS_STREAMING}, { "subscriptionSkip", htsp_method_skip, ACCESS_STREAMING}, { "subscriptionSpeed", htsp_method_speed, ACCESS_STREAMING}, { "fileOpen", htsp_method_file_open, ACCESS_RECORDER}, @@ -2419,6 +2421,26 @@ htsp_subscription_speed(htsp_subscription_t *hs, int speed) htsp_send(hs->hs_htsp, m, NULL, &hs->hs_q, 0); } +/** + * + */ +static void +htsp_subscription_skip(htsp_subscription_t *hs, streaming_skip_t *skip) +{ + htsmsg_t *m = htsmsg_create_map(); + htsmsg_add_str(m, "method", "subscriptionSkip"); + htsmsg_add_u32(m, "subscriptionId", hs->hs_sid); + if (skip->type == SMT_SKIP_ABS_TIME || skip->type == SMT_SKIP_ABS_SIZE) + htsmsg_add_u32(m, "absolute", 1); + if (skip->type == SMT_SKIP_ERROR) + htsmsg_add_u32(m, "error", 1); + else if (skip->type == SMT_SKIP_ABS_TIME || skip->type == SMT_SKIP_REL_TIME) + htsmsg_add_s64(m, "time", skip->time); + else if (skip->type == SMT_SKIP_ABS_SIZE || skip->type == SMT_SKIP_REL_SIZE) + htsmsg_add_s64(m, "size", skip->size); + htsp_send(hs->hs_htsp, m, NULL, &hs->hs_q, 0); +} + /** * */ @@ -2465,6 +2487,7 @@ htsp_streaming_input(void *opaque, streaming_message_t *sm) abort(); case SMT_SKIP: + htsp_subscription_skip(hs, sm->sm_data); break; case SMT_SPEED: diff --git a/src/timeshift/private.h b/src/timeshift/private.h index ed64a12f..655b2539 100644 --- a/src/timeshift/private.h +++ b/src/timeshift/private.h @@ -133,6 +133,8 @@ void timeshift_filemgr_init ( void ); void timeshift_filemgr_term ( void ); int timeshift_filemgr_makedirs ( int ts_index, char *buf, size_t len ); +timeshift_file_t *timeshift_filemgr_last + ( timeshift_t *ts ); timeshift_file_t *timeshift_filemgr_get ( timeshift_t *ts, int create ); timeshift_file_t *timeshift_filemgr_prev diff --git a/src/timeshift/timeshift_filemgr.c b/src/timeshift/timeshift_filemgr.c index 44571048..f8fafad5 100644 --- a/src/timeshift/timeshift_filemgr.c +++ b/src/timeshift/timeshift_filemgr.c @@ -278,6 +278,15 @@ timeshift_file_t *timeshift_filemgr_prev return nxt; } +/* + * Get the oldest file + */ +timeshift_file_t *timeshift_filemgr_last ( timeshift_t *ts ) +{ + return TAILQ_FIRST(&ts->files); +} + + /* ************************************************************************** * Setup / Teardown * *************************************************************************/ diff --git a/src/timeshift/timeshift_reader.c b/src/timeshift/timeshift_reader.c index 423d42ac..4f6754f9 100644 --- a/src/timeshift/timeshift_reader.c +++ b/src/timeshift/timeshift_reader.c @@ -110,6 +110,7 @@ static ssize_t _read_msg ( int fd, streaming_message_t **sm ) case SMT_START: case SMT_NOSTART: case SMT_SERVICE_STATUS: + return -1; break; /* Code */ @@ -159,6 +160,9 @@ static ssize_t _read_msg ( int fd, streaming_message_t **sm ) } (*sm)->sm_time = time; break; + + default: + return -1; } /* OK */ @@ -221,9 +225,30 @@ static int _timeshift_skip if (!tsf || !tsi) end = 1; + /* Find start/end of buffer */ + if (end) { + if (back) { + tsf = timeshift_filemgr_last(ts); + tsi = NULL; + while (tsf && !tsi) { + if (!(tsi = TAILQ_FIRST(&tsf->iframes))) + tsf = timeshift_filemgr_next(tsf, &end, 0); + } + end = -1; + } else { + tsf = timeshift_filemgr_get(ts, ts->ondemand); + tsi = NULL; + while (tsf && !tsi) { + if (!(tsi = TAILQ_LAST(&tsf->iframes, timeshift_index_iframe_list))) + tsf = timeshift_filemgr_prev(tsf, &end, 0); + } + end = 1; + } + } + /* Done */ *new_file = tsf; - *iframe = end ? NULL : tsi; + *iframe = tsi; return end; } @@ -242,14 +267,14 @@ void *timeshift_reader ( void *p ) off_t cur_off = 0; int cur_speed = 100, keyframe_mode = 0; int64_t pause_time = 0, play_time = 0, last_time = 0; - int64_t now, deliver, skip_time; - streaming_message_t *sm = NULL, *ctrl; + int64_t now, deliver, skip_time = 0; + streaming_message_t *sm = NULL, *ctrl = NULL; timeshift_index_iframe_t *tsi = NULL; - streaming_skip_t *skip; + streaming_skip_t *skip = NULL; /* Poll */ - struct epoll_event ev; - efd = epoll_create(1); + struct epoll_event ev = { 0 }; + efd = epoll_create(1); ev.events = EPOLLIN; ev.data.fd = ts->rd_pipe.rd; epoll_ctl(efd, EPOLL_CTL_ADD, ev.data.fd, &ev); @@ -264,7 +289,6 @@ void *timeshift_reader ( void *p ) nfds = 0; wait = -1; end = 0; - skip_time = 0; skip = NULL; now = getmonoclock(); @@ -280,6 +304,7 @@ void *timeshift_reader ( void *p ) #endif run = 0; streaming_msg_free(ctrl); + ctrl = NULL; /* Speed */ } else if (ctrl->sm_type == SMT_SPEED) { @@ -351,35 +376,71 @@ void *timeshift_reader ( void *p ) /* Send on the message */ ctrl->sm_code = speed; streaming_target_deliver2(ts->output, ctrl); + ctrl = NULL; /* Skip/Seek */ } else if (ctrl->sm_type == SMT_SKIP) { - skip = (streaming_skip_t *) ctrl->sm_data; + skip = ctrl->sm_data; switch (skip->type) { case SMT_SKIP_REL_TIME: - skip_time = last_time + skip->time; - break; - case SMT_SKIP_ABS_TIME: - skip_time = skip->time; // Wrong - need to use starttime of video too - break; - case SMT_SKIP_REL_SIZE: - case SMT_SKIP_ABS_SIZE: - tvhlog(LOG_DEBUG, "timeshift", "unsupported skip type: %d", skip->type); + tvhlog(LOG_DEBUG, "timeshift", "ts %d skip %"PRId64" requested", ts->id, skip->time); + + /* Must handle live playback case */ + if (ts->state == TS_LIVE) { + if (skip->time < 0) { + pthread_mutex_lock(&ts->rdwr_mutex); + if ((cur_file = timeshift_filemgr_get(ts, ts->ondemand))) { + ts->state = TS_PLAY; + cur_off = cur_file->size; + last_time = cur_file->last; + } else { + tvhlog(LOG_ERR, "timeshift", "ts %d failed to get current file", ts->id); + skip = NULL; + break; + } + pthread_mutex_unlock(&ts->rdwr_mutex); + } else { + tvhlog(LOG_DEBUG, "timeshift", "ts %d skip ignored, already live", ts->id); + skip = NULL; + } + } + + /* OK */ + if (skip) { + /* Adjust time */ + play_time = now; + pause_time = skip_time = last_time + skip->time; + tsi = NULL; + + /* Clear existing packet */ + if (sm) + streaming_msg_free(sm); + sm = NULL; + } break; default: - tvhlog(LOG_ERR, "timeshift", "invalid skip type: %d", skip->type); + tvhlog(LOG_ERR, "timeshift", "ts %d invalid/unsupported skip type: %d", ts->id, skip->type); + skip = NULL; + break; } - if (!skip_time) - streaming_msg_free(ctrl); + + /* Error */ + if (!skip) { + ((streaming_skip_t*)ctrl->sm_data)->type = SMT_SKIP_ERROR; + streaming_target_deliver2(ts->output, ctrl); + ctrl = NULL; + } + /* Ignore */ } else { streaming_msg_free(ctrl); + ctrl = NULL; } } } /* Done */ - if (!run || ts->state != TS_PLAY || !cur_file) { + if (!run || !cur_file || ((ts->state != TS_PLAY && !skip))) { pthread_mutex_unlock(&ts->state_mutex); continue; } @@ -396,12 +457,12 @@ void *timeshift_reader ( void *p ) if (!sm) { /* Rewind or Fast forward (i-frame only) */ - if (skip_time || keyframe_mode) { + if (skip || keyframe_mode) { timeshift_file_t *tsf = NULL; time_t req_time; /* Time */ - if (!skip_time) + if (!skip) req_time = last_time + ((cur_speed < 0) ? -1 : 1); else req_time = skip_time; @@ -410,12 +471,6 @@ void *timeshift_reader ( void *p ) end = _timeshift_skip(ts, req_time, last_time, cur_file, &tsf, &tsi); - /* Adjust skip time to actual */ - if (skip_time) { - skip->time += (tsi->time - skip_time); - streaming_target_deliver2(ts->output, ctrl); - } - /* File changed (close) */ if ((tsf != cur_file) && (fd != -1)) { close(fd); @@ -426,10 +481,12 @@ void *timeshift_reader ( void *p ) cur_file = tsf; if (tsi) cur_off = tsi->pos; + else + cur_off = 0; } /* Find packet */ - if (cur_file && !end) { + if (cur_file) { /* Open file */ if (fd == -1) { @@ -439,11 +496,23 @@ void *timeshift_reader ( void *p ) #endif fd = open(cur_file->path, O_RDONLY); } - if (cur_off) lseek(fd, cur_off, SEEK_SET); + if (cur_off) { +#ifdef TSHFT_TRACE + tvhlog(LOG_DEBUG, "timeshift", "ts %d seek to %lu", ts->id, cur_off); +#endif + lseek(fd, cur_off, SEEK_SET); + } /* Read msg */ ssize_t r = _read_msg(fd, &sm); - assert(r != -1); + if (r < 0) { + streaming_message_t *e = streaming_msg_create_code(SMT_STOP, SM_CODE_UNDEFINED_ERROR); + streaming_target_deliver2(ts->output, e); + tvhlog(LOG_ERR, "timeshift", "ts %d could not read buffer", ts->id); + pthread_mutex_unlock(&ts->rdwr_mutex); + pthread_mutex_unlock(&ts->state_mutex); + break; + } #ifdef TSHFT_TRACE tvhlog(LOG_DEBUG, "timeshift", "ts %d read msg %p (%ld)", ts->id, sm, r); @@ -466,16 +535,38 @@ void *timeshift_reader ( void *p ) } } + /* Send skip response */ + if (skip) { + if (sm && sm->sm_type == SMT_PACKET) { + th_pkt_t *pkt = sm->sm_data; + skip->time = pkt->pkt_pts; + skip->type = SMT_SKIP_ABS_TIME; + tvhlog(LOG_DEBUG, "timeshift", "ts %d skip to %"PRId64" ok", ts->id, skip->time); + } else { + /* Report error */ + skip->type = SMT_SKIP_ERROR; + skip = NULL; + tvhlog(LOG_DEBUG, "timeshift", "ts %d skip failed", ts->id); + } + streaming_target_deliver2(ts->output, ctrl); + ctrl = NULL; + } + /* Deliver */ - if (sm && (skip_time || + if (sm && (skip || (((cur_speed < 0) && (sm->sm_time >= deliver)) || ((cur_speed > 0) && (sm->sm_time <= deliver))))) { -#ifdef TSHFT_TRACE - tvhlog(LOG_DEBUG, "timeshift", "ts %d deliver %"PRItime_t, - ts->id, sm->sm_time); -#endif sm->sm_timeshift = now - sm->sm_time; +#ifdef TSHFT_TRACE + { + time_t pts = 0; + if (sm->sm_type == SMT_PACKET) + pts = ((th_pkt_t*)sm->sm_data)->pkt_pts; + tvhlog(LOG_DEBUG, "timeshift", "ts %d deliver %"PRItime_t" pts=%"PRItime_t " shift=%"PRIu64, + ts->id, sm->sm_time, pts, sm->sm_timeshift ); + } +#endif streaming_target_deliver2(ts->output, sm); last_time = sm->sm_time; sm = NULL; @@ -493,10 +584,12 @@ void *timeshift_reader ( void *p ) } /* Terminate */ - if (!cur_file || end) { + if (!cur_file || end != 0) { + if (!end) + end = (cur_file > 0) ? 1 : -1; /* Back to live */ - if (cur_speed > 0) { + if (end == 1) { tvhlog(LOG_DEBUG, "timeshift", "ts %d eob revert to live mode", ts->id); ts->state = TS_LIVE; cur_speed = 100; @@ -508,13 +601,15 @@ void *timeshift_reader ( void *p ) timeshift_filemgr_flush(ts, NULL); /* Pause */ - } else if (cur_speed < 0) { + } else { tvhlog(LOG_DEBUG, "timeshift", "ts %d sob pause stream", ts->id); - cur_speed = 0; - ts->state = TS_PAUSE; - ctrl = streaming_msg_create_code(SMT_SPEED, cur_speed); + cur_speed = 0; + ts->state = TS_PAUSE; + pause_time = now; + ctrl = streaming_msg_create_code(SMT_SPEED, cur_speed); streaming_target_deliver2(ts->output, ctrl); } + ctrl = NULL; /* Flush unwanted */ } else if (ts->ondemand && cur_file) { @@ -526,7 +621,8 @@ void *timeshift_reader ( void *p ) } /* Cleanup */ - if (sm) streaming_msg_free(sm); + if (sm) streaming_msg_free(sm); + if (ctrl) streaming_msg_free(ctrl); #ifdef TSHFT_TRACE tvhlog(LOG_DEBUG, "timeshift", "ts %d exit reader thread", ts->id); #endif diff --git a/src/tvheadend.h b/src/tvheadend.h index a6a9a0ab..0df50c13 100644 --- a/src/tvheadend.h +++ b/src/tvheadend.h @@ -216,6 +216,7 @@ typedef struct signal_status { typedef struct streaming_skip { enum { + SMT_SKIP_ERROR, SMT_SKIP_REL_TIME, SMT_SKIP_ABS_TIME, SMT_SKIP_REL_SIZE,