timeshift: finish timeshift skip support.

I now have a more complete skip support implemented. This includes
properly handling buffer ends. I have tested with a custom pvr.hts
in XBMC, but it does need plenty more.
This commit is contained in:
Adam Sutton 2013-01-09 12:30:11 +00:00
parent 2235b869ae
commit 86c4a969f7
5 changed files with 176 additions and 45 deletions

View file

@ -1300,8 +1300,6 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in)
streaming_target_t *st = &hs->hs_input;
if(normts)
st = hs->hs_tsfix = tsfix_create(st);
#if ENABLE_TIMESHIFT
if (timeshiftPeriod != 0) {
if (timeshiftPeriod == ~0)
@ -1309,8 +1307,11 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in)
else
tvhlog(LOG_DEBUG, "htsp", "using timeshift buffer (%u mins)", timeshiftPeriod / 60);
st = hs->hs_tshift = timeshift_create(st, timeshiftPeriod);
normts = 1;
}
#endif
if(normts)
st = hs->hs_tsfix = tsfix_create(st);
hs->hs_s = subscription_create_from_channel(ch, weight,
htsp->htsp_logname,
@ -1619,6 +1620,7 @@ struct {
{ "subscribe", htsp_method_subscribe, ACCESS_STREAMING},
{ "unsubscribe", htsp_method_unsubscribe, ACCESS_STREAMING},
{ "subscriptionChangeWeight", htsp_method_change_weight, ACCESS_STREAMING},
{ "subscriptionSeek", htsp_method_skip, ACCESS_STREAMING},
{ "subscriptionSkip", htsp_method_skip, ACCESS_STREAMING},
{ "subscriptionSpeed", htsp_method_speed, ACCESS_STREAMING},
{ "fileOpen", htsp_method_file_open, ACCESS_RECORDER},
@ -2419,6 +2421,26 @@ htsp_subscription_speed(htsp_subscription_t *hs, int speed)
htsp_send(hs->hs_htsp, m, NULL, &hs->hs_q, 0);
}
/**
*
*/
static void
htsp_subscription_skip(htsp_subscription_t *hs, streaming_skip_t *skip)
{
htsmsg_t *m = htsmsg_create_map();
htsmsg_add_str(m, "method", "subscriptionSkip");
htsmsg_add_u32(m, "subscriptionId", hs->hs_sid);
if (skip->type == SMT_SKIP_ABS_TIME || skip->type == SMT_SKIP_ABS_SIZE)
htsmsg_add_u32(m, "absolute", 1);
if (skip->type == SMT_SKIP_ERROR)
htsmsg_add_u32(m, "error", 1);
else if (skip->type == SMT_SKIP_ABS_TIME || skip->type == SMT_SKIP_REL_TIME)
htsmsg_add_s64(m, "time", skip->time);
else if (skip->type == SMT_SKIP_ABS_SIZE || skip->type == SMT_SKIP_REL_SIZE)
htsmsg_add_s64(m, "size", skip->size);
htsp_send(hs->hs_htsp, m, NULL, &hs->hs_q, 0);
}
/**
*
*/
@ -2465,6 +2487,7 @@ htsp_streaming_input(void *opaque, streaming_message_t *sm)
abort();
case SMT_SKIP:
htsp_subscription_skip(hs, sm->sm_data);
break;
case SMT_SPEED:

View file

@ -133,6 +133,8 @@ void timeshift_filemgr_init ( void );
void timeshift_filemgr_term ( void );
int timeshift_filemgr_makedirs ( int ts_index, char *buf, size_t len );
timeshift_file_t *timeshift_filemgr_last
( timeshift_t *ts );
timeshift_file_t *timeshift_filemgr_get
( timeshift_t *ts, int create );
timeshift_file_t *timeshift_filemgr_prev

View file

@ -278,6 +278,15 @@ timeshift_file_t *timeshift_filemgr_prev
return nxt;
}
/*
* Get the oldest file
*/
timeshift_file_t *timeshift_filemgr_last ( timeshift_t *ts )
{
return TAILQ_FIRST(&ts->files);
}
/* **************************************************************************
* Setup / Teardown
* *************************************************************************/

View file

@ -110,6 +110,7 @@ static ssize_t _read_msg ( int fd, streaming_message_t **sm )
case SMT_START:
case SMT_NOSTART:
case SMT_SERVICE_STATUS:
return -1;
break;
/* Code */
@ -159,6 +160,9 @@ static ssize_t _read_msg ( int fd, streaming_message_t **sm )
}
(*sm)->sm_time = time;
break;
default:
return -1;
}
/* OK */
@ -221,9 +225,30 @@ static int _timeshift_skip
if (!tsf || !tsi)
end = 1;
/* Find start/end of buffer */
if (end) {
if (back) {
tsf = timeshift_filemgr_last(ts);
tsi = NULL;
while (tsf && !tsi) {
if (!(tsi = TAILQ_FIRST(&tsf->iframes)))
tsf = timeshift_filemgr_next(tsf, &end, 0);
}
end = -1;
} else {
tsf = timeshift_filemgr_get(ts, ts->ondemand);
tsi = NULL;
while (tsf && !tsi) {
if (!(tsi = TAILQ_LAST(&tsf->iframes, timeshift_index_iframe_list)))
tsf = timeshift_filemgr_prev(tsf, &end, 0);
}
end = 1;
}
}
/* Done */
*new_file = tsf;
*iframe = end ? NULL : tsi;
*iframe = tsi;
return end;
}
@ -242,14 +267,14 @@ void *timeshift_reader ( void *p )
off_t cur_off = 0;
int cur_speed = 100, keyframe_mode = 0;
int64_t pause_time = 0, play_time = 0, last_time = 0;
int64_t now, deliver, skip_time;
streaming_message_t *sm = NULL, *ctrl;
int64_t now, deliver, skip_time = 0;
streaming_message_t *sm = NULL, *ctrl = NULL;
timeshift_index_iframe_t *tsi = NULL;
streaming_skip_t *skip;
streaming_skip_t *skip = NULL;
/* Poll */
struct epoll_event ev;
efd = epoll_create(1);
struct epoll_event ev = { 0 };
efd = epoll_create(1);
ev.events = EPOLLIN;
ev.data.fd = ts->rd_pipe.rd;
epoll_ctl(efd, EPOLL_CTL_ADD, ev.data.fd, &ev);
@ -264,7 +289,6 @@ void *timeshift_reader ( void *p )
nfds = 0;
wait = -1;
end = 0;
skip_time = 0;
skip = NULL;
now = getmonoclock();
@ -280,6 +304,7 @@ void *timeshift_reader ( void *p )
#endif
run = 0;
streaming_msg_free(ctrl);
ctrl = NULL;
/* Speed */
} else if (ctrl->sm_type == SMT_SPEED) {
@ -351,35 +376,71 @@ void *timeshift_reader ( void *p )
/* Send on the message */
ctrl->sm_code = speed;
streaming_target_deliver2(ts->output, ctrl);
ctrl = NULL;
/* Skip/Seek */
} else if (ctrl->sm_type == SMT_SKIP) {
skip = (streaming_skip_t *) ctrl->sm_data;
skip = ctrl->sm_data;
switch (skip->type) {
case SMT_SKIP_REL_TIME:
skip_time = last_time + skip->time;
break;
case SMT_SKIP_ABS_TIME:
skip_time = skip->time; // Wrong - need to use starttime of video too
break;
case SMT_SKIP_REL_SIZE:
case SMT_SKIP_ABS_SIZE:
tvhlog(LOG_DEBUG, "timeshift", "unsupported skip type: %d", skip->type);
tvhlog(LOG_DEBUG, "timeshift", "ts %d skip %"PRId64" requested", ts->id, skip->time);
/* Must handle live playback case */
if (ts->state == TS_LIVE) {
if (skip->time < 0) {
pthread_mutex_lock(&ts->rdwr_mutex);
if ((cur_file = timeshift_filemgr_get(ts, ts->ondemand))) {
ts->state = TS_PLAY;
cur_off = cur_file->size;
last_time = cur_file->last;
} else {
tvhlog(LOG_ERR, "timeshift", "ts %d failed to get current file", ts->id);
skip = NULL;
break;
}
pthread_mutex_unlock(&ts->rdwr_mutex);
} else {
tvhlog(LOG_DEBUG, "timeshift", "ts %d skip ignored, already live", ts->id);
skip = NULL;
}
}
/* OK */
if (skip) {
/* Adjust time */
play_time = now;
pause_time = skip_time = last_time + skip->time;
tsi = NULL;
/* Clear existing packet */
if (sm)
streaming_msg_free(sm);
sm = NULL;
}
break;
default:
tvhlog(LOG_ERR, "timeshift", "invalid skip type: %d", skip->type);
tvhlog(LOG_ERR, "timeshift", "ts %d invalid/unsupported skip type: %d", ts->id, skip->type);
skip = NULL;
break;
}
if (!skip_time)
streaming_msg_free(ctrl);
/* Error */
if (!skip) {
((streaming_skip_t*)ctrl->sm_data)->type = SMT_SKIP_ERROR;
streaming_target_deliver2(ts->output, ctrl);
ctrl = NULL;
}
/* Ignore */
} else {
streaming_msg_free(ctrl);
ctrl = NULL;
}
}
}
/* Done */
if (!run || ts->state != TS_PLAY || !cur_file) {
if (!run || !cur_file || ((ts->state != TS_PLAY && !skip))) {
pthread_mutex_unlock(&ts->state_mutex);
continue;
}
@ -396,12 +457,12 @@ void *timeshift_reader ( void *p )
if (!sm) {
/* Rewind or Fast forward (i-frame only) */
if (skip_time || keyframe_mode) {
if (skip || keyframe_mode) {
timeshift_file_t *tsf = NULL;
time_t req_time;
/* Time */
if (!skip_time)
if (!skip)
req_time = last_time + ((cur_speed < 0) ? -1 : 1);
else
req_time = skip_time;
@ -410,12 +471,6 @@ void *timeshift_reader ( void *p )
end = _timeshift_skip(ts, req_time, last_time,
cur_file, &tsf, &tsi);
/* Adjust skip time to actual */
if (skip_time) {
skip->time += (tsi->time - skip_time);
streaming_target_deliver2(ts->output, ctrl);
}
/* File changed (close) */
if ((tsf != cur_file) && (fd != -1)) {
close(fd);
@ -426,10 +481,12 @@ void *timeshift_reader ( void *p )
cur_file = tsf;
if (tsi)
cur_off = tsi->pos;
else
cur_off = 0;
}
/* Find packet */
if (cur_file && !end) {
if (cur_file) {
/* Open file */
if (fd == -1) {
@ -439,11 +496,23 @@ void *timeshift_reader ( void *p )
#endif
fd = open(cur_file->path, O_RDONLY);
}
if (cur_off) lseek(fd, cur_off, SEEK_SET);
if (cur_off) {
#ifdef TSHFT_TRACE
tvhlog(LOG_DEBUG, "timeshift", "ts %d seek to %lu", ts->id, cur_off);
#endif
lseek(fd, cur_off, SEEK_SET);
}
/* Read msg */
ssize_t r = _read_msg(fd, &sm);
assert(r != -1);
if (r < 0) {
streaming_message_t *e = streaming_msg_create_code(SMT_STOP, SM_CODE_UNDEFINED_ERROR);
streaming_target_deliver2(ts->output, e);
tvhlog(LOG_ERR, "timeshift", "ts %d could not read buffer", ts->id);
pthread_mutex_unlock(&ts->rdwr_mutex);
pthread_mutex_unlock(&ts->state_mutex);
break;
}
#ifdef TSHFT_TRACE
tvhlog(LOG_DEBUG, "timeshift", "ts %d read msg %p (%ld)",
ts->id, sm, r);
@ -466,16 +535,38 @@ void *timeshift_reader ( void *p )
}
}
/* Send skip response */
if (skip) {
if (sm && sm->sm_type == SMT_PACKET) {
th_pkt_t *pkt = sm->sm_data;
skip->time = pkt->pkt_pts;
skip->type = SMT_SKIP_ABS_TIME;
tvhlog(LOG_DEBUG, "timeshift", "ts %d skip to %"PRId64" ok", ts->id, skip->time);
} else {
/* Report error */
skip->type = SMT_SKIP_ERROR;
skip = NULL;
tvhlog(LOG_DEBUG, "timeshift", "ts %d skip failed", ts->id);
}
streaming_target_deliver2(ts->output, ctrl);
ctrl = NULL;
}
/* Deliver */
if (sm && (skip_time ||
if (sm && (skip ||
(((cur_speed < 0) && (sm->sm_time >= deliver)) ||
((cur_speed > 0) && (sm->sm_time <= deliver))))) {
#ifdef TSHFT_TRACE
tvhlog(LOG_DEBUG, "timeshift", "ts %d deliver %"PRItime_t,
ts->id, sm->sm_time);
#endif
sm->sm_timeshift = now - sm->sm_time;
#ifdef TSHFT_TRACE
{
time_t pts = 0;
if (sm->sm_type == SMT_PACKET)
pts = ((th_pkt_t*)sm->sm_data)->pkt_pts;
tvhlog(LOG_DEBUG, "timeshift", "ts %d deliver %"PRItime_t" pts=%"PRItime_t " shift=%"PRIu64,
ts->id, sm->sm_time, pts, sm->sm_timeshift );
}
#endif
streaming_target_deliver2(ts->output, sm);
last_time = sm->sm_time;
sm = NULL;
@ -493,10 +584,12 @@ void *timeshift_reader ( void *p )
}
/* Terminate */
if (!cur_file || end) {
if (!cur_file || end != 0) {
if (!end)
end = (cur_file > 0) ? 1 : -1;
/* Back to live */
if (cur_speed > 0) {
if (end == 1) {
tvhlog(LOG_DEBUG, "timeshift", "ts %d eob revert to live mode", ts->id);
ts->state = TS_LIVE;
cur_speed = 100;
@ -508,13 +601,15 @@ void *timeshift_reader ( void *p )
timeshift_filemgr_flush(ts, NULL);
/* Pause */
} else if (cur_speed < 0) {
} else {
tvhlog(LOG_DEBUG, "timeshift", "ts %d sob pause stream", ts->id);
cur_speed = 0;
ts->state = TS_PAUSE;
ctrl = streaming_msg_create_code(SMT_SPEED, cur_speed);
cur_speed = 0;
ts->state = TS_PAUSE;
pause_time = now;
ctrl = streaming_msg_create_code(SMT_SPEED, cur_speed);
streaming_target_deliver2(ts->output, ctrl);
}
ctrl = NULL;
/* Flush unwanted */
} else if (ts->ondemand && cur_file) {
@ -526,7 +621,8 @@ void *timeshift_reader ( void *p )
}
/* Cleanup */
if (sm) streaming_msg_free(sm);
if (sm) streaming_msg_free(sm);
if (ctrl) streaming_msg_free(ctrl);
#ifdef TSHFT_TRACE
tvhlog(LOG_DEBUG, "timeshift", "ts %d exit reader thread", ts->id);
#endif

View file

@ -216,6 +216,7 @@ typedef struct signal_status {
typedef struct streaming_skip
{
enum {
SMT_SKIP_ERROR,
SMT_SKIP_REL_TIME,
SMT_SKIP_ABS_TIME,
SMT_SKIP_REL_SIZE,