From 88a9d96039abe77545247191eedc3a02e17ae0e8 Mon Sep 17 00:00:00 2001 From: Adam Sutton Date: Fri, 28 Dec 2012 22:14:06 +0000 Subject: [PATCH] timeshift: Reworked the reader engine to common up things and prepare for seek support. --- src/timeshift/timeshift_reader.c | 295 ++++++++++++++++--------------- src/tvheadend.h | 2 +- 2 files changed, 153 insertions(+), 144 deletions(-) diff --git a/src/timeshift/timeshift_reader.c b/src/timeshift/timeshift_reader.c index ab57cdff..cf4c88c0 100644 --- a/src/timeshift/timeshift_reader.c +++ b/src/timeshift/timeshift_reader.c @@ -165,6 +165,68 @@ static ssize_t _read_msg ( int fd, streaming_message_t **sm ) return cnt; } +/* ************************************************************************** + * Utilities + * *************************************************************************/ + +static int _timeshift_skip + ( timeshift_t *ts, int64_t req_time, int64_t cur_time, + timeshift_file_t *cur_file, timeshift_file_t **new_file, + timeshift_index_iframe_t **iframe ) +{ + timeshift_index_iframe_t *tsi = *iframe; + timeshift_file_t *tsf = cur_file; + int64_t sec = req_time / 1000000; + int back = (req_time < cur_time) ? 1 : 0; + int end = 0; + + /* Coarse search */ + if (!tsi) { + while (tsf && !end) { + if (back) { + if ((tsf->time <= sec) && + (tsi = TAILQ_LAST(&tsf->iframes, timeshift_index_iframe_list))) + break; + tsf = timeshift_filemgr_prev(tsf, &end, 1); + } else { + if ((tsf->time >= sec) && + (tsi = TAILQ_FIRST(&tsf->iframes))) + break; + tsf = timeshift_filemgr_next(tsf, &end, 0); + } + tsi = NULL; + } + } + + /* Fine search */ + if (back) { + while (!end && tsf && tsi && (tsi->time > req_time)) { + tsi = TAILQ_PREV(tsi, timeshift_index_iframe_list, link); + while (!end && tsf && !tsi) { + if ((tsf = timeshift_filemgr_prev(tsf, &end, 1))) + tsi = TAILQ_LAST(&tsf->iframes, timeshift_index_iframe_list); + } + } + } else { + while (!end && tsf && tsi && (tsi->time < req_time)) { + tsi = TAILQ_NEXT(tsi, link); + while (!end && tsf && !tsi) { + if ((tsf = timeshift_filemgr_next(tsf, &end, 0))) + tsi = TAILQ_FIRST(&tsf->iframes); + } + } + } + + /* End */ + if (!tsf || !tsi) + end = 1; + + /* Done */ + *new_file = tsf; + *iframe = end ? NULL : tsi; + return end; +} + /* ************************************************************************** * Thread * *************************************************************************/ @@ -176,12 +238,12 @@ void *timeshift_reader ( void *p ) { timeshift_t *ts = p; int efd, nfds, end, fd = -1, run = 1, wait = -1; + timeshift_file_t *cur_file = NULL; off_t cur_off = 0; int cur_speed = 100, keyframe_mode = 0; - int64_t pause_time = 0, play_time = 0, last_time = 0, tx_time = 0; - int64_t now, deliver; + int64_t pause_time = 0, play_time = 0, last_time = 0; + int64_t now, deliver, skip_time; streaming_message_t *sm = NULL, *ctrl; - timeshift_file_t *cur_file = NULL, *tsi_file = NULL; timeshift_index_iframe_t *tsi = NULL; /* Poll */ @@ -199,8 +261,10 @@ void *timeshift_reader ( void *p ) nfds = epoll_wait(efd, &ev, 1, wait); else nfds = 0; - wait = -1; - end = 0; + wait = -1; + end = 0; + skip_time = 0; + now = getmonoclock(); /* Control */ pthread_mutex_lock(&ts->state_mutex); @@ -216,7 +280,6 @@ void *timeshift_reader ( void *p ) streaming_msg_free(ctrl); /* Speed */ - // TODO: currently just pause } else if (ctrl->sm_type == SMT_SPEED) { int speed = ctrl->sm_code; int keyframe; @@ -270,8 +333,7 @@ void *timeshift_reader ( void *p ) keyframe ? "yes" : "no"); keyframe_mode = keyframe; if (keyframe) { - tsi = NULL; - tsi_file = cur_file; + tsi = NULL; } } @@ -288,7 +350,11 @@ void *timeshift_reader ( void *p ) ctrl->sm_code = speed; streaming_target_deliver2(ts->output, ctrl); - /* Skip */ + /* Skip/Seek */ + } else if (ctrl->sm_type == SMT_SKIP) { + // TODO: implement this + + /* Ignore */ } else { streaming_msg_free(ctrl); } @@ -303,165 +369,108 @@ void *timeshift_reader ( void *p ) continue; } + /* File processing lock */ + pthread_mutex_lock(&ts->rdwr_mutex); + /* Calculate delivery time */ - now = getmonoclock(); deliver = (now - play_time) + TS_PLAY_BUF; deliver = (deliver * cur_speed) / 100; deliver = (deliver + pause_time); - /* Rewind or Fast forward (i-frame only) */ - if (keyframe_mode) { - wait = 0; + /* Determine next packet */ + if (!sm) { - /* Find next index */ - if (cur_speed < 0) { - if (!tsi) { - TAILQ_FOREACH_REVERSE(tsi, &tsi_file->iframes, - timeshift_index_iframe_list, link) { - if (tsi->time < last_time) break; - } - } - } else { - if (!tsi) { - TAILQ_FOREACH(tsi, &tsi_file->iframes, link) { - if (tsi->time > last_time) break; - } - } - } + /* Rewind or Fast forward (i-frame only) */ + if (skip_time || keyframe_mode) { + timeshift_file_t *tsf = NULL; + time_t req_time; - /* Next file */ - if (!tsi) { - if (fd != -1) + /* Time */ + if (!skip_time) + req_time = last_time + ((cur_speed < 0) ? -1 : 1); + else + req_time = skip_time; + + /* Find */ + end = _timeshift_skip(ts, req_time, last_time, + cur_file, &tsf, &tsi); + + /* File changed (close) */ + if ((tsf != cur_file) && (fd != -1)) { close(fd); - wait = 0; // immediately cycle around - fd = -1; - if (cur_speed < 0) - tsi_file = timeshift_filemgr_prev(tsi_file, &end, 1); - else - tsi_file = timeshift_filemgr_next(tsi_file, &end, 0); - } - - /* Deliver */ - if (tsi && (((cur_speed < 0) && (tsi->time >= deliver)) || - ((cur_speed > 0) && (tsi->time <= deliver)))) { - - /* Keep delivery to 5fps max */ - if ((now - tx_time) >= 200000) { - - /* Open */ - if (fd == -1) { -#ifdef TSHFT_TRACE - tvhlog(LOG_DEBUG, "timeshift", "ts %d open file %s", - ts->id, tsi_file->path); -#endif - fd = open(tsi_file->path, O_RDONLY); - } - - /* Read */ - off_t ret = lseek(fd, tsi->pos, SEEK_SET); - assert(ret == tsi->pos); - ssize_t r = _read_msg(fd, &sm); - - /* Send */ - if (r > 0) { -#ifdef TSHFT_TRACE - tvhlog(LOG_DEBUG, "timeshift", "ts %d deliver %"PRItime_t, - ts->id, sm->sm_time); -#endif - sm->sm_timeshift = now - sm->sm_time; - streaming_target_deliver2(ts->output, sm); - cur_file = tsi_file; - cur_off = tsi->pos + r; - last_time = sm->sm_time; - tx_time = now; - sm = NULL; - } else { - wait = -1; - close(fd); - fd = -1; - } + fd = -1; } - /* Next index */ - if (cur_speed < 0) - tsi = TAILQ_PREV(tsi, timeshift_index_iframe_list, link); - else - tsi = TAILQ_NEXT(tsi, link); - - /* Not yet! */ - } else if (tsi) { - if (cur_speed > 0) - wait = (tsi->time - deliver) / 1000; - else - wait = (deliver - tsi->time) / 1000; - if (wait == 0) wait = 1; + /* Position */ + cur_file = tsf; + if (tsi) + cur_off = tsi->pos; } - /* Full frame delivery */ - } else { + /* Find packet */ + if (cur_file && !end) { - /* Open file */ - if (fd == -1) { + /* Open file */ + if (fd == -1) { #ifdef TSHFT_TRACE - tvhlog(LOG_DEBUG, "timeshift", "ts %d open file %s", - ts->id, cur_file->path); + tvhlog(LOG_DEBUG, "timeshift", "ts %d open file %s", + ts->id, cur_file->path); #endif - fd = open(cur_file->path, O_RDONLY); + fd = open(cur_file->path, O_RDONLY); + } if (cur_off) lseek(fd, cur_off, SEEK_SET); - } - - /* Process */ - pthread_mutex_lock(&ts->rdwr_mutex); - end = 1; - while (cur_file && cur_off < cur_file->size) { /* Read msg */ - if (!sm) { - ssize_t r = _read_msg(fd, &sm); - assert(r != -1); - - /* Incomplete */ - if (r == 0) { - lseek(fd, cur_off, SEEK_SET); - break; - } + ssize_t r = _read_msg(fd, &sm); + assert(r != -1); +#ifdef TSHFT_TRACE + tvhlog(LOG_DEBUG, "timeshift", "ts %d read msg %p (%ld)", + ts->id, sm, r); +#endif + /* Incomplete */ + if (r == 0) + lseek(fd, cur_off, SEEK_SET); + else cur_off += r; - /* Special case - EOF */ - if (r == sizeof(size_t) || cur_off > cur_file->size) { - close(fd); - wait = 0; // immediately cycle around - cur_off = 0; // reset - fd = -1; - cur_file = timeshift_filemgr_next(cur_file, NULL, 0); - break; - } - } - - assert(sm); - end = 0; - - /* Deliver */ - if (sm->sm_time <= deliver) { -#ifdef TSHFT_TRACE - tvhlog(LOG_DEBUG, "timeshift", "ts %d deliver %"PRItime_t, - ts->id, sm->sm_time); -#endif - sm->sm_timeshift = now - sm->sm_time; - streaming_target_deliver2(ts->output, sm); - tx_time = now; - last_time = sm->sm_time; - sm = NULL; - wait = 0; - } else { - wait = (sm->sm_time - deliver) / 1000; - if (wait == 0) wait = 1; - break; + /* Special case - EOF */ + if (r == sizeof(size_t) || cur_off > cur_file->size) { + close(fd); + fd = -1; + cur_file = timeshift_filemgr_next(cur_file, NULL, 0); + cur_off = 0; // reset + wait = 0; } } } + /* Deliver */ + if (sm && (skip_time || + (((cur_speed < 0) && (sm->sm_time >= deliver)) || + ((cur_speed > 0) && (sm->sm_time <= deliver))))) { + +#ifdef TSHFT_TRACE + tvhlog(LOG_DEBUG, "timeshift", "ts %d deliver %"PRItime_t, + ts->id, sm->sm_time); +#endif + sm->sm_timeshift = now - sm->sm_time; + streaming_target_deliver2(ts->output, sm); + last_time = sm->sm_time; + sm = NULL; + wait = 0; + } else if (sm) { + if (cur_speed > 0) + wait = (sm->sm_time - deliver) / 1000; + else + wait = (deliver - sm->sm_time) / 1000; + if (wait == 0) wait = 1; +#ifdef TSHFT_TRACE + tvhlog(LOG_DEBUG, "timeshift", "ts %d wait %d", + ts->id, wait); +#endif + } + /* Terminate */ if (!cur_file || end) { diff --git a/src/tvheadend.h b/src/tvheadend.h index 2f6adcf9..a6a9a0ab 100644 --- a/src/tvheadend.h +++ b/src/tvheadend.h @@ -223,7 +223,7 @@ typedef struct streaming_skip } type; union { off_t size; - time_t time; + int64_t time; }; } streaming_skip_t;