timeshift: fix some sob/eob problems.

This commit is contained in:
Adam Sutton 2013-01-14 13:07:28 +00:00
parent eb6b33922e
commit 6077ac3fbe

View file

@ -252,6 +252,86 @@ static int _timeshift_skip
return end;
}
/*
* Output packet
*/
static int _timeshift_read
( timeshift_t *ts, timeshift_file_t **cur_file, off_t *cur_off, int *fd,
streaming_message_t **sm, int *wait )
{
if (*cur_file) {
/* Open file */
if (*fd == -1) {
#ifdef TSHFT_TRACE
tvhlog(LOG_DEBUG, "timeshift", "ts %d open file %s",
ts->id, (*cur_file)->path);
#endif
*fd = open((*cur_file)->path, O_RDONLY);
}
if (*cur_off) {
#ifdef TSHFT_TRACE
tvhlog(LOG_DEBUG, "timeshift", "ts %d seek to %lu", ts->id, *cur_off);
#endif
lseek(*fd, *cur_off, SEEK_SET);
}
/* Read msg */
ssize_t r = _read_msg(*fd, sm);
if (r < 0) {
streaming_message_t *e = streaming_msg_create_code(SMT_STOP, SM_CODE_UNDEFINED_ERROR);
streaming_target_deliver2(ts->output, e);
tvhlog(LOG_ERR, "timeshift", "ts %d could not read buffer", ts->id);
return -1;
}
#ifdef TSHFT_TRACE
tvhlog(LOG_DEBUG, "timeshift", "ts %d read msg %p (%ld)",
ts->id, *sm, r);
#endif
/* Incomplete */
if (r == 0)
lseek(*fd, *cur_off, SEEK_SET);
else
*cur_off += r;
/* Special case - EOF */
if (r == sizeof(size_t) || *cur_off > (*cur_file)->size) {
close(*fd);
*fd = -1;
*cur_file = timeshift_filemgr_next(*cur_file, NULL, 0);
*cur_off = 0; // reset
*wait = 0;
}
}
return 0;
}
/*
* Flush all data to live
*/
static int _timeshift_flush_to_live
( timeshift_t *ts, timeshift_file_t **cur_file, off_t *cur_off, int *fd,
streaming_message_t **sm, int *wait )
{
time_t pts = 0;
while (*cur_file) {
if (_timeshift_read(ts, cur_file, cur_off, fd, sm, wait) == -1)
return -1;
if (!*sm) break;
if ((*sm)->sm_type == SMT_PACKET) {
pts = ((th_pkt_t*)(*sm)->sm_data)->pkt_pts;
tvhlog(LOG_DEBUG, "timeshift", "ts %d deliver %"PRId64" pts=%"PRItime_t " shift=%"PRIu64,
ts->id, (*sm)->sm_time, pts, (*sm)->sm_timeshift );
}
streaming_target_deliver2(ts->output, *sm);
*sm = NULL;
}
return 0;
}
/* **************************************************************************
* Thread
* *************************************************************************/
@ -502,52 +582,10 @@ void *timeshift_reader ( void *p )
}
/* Find packet */
if (cur_file) {
/* Open file */
if (fd == -1) {
#ifdef TSHFT_TRACE
tvhlog(LOG_DEBUG, "timeshift", "ts %d open file %s",
ts->id, cur_file->path);
#endif
fd = open(cur_file->path, O_RDONLY);
}
if (cur_off) {
#ifdef TSHFT_TRACE
tvhlog(LOG_DEBUG, "timeshift", "ts %d seek to %lu", ts->id, cur_off);
#endif
lseek(fd, cur_off, SEEK_SET);
}
/* Read msg */
ssize_t r = _read_msg(fd, &sm);
if (r < 0) {
streaming_message_t *e = streaming_msg_create_code(SMT_STOP, SM_CODE_UNDEFINED_ERROR);
streaming_target_deliver2(ts->output, e);
tvhlog(LOG_ERR, "timeshift", "ts %d could not read buffer", ts->id);
pthread_mutex_unlock(&ts->rdwr_mutex);
pthread_mutex_unlock(&ts->state_mutex);
break;
}
#ifdef TSHFT_TRACE
tvhlog(LOG_DEBUG, "timeshift", "ts %d read msg %p (%ld)",
ts->id, sm, r);
#endif
/* Incomplete */
if (r == 0)
lseek(fd, cur_off, SEEK_SET);
else
cur_off += r;
/* Special case - EOF */
if (r == sizeof(size_t) || cur_off > cur_file->size) {
close(fd);
fd = -1;
cur_file = timeshift_filemgr_next(cur_file, NULL, 0);
cur_off = 0; // reset
wait = 0;
}
if (_timeshift_read(ts, &cur_file, &cur_off, &fd, &sm, &wait) == -1) {
pthread_mutex_unlock(&ts->rdwr_mutex);
pthread_mutex_unlock(&ts->state_mutex);
break;
}
}
@ -603,7 +641,7 @@ void *timeshift_reader ( void *p )
/* Terminate */
if (!cur_file || end != 0) {
if (!end)
end = (cur_file > 0) ? 1 : -1;
end = (cur_speed > 0) ? 1 : -1;
/* Back to live */
if (end == 1) {
@ -613,15 +651,30 @@ void *timeshift_reader ( void *p )
ctrl = streaming_msg_create_code(SMT_SPEED, cur_speed);
streaming_target_deliver2(ts->output, ctrl);
/* Flush timeshift buffer to live */
if (_timeshift_flush_to_live(ts, &cur_file, &cur_off, &fd, &sm, &wait) == -1)
break;
/* Close file (if open) */
if (fd != -1) {
close(fd);
fd = -1;
}
/* Flush ALL files */
if (ts->ondemand)
timeshift_filemgr_flush(ts, NULL);
/* Pause */
} else {
tvhlog(LOG_DEBUG, "timeshift", "ts %d sob pause stream", ts->id);
cur_speed = 0;
ts->state = TS_PAUSE;
if (cur_speed <= 0) {
cur_speed = 0;
ts->state = TS_PAUSE;
} else {
ts->state = TS_PLAY;
play_time = now;
}
tvhlog(LOG_DEBUG, "timeshift", "ts %d sob speed %d", ts->id, cur_speed);
pause_time = last_time;
ctrl = streaming_msg_create_code(SMT_SPEED, cur_speed);
streaming_target_deliver2(ts->output, ctrl);
@ -638,8 +691,9 @@ void *timeshift_reader ( void *p )
}
/* Cleanup */
if (sm) streaming_msg_free(sm);
if (ctrl) streaming_msg_free(ctrl);
if (fd != -1) close(fd);
if (sm) streaming_msg_free(sm);
if (ctrl) streaming_msg_free(ctrl);
#ifdef TSHFT_TRACE
tvhlog(LOG_DEBUG, "timeshift", "ts %d exit reader thread", ts->id);
#endif