diff --git a/src/timeshift/timeshift_reader.c b/src/timeshift/timeshift_reader.c index 87a08205..a8503c0c 100644 --- a/src/timeshift/timeshift_reader.c +++ b/src/timeshift/timeshift_reader.c @@ -252,6 +252,86 @@ static int _timeshift_skip return end; } +/* + * Output packet + */ +static int _timeshift_read + ( timeshift_t *ts, timeshift_file_t **cur_file, off_t *cur_off, int *fd, + streaming_message_t **sm, int *wait ) +{ + if (*cur_file) { + + /* Open file */ + if (*fd == -1) { +#ifdef TSHFT_TRACE + tvhlog(LOG_DEBUG, "timeshift", "ts %d open file %s", + ts->id, (*cur_file)->path); +#endif + *fd = open((*cur_file)->path, O_RDONLY); + } + if (*cur_off) { +#ifdef TSHFT_TRACE + tvhlog(LOG_DEBUG, "timeshift", "ts %d seek to %lu", ts->id, *cur_off); +#endif + lseek(*fd, *cur_off, SEEK_SET); + } + + /* Read msg */ + ssize_t r = _read_msg(*fd, sm); + if (r < 0) { + streaming_message_t *e = streaming_msg_create_code(SMT_STOP, SM_CODE_UNDEFINED_ERROR); + streaming_target_deliver2(ts->output, e); + tvhlog(LOG_ERR, "timeshift", "ts %d could not read buffer", ts->id); + return -1; + } +#ifdef TSHFT_TRACE + tvhlog(LOG_DEBUG, "timeshift", "ts %d read msg %p (%ld)", + ts->id, *sm, r); +#endif + + /* Incomplete */ + if (r == 0) + lseek(*fd, *cur_off, SEEK_SET); + else + *cur_off += r; + + /* Special case - EOF */ + if (r == sizeof(size_t) || *cur_off > (*cur_file)->size) { + close(*fd); + *fd = -1; + *cur_file = timeshift_filemgr_next(*cur_file, NULL, 0); + *cur_off = 0; // reset + *wait = 0; + } + } + return 0; +} + + +/* + * Flush all data to live + */ +static int _timeshift_flush_to_live + ( timeshift_t *ts, timeshift_file_t **cur_file, off_t *cur_off, int *fd, + streaming_message_t **sm, int *wait ) +{ + time_t pts = 0; + while (*cur_file) { + if (_timeshift_read(ts, cur_file, cur_off, fd, sm, wait) == -1) + return -1; + if (!*sm) break; + if ((*sm)->sm_type == SMT_PACKET) { + pts = ((th_pkt_t*)(*sm)->sm_data)->pkt_pts; + tvhlog(LOG_DEBUG, "timeshift", "ts %d deliver %"PRId64" pts=%"PRItime_t " shift=%"PRIu64, + ts->id, (*sm)->sm_time, pts, (*sm)->sm_timeshift ); + } + streaming_target_deliver2(ts->output, *sm); + *sm = NULL; + } + return 0; +} + + /* ************************************************************************** * Thread * *************************************************************************/ @@ -502,52 +582,10 @@ void *timeshift_reader ( void *p ) } /* Find packet */ - if (cur_file) { - - /* Open file */ - if (fd == -1) { -#ifdef TSHFT_TRACE - tvhlog(LOG_DEBUG, "timeshift", "ts %d open file %s", - ts->id, cur_file->path); -#endif - fd = open(cur_file->path, O_RDONLY); - } - if (cur_off) { -#ifdef TSHFT_TRACE - tvhlog(LOG_DEBUG, "timeshift", "ts %d seek to %lu", ts->id, cur_off); -#endif - lseek(fd, cur_off, SEEK_SET); - } - - /* Read msg */ - ssize_t r = _read_msg(fd, &sm); - if (r < 0) { - streaming_message_t *e = streaming_msg_create_code(SMT_STOP, SM_CODE_UNDEFINED_ERROR); - streaming_target_deliver2(ts->output, e); - tvhlog(LOG_ERR, "timeshift", "ts %d could not read buffer", ts->id); - pthread_mutex_unlock(&ts->rdwr_mutex); - pthread_mutex_unlock(&ts->state_mutex); - break; - } -#ifdef TSHFT_TRACE - tvhlog(LOG_DEBUG, "timeshift", "ts %d read msg %p (%ld)", - ts->id, sm, r); -#endif - - /* Incomplete */ - if (r == 0) - lseek(fd, cur_off, SEEK_SET); - else - cur_off += r; - - /* Special case - EOF */ - if (r == sizeof(size_t) || cur_off > cur_file->size) { - close(fd); - fd = -1; - cur_file = timeshift_filemgr_next(cur_file, NULL, 0); - cur_off = 0; // reset - wait = 0; - } + if (_timeshift_read(ts, &cur_file, &cur_off, &fd, &sm, &wait) == -1) { + pthread_mutex_unlock(&ts->rdwr_mutex); + pthread_mutex_unlock(&ts->state_mutex); + break; } } @@ -603,7 +641,7 @@ void *timeshift_reader ( void *p ) /* Terminate */ if (!cur_file || end != 0) { if (!end) - end = (cur_file > 0) ? 1 : -1; + end = (cur_speed > 0) ? 1 : -1; /* Back to live */ if (end == 1) { @@ -613,15 +651,30 @@ void *timeshift_reader ( void *p ) ctrl = streaming_msg_create_code(SMT_SPEED, cur_speed); streaming_target_deliver2(ts->output, ctrl); + /* Flush timeshift buffer to live */ + if (_timeshift_flush_to_live(ts, &cur_file, &cur_off, &fd, &sm, &wait) == -1) + break; + + /* Close file (if open) */ + if (fd != -1) { + close(fd); + fd = -1; + } + /* Flush ALL files */ if (ts->ondemand) timeshift_filemgr_flush(ts, NULL); /* Pause */ } else { - tvhlog(LOG_DEBUG, "timeshift", "ts %d sob pause stream", ts->id); - cur_speed = 0; - ts->state = TS_PAUSE; + if (cur_speed <= 0) { + cur_speed = 0; + ts->state = TS_PAUSE; + } else { + ts->state = TS_PLAY; + play_time = now; + } + tvhlog(LOG_DEBUG, "timeshift", "ts %d sob speed %d", ts->id, cur_speed); pause_time = last_time; ctrl = streaming_msg_create_code(SMT_SPEED, cur_speed); streaming_target_deliver2(ts->output, ctrl); @@ -638,8 +691,9 @@ void *timeshift_reader ( void *p ) } /* Cleanup */ - if (sm) streaming_msg_free(sm); - if (ctrl) streaming_msg_free(ctrl); + if (fd != -1) close(fd); + if (sm) streaming_msg_free(sm); + if (ctrl) streaming_msg_free(ctrl); #ifdef TSHFT_TRACE tvhlog(LOG_DEBUG, "timeshift", "ts %d exit reader thread", ts->id); #endif