timeshift: Reworked the reader engine to common up things and prepare for seek support.

This commit is contained in:
Adam Sutton 2012-12-28 22:14:06 +00:00
parent acebb13f15
commit 88a9d96039
2 changed files with 153 additions and 144 deletions

View file

@ -165,6 +165,68 @@ static ssize_t _read_msg ( int fd, streaming_message_t **sm )
return cnt;
}
/* **************************************************************************
* Utilities
* *************************************************************************/
static int _timeshift_skip
( timeshift_t *ts, int64_t req_time, int64_t cur_time,
timeshift_file_t *cur_file, timeshift_file_t **new_file,
timeshift_index_iframe_t **iframe )
{
timeshift_index_iframe_t *tsi = *iframe;
timeshift_file_t *tsf = cur_file;
int64_t sec = req_time / 1000000;
int back = (req_time < cur_time) ? 1 : 0;
int end = 0;
/* Coarse search */
if (!tsi) {
while (tsf && !end) {
if (back) {
if ((tsf->time <= sec) &&
(tsi = TAILQ_LAST(&tsf->iframes, timeshift_index_iframe_list)))
break;
tsf = timeshift_filemgr_prev(tsf, &end, 1);
} else {
if ((tsf->time >= sec) &&
(tsi = TAILQ_FIRST(&tsf->iframes)))
break;
tsf = timeshift_filemgr_next(tsf, &end, 0);
}
tsi = NULL;
}
}
/* Fine search */
if (back) {
while (!end && tsf && tsi && (tsi->time > req_time)) {
tsi = TAILQ_PREV(tsi, timeshift_index_iframe_list, link);
while (!end && tsf && !tsi) {
if ((tsf = timeshift_filemgr_prev(tsf, &end, 1)))
tsi = TAILQ_LAST(&tsf->iframes, timeshift_index_iframe_list);
}
}
} else {
while (!end && tsf && tsi && (tsi->time < req_time)) {
tsi = TAILQ_NEXT(tsi, link);
while (!end && tsf && !tsi) {
if ((tsf = timeshift_filemgr_next(tsf, &end, 0)))
tsi = TAILQ_FIRST(&tsf->iframes);
}
}
}
/* End */
if (!tsf || !tsi)
end = 1;
/* Done */
*new_file = tsf;
*iframe = end ? NULL : tsi;
return end;
}
/* **************************************************************************
* Thread
* *************************************************************************/
@ -176,12 +238,12 @@ void *timeshift_reader ( void *p )
{
timeshift_t *ts = p;
int efd, nfds, end, fd = -1, run = 1, wait = -1;
timeshift_file_t *cur_file = NULL;
off_t cur_off = 0;
int cur_speed = 100, keyframe_mode = 0;
int64_t pause_time = 0, play_time = 0, last_time = 0, tx_time = 0;
int64_t now, deliver;
int64_t pause_time = 0, play_time = 0, last_time = 0;
int64_t now, deliver, skip_time;
streaming_message_t *sm = NULL, *ctrl;
timeshift_file_t *cur_file = NULL, *tsi_file = NULL;
timeshift_index_iframe_t *tsi = NULL;
/* Poll */
@ -199,8 +261,10 @@ void *timeshift_reader ( void *p )
nfds = epoll_wait(efd, &ev, 1, wait);
else
nfds = 0;
wait = -1;
end = 0;
wait = -1;
end = 0;
skip_time = 0;
now = getmonoclock();
/* Control */
pthread_mutex_lock(&ts->state_mutex);
@ -216,7 +280,6 @@ void *timeshift_reader ( void *p )
streaming_msg_free(ctrl);
/* Speed */
// TODO: currently just pause
} else if (ctrl->sm_type == SMT_SPEED) {
int speed = ctrl->sm_code;
int keyframe;
@ -270,8 +333,7 @@ void *timeshift_reader ( void *p )
keyframe ? "yes" : "no");
keyframe_mode = keyframe;
if (keyframe) {
tsi = NULL;
tsi_file = cur_file;
tsi = NULL;
}
}
@ -288,7 +350,11 @@ void *timeshift_reader ( void *p )
ctrl->sm_code = speed;
streaming_target_deliver2(ts->output, ctrl);
/* Skip */
/* Skip/Seek */
} else if (ctrl->sm_type == SMT_SKIP) {
// TODO: implement this
/* Ignore */
} else {
streaming_msg_free(ctrl);
}
@ -303,165 +369,108 @@ void *timeshift_reader ( void *p )
continue;
}
/* File processing lock */
pthread_mutex_lock(&ts->rdwr_mutex);
/* Calculate delivery time */
now = getmonoclock();
deliver = (now - play_time) + TS_PLAY_BUF;
deliver = (deliver * cur_speed) / 100;
deliver = (deliver + pause_time);
/* Rewind or Fast forward (i-frame only) */
if (keyframe_mode) {
wait = 0;
/* Determine next packet */
if (!sm) {
/* Find next index */
if (cur_speed < 0) {
if (!tsi) {
TAILQ_FOREACH_REVERSE(tsi, &tsi_file->iframes,
timeshift_index_iframe_list, link) {
if (tsi->time < last_time) break;
}
}
} else {
if (!tsi) {
TAILQ_FOREACH(tsi, &tsi_file->iframes, link) {
if (tsi->time > last_time) break;
}
}
}
/* Rewind or Fast forward (i-frame only) */
if (skip_time || keyframe_mode) {
timeshift_file_t *tsf = NULL;
time_t req_time;
/* Next file */
if (!tsi) {
if (fd != -1)
/* Time */
if (!skip_time)
req_time = last_time + ((cur_speed < 0) ? -1 : 1);
else
req_time = skip_time;
/* Find */
end = _timeshift_skip(ts, req_time, last_time,
cur_file, &tsf, &tsi);
/* File changed (close) */
if ((tsf != cur_file) && (fd != -1)) {
close(fd);
wait = 0; // immediately cycle around
fd = -1;
if (cur_speed < 0)
tsi_file = timeshift_filemgr_prev(tsi_file, &end, 1);
else
tsi_file = timeshift_filemgr_next(tsi_file, &end, 0);
}
/* Deliver */
if (tsi && (((cur_speed < 0) && (tsi->time >= deliver)) ||
((cur_speed > 0) && (tsi->time <= deliver)))) {
/* Keep delivery to 5fps max */
if ((now - tx_time) >= 200000) {
/* Open */
if (fd == -1) {
#ifdef TSHFT_TRACE
tvhlog(LOG_DEBUG, "timeshift", "ts %d open file %s",
ts->id, tsi_file->path);
#endif
fd = open(tsi_file->path, O_RDONLY);
}
/* Read */
off_t ret = lseek(fd, tsi->pos, SEEK_SET);
assert(ret == tsi->pos);
ssize_t r = _read_msg(fd, &sm);
/* Send */
if (r > 0) {
#ifdef TSHFT_TRACE
tvhlog(LOG_DEBUG, "timeshift", "ts %d deliver %"PRItime_t,
ts->id, sm->sm_time);
#endif
sm->sm_timeshift = now - sm->sm_time;
streaming_target_deliver2(ts->output, sm);
cur_file = tsi_file;
cur_off = tsi->pos + r;
last_time = sm->sm_time;
tx_time = now;
sm = NULL;
} else {
wait = -1;
close(fd);
fd = -1;
}
fd = -1;
}
/* Next index */
if (cur_speed < 0)
tsi = TAILQ_PREV(tsi, timeshift_index_iframe_list, link);
else
tsi = TAILQ_NEXT(tsi, link);
/* Not yet! */
} else if (tsi) {
if (cur_speed > 0)
wait = (tsi->time - deliver) / 1000;
else
wait = (deliver - tsi->time) / 1000;
if (wait == 0) wait = 1;
/* Position */
cur_file = tsf;
if (tsi)
cur_off = tsi->pos;
}
/* Full frame delivery */
} else {
/* Find packet */
if (cur_file && !end) {
/* Open file */
if (fd == -1) {
/* Open file */
if (fd == -1) {
#ifdef TSHFT_TRACE
tvhlog(LOG_DEBUG, "timeshift", "ts %d open file %s",
ts->id, cur_file->path);
tvhlog(LOG_DEBUG, "timeshift", "ts %d open file %s",
ts->id, cur_file->path);
#endif
fd = open(cur_file->path, O_RDONLY);
fd = open(cur_file->path, O_RDONLY);
}
if (cur_off) lseek(fd, cur_off, SEEK_SET);
}
/* Process */
pthread_mutex_lock(&ts->rdwr_mutex);
end = 1;
while (cur_file && cur_off < cur_file->size) {
/* Read msg */
if (!sm) {
ssize_t r = _read_msg(fd, &sm);
assert(r != -1);
/* Incomplete */
if (r == 0) {
lseek(fd, cur_off, SEEK_SET);
break;
}
ssize_t r = _read_msg(fd, &sm);
assert(r != -1);
#ifdef TSHFT_TRACE
tvhlog(LOG_DEBUG, "timeshift", "ts %d read msg %p (%ld)",
ts->id, sm, r);
#endif
/* Incomplete */
if (r == 0)
lseek(fd, cur_off, SEEK_SET);
else
cur_off += r;
/* Special case - EOF */
if (r == sizeof(size_t) || cur_off > cur_file->size) {
close(fd);
wait = 0; // immediately cycle around
cur_off = 0; // reset
fd = -1;
cur_file = timeshift_filemgr_next(cur_file, NULL, 0);
break;
}
}
assert(sm);
end = 0;
/* Deliver */
if (sm->sm_time <= deliver) {
#ifdef TSHFT_TRACE
tvhlog(LOG_DEBUG, "timeshift", "ts %d deliver %"PRItime_t,
ts->id, sm->sm_time);
#endif
sm->sm_timeshift = now - sm->sm_time;
streaming_target_deliver2(ts->output, sm);
tx_time = now;
last_time = sm->sm_time;
sm = NULL;
wait = 0;
} else {
wait = (sm->sm_time - deliver) / 1000;
if (wait == 0) wait = 1;
break;
/* Special case - EOF */
if (r == sizeof(size_t) || cur_off > cur_file->size) {
close(fd);
fd = -1;
cur_file = timeshift_filemgr_next(cur_file, NULL, 0);
cur_off = 0; // reset
wait = 0;
}
}
}
/* Deliver */
if (sm && (skip_time ||
(((cur_speed < 0) && (sm->sm_time >= deliver)) ||
((cur_speed > 0) && (sm->sm_time <= deliver))))) {
#ifdef TSHFT_TRACE
tvhlog(LOG_DEBUG, "timeshift", "ts %d deliver %"PRItime_t,
ts->id, sm->sm_time);
#endif
sm->sm_timeshift = now - sm->sm_time;
streaming_target_deliver2(ts->output, sm);
last_time = sm->sm_time;
sm = NULL;
wait = 0;
} else if (sm) {
if (cur_speed > 0)
wait = (sm->sm_time - deliver) / 1000;
else
wait = (deliver - sm->sm_time) / 1000;
if (wait == 0) wait = 1;
#ifdef TSHFT_TRACE
tvhlog(LOG_DEBUG, "timeshift", "ts %d wait %d",
ts->id, wait);
#endif
}
/* Terminate */
if (!cur_file || end) {

View file

@ -223,7 +223,7 @@ typedef struct streaming_skip
} type;
union {
off_t size;
time_t time;
int64_t time;
};
} streaming_skip_t;