timeshift: implement timeshift to RAM, fixes #2626

This commit is contained in:
Jaroslav Kysela 2015-01-19 21:08:15 +01:00
parent ca0021e3b6
commit bc9874cc26
9 changed files with 312 additions and 131 deletions

View file

@ -36,6 +36,11 @@
specify an unlimited period its highly recommended you specifying a value specify an unlimited period its highly recommended you specifying a value
here. here.
<dt>Max. RAM Size (MegaBytes)
<dd>Specifies the maximum RAM (system memory) size for timeshift buffers.
When free RAM buffers are available, they are used instead storage to
save the timeshift data.
<dt>Unlimited: <dt>Unlimited:
<dd>If checked, this allows the combined size of all timeshift buffers to <dd>If checked, this allows the combined size of all timeshift buffers to
potentially grow unbounded until your storage media runs out of space potentially grow unbounded until your storage media runs out of space

View file

@ -41,6 +41,8 @@ int timeshift_unlimited_period;
uint32_t timeshift_max_period; uint32_t timeshift_max_period;
int timeshift_unlimited_size; int timeshift_unlimited_size;
uint64_t timeshift_max_size; uint64_t timeshift_max_size;
uint64_t timeshift_ram_size;
uint64_t timeshift_ram_segment_size;
/* /*
* Intialise global file manager * Intialise global file manager
@ -61,6 +63,8 @@ void timeshift_init ( void )
timeshift_max_period = 3600; // 1Hr timeshift_max_period = 3600; // 1Hr
timeshift_unlimited_size = 0; timeshift_unlimited_size = 0;
timeshift_max_size = 10000 * (size_t)1048576; // 10G timeshift_max_size = 10000 * (size_t)1048576; // 10G
timeshift_ram_size = 0;
timeshift_ram_segment_size = 0;
/* Load settings */ /* Load settings */
if ((m = hts_settings_load("timeshift/config"))) { if ((m = hts_settings_load("timeshift/config"))) {
@ -77,6 +81,10 @@ void timeshift_init ( void )
timeshift_unlimited_size = u32 ? 1 : 0; timeshift_unlimited_size = u32 ? 1 : 0;
if (!htsmsg_get_u32(m, "max_size", &u32)) if (!htsmsg_get_u32(m, "max_size", &u32))
timeshift_max_size = 1048576LL * u32; timeshift_max_size = 1048576LL * u32;
if (!htsmsg_get_u32(m, "ram_size", &u32)) {
timeshift_ram_size = 1048576LL * u32;
timeshift_ram_segment_size = timeshift_ram_size / 10;
}
htsmsg_destroy(m); htsmsg_destroy(m);
} }
} }
@ -107,6 +115,7 @@ void timeshift_save ( void )
htsmsg_add_u32(m, "max_period", timeshift_max_period); htsmsg_add_u32(m, "max_period", timeshift_max_period);
htsmsg_add_u32(m, "unlimited_size", timeshift_unlimited_size); htsmsg_add_u32(m, "unlimited_size", timeshift_unlimited_size);
htsmsg_add_u32(m, "max_size", timeshift_max_size / 1048576); htsmsg_add_u32(m, "max_size", timeshift_max_size / 1048576);
htsmsg_add_u32(m, "ram_size", timeshift_ram_size / 1048576);
hts_settings_save(m, "timeshift/config"); hts_settings_save(m, "timeshift/config");
} }

View file

@ -27,6 +27,9 @@ extern uint32_t timeshift_max_period;
extern int timeshift_unlimited_size; extern int timeshift_unlimited_size;
extern uint64_t timeshift_max_size; extern uint64_t timeshift_max_size;
extern uint64_t timeshift_total_size; extern uint64_t timeshift_total_size;
extern uint64_t timeshift_ram_size;
extern uint64_t timeshift_ram_segment_size;
extern uint64_t timeshift_total_ram_size;
typedef struct timeshift_status typedef struct timeshift_status
{ {

View file

@ -51,12 +51,18 @@ typedef TAILQ_HEAD(timeshift_index_data_list,timeshift_index_data) timeshift_ind
*/ */
typedef struct timeshift_file typedef struct timeshift_file
{ {
int fd; ///< Write descriptor int wfd; ///< Write descriptor
int rfd; ///< Read descriptor
char *path; ///< Full path to file char *path; ///< Full path to file
time_t time; ///< Files coarse timestamp time_t time; ///< Files coarse timestamp
size_t size; ///< Current file size; size_t size; ///< Current file size;
int64_t last; ///< Latest timestamp int64_t last; ///< Latest timestamp
off_t woff; ///< Write offset
off_t roff; ///< Read offset
uint8_t *ram; ///< RAM area
int64_t ram_size; ///< RAM area size in bytes
uint8_t bad; ///< File is broken uint8_t bad; ///< File is broken
@ -66,6 +72,8 @@ typedef struct timeshift_file
timeshift_index_data_list_t sstart; ///< Stream start messages timeshift_index_data_list_t sstart; ///< Stream start messages
TAILQ_ENTRY(timeshift_file) link; ///< List entry TAILQ_ENTRY(timeshift_file) link; ///< List entry
pthread_mutex_t ram_lock; ///< Mutex for the ram array access
} timeshift_file_t; } timeshift_file_t;
typedef TAILQ_HEAD(timeshift_file_list,timeshift_file) timeshift_file_list_t; typedef TAILQ_HEAD(timeshift_file_list,timeshift_file) timeshift_file_list_t;
@ -113,15 +121,15 @@ typedef struct timeshift {
/* /*
* Write functions * Write functions
*/ */
ssize_t timeshift_write_start ( int fd, int64_t time, streaming_start_t *ss ); ssize_t timeshift_write_start ( timeshift_file_t *tsf, int64_t time, streaming_start_t *ss );
ssize_t timeshift_write_sigstat ( int fd, int64_t time, signal_status_t *ss ); ssize_t timeshift_write_sigstat ( timeshift_file_t *tsf, int64_t time, signal_status_t *ss );
ssize_t timeshift_write_packet ( int fd, int64_t time, th_pkt_t *pkt ); ssize_t timeshift_write_packet ( timeshift_file_t *tsf, int64_t time, th_pkt_t *pkt );
ssize_t timeshift_write_mpegts ( int fd, int64_t time, void *data ); ssize_t timeshift_write_mpegts ( timeshift_file_t *tsf, int64_t time, void *data );
ssize_t timeshift_write_skip ( int fd, streaming_skip_t *skip ); ssize_t timeshift_write_skip ( int fd, streaming_skip_t *skip );
ssize_t timeshift_write_speed ( int fd, int speed ); ssize_t timeshift_write_speed ( int fd, int speed );
ssize_t timeshift_write_stop ( int fd, int code ); ssize_t timeshift_write_stop ( int fd, int code );
ssize_t timeshift_write_exit ( int fd ); ssize_t timeshift_write_exit ( int fd );
ssize_t timeshift_write_eof ( int fd ); ssize_t timeshift_write_eof ( timeshift_file_t *tsf );
void timeshift_writer_flush ( timeshift_t *ts ); void timeshift_writer_flush ( timeshift_t *ts );

View file

@ -39,6 +39,7 @@ static pthread_mutex_t timeshift_reaper_lock;
static pthread_cond_t timeshift_reaper_cond; static pthread_cond_t timeshift_reaper_cond;
uint64_t timeshift_total_size; uint64_t timeshift_total_size;
uint64_t timeshift_total_ram_size;
/* ************************************************************************** /* **************************************************************************
* File reaper thread * File reaper thread
@ -63,15 +64,19 @@ static void* timeshift_reaper_callback ( void *p )
TAILQ_REMOVE(&timeshift_reaper_list, tsf, link); TAILQ_REMOVE(&timeshift_reaper_list, tsf, link);
pthread_mutex_unlock(&timeshift_reaper_lock); pthread_mutex_unlock(&timeshift_reaper_lock);
tvhtrace("timeshift", "remove file %s", tsf->path); if (tsf->path) {
tvhtrace("timeshift", "remove file %s", tsf->path);
/* Remove */ /* Remove */
unlink(tsf->path); unlink(tsf->path);
dpath = dirname(tsf->path); dpath = dirname(tsf->path);
if (rmdir(dpath) == -1) if (rmdir(dpath) == -1)
if (errno != ENOTEMPTY) if (errno != ENOTEMPTY)
tvhlog(LOG_ERR, "timeshift", "failed to remove %s [e=%s]", tvhlog(LOG_ERR, "timeshift", "failed to remove %s [e=%s]",
dpath, strerror(errno)); dpath, strerror(errno));
} else {
tvhtrace("timeshift", "remove RAM segment (time %li)", (long)tsf->time);
}
/* Free memory */ /* Free memory */
while ((ti = TAILQ_FIRST(&tsf->iframes))) { while ((ti = TAILQ_FIRST(&tsf->iframes))) {
@ -85,6 +90,7 @@ static void* timeshift_reaper_callback ( void *p )
free(tid); free(tid);
} }
free(tsf->path); free(tsf->path);
free(tsf->ram);
free(tsf); free(tsf);
pthread_mutex_lock(&timeshift_reaper_lock); pthread_mutex_lock(&timeshift_reaper_lock);
@ -96,7 +102,12 @@ static void* timeshift_reaper_callback ( void *p )
static void timeshift_reaper_remove ( timeshift_file_t *tsf ) static void timeshift_reaper_remove ( timeshift_file_t *tsf )
{ {
tvhtrace("timeshift", "queue file for removal %s", tsf->path); #if ENABLE_TRACE
if (tsf->path)
tvhtrace("timeshift", "queue file for removal %s", tsf->path);
else
tvhtrace("timeshift", "queue file for removal - RAM segment time %li", (long)tsf->time);
#endif
pthread_mutex_lock(&timeshift_reaper_lock); pthread_mutex_lock(&timeshift_reaper_lock);
TAILQ_INSERT_TAIL(&timeshift_reaper_list, tsf, link); TAILQ_INSERT_TAIL(&timeshift_reaper_list, tsf, link);
pthread_cond_signal(&timeshift_reaper_cond); pthread_cond_signal(&timeshift_reaper_cond);
@ -140,14 +151,17 @@ int timeshift_filemgr_makedirs ( int index, char *buf, size_t len )
*/ */
void timeshift_filemgr_close ( timeshift_file_t *tsf ) void timeshift_filemgr_close ( timeshift_file_t *tsf )
{ {
ssize_t r = timeshift_write_eof(tsf->fd); ssize_t r = timeshift_write_eof(tsf);
if (r > 0) if (r > 0)
{ {
tsf->size += r; tsf->size += r;
atomic_add_u64(&timeshift_total_size, r); atomic_add_u64(&timeshift_total_size, r);
if (tsf->ram)
atomic_add_u64(&timeshift_total_ram_size, r);
} }
close(tsf->fd); if (tsf->wfd >= 0)
tsf->fd = -1; close(tsf->wfd);
tsf->wfd = -1;
} }
/* /*
@ -156,11 +170,19 @@ void timeshift_filemgr_close ( timeshift_file_t *tsf )
void timeshift_filemgr_remove void timeshift_filemgr_remove
( timeshift_t *ts, timeshift_file_t *tsf, int force ) ( timeshift_t *ts, timeshift_file_t *tsf, int force )
{ {
if (tsf->fd != -1) if (tsf->wfd >= 0)
close(tsf->fd); close(tsf->wfd);
tvhlog(LOG_DEBUG, "timeshift", "ts %d remove %s", ts->id, tsf->path); assert(tsf->rfd < 0);
#if ENABLE_TRACE
if (tsf->path)
tvhdebug("timeshift", "ts %d remove %s", ts->id, tsf->path);
else
tvhdebug("timeshift", "ts %d RAM segment remove time %li", ts->id, (long)tsf->time);
#endif
TAILQ_REMOVE(&ts->files, tsf, link); TAILQ_REMOVE(&ts->files, tsf, link);
atomic_add_u64(&timeshift_total_size, -tsf->size); atomic_add_u64(&timeshift_total_size, -tsf->size);
if (tsf->ram)
atomic_add_u64(&timeshift_total_ram_size, -tsf->size);
timeshift_reaper_remove(tsf); timeshift_reaper_remove(tsf);
} }
@ -176,6 +198,26 @@ void timeshift_filemgr_flush ( timeshift_t *ts, timeshift_file_t *end )
} }
} }
/*
*
*/
static timeshift_file_t * timeshift_filemgr_file_init
( timeshift_t *ts, time_t time )
{
timeshift_file_t *tsf;
tsf = calloc(1, sizeof(timeshift_file_t));
tsf->time = time;
tsf->last = getmonoclock();
tsf->wfd = -1;
tsf->rfd = -1;
TAILQ_INIT(&tsf->iframes);
TAILQ_INIT(&tsf->sstart);
TAILQ_INSERT_TAIL(&ts->files, tsf, link);
pthread_mutex_init(&tsf->ram_lock, NULL);
return tsf;
}
/* /*
* Get current / new file * Get current / new file
*/ */
@ -185,7 +227,7 @@ timeshift_file_t *timeshift_filemgr_get ( timeshift_t *ts, int create )
struct timespec tp; struct timespec tp;
timeshift_file_t *tsf_tl, *tsf_hd, *tsf_tmp; timeshift_file_t *tsf_tl, *tsf_hd, *tsf_tmp;
timeshift_index_data_t *ti; timeshift_index_data_t *ti;
char path[512]; char path[PATH_MAX];
time_t time; time_t time;
/* Return last file */ /* Return last file */
@ -200,11 +242,12 @@ timeshift_file_t *timeshift_filemgr_get ( timeshift_t *ts, int create )
clock_gettime(CLOCK_MONOTONIC_COARSE, &tp); clock_gettime(CLOCK_MONOTONIC_COARSE, &tp);
time = tp.tv_sec / TIMESHIFT_FILE_PERIOD; time = tp.tv_sec / TIMESHIFT_FILE_PERIOD;
tsf_tl = TAILQ_LAST(&ts->files, timeshift_file_list); tsf_tl = TAILQ_LAST(&ts->files, timeshift_file_list);
if (!tsf_tl || tsf_tl->time != time) { if (!tsf_tl || tsf_tl->time != time ||
(tsf_tl->ram && tsf_tl->woff >= timeshift_ram_segment_size)) {
tsf_hd = TAILQ_FIRST(&ts->files); tsf_hd = TAILQ_FIRST(&ts->files);
/* Close existing */ /* Close existing */
if (tsf_tl && tsf_tl->fd != -1) if (tsf_tl)
timeshift_filemgr_close(tsf_tl); timeshift_filemgr_close(tsf_tl);
/* Check period */ /* Check period */
@ -236,32 +279,48 @@ timeshift_file_t *timeshift_filemgr_get ( timeshift_t *ts, int create )
ts->full = 1; ts->full = 1;
} }
} }
/* Create new file */ /* Create new file */
tsf_tmp = NULL; tsf_tmp = NULL;
if (!ts->full) { if (!ts->full) {
/* Create directories */ tvhtrace("timeshift", "ts %d RAM total %"PRId64" requested %"PRId64" segment %"PRId64,
if (!ts->path) { ts->id, atomic_pre_add_u64(&timeshift_total_ram_size, 0),
if (timeshift_filemgr_makedirs(ts->id, path, sizeof(path))) timeshift_ram_size, timeshift_ram_segment_size);
return NULL; if (timeshift_ram_size >= 8*1024*1024 &&
ts->path = strdup(path); atomic_pre_add_u64(&timeshift_total_ram_size, 0) <
timeshift_ram_size + (timeshift_ram_segment_size / 2)) {
tsf_tmp = timeshift_filemgr_file_init(ts, time);
tsf_tmp->ram_size = MIN(16*1024*1024, timeshift_ram_segment_size);
tsf_tmp->ram = malloc(tsf_tmp->ram_size);
if (!tsf_tmp->ram) {
free(tsf_tmp);
tsf_tmp = NULL;
} else {
tvhtrace("timeshift", "ts %d create RAM segment with %"PRId64" bytes (time %li)",
ts->id, tsf_tmp->ram_size, (long)time);
}
}
if (!tsf_tmp) {
/* Create directories */
if (!ts->path) {
if (timeshift_filemgr_makedirs(ts->id, path, sizeof(path)))
return NULL;
ts->path = strdup(path);
}
/* Create File */
snprintf(path, sizeof(path), "%s/tvh-%"PRItime_t, ts->path, time);
tvhtrace("timeshift", "ts %d create file %s", ts->id, path);
if ((fd = open(path, O_WRONLY | O_CREAT, 0600)) > 0) {
tsf_tmp = timeshift_filemgr_file_init(ts, time);
tsf_tmp->wfd = fd;
tsf_tmp->path = strdup(path);
}
} }
/* Create File */ if (tsf_tmp) {
snprintf(path, sizeof(path), "%s/tvh-%"PRItime_t, ts->path, time);
tvhtrace("timeshift", "ts %d create file %s", ts->id, path);
if ((fd = open(path, O_WRONLY | O_CREAT, 0600)) > 0) {
tsf_tmp = calloc(1, sizeof(timeshift_file_t));
tsf_tmp->time = time;
tsf_tmp->fd = fd;
tsf_tmp->path = strdup(path);
tsf_tmp->refcount = 0;
tsf_tmp->last = getmonoclock();
TAILQ_INIT(&tsf_tmp->iframes);
TAILQ_INIT(&tsf_tmp->sstart);
TAILQ_INSERT_TAIL(&ts->files, tsf_tmp, link);
/* Copy across last start message */ /* Copy across last start message */
if (tsf_tl && (ti = TAILQ_LAST(&tsf_tl->sstart, timeshift_index_data_list))) { if (tsf_tl && (ti = TAILQ_LAST(&tsf_tl->sstart, timeshift_index_data_list))) {
tvhtrace("timeshift", "ts %d copy smt_start to new file", tvhtrace("timeshift", "ts %d copy smt_start to new file",
@ -343,6 +402,7 @@ void timeshift_filemgr_init ( void )
/* Size processing */ /* Size processing */
timeshift_total_size = 0; timeshift_total_size = 0;
timeshift_ram_size = 0;
/* Start the reaper thread */ /* Start the reaper thread */
timeshift_reaper_run = 1; timeshift_reaper_run = 1;
@ -371,5 +431,3 @@ void timeshift_filemgr_term ( void )
if (!timeshift_filemgr_get_root(path, sizeof(path))) if (!timeshift_filemgr_get_root(path, sizeof(path)))
rmtree(path); rmtree(path);
} }

View file

@ -41,13 +41,30 @@
* File Reading * File Reading
* *************************************************************************/ * *************************************************************************/
static ssize_t _read_pktbuf ( int fd, pktbuf_t **pktbuf ) static ssize_t _read_buf ( timeshift_file_t *tsf, int fd, void *buf, size_t size )
{
if (tsf && tsf->ram) {
if (tsf->roff + size > tsf->woff) return -1;
pthread_mutex_lock(&tsf->ram_lock);
memcpy(buf, tsf->ram + tsf->roff, size);
tsf->roff += size;
pthread_mutex_unlock(&tsf->ram_lock);
return size;
} else {
size = read(tsf ? tsf->rfd : fd, buf, size);
if (size > 0 && tsf)
tsf->roff += size;
return size;
}
}
static ssize_t _read_pktbuf ( timeshift_file_t *tsf, int fd, pktbuf_t **pktbuf )
{ {
ssize_t r, cnt = 0; ssize_t r, cnt = 0;
size_t sz; size_t sz;
/* Size */ /* Size */
r = read(fd, &sz, sizeof(sz)); r = _read_buf(tsf, fd, &sz, sizeof(sz));
if (r < 0) return -1; if (r < 0) return -1;
if (r != sizeof(sz)) return 0; if (r != sizeof(sz)) return 0;
cnt += r; cnt += r;
@ -60,7 +77,7 @@ static ssize_t _read_pktbuf ( int fd, pktbuf_t **pktbuf )
/* Data */ /* Data */
*pktbuf = pktbuf_alloc(NULL, sz); *pktbuf = pktbuf_alloc(NULL, sz);
r = read(fd, (*pktbuf)->pb_data, sz); r = _read_buf(tsf, fd, (*pktbuf)->pb_data, sz);
if (r != sz) { if (r != sz) {
free((*pktbuf)->pb_data); free((*pktbuf)->pb_data);
free(*pktbuf); free(*pktbuf);
@ -72,7 +89,7 @@ static ssize_t _read_pktbuf ( int fd, pktbuf_t **pktbuf )
} }
static ssize_t _read_msg ( int fd, streaming_message_t **sm ) static ssize_t _read_msg ( timeshift_file_t *tsf, int fd, streaming_message_t **sm )
{ {
ssize_t r, cnt = 0; ssize_t r, cnt = 0;
size_t sz; size_t sz;
@ -85,7 +102,7 @@ static ssize_t _read_msg ( int fd, streaming_message_t **sm )
*sm = NULL; *sm = NULL;
/* Size */ /* Size */
r = read(fd, &sz, sizeof(sz)); r = _read_buf(tsf, fd, &sz, sizeof(sz));
if (r < 0) return -1; if (r < 0) return -1;
if (r != sizeof(sz)) return 0; if (r != sizeof(sz)) return 0;
cnt += r; cnt += r;
@ -97,13 +114,13 @@ static ssize_t _read_msg ( int fd, streaming_message_t **sm )
if (sz > 1024 * 1024) return -1; if (sz > 1024 * 1024) return -1;
/* Type */ /* Type */
r = read(fd, &type, sizeof(type)); r = _read_buf(tsf, fd, &type, sizeof(type));
if (r < 0) return -1; if (r < 0) return -1;
if (r != sizeof(type)) return 0; if (r != sizeof(type)) return 0;
cnt += r; cnt += r;
/* Time */ /* Time */
r = read(fd, &time, sizeof(time)); r = _read_buf(tsf, fd, &time, sizeof(time));
if (r < 0) return -1; if (r < 0) return -1;
if (r != sizeof(time)) return 0; if (r != sizeof(time)) return 0;
cnt += r; cnt += r;
@ -127,7 +144,7 @@ static ssize_t _read_msg ( int fd, streaming_message_t **sm )
case SMT_EXIT: case SMT_EXIT:
case SMT_SPEED: case SMT_SPEED:
if (sz != sizeof(code)) return -1; if (sz != sizeof(code)) return -1;
r = read(fd, &code, sz); r = _read_buf(tsf, fd, &code, sz);
if (r != sz) { if (r != sz) {
if (r < 0) return -1; if (r < 0) return -1;
return 0; return 0;
@ -141,7 +158,7 @@ static ssize_t _read_msg ( int fd, streaming_message_t **sm )
case SMT_MPEGTS: case SMT_MPEGTS:
case SMT_PACKET: case SMT_PACKET:
data = malloc(sz); data = malloc(sz);
r = read(fd, data, sz); r = _read_buf(tsf, fd, data, sz);
if (r != sz) { if (r != sz) {
free(data); free(data);
if (r < 0) return -1; if (r < 0) return -1;
@ -152,13 +169,13 @@ static ssize_t _read_msg ( int fd, streaming_message_t **sm )
pkt->pkt_payload = pkt->pkt_meta = NULL; pkt->pkt_payload = pkt->pkt_meta = NULL;
pkt->pkt_refcount = 0; pkt->pkt_refcount = 0;
*sm = streaming_msg_create_pkt(pkt); *sm = streaming_msg_create_pkt(pkt);
r = _read_pktbuf(fd, &pkt->pkt_meta); r = _read_pktbuf(tsf, fd, &pkt->pkt_meta);
if (r < 0) { if (r < 0) {
streaming_msg_free(*sm); streaming_msg_free(*sm);
return r; return r;
} }
cnt += r; cnt += r;
r = _read_pktbuf(fd, &pkt->pkt_payload); r = _read_pktbuf(tsf, fd, &pkt->pkt_payload);
if (r < 0) { if (r < 0) {
streaming_msg_free(*sm); streaming_msg_free(*sm);
return r; return r;
@ -317,24 +334,31 @@ static int _timeshift_skip
* Output packet * Output packet
*/ */
static int _timeshift_read static int _timeshift_read
( timeshift_t *ts, timeshift_file_t **cur_file, off_t *cur_off, int *fd, ( timeshift_t *ts, timeshift_file_t **cur_file,
streaming_message_t **sm, int *wait ) streaming_message_t **sm, int *wait )
{ {
if (*cur_file) { timeshift_file_t *tsf = *cur_file;
ssize_t r;
off_t off, ooff;
if (tsf) {
/* Open file */ /* Open file */
if (*fd < 0) { if (tsf->rfd < 0 && !tsf->ram) {
tvhtrace("timeshift", "ts %d open file %s", tsf->rfd = open(tsf->path, O_RDONLY);
ts->id, (*cur_file)->path); tvhtrace("timeshift", "ts %d open file %s (fd %i)", ts->id, tsf->path, tsf->rfd);
*fd = open((*cur_file)->path, O_RDONLY); if (tsf->rfd < 0)
if (*fd < 0)
return -1; return -1;
} }
tvhtrace("timeshift", "ts %d seek to %jd", ts->id, (intmax_t)*cur_off); tvhtrace("timeshift", "ts %d seek to %jd (fd %i)", ts->id, tsf->roff, tsf->rfd);
lseek(*fd, *cur_off, SEEK_SET); if (tsf->rfd >= 0)
if ((off = lseek(tsf->rfd, tsf->roff, SEEK_SET)) != tsf->roff)
tvherror("timeshift", "seek to %s failed (off %"PRId64" != %"PRId64"): %s",
tsf->path, (int64_t)tsf->roff, (int64_t)off, strerror(errno));
/* Read msg */ /* Read msg */
ssize_t r = _read_msg(*fd, sm); ooff = tsf->roff;
r = _read_msg(tsf, -1, sm);
if (r < 0) { if (r < 0) {
streaming_message_t *e = streaming_msg_create_code(SMT_STOP, SM_CODE_UNDEFINED_ERROR); streaming_message_t *e = streaming_msg_create_code(SMT_STOP, SM_CODE_UNDEFINED_ERROR);
streaming_target_deliver2(ts->output, e); streaming_target_deliver2(ts->output, e);
@ -349,21 +373,25 @@ static int _timeshift_read
/* Incomplete */ /* Incomplete */
if (r == 0) { if (r == 0) {
lseek(*fd, *cur_off, SEEK_SET); if (tsf->rfd >= 0) {
tvhtrace("timeshift", "ts %d seek to %jd (fd %i) (incomplete)", ts->id, tsf->roff, tsf->rfd);
if ((off = lseek(tsf->rfd, ooff, SEEK_SET)) != ooff)
tvherror("timeshift", "seek to %s failed (off %"PRId64" != %"PRId64"): %s",
tsf->path, (int64_t)ooff, (int64_t)off, strerror(errno));
}
tsf->roff = ooff;
return 0; return 0;
} }
/* Update */
*cur_off += r;
/* Special case - EOF */ /* Special case - EOF */
if (r == sizeof(size_t) || *cur_off > (*cur_file)->size) { if (r == sizeof(size_t) || tsf->roff > tsf->size) {
close(*fd); if (tsf->rfd >= 0)
*fd = -1; close(tsf->rfd);
tsf->rfd = -1;
pthread_mutex_lock(&ts->rdwr_mutex); pthread_mutex_lock(&ts->rdwr_mutex);
*cur_file = timeshift_filemgr_next(*cur_file, NULL, 0); *cur_file = timeshift_filemgr_next(tsf, NULL, 0);
pthread_mutex_unlock(&ts->rdwr_mutex); pthread_mutex_unlock(&ts->rdwr_mutex);
*cur_off = 0; // reset tsf->roff = 0; // reset
*wait = 0; *wait = 0;
/* Check SMT_START index */ /* Check SMT_START index */
@ -386,12 +414,12 @@ static int _timeshift_read
* Flush all data to live * Flush all data to live
*/ */
static int _timeshift_flush_to_live static int _timeshift_flush_to_live
( timeshift_t *ts, timeshift_file_t **cur_file, off_t *cur_off, int *fd, ( timeshift_t *ts, timeshift_file_t **cur_file,
streaming_message_t **sm, int *wait ) streaming_message_t **sm, int *wait )
{ {
time_t pts = 0; time_t pts = 0;
while (*cur_file) { while (*cur_file) {
if (_timeshift_read(ts, cur_file, cur_off, fd, sm, wait) == -1) if (_timeshift_read(ts, cur_file, sm, wait) == -1)
return -1; return -1;
if (!*sm) break; if (!*sm) break;
if ((*sm)->sm_type == SMT_PACKET) { if ((*sm)->sm_type == SMT_PACKET) {
@ -409,15 +437,15 @@ static int _timeshift_flush_to_live
* Thread * Thread
* *************************************************************************/ * *************************************************************************/
/* /*
* Timeshift thread * Timeshift thread
*/ */
void *timeshift_reader ( void *p ) void *timeshift_reader ( void *p )
{ {
timeshift_t *ts = p; timeshift_t *ts = p;
int nfds, end, fd = -1, run = 1, wait = -1; int nfds, end, run = 1, wait = -1;
timeshift_file_t *cur_file = NULL; timeshift_file_t *cur_file = NULL;
off_t cur_off = 0;
int cur_speed = 100, keyframe_mode = 0; int cur_speed = 100, keyframe_mode = 0;
int64_t pause_time = 0, play_time = 0, last_time = 0; int64_t pause_time = 0, play_time = 0, last_time = 0;
int64_t now, deliver, skip_time = 0; int64_t now, deliver, skip_time = 0;
@ -454,7 +482,7 @@ void *timeshift_reader ( void *p )
/* Control */ /* Control */
pthread_mutex_lock(&ts->state_mutex); pthread_mutex_lock(&ts->state_mutex);
if (nfds == 1) { if (nfds == 1) {
if (_read_msg(ts->rd_pipe.rd, &ctrl) > 0) { if (_read_msg(NULL, ts->rd_pipe.rd, &ctrl) > 0) {
/* Exit */ /* Exit */
if (ctrl->sm_type == SMT_EXIT) { if (ctrl->sm_type == SMT_EXIT) {
@ -494,10 +522,10 @@ void *timeshift_reader ( void *p )
ts->id); ts->id);
timeshift_writer_flush(ts); timeshift_writer_flush(ts);
pthread_mutex_lock(&ts->rdwr_mutex); pthread_mutex_lock(&ts->rdwr_mutex);
if ((cur_file = timeshift_filemgr_get(ts, 1))) { if ((cur_file = timeshift_filemgr_get(ts, 1))) {
cur_off = cur_file->size; cur_file->roff = cur_file->size;
pause_time = cur_file->last; pause_time = cur_file->last;
last_time = pause_time; last_time = pause_time;
} }
pthread_mutex_unlock(&ts->rdwr_mutex); pthread_mutex_unlock(&ts->rdwr_mutex);
} }
@ -576,9 +604,9 @@ void *timeshift_reader ( void *p )
/* Live playback (stage1) */ /* Live playback (stage1) */
if (ts->state == TS_LIVE) { if (ts->state == TS_LIVE) {
pthread_mutex_lock(&ts->rdwr_mutex); pthread_mutex_lock(&ts->rdwr_mutex);
if ((cur_file = timeshift_filemgr_get(ts, !ts->ondemand))) { if ((cur_file = timeshift_filemgr_get(ts, !ts->ondemand))) {
cur_off = cur_file->size; cur_file->roff = cur_file->size;
last_time = cur_file->last; last_time = cur_file->last;
} else { } else {
tvhlog(LOG_ERR, "timeshift", "ts %d failed to get current file", ts->id); tvhlog(LOG_ERR, "timeshift", "ts %d failed to get current file", ts->id);
skip = NULL; skip = NULL;
@ -695,23 +723,24 @@ void *timeshift_reader ( void *p )
tvhlog(LOG_DEBUG, "timeshift", "ts %d skip found pkt @ %"PRId64, ts->id, tsi->time); tvhlog(LOG_DEBUG, "timeshift", "ts %d skip found pkt @ %"PRId64, ts->id, tsi->time);
/* File changed (close) */ /* File changed (close) */
if ((tsf != cur_file) && (fd != -1)) { if ((tsf != cur_file) && cur_file && cur_file->rfd >= 0) {
close(fd); close(cur_file->rfd);
fd = -1; cur_file->rfd = -1;
} }
/* Position */ /* Position */
if (cur_file) if (cur_file)
cur_file->refcount--; cur_file->refcount--;
cur_file = tsf; if ((cur_file = tsf) != NULL) {
if (tsi) if (tsi)
cur_off = tsi->pos; cur_file->roff = tsi->pos;
else else
cur_off = 0; cur_file->roff = 0;
}
} }
/* Find packet */ /* Find packet */
if (_timeshift_read(ts, &cur_file, &cur_off, &fd, &sm, &wait) == -1) { if (_timeshift_read(ts, &cur_file, &sm, &wait) == -1) {
pthread_mutex_unlock(&ts->state_mutex); pthread_mutex_unlock(&ts->state_mutex);
break; break;
} }
@ -782,13 +811,13 @@ void *timeshift_reader ( void *p )
streaming_target_deliver2(ts->output, ctrl); streaming_target_deliver2(ts->output, ctrl);
/* Flush timeshift buffer to live */ /* Flush timeshift buffer to live */
if (_timeshift_flush_to_live(ts, &cur_file, &cur_off, &fd, &sm, &wait) == -1) if (_timeshift_flush_to_live(ts, &cur_file, &sm, &wait) == -1)
break; break;
/* Close file (if open) */ /* Close file (if open) */
if (fd != -1) { if (cur_file && cur_file->rfd >= 0) {
close(fd); close(cur_file->rfd);
fd = -1; cur_file->rfd = -1;
} }
/* Flush ALL files */ /* Flush ALL files */
@ -823,7 +852,10 @@ void *timeshift_reader ( void *p )
/* Cleanup */ /* Cleanup */
tvhpoll_destroy(pd); tvhpoll_destroy(pd);
if (fd != -1) close(fd); if (cur_file && cur_file->rfd >= 0) {
close(cur_file->rfd);
cur_file->rfd = -1;
}
if (sm) streaming_msg_free(sm); if (sm) streaming_msg_free(sm);
if (ctrl) streaming_msg_free(ctrl); if (ctrl) streaming_msg_free(ctrl);
tvhtrace("timeshift", "ts %d exit reader thread", ts->id); tvhtrace("timeshift", "ts %d exit reader thread", ts->id);

View file

@ -36,7 +36,7 @@
/* /*
* Write data (retry on EAGAIN) * Write data (retry on EAGAIN)
*/ */
static ssize_t _write static ssize_t _write_fd
( int fd, const void *buf, size_t count ) ( int fd, const void *buf, size_t count )
{ {
ssize_t r; ssize_t r;
@ -54,25 +54,76 @@ static ssize_t _write
return count == n ? n : -1; return count == n ? n : -1;
} }
static ssize_t _write
( timeshift_file_t *tsf, const void *buf, size_t count )
{
uint8_t *ram;
size_t alloc;
if (tsf->ram) {
pthread_mutex_lock(&tsf->ram_lock);
if (tsf->ram_size < tsf->woff + count) {
if (tsf->ram_size >= timeshift_ram_segment_size)
alloc = MAX(count, 64*1024);
else
alloc = MAX(count, 4*1024*1024);
ram = realloc(tsf->ram, tsf->ram_size + alloc);
if (ram == NULL) {
tvhwarn("timeshift", "RAM timeshift memalloc failed");
pthread_mutex_unlock(&tsf->ram_lock);
return -1;
}
tsf->ram = ram;
tsf->ram_size += alloc;
}
memcpy(tsf->ram + tsf->woff, buf, count);
tsf->woff += count;
pthread_mutex_unlock(&tsf->ram_lock);
return count;
}
return _write_fd(tsf->wfd, buf, count);
}
/* /*
* Write message * Write message
*/ */
static ssize_t _write_msg static ssize_t _write_msg
( timeshift_file_t *tsf, streaming_message_type_t type, int64_t time,
const void *buf, size_t len )
{
size_t len2 = len + sizeof(type) + sizeof(time);
ssize_t err, ret;
ret = err = _write(tsf, &len2, sizeof(len2));
if (err < 0) return err;
err = _write(tsf, &type, sizeof(type));
if (err < 0) return err;
ret += err;
err = _write(tsf, &time, sizeof(time));
if (err < 0) return err;
ret += err;
if (len) {
err = _write(tsf, buf, len);
if (err < 0) return err;
ret += err;
}
return ret;
}
static ssize_t _write_msg_fd
( int fd, streaming_message_type_t type, int64_t time, ( int fd, streaming_message_type_t type, int64_t time,
const void *buf, size_t len ) const void *buf, size_t len )
{ {
size_t len2 = len + sizeof(type) + sizeof(time); size_t len2 = len + sizeof(type) + sizeof(time);
ssize_t err, ret; ssize_t err, ret;
ret = err = _write(fd, &len2, sizeof(len2)); ret = err = _write_fd(fd, &len2, sizeof(len2));
if (err < 0) return err; if (err < 0) return err;
err = _write(fd, &type, sizeof(type)); err = _write_fd(fd, &type, sizeof(type));
if (err < 0) return err; if (err < 0) return err;
ret += err; ret += err;
err = _write(fd, &time, sizeof(time)); err = _write_fd(fd, &time, sizeof(time));
if (err < 0) return err; if (err < 0) return err;
ret += err; ret += err;
if (len) { if (len) {
err = _write(fd, buf, len); err = _write_fd(fd, buf, len);
if (err < 0) return err; if (err < 0) return err;
ret += err; ret += err;
} }
@ -82,18 +133,18 @@ static ssize_t _write_msg
/* /*
* Write packet buffer * Write packet buffer
*/ */
static int _write_pktbuf ( int fd, pktbuf_t *pktbuf ) static int _write_pktbuf ( timeshift_file_t *tsf, pktbuf_t *pktbuf )
{ {
ssize_t ret, err; ssize_t ret, err;
if (pktbuf) { if (pktbuf) {
ret = err = _write(fd, &pktbuf->pb_size, sizeof(pktbuf->pb_size)); ret = err = _write(tsf, &pktbuf->pb_size, sizeof(pktbuf->pb_size));
if (err < 0) return err; if (err < 0) return err;
err = _write(fd, pktbuf->pb_data, pktbuf->pb_size); err = _write(tsf, pktbuf->pb_data, pktbuf->pb_size);
if (err < 0) return err; if (err < 0) return err;
ret += err; ret += err;
} else { } else {
size_t sz = 0; size_t sz = 0;
ret = _write(fd, &sz, sizeof(sz)); ret = _write(tsf, &sz, sizeof(sz));
} }
return ret; return ret;
} }
@ -102,24 +153,24 @@ static int _write_pktbuf ( int fd, pktbuf_t *pktbuf )
* Write signal status * Write signal status
*/ */
ssize_t timeshift_write_sigstat ssize_t timeshift_write_sigstat
( int fd, int64_t time, signal_status_t *sigstat ) ( timeshift_file_t *tsf, int64_t time, signal_status_t *sigstat )
{ {
return _write_msg(fd, SMT_SIGNAL_STATUS, time, sigstat, return _write_msg(tsf, SMT_SIGNAL_STATUS, time, sigstat,
sizeof(signal_status_t)); sizeof(signal_status_t));
} }
/* /*
* Write packet * Write packet
*/ */
ssize_t timeshift_write_packet ( int fd, int64_t time, th_pkt_t *pkt ) ssize_t timeshift_write_packet ( timeshift_file_t *tsf, int64_t time, th_pkt_t *pkt )
{ {
ssize_t ret = 0, err; ssize_t ret = 0, err;
ret = err = _write_msg(fd, SMT_PACKET, time, pkt, sizeof(th_pkt_t)); ret = err = _write_msg(tsf, SMT_PACKET, time, pkt, sizeof(th_pkt_t));
if (err <= 0) return err; if (err <= 0) return err;
err = _write_pktbuf(fd, pkt->pkt_meta); err = _write_pktbuf(tsf, pkt->pkt_meta);
if (err <= 0) return err; if (err <= 0) return err;
ret += err; ret += err;
err = _write_pktbuf(fd, pkt->pkt_payload); err = _write_pktbuf(tsf, pkt->pkt_payload);
if (err <= 0) return err; if (err <= 0) return err;
ret += err; ret += err;
return ret; return ret;
@ -128,9 +179,9 @@ ssize_t timeshift_write_packet ( int fd, int64_t time, th_pkt_t *pkt )
/* /*
* Write MPEGTS data * Write MPEGTS data
*/ */
ssize_t timeshift_write_mpegts ( int fd, int64_t time, void *data ) ssize_t timeshift_write_mpegts ( timeshift_file_t *tsf, int64_t time, void *data )
{ {
return _write_msg(fd, SMT_MPEGTS, time, data, 188); return _write_msg(tsf, SMT_MPEGTS, time, data, 188);
} }
/* /*
@ -138,7 +189,7 @@ ssize_t timeshift_write_mpegts ( int fd, int64_t time, void *data )
*/ */
ssize_t timeshift_write_skip ( int fd, streaming_skip_t *skip ) ssize_t timeshift_write_skip ( int fd, streaming_skip_t *skip )
{ {
return _write_msg(fd, SMT_SKIP, 0, skip, sizeof(streaming_skip_t)); return _write_msg_fd(fd, SMT_SKIP, 0, skip, sizeof(streaming_skip_t));
} }
/* /*
@ -146,7 +197,7 @@ ssize_t timeshift_write_skip ( int fd, streaming_skip_t *skip )
*/ */
ssize_t timeshift_write_speed ( int fd, int speed ) ssize_t timeshift_write_speed ( int fd, int speed )
{ {
return _write_msg(fd, SMT_SPEED, 0, &speed, sizeof(speed)); return _write_msg_fd(fd, SMT_SPEED, 0, &speed, sizeof(speed));
} }
/* /*
@ -154,7 +205,7 @@ ssize_t timeshift_write_speed ( int fd, int speed )
*/ */
ssize_t timeshift_write_stop ( int fd, int code ) ssize_t timeshift_write_stop ( int fd, int code )
{ {
return _write_msg(fd, SMT_STOP, 0, &code, sizeof(code)); return _write_msg_fd(fd, SMT_STOP, 0, &code, sizeof(code));
} }
/* /*
@ -163,16 +214,16 @@ ssize_t timeshift_write_stop ( int fd, int code )
ssize_t timeshift_write_exit ( int fd ) ssize_t timeshift_write_exit ( int fd )
{ {
int code = 0; int code = 0;
return _write_msg(fd, SMT_EXIT, 0, &code, sizeof(code)); return _write_msg_fd(fd, SMT_EXIT, 0, &code, sizeof(code));
} }
/* /*
* Write end of file (special internal message) * Write end of file (special internal message)
*/ */
ssize_t timeshift_write_eof ( int fd ) ssize_t timeshift_write_eof ( timeshift_file_t *tsf )
{ {
size_t sz = 0; size_t sz = 0;
return _write(fd, &sz, sizeof(sz)); return _write(tsf, &sz, sizeof(sz));
} }
/* ************************************************************************** /* **************************************************************************
@ -200,9 +251,9 @@ static inline ssize_t _process_msg0
if (SCT_ISVIDEO(ss->ss_components[i].ssc_type)) if (SCT_ISVIDEO(ss->ss_components[i].ssc_type))
ts->vididx = ss->ss_components[i].ssc_index; ts->vididx = ss->ss_components[i].ssc_index;
} else if (sm->sm_type == SMT_SIGNAL_STATUS) } else if (sm->sm_type == SMT_SIGNAL_STATUS)
err = timeshift_write_sigstat(tsf->fd, sm->sm_time, sm->sm_data); err = timeshift_write_sigstat(tsf, sm->sm_time, sm->sm_data);
else if (sm->sm_type == SMT_PACKET) { else if (sm->sm_type == SMT_PACKET) {
err = timeshift_write_packet(tsf->fd, sm->sm_time, sm->sm_data); err = timeshift_write_packet(tsf, sm->sm_time, sm->sm_data);
if (err > 0) { if (err > 0) {
th_pkt_t *pkt = sm->sm_data; th_pkt_t *pkt = sm->sm_data;
@ -216,7 +267,7 @@ static inline ssize_t _process_msg0
} }
} }
} else if (sm->sm_type == SMT_MPEGTS) } else if (sm->sm_type == SMT_MPEGTS)
err = timeshift_write_mpegts(tsf->fd, sm->sm_time, sm->sm_data); err = timeshift_write_mpegts(tsf, sm->sm_time, sm->sm_data);
else else
err = 0; err = 0;
@ -225,6 +276,8 @@ static inline ssize_t _process_msg0
tsf->last = sm->sm_time; tsf->last = sm->sm_time;
tsf->size += err; tsf->size += err;
atomic_add_u64(&timeshift_total_size, err); atomic_add_u64(&timeshift_total_size, err);
if (tsf->ram)
atomic_add_u64(&timeshift_total_ram_size, err);
} }
return err; return err;
} }
@ -265,7 +318,7 @@ static void _process_msg
case SMT_MPEGTS: case SMT_MPEGTS:
case SMT_PACKET: case SMT_PACKET:
pthread_mutex_lock(&ts->rdwr_mutex); pthread_mutex_lock(&ts->rdwr_mutex);
if ((tsf = timeshift_filemgr_get(ts, 1)) && (tsf->fd != -1)) { if ((tsf = timeshift_filemgr_get(ts, 1)) && (tsf->wfd >= 0 || tsf->ram)) {
if ((err = _process_msg0(ts, tsf, &sm)) < 0) { if ((err = _process_msg0(ts, tsf, &sm)) < 0) {
timeshift_filemgr_close(tsf); timeshift_filemgr_close(tsf);
tsf->bad = 1; tsf->bad = 1;

View file

@ -677,6 +677,7 @@ extjs_timeshift(http_connection_t *hc, const char *remain, void *opaque)
htsmsg_add_u32(m, "timeshift_max_period", timeshift_max_period / 60); htsmsg_add_u32(m, "timeshift_max_period", timeshift_max_period / 60);
htsmsg_add_u32(m, "timeshift_unlimited_size", timeshift_unlimited_size); htsmsg_add_u32(m, "timeshift_unlimited_size", timeshift_unlimited_size);
htsmsg_add_u32(m, "timeshift_max_size", timeshift_max_size / 1048576); htsmsg_add_u32(m, "timeshift_max_size", timeshift_max_size / 1048576);
htsmsg_add_u32(m, "timeshift_ram_size", timeshift_ram_size / 1048576);
pthread_mutex_unlock(&global_lock); pthread_mutex_unlock(&global_lock);
out = json_single_record(m, "config"); out = json_single_record(m, "config");
@ -696,6 +697,10 @@ extjs_timeshift(http_connection_t *hc, const char *remain, void *opaque)
timeshift_unlimited_size = http_arg_get(&hc->hc_req_args, "timeshift_unlimited_size") ? 1 : 0; timeshift_unlimited_size = http_arg_get(&hc->hc_req_args, "timeshift_unlimited_size") ? 1 : 0;
if ((str = http_arg_get(&hc->hc_req_args, "timeshift_max_size"))) if ((str = http_arg_get(&hc->hc_req_args, "timeshift_max_size")))
timeshift_max_size = atol(str) * 1048576LL; timeshift_max_size = atol(str) * 1048576LL;
if ((str = http_arg_get(&hc->hc_req_args, "timeshift_ram_size"))) {
timeshift_ram_size = atol(str) * 1048576LL;
timeshift_ram_segment_size = timeshift_ram_size / 10;
}
timeshift_save(); timeshift_save();
pthread_mutex_unlock(&global_lock); pthread_mutex_unlock(&global_lock);

View file

@ -12,7 +12,8 @@ tvheadend.timeshift = function(panel, index) {
'timeshift_enabled', 'timeshift_ondemand', 'timeshift_enabled', 'timeshift_ondemand',
'timeshift_path', 'timeshift_path',
'timeshift_unlimited_period', 'timeshift_max_period', 'timeshift_unlimited_period', 'timeshift_max_period',
'timeshift_unlimited_size', 'timeshift_max_size' 'timeshift_unlimited_size', 'timeshift_max_size',
'timeshift_ram_size'
] ]
); );
@ -59,6 +60,13 @@ tvheadend.timeshift = function(panel, index) {
width: 300 width: 300
}); });
var timeshiftRamSize = new Ext.form.NumberField({
fieldLabel: 'Max. RAM Size (MB)',
name: 'timeshift_ram_size',
allowBlank: false,
width: 300
});
var timeshiftUnlSize = new Ext.form.Checkbox({ var timeshiftUnlSize = new Ext.form.Checkbox({
fieldLabel: 'Unlimited size', fieldLabel: 'Unlimited size',
name: 'timeshift_unlimited_size', name: 'timeshift_unlimited_size',
@ -100,7 +108,7 @@ tvheadend.timeshift = function(panel, index) {
width: 500, width: 500,
autoHeight: true, autoHeight: true,
border: false, border: false,
items : [timeshiftMaxPeriod, timeshiftMaxSize] items : [timeshiftMaxPeriod, timeshiftMaxSize, timeshiftRamSize]
}); });
var timeshiftPanelB = new Ext.form.FieldSet({ var timeshiftPanelB = new Ext.form.FieldSet({