timeshift: early prototype of the new timeshift feature.

Currently this supports pause/resume, and speed control. FF up to 4x uses
full frame output, faster than that or reverse uses i-frame only output.
This causes problems with some players and needs work.

Also buffers are done at the subscription level which means the disk space
is not shared even if it holds the same content. And more importantly
this means you cannot yet record the timeshift buffer like on a standard
PVR.
This commit is contained in:
Adam Sutton 2012-10-12 17:24:27 +01:00
parent 78df091e41
commit 9180546fc6
22 changed files with 1873 additions and 32 deletions

View file

@ -122,7 +122,7 @@ SRCS-$(CONFIG_LINUXDVB) += src/epggrab/otamux.c\
src/epggrab/support/freesat_huffman.c \
SRCS += src/plumbing/tsfix.c \
src/plumbing/globalheaders.c \
src/plumbing/globalheaders.c
SRCS += src/dvr/dvr_db.c \
src/dvr/dvr_rec.c \
@ -145,6 +145,13 @@ SRCS += src/muxer.c \
# Optional code
#
# Timeshift
SRCS-${CONFIG_TIMESHIFT} += \
src/timeshift.c \
src/timeshift/timeshift_filemgr.c \
src/timeshift/timeshift_writer.c \
src/timeshift/timeshift_reader.c \
# DVB
SRCS-${CONFIG_LINUXDVB} += \
src/dvb/dvb.c \

1
configure vendored
View file

@ -24,6 +24,7 @@ OPTIONS=(
"avahi:auto"
"zlib:auto"
"libav:auto"
"timeshift:no"
"bundle:no"
"dvbcsa:no"
)

View file

@ -26,11 +26,24 @@ static htsmsg_t *config;
void config_init ( void )
{
int save = 0;
uint32_t u32;
config = hts_settings_load("config");
if (!config) {
tvhlog(LOG_DEBUG, "config", "no configuration, loading defaults");
config = htsmsg_create_map();
}
/* Defaults */
if (htsmsg_get_u32(config, "timeshiftperiod", &u32))
save |= config_set_timeshift_period(0);
if (htsmsg_get_u32(config, "timeshiftsize", &u32))
save |= config_set_timeshift_size(0);
/* Save defaults */
if (save)
config_save();
}
void config_save ( void )
@ -43,6 +56,29 @@ htsmsg_t *config_get_all ( void )
return htsmsg_copy(config);
}
static int _config_set_str ( const char *fld, const char *val )
{
const char *c = htsmsg_get_str(config, fld);
if (!c || strcmp(c, val)) {
if (c) htsmsg_delete_field(config, fld);
htsmsg_add_str(config, fld, val);
return 1;
}
return 0;
}
static int _config_set_u32 ( const char *fld, uint32_t val )
{
uint32_t u32;
int ret = htsmsg_get_u32(config, fld, &u32);
if (ret || (u32 != val)) {
if (!ret) htsmsg_delete_field(config, fld);
htsmsg_add_u32(config, fld, val);
return 1;
}
return 0;
}
const char *config_get_language ( void )
{
return htsmsg_get_str(config, "language");
@ -50,13 +86,7 @@ const char *config_get_language ( void )
int config_set_language ( const char *lang )
{
const char *c = config_get_language();
if (!c || strcmp(c, lang)) {
if (c) htsmsg_delete_field(config, "language");
htsmsg_add_str(config, "language", lang);
return 1;
}
return 0;
return _config_set_str("language", lang);
}
const char *config_get_muxconfpath ( void )
@ -66,11 +96,35 @@ const char *config_get_muxconfpath ( void )
int config_set_muxconfpath ( const char *path )
{
const char *c = config_get_muxconfpath();
if (!c || strcmp(c, path)) {
if (c) htsmsg_delete_field(config, "muxconfpath");
htsmsg_add_str(config, "muxconfpath", path);
return 1;
}
return 0;
return _config_set_str("muxconfpath", path);
}
const char *config_get_timeshift_path ( void )
{
return htsmsg_get_str(config, "timeshiftpath");
}
int config_set_timeshift_path ( const char *path )
{
return _config_set_str("timeshiftpath", path);
}
uint32_t config_get_timeshift_period ( void )
{
return htsmsg_get_u32_or_default(config, "timeshiftperiod", 0);
}
int config_set_timeshift_period ( uint32_t period )
{
return _config_set_u32("timeshiftperiod", period);
}
uint32_t config_get_timeshift_size ( void )
{
return htsmsg_get_u32_or_default(config, "timeshiftsize", 0);
}
int config_set_timeshift_size ( uint32_t size )
{
return _config_set_u32("timeshiftsize", size);
}

View file

@ -36,4 +36,16 @@ const char *config_get_language ( void );
int config_set_language ( const char *str )
__attribute__((warn_unused_result));
const char *config_get_timeshift_path ( void );
int config_set_timeshift_path ( const char *str )
__attribute__((warn_unused_result));
uint32_t config_get_timeshift_period ( void );
int config_set_timeshift_period ( uint32_t val )
__attribute__((warn_unused_result));
uint32_t config_get_timeshift_size ( void );
int config_set_timeshift_size ( uint32_t val )
__attribute__((warn_unused_result));
#endif /* __TVH_CONFIG__H__ */

View file

@ -538,6 +538,8 @@ dvr_thread(void *aux)
}
break;
case SMT_SPEED:
case SMT_SKIP:
case SMT_SIGNAL_STATUS:
break;

View file

@ -43,6 +43,9 @@
#include "epg.h"
#include "plumbing/tsfix.h"
#include "imagecache.h"
#if ENABLE_TIMESHIFT
#include "timeshift.h"
#endif
#include <sys/statvfs.h>
#include "settings.h"
@ -170,6 +173,10 @@ typedef struct htsp_subscription {
streaming_target_t hs_input;
streaming_target_t *hs_tsfix;
#if ENABLE_TIMESHIFT
streaming_target_t *hs_tshift;
#endif
htsp_msg_q_t hs_q;
time_t hs_last_report; /* Last queue status report sent */
@ -274,6 +281,9 @@ htsp_subscription_destroy(htsp_connection_t *htsp, htsp_subscription_t *hs)
if(hs->hs_tsfix != NULL)
tsfix_destroy(hs->hs_tsfix);
htsp_flush_queue(htsp, &hs->hs_q);
#if ENABLE_TIMESHIFT
if(hs->hs_tshift) timeshift_destroy(hs->hs_tshift);
#endif
free(hs);
}
@ -1228,6 +1238,9 @@ static htsmsg_t *
htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in)
{
uint32_t chid, sid, weight, req90khz, normts;
#if ENABLE_TIMESHIFT
uint32_t timeshiftPeriod;
#endif
channel_t *ch;
htsp_subscription_t *hs;
const char *str;
@ -1249,6 +1262,11 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in)
req90khz = htsmsg_get_u32_or_default(in, "90khz", 0);
normts = htsmsg_get_u32_or_default(in, "normts", 0);
#if ENABLE_TIMESHIFT
timeshiftPeriod = htsmsg_get_u32_or_default(in, "timeshiftPeriod", 0);
timeshiftPeriod = MIN(timeshiftPeriod, config_get_timeshift_period());
#endif
/*
* We send the reply now to avoid the user getting the 'subscriptionStart'
* async message before the reply to 'subscribe'.
@ -1279,14 +1297,19 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in)
LIST_INSERT_HEAD(&htsp->htsp_subscriptions, hs, hs_link);
streaming_target_init(&hs->hs_input, htsp_streaming_input, hs, 0);
streaming_target_t *st;
streaming_target_t *st = &hs->hs_input;
if(normts) {
hs->hs_tsfix = tsfix_create(&hs->hs_input);
st = hs->hs_tsfix;
} else {
st = &hs->hs_input;
if(normts)
st = hs->hs_tsfix = tsfix_create(st);
#if ENABLE_TIMESHIFT
if (timeshiftPeriod != 0) {
if (timeshiftPeriod == ~0)
tvhlog(LOG_DEBUG, "htsp", "using timeshift buffer (unlimited)");
else
tvhlog(LOG_DEBUG, "htsp", "using timeshift buffer (%u mins)", timeshiftPeriod / 60);
st = hs->hs_tshift = timeshift_create(st, timeshiftPeriod);
}
#endif
hs->hs_s = subscription_create_from_channel(ch, weight,
htsp->htsp_logname,
@ -1353,6 +1376,72 @@ htsp_method_change_weight(htsp_connection_t *htsp, htsmsg_t *in)
return NULL;
}
/**
* Skip stream
*/
static htsmsg_t *
htsp_method_skip(htsp_connection_t *htsp, htsmsg_t *in)
{
htsp_subscription_t *hs;
uint32_t sid, abs;
int64_t s64;
streaming_skip_t skip;
if(htsmsg_get_u32(in, "subscriptionId", &sid))
return htsp_error("Missing argument 'subscriptionId'");
abs = htsmsg_get_u32_or_default(in, "absolute", 0);
if(!htsmsg_get_s64(in, "time", &s64)) {
skip.type = abs ? SMT_SKIP_ABS_TIME : SMT_SKIP_REL_TIME;
skip.time = s64;
} else if (!htsmsg_get_s64(in, "size", &s64)) {
skip.type = abs ? SMT_SKIP_ABS_SIZE : SMT_SKIP_REL_SIZE;
skip.size = s64;
} else {
return htsp_error("Missing argument 'time' or 'size'");
}
LIST_FOREACH(hs, &htsp->htsp_subscriptions, hs_link)
if(hs->hs_sid == sid)
break;
if(hs == NULL)
return htsp_error("Requested subscription does not exist");
subscription_set_skip(hs->hs_s, &skip);
htsp_reply(htsp, in, htsmsg_create_map());
return NULL;
}
/*
* Set stream speed
*/
static htsmsg_t *
htsp_method_speed(htsp_connection_t *htsp, htsmsg_t *in)
{
htsp_subscription_t *hs;
uint32_t sid;
int32_t speed;
if(htsmsg_get_u32(in, "subscriptionId", &sid))
return htsp_error("Missing argument 'subscriptionId'");
if(htsmsg_get_s32(in, "speed", &speed))
return htsp_error("Missing argument 'speed'");
LIST_FOREACH(hs, &htsp->htsp_subscriptions, hs_link)
if(hs->hs_sid == sid)
break;
if(hs == NULL)
return htsp_error("Requested subscription does not exist");
subscription_set_speed(hs->hs_s, speed);
htsp_reply(htsp, in, htsmsg_create_map());
return NULL;
}
/**
* Open file
@ -1529,6 +1618,8 @@ struct {
{ "subscribe", htsp_method_subscribe, ACCESS_STREAMING},
{ "unsubscribe", htsp_method_unsubscribe, ACCESS_STREAMING},
{ "subscriptionChangeWeight", htsp_method_change_weight, ACCESS_STREAMING},
{ "subscriptionSkip", htsp_method_skip, ACCESS_STREAMING},
{ "subscriptionSpeed", htsp_method_speed, ACCESS_STREAMING},
{ "fileOpen", htsp_method_file_open, ACCESS_RECORDER},
{ "fileRead", htsp_method_file_read, ACCESS_RECORDER},
{ "fileClose", htsp_method_file_close, ACCESS_RECORDER},
@ -2065,7 +2156,11 @@ const static char frametypearray[PKT_NTYPES] = {
* Build a htsmsg from a th_pkt and enqueue it on our HTSP service
*/
static void
#if ENABLE_TIMESHIFT
htsp_stream_deliver(htsp_subscription_t *hs, th_pkt_t *pkt, uint64_t timeshift)
#else
htsp_stream_deliver(htsp_subscription_t *hs, th_pkt_t *pkt)
#endif
{
htsmsg_t *m;
htsp_msg_t *hm;
@ -2093,6 +2188,11 @@ htsp_stream_deliver(htsp_subscription_t *hs, th_pkt_t *pkt)
htsmsg_add_u32(m, "stream", pkt->pkt_componentindex);
htsmsg_add_u32(m, "com", pkt->pkt_commercial);
#if ENABLE_TIMESHIFT
if (timeshift)
htsmsg_add_s64(m, "timeshift", timeshift);
#endif
if(pkt->pkt_pts != PTS_UNSET) {
int64_t pts = hs->hs_90khz ? pkt->pkt_pts : ts_rescale(pkt->pkt_pts, 1000000);
@ -2305,6 +2405,19 @@ htsp_subscription_signal_status(htsp_subscription_t *hs, signal_status_t *sig)
htsp_send_message(hs->hs_htsp, m, &hs->hs_htsp->htsp_hmq_qstatus);
}
/**
*
*/
static void
htsp_subscription_speed(htsp_subscription_t *hs, int speed)
{
htsmsg_t *m = htsmsg_create_map();
htsmsg_add_str(m, "method", "subscriptionSpeed");
htsmsg_add_u32(m, "subscriptionId", hs->hs_sid);
htsmsg_add_u32(m, "speed", speed);
htsp_send(hs->hs_htsp, m, NULL, &hs->hs_q, 0);
}
/**
*
*/
@ -2315,7 +2428,12 @@ htsp_streaming_input(void *opaque, streaming_message_t *sm)
switch(sm->sm_type) {
case SMT_PACKET:
htsp_stream_deliver(hs, sm->sm_data); // reference is transfered
#if ENABLE_TIMESHIFT
htsp_stream_deliver(hs, sm->sm_data, sm->sm_timeshift);
#else
htsp_stream_deliver(hs, sm->sm_data);
#endif
// reference is transfered
sm->sm_data = NULL;
break;
@ -2344,6 +2462,13 @@ htsp_streaming_input(void *opaque, streaming_message_t *sm)
case SMT_EXIT:
abort();
case SMT_SKIP:
break;
case SMT_SPEED:
htsp_subscription_speed(hs, sm->sm_code);
break;
}
streaming_msg_free(sm);
}

View file

@ -61,6 +61,7 @@
#include "muxes.h"
#include "config2.h"
#include "imagecache.h"
#include "timeshift.h"
#if ENABLE_LIBAV
#include "libav.h"
#endif
@ -598,6 +599,10 @@ main(int argc, char **argv)
v4l_init();
#endif
#if ENABLE_TIMESHIFT
timeshift_init();
#endif
tcp_server_init();
http_server_init();
webui_init();
@ -659,6 +664,10 @@ main(int argc, char **argv)
epg_save();
#if ENABLE_TIMESHIFT
timeshift_term();
#endif
tvhlog(LOG_NOTICE, "STOP", "Exiting HTS Tvheadend");
if(opt_fork)

View file

@ -36,6 +36,14 @@ typedef struct pktbuf {
#define PKT_B_FRAME 3
#define PKT_NTYPES 4
static inline char pkt_frametype_to_char ( int frametype )
{
if (frametype == PKT_I_FRAME) return 'I';
if (frametype == PKT_P_FRAME) return 'P';
if (frametype == PKT_B_FRAME) return 'B';
return ' ';
}
typedef struct th_pkt {
int64_t pkt_dts;
int64_t pkt_pts;

View file

@ -255,6 +255,8 @@ gh_hold(globalheaders_t *gh, streaming_message_t *sm)
case SMT_SIGNAL_STATUS:
case SMT_NOSTART:
case SMT_MPEGTS:
case SMT_SPEED:
case SMT_SKIP:
streaming_target_deliver2(gh->gh_output, sm);
break;
}
@ -283,6 +285,8 @@ gh_pass(globalheaders_t *gh, streaming_message_t *sm)
case SMT_SIGNAL_STATUS:
case SMT_NOSTART:
case SMT_MPEGTS:
case SMT_SKIP:
case SMT_SPEED:
streaming_target_deliver2(gh->gh_output, sm);
break;

View file

@ -369,6 +369,8 @@ tsfix_input(void *opaque, streaming_message_t *sm)
case SMT_SIGNAL_STATUS:
case SMT_NOSTART:
case SMT_MPEGTS:
case SMT_SPEED:
case SMT_SKIP:
break;
}

View file

@ -137,6 +137,10 @@ streaming_msg_create(streaming_message_type_t type)
{
streaming_message_t *sm = malloc(sizeof(streaming_message_t));
sm->sm_type = type;
#if ENABLE_TIMESHIFT
sm->sm_time = 0;
sm->sm_timeshift = 0;
#endif
return sm;
}
@ -188,7 +192,11 @@ streaming_msg_clone(streaming_message_t *src)
streaming_message_t *dst = malloc(sizeof(streaming_message_t));
streaming_start_t *ss;
dst->sm_type = src->sm_type;
dst->sm_type = src->sm_type;
#if ENABLE_TIMESHIFT
dst->sm_time = src->sm_time;
dst->sm_timeshift = src->sm_timeshift;
#endif
switch(src->sm_type) {
@ -202,11 +210,17 @@ streaming_msg_clone(streaming_message_t *src)
atomic_add(&ss->ss_refcount, 1);
break;
case SMT_SKIP:
dst->sm_data = malloc(sizeof(streaming_skip_t));
memcpy(dst->sm_data, src->sm_data, sizeof(streaming_skip_t));
break;
case SMT_SIGNAL_STATUS:
dst->sm_data = malloc(sizeof(signal_status_t));
memcpy(dst->sm_data, src->sm_data, sizeof(signal_status_t));
break;
case SMT_SPEED:
case SMT_STOP:
case SMT_SERVICE_STATUS:
case SMT_NOSTART:
@ -264,17 +278,13 @@ streaming_msg_free(streaming_message_t *sm)
break;
case SMT_STOP:
break;
case SMT_EXIT:
break;
case SMT_SERVICE_STATUS:
break;
case SMT_NOSTART:
case SMT_SPEED:
break;
case SMT_SKIP:
case SMT_SIGNAL_STATUS:
free(sm->sm_data);
break;

View file

@ -545,8 +545,6 @@ subscription_dummy_join(const char *id, int first)
"Dummy join %s ok", id);
}
/**
*
*/
@ -635,3 +633,41 @@ subscription_init(void)
{
gtimer_arm(&every_sec, every_sec_cb, NULL, 1);
}
/**
* Set speed
*/
void
subscription_set_speed ( th_subscription_t *s, int speed )
{
streaming_message_t *sm;
service_t *t = s->ths_service;
pthread_mutex_lock(&t->s_stream_mutex);
sm = streaming_msg_create_code(SMT_SPEED, speed);
streaming_target_deliver(s->ths_output, sm);
pthread_mutex_unlock(&t->s_stream_mutex);
}
/**
* Set skip
*/
void
subscription_set_skip ( th_subscription_t *s, const streaming_skip_t *skip )
{
streaming_message_t *sm;
service_t *t = s->ths_service;
pthread_mutex_lock(&t->s_stream_mutex);
sm = streaming_msg_create(SMT_SKIP);
sm->sm_data = malloc(sizeof(streaming_skip_t));
memcpy(sm->sm_data, skip, sizeof(streaming_skip_t));
streaming_target_deliver(s->ths_output, sm);
pthread_mutex_unlock(&t->s_stream_mutex);
}

View file

@ -111,6 +111,14 @@ th_subscription_t *subscription_create(int weight, const char *name,
void subscription_change_weight(th_subscription_t *s, int weight);
void subscription_set_speed
(th_subscription_t *s, int32_t speed );
void subscription_set_skip
(th_subscription_t *s, const streaming_skip_t *skip);
void subscription_stop(th_subscription_t *s);
void subscription_unlink_service(th_subscription_t *s, int reason);
void subscription_dummy_join(const char *id, int first);

184
src/timeshift.c Normal file
View file

@ -0,0 +1,184 @@
/**
* TV headend - Timeshift
* Copyright (C) 2012 Adam Sutton
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tvheadend.h"
#include "streaming.h"
#include "timeshift.h"
#include "timeshift/private.h"
#include "config2.h"
#include "settings.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <assert.h>
#include <stdio.h>
static int timeshift_index = 0;
/*
* Intialise global file manager
*/
void timeshift_init ( void )
{
timeshift_filemgr_init();
}
/*
* Terminate global file manager
*/
void timeshift_term ( void )
{
timeshift_filemgr_term();
}
/*
* Receive data
*/
static void timeshift_input
( void *opaque, streaming_message_t *sm )
{
timeshift_t *ts = opaque;
pthread_mutex_lock(&ts->state_mutex);
/* Control */
if (sm->sm_type == SMT_SKIP) {
if (ts->state >= TS_LIVE)
timeshift_write_skip(ts->rd_pipe.wr, sm->sm_data);
} else if (sm->sm_type == SMT_SPEED) {
if (ts->state >= TS_LIVE)
timeshift_write_speed(ts->rd_pipe.wr, sm->sm_code);
}
else {
/* Start */
if (sm->sm_type == SMT_START && ts->state == TS_INIT) {
ts->state = TS_LIVE;
}
/* Pass-thru */
if (ts->state <= TS_LIVE) {
streaming_target_deliver2(ts->output, streaming_msg_clone(sm));
}
/* Buffer to disk */
if (ts->state >= TS_LIVE) {
sm->sm_time = getmonoclock();
streaming_target_deliver2(&ts->wr_queue.sq_st, sm);
} else
streaming_msg_free(sm);
/* Exit/Stop */
if (sm->sm_type == SMT_EXIT ||
(sm->sm_type == SMT_STOP && sm->sm_code == 0)) {
timeshift_write_exit(ts->rd_pipe.wr);
ts->state = TS_EXIT;
}
}
pthread_mutex_unlock(&ts->state_mutex);
}
/**
*
*/
void
timeshift_destroy(streaming_target_t *pad)
{
timeshift_t *ts = (timeshift_t*)pad;
timeshift_file_t *tsf;
streaming_message_t *sm;
/* Must hold global lock */
lock_assert(&global_lock);
/* Ensure the thread exits */
// Note: this is a workaround for the fact the Q might have been flushed
// in reader thread (VERY unlikely)
sm = streaming_msg_create(SMT_EXIT);
streaming_target_deliver2(&ts->wr_queue.sq_st, sm);
/* Wait for all threads */
pthread_join(ts->rd_thread, NULL);
pthread_join(ts->wr_thread, NULL);
pthread_join(ts->rm_thread, NULL);
/* Shut stuff down */
streaming_queue_deinit(&ts->wr_queue);
close(ts->rd_pipe.rd);
close(ts->rd_pipe.wr);
/* Flush files */
while ((tsf = TAILQ_FIRST(&ts->files)))
timeshift_filemgr_remove(ts, tsf, 1);
free(ts->path);
free(ts);
}
/**
* Create timeshift buffer
*
* max_period of buffer in seconds (0 = unlimited)
* max_size of buffer in bytes (0 = unlimited)
*/
streaming_target_t *timeshift_create
(streaming_target_t *out, time_t max_time)
{
char buf[512];
timeshift_t *ts = calloc(1, sizeof(timeshift_t));
/* Must hold global lock */
lock_assert(&global_lock);
/* Create directories */
if (timeshift_filemgr_makedirs(timeshift_index, buf, sizeof(buf)))
return NULL;
/* Setup structure */
TAILQ_INIT(&ts->files);
ts->output = out;
ts->path = strdup(buf);
ts->max_time = max_time;
ts->state = TS_INIT;
ts->full = 0;
ts->vididx = -1;
ts->id = timeshift_index;
pthread_mutex_init(&ts->rdwr_mutex, NULL);
pthread_mutex_init(&ts->state_mutex, NULL);
/* Initialise output */
tvh_pipe(O_NONBLOCK, &ts->rd_pipe);
/* Initialise input */
streaming_queue_init(&ts->wr_queue, 0);
streaming_target_init(&ts->input, timeshift_input, ts, 0);
pthread_create(&ts->wr_thread, NULL, timeshift_writer, ts);
pthread_create(&ts->rd_thread, NULL, timeshift_reader, ts);
/* Update index */
timeshift_index++;
return &ts->input;
}

30
src/timeshift.h Normal file
View file

@ -0,0 +1,30 @@
/*
* TV headend - Timeshift
* Copyright (C) 2012 Adam Sutton
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __TVH_TIMESHIFT_H__
#define __TVH_TIMESHIFT_H__
void timeshift_init ( void );
void timeshift_term ( void );
streaming_target_t *timeshift_create
(streaming_target_t *out, time_t max_period);
void timeshift_destroy(streaming_target_t *pad);
#endif /* __TVH_TIMESHIFT_H__ */

139
src/timeshift/private.h Normal file
View file

@ -0,0 +1,139 @@
/*
* TV headend - Timeshift
* Copyright (C) 2012 Adam Sutton
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __TVH_TIMESHIFT_PRIVATE_H__
#define __TVH_TIMESHIFT_PRIVATE_H__
#define TS_PLAY_BUF 100000 // us to buffer in TX
/**
* Indexes of import data in the stream
*/
typedef struct timeshift_index
{
off_t pos; ///< Position in the file
union {
int64_t time; ///< Packet time
void *data; ///< Associated data
};
TAILQ_ENTRY(timeshift_index) link; ///< List entry
} timeshift_index_t;
typedef TAILQ_HEAD(timeshift_index_list,timeshift_index) timeshift_index_list_t;
/**
* Timeshift file
*/
typedef struct timeshift_file
{
int fd; ///< Write descriptor
char *path; ///< Full path to file
time_t time; ///< Files coarse timestamp
size_t size; ///< Current file size;
int64_t last; ///< Latest timestamp
uint8_t bad; ///< File is broken
int refcount; ///< Reader ref count
timeshift_index_list_t iframes; ///< I-frame indexing
timeshift_index_list_t sstart; ///< Stream start messages
TAILQ_ENTRY(timeshift_file) link; ///< List entry
} timeshift_file_t;
typedef TAILQ_HEAD(timeshift_file_list,timeshift_file) timeshift_file_list_t;
/**
*
*/
typedef struct timeshift {
// Note: input MUST BE FIRST in struct
streaming_target_t input; ///< Input source
streaming_target_t *output; ///< Output dest
int id; ///< Reference number
char *path; ///< Directory containing buffer
time_t max_time; ///< Maximum period to shift
enum {
TS_INIT,
TS_EXIT,
TS_LIVE,
TS_PAUSE,
TS_PLAY,
} state; ///< Play state
pthread_mutex_t state_mutex; ///< Protect state changes
uint8_t full; ///< Buffer is full
streaming_queue_t wr_queue; ///< Writer queue
pthread_t wr_thread; ///< Writer thread
pthread_t rd_thread; ///< Reader thread
th_pipe_t rd_pipe; ///< Message passing to reader
pthread_t rm_thread; ///< Reaper thread
timeshift_file_list_t rm_list; ///< Remove files
pthread_mutex_t rdwr_mutex; ///< Buffer protection
timeshift_file_list_t files; ///< List of files
int vididx; ///< Index of (current) video stream
} timeshift_t;
/*
* Write functions
*/
ssize_t timeshift_write_start ( int fd, int64_t time, streaming_start_t *ss );
ssize_t timeshift_write_sigstat ( int fd, int64_t time, signal_status_t *ss );
ssize_t timeshift_write_packet ( int fd, int64_t time, th_pkt_t *pkt );
ssize_t timeshift_write_mpegts ( int fd, int64_t time, void *data );
ssize_t timeshift_write_skip ( int fd, streaming_skip_t *skip );
ssize_t timeshift_write_speed ( int fd, int speed );
ssize_t timeshift_write_stop ( int fd, int code );
ssize_t timeshift_write_exit ( int fd );
ssize_t timeshift_write_eof ( int fd );
void timeshift_writer_flush ( timeshift_t *ts );
/*
* Threads
*/
void *timeshift_reader ( void *p );
void *timeshift_writer ( void *p );
/*
* File management
*/
void timeshift_filemgr_init ( void );
void timeshift_filemgr_term ( void );
int timeshift_filemgr_makedirs ( int ts_index, char *buf, size_t len );
timeshift_file_t *timeshift_filemgr_get
( timeshift_t *ts, int create );
timeshift_file_t *timeshift_filemgr_prev
( timeshift_file_t *ts, int *end, int keep );
timeshift_file_t *timeshift_filemgr_next
( timeshift_file_t *ts, int *end, int keep );
void timeshift_filemgr_remove
( timeshift_t *ts, timeshift_file_t *tsf, int force );
void timeshift_filemgr_close ( timeshift_file_t *tsf );
#endif /* __TVH_TIMESHIFT_PRIVATE_H__ */

View file

@ -0,0 +1,306 @@
/**
* TV headend - Timeshift File Manager
* Copyright (C) 2012 Adam Sutton
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <assert.h>
#include <pthread.h>
#include "tvheadend.h"
#include "streaming.h"
#include "timeshift.h"
#include "timeshift/private.h"
#include "config2.h"
#include "settings.h"
static int timeshift_reaper_run;
static timeshift_file_list_t timeshift_reaper_list;
static pthread_t timeshift_reaper_thread;
static pthread_mutex_t timeshift_reaper_lock;
static pthread_cond_t timeshift_reaper_cond;
/* **************************************************************************
* File reaper thread
* *************************************************************************/
static void* timeshift_reaper_callback ( void *p )
{
char *dpath;
timeshift_file_t *tsf;
timeshift_index_t *ti;
streaming_message_t *sm;
pthread_mutex_lock(&timeshift_reaper_lock);
while (timeshift_reaper_run) {
/* Get next */
tsf = TAILQ_FIRST(&timeshift_reaper_list);
if (!tsf) {
pthread_cond_wait(&timeshift_reaper_cond, &timeshift_reaper_lock);
continue;
}
TAILQ_REMOVE(&timeshift_reaper_list, tsf, link);
pthread_mutex_unlock(&timeshift_reaper_lock);
#ifdef TSHFT_TRACE
tvhlog(LOG_DEBUG, "timeshift", "remove file %s", tsf->path);
#endif
/* Remove */
unlink(tsf->path);
dpath = dirname(tsf->path);
if (rmdir(dpath) == -1)
if (errno != ENOTEMPTY)
tvhlog(LOG_ERR, "timeshift", "failed to remove %s [e=%s]",
dpath, strerror(errno));
/* Free memory */
while ((ti = TAILQ_FIRST(&tsf->iframes))) {
TAILQ_REMOVE(&tsf->iframes, ti, link);
free(ti);
}
while ((ti = TAILQ_FIRST(&tsf->sstart))) {
TAILQ_REMOVE(&tsf->sstart, ti, link);
sm = ti->data;
streaming_msg_free(sm);
free(ti);
}
free(tsf->path);
free(tsf);
}
pthread_mutex_unlock(&timeshift_reaper_lock);
#ifdef TSHFT_TRACE
tvhlog(LOG_DEBUG, "timeshift", "reaper thread exit");
#endif
return NULL;
}
static void timeshift_reaper_remove ( timeshift_file_t *tsf )
{
#ifdef TSHFT_TRACE
tvhlog(LOG_DEBUG, "timeshift", "queue file for removal %s", tsf->path);
#endif
pthread_mutex_lock(&timeshift_reaper_lock);
TAILQ_INSERT_TAIL(&timeshift_reaper_list, tsf, link);
pthread_cond_signal(&timeshift_reaper_cond);
pthread_mutex_unlock(&timeshift_reaper_lock);
}
/* **************************************************************************
* File Handling
* *************************************************************************/
/*
* Get root directory
*/
static void timeshift_filemgr_get_root ( char *buf, size_t len )
{
const char *path = config_get_timeshift_path();
if (!path || !*path)
path = hts_settings_get_root();
snprintf(buf, len, "%s/timeshift", path);
}
/*
* Create timeshift directories (for a given instance)
*/
int timeshift_filemgr_makedirs ( int index, char *buf, size_t len )
{
timeshift_filemgr_get_root(buf, len);
snprintf(buf+strlen(buf), len-strlen(buf), "/%d", index);
return makedirs(buf, 0700);
}
/*
* Close file
*/
void timeshift_filemgr_close ( timeshift_file_t *tsf )
{
ssize_t r = timeshift_write_eof(tsf->fd);
if (r > 0)
tsf->size += r;
close(tsf->fd);
tsf->fd = -1;
}
/*
* Remove file
*/
void timeshift_filemgr_remove
( timeshift_t *ts, timeshift_file_t *tsf, int force )
{
if (tsf->fd != -1)
close(tsf->fd);
TAILQ_REMOVE(&ts->files, tsf, link);
timeshift_reaper_remove(tsf);
}
/*
* Get current / new file
*/
timeshift_file_t *timeshift_filemgr_get ( timeshift_t *ts, int create )
{
int fd;
struct timespec tp;
timeshift_file_t *tsf_tl, *tsf_hd, *tsf_tmp;
timeshift_index_t *ti;
char path[512];
/* Return last file */
if (!create)
return TAILQ_LAST(&ts->files, timeshift_file_list);
/* No space */
if (ts->full)
return NULL;
/* Store to file */
clock_gettime(CLOCK_MONOTONIC_COARSE, &tp);
tsf_tl = TAILQ_LAST(&ts->files, timeshift_file_list);
if (!tsf_tl || tsf_tl->time != tp.tv_sec) {
tsf_hd = TAILQ_FIRST(&ts->files);
/* Close existing */
if (tsf_tl && tsf_tl->fd != -1)
timeshift_filemgr_close(tsf_tl);
/* Check period */
if (ts->max_time && tsf_hd && tsf_tl) {
time_t d = tsf_tl->time - tsf_hd->time;
if (d > (ts->max_time+5)) {
if (!tsf_hd->refcount) {
timeshift_filemgr_remove(ts, tsf_hd, 0);
} else {
#ifdef TSHFT_TRACE
tvhlog(LOG_DEBUG, "timeshift", "ts %d buffer full", ts->id);
#endif
ts->full = 1;
}
}
}
/* Check size */
// TODO: need to implement this
/* Create new file */
tsf_tmp = NULL;
if (!ts->full) {
snprintf(path, sizeof(path), "%s/tvh-%"PRItime_t, ts->path, tp.tv_sec);
#ifdef TSHFT_TRACE
tvhlog(LOG_DEBUG, "timeshift", "ts %d create file %s", ts->id, path);
#endif
if ((fd = open(path, O_WRONLY | O_CREAT, 0600)) > 0) {
tsf_tmp = calloc(1, sizeof(timeshift_file_t));
tsf_tmp->time = tp.tv_sec;
tsf_tmp->fd = fd;
tsf_tmp->path = strdup(path);
tsf_tmp->refcount = 0;
TAILQ_INIT(&tsf_tmp->iframes);
TAILQ_INIT(&tsf_tmp->sstart);
TAILQ_INSERT_TAIL(&ts->files, tsf_tmp, link);
/* Copy across last start message */
if (tsf_tl && (ti = TAILQ_LAST(&tsf_tl->sstart, timeshift_index_list))) {
#ifdef TSHFT_TRACE
tvhlog(LOG_DEBUG, "timeshift", "ts %d copy smt_start to new file",
ts->id);
#endif
timeshift_index_t *ti2 = calloc(1, sizeof(timeshift_index_t));
ti2->pos = ti->pos;
ti2->data = streaming_msg_clone(ti->data);
TAILQ_INSERT_TAIL(&tsf_tmp->sstart, ti2, link);
}
}
}
tsf_tl = tsf_tmp;
}
return tsf_tl;
}
timeshift_file_t *timeshift_filemgr_next
( timeshift_file_t *tsf, int *end, int keep )
{
timeshift_file_t *nxt = TAILQ_NEXT(tsf, link);
if (!nxt && end) *end = 1;
if (!nxt && keep) return tsf;
tsf->refcount--;
if (nxt)
nxt->refcount++;
return nxt;
}
timeshift_file_t *timeshift_filemgr_prev
( timeshift_file_t *tsf, int *end, int keep )
{
timeshift_file_t *nxt = TAILQ_PREV(tsf, timeshift_file_list, link);
if (!nxt && end) *end = 1;
if (!nxt && keep) return tsf;
tsf->refcount--;
if (nxt)
nxt->refcount++;
return nxt;
}
/* **************************************************************************
* Setup / Teardown
* *************************************************************************/
/*
* Initialise global structures
*/
void timeshift_filemgr_init ( void )
{
char path[512];
/* Try to remove any rubbish left from last run */
timeshift_filemgr_get_root(path, sizeof(path));
rmtree(path);
/* Start the reaper thread */
timeshift_reaper_run = 1;
pthread_mutex_init(&timeshift_reaper_lock, NULL);
pthread_cond_init(&timeshift_reaper_cond, NULL);
TAILQ_INIT(&timeshift_reaper_list);
pthread_create(&timeshift_reaper_thread, NULL,
timeshift_reaper_callback, NULL);
}
/*
* Terminate
*/
void timeshift_filemgr_term ( void )
{
char path[512];
/* Wait for thread */
pthread_mutex_lock(&timeshift_reaper_lock);
timeshift_reaper_run = 0;
pthread_cond_signal(&timeshift_reaper_cond);
pthread_mutex_unlock(&timeshift_reaper_lock);
pthread_join(timeshift_reaper_thread, NULL);
/* Remove the lot */
timeshift_filemgr_get_root(path, sizeof(path));
rmtree(path);
}

View file

@ -0,0 +1,490 @@
/**
* TV headend - Timeshift Reader
* Copyright (C) 2012 Adam Sutton
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tvheadend.h"
#include "streaming.h"
#include "timeshift.h"
#include "timeshift/private.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <assert.h>
/* **************************************************************************
* File Reading
* *************************************************************************/
static ssize_t _read_pktbuf ( int fd, pktbuf_t **pktbuf )
{
ssize_t r, cnt = 0;
size_t sz;
/* Size */
r = read(fd, &sz, sizeof(sz));
if (r < 0) return -1;
if (r != sizeof(sz)) return 0;
cnt += r;
/* Empty */
if (!sz) {
*pktbuf = NULL;
return cnt;
}
/* Data */
*pktbuf = pktbuf_alloc(NULL, sz);
r = read(fd, (*pktbuf)->pb_data, sz);
if (r != sz) {
free((*pktbuf)->pb_data);
free(*pktbuf);
return r < 0 ? -1 : 0;
}
cnt += r;
return cnt;
}
static ssize_t _read_msg ( int fd, streaming_message_t **sm )
{
ssize_t r, cnt = 0;
size_t sz;
streaming_message_type_t type;
int64_t time;
void *data;
int code;
/* Clear */
*sm = NULL;
/* Size */
r = read(fd, &sz, sizeof(sz));
if (r < 0) return -1;
if (r != sizeof(sz)) return 0;
cnt += r;
/* EOF */
if (sz == 0) return cnt;
/* Type */
r = read(fd, &type, sizeof(type));
if (r < 0) return -1;
if (r != sizeof(type)) return 0;
cnt += r;
/* Time */
r = read(fd, &time, sizeof(time));
if (r < 0) return -1;
if (r != sizeof(time)) return 0;
cnt += r;
/* Adjust size */
sz -= sizeof(type) + sizeof(time);
cnt += sz;
/* Standard messages */
switch (type) {
/* Unhandled */
case SMT_START:
case SMT_NOSTART:
case SMT_SERVICE_STATUS:
break;
/* Code */
case SMT_STOP:
case SMT_EXIT:
case SMT_SPEED:
if (sz != sizeof(code)) return -1;
r = read(fd, &code, sz);
if (r != sz) {
if (r < 0) return -1;
return 0;
}
*sm = streaming_msg_create_code(type, code);
break;
/* Data */
case SMT_SKIP:
case SMT_SIGNAL_STATUS:
case SMT_MPEGTS:
case SMT_PACKET:
data = malloc(sz);
r = read(fd, data, sz);
if (r != sz) {
free(data);
if (r < 0) return -1;
return 0;
}
if (type == SMT_PACKET) {
th_pkt_t *pkt = data;
pkt->pkt_payload = pkt->pkt_header = NULL;
*sm = streaming_msg_create_pkt(pkt);
r = _read_pktbuf(fd, &pkt->pkt_header);
if (r < 0) {
streaming_msg_free(*sm);
return r;
}
cnt += r;
r = _read_pktbuf(fd, &pkt->pkt_payload);
if (r < 0) {
streaming_msg_free(*sm);
return r;
}
cnt += r;
} else {
*sm = streaming_msg_create_data(type, data);
}
(*sm)->sm_time = time;
break;
}
/* OK */
return cnt;
}
/* **************************************************************************
* Thread
* *************************************************************************/
/*
* Timeshift thread
*/
void *timeshift_reader ( void *p )
{
timeshift_t *ts = p;
int efd, nfds, end, fd = -1, run = 1, wait = -1;
off_t cur_off = 0;
int cur_speed = 100, keyframe_mode = 0;
int64_t pause_time = 0, play_time = 0, last_time = 0, tx_time = 0;
int64_t now, deliver;
streaming_message_t *sm = NULL, *ctrl;
timeshift_file_t *cur_file = NULL, *tsi_file = NULL;
timeshift_index_t *tsi = NULL;
/* Poll */
struct epoll_event ev;
efd = epoll_create(1);
ev.events = EPOLLIN;
ev.data.fd = ts->rd_pipe.rd;
epoll_ctl(efd, EPOLL_CTL_ADD, ev.data.fd, &ev);
/* Output */
while (run) {
/* Wait for data */
if(wait)
nfds = epoll_wait(efd, &ev, 1, wait);
else
nfds = 0;
wait = -1;
end = 0;
/* Control */
pthread_mutex_lock(&ts->state_mutex);
if (nfds == 1) {
if (_read_msg(ev.data.fd, &ctrl) > 0) {
/* Exit */
if (ctrl->sm_type == SMT_EXIT) {
#ifdef TSHFT_TRACE
tvhlog(LOG_DEBUG, "timeshift", "ts %d read exit request", ts->id);
#endif
run = 0;
streaming_msg_free(ctrl);
/* Speed */
// TODO: currently just pause
} else if (ctrl->sm_type == SMT_SPEED) {
int speed = ctrl->sm_code;
int keyframe;
/* Bound it */
if (speed > 3200) speed = 3200;
if (speed < -3200) speed = -3200;
/* Process */
if (cur_speed != speed) {
/* Live playback */
if (ts->state == TS_LIVE) {
/* Reject */
if (speed >= 100) {
tvhlog(LOG_DEBUG, "timeshift", "ts %d reject 1x+ in live mode",
ts->id);
speed = 100;
/* Set position */
} else {
tvhlog(LOG_DEBUG, "timeshift", "ts %d enter timeshift mode",
ts->id);
timeshift_writer_flush(ts);
pthread_mutex_lock(&ts->rdwr_mutex);
if ((cur_file = timeshift_filemgr_get(ts, 0))) {
cur_off = cur_file->size;
pause_time = cur_file->last;
last_time = pause_time;
}
pthread_mutex_unlock(&ts->rdwr_mutex);
}
/* Buffer playback */
} else if (ts->state == TS_PLAY) {
pause_time = last_time;
/* Paused */
} else {
}
/* Check keyframe mode */
keyframe = (speed < 0) || (speed > 400);
if (keyframe != keyframe_mode) {
tvhlog(LOG_DEBUG, "timeshift", "using keyframe mode? %s",
keyframe ? "yes" : "no");
keyframe_mode = keyframe;
if (keyframe) {
tsi = NULL;
tsi_file = cur_file;
}
}
/* Update */
play_time = getmonoclock();
cur_speed = speed;
if (speed != 100 || ts->state != TS_LIVE)
ts->state = speed == 0 ? TS_PAUSE : TS_PLAY;
tvhlog(LOG_DEBUG, "timeshift", "ts %d change speed %d",
ts->id, speed);
}
/* Send on the message */
ctrl->sm_code = speed;
streaming_target_deliver2(ts->output, ctrl);
/* Skip */
} else {
streaming_msg_free(ctrl);
}
ctrl = NULL;
}
}
/* Done */
if (!run || ts->state != TS_PLAY || !cur_file) {
pthread_mutex_unlock(&ts->state_mutex);
continue;
}
/* Calculate delivery time */
now = getmonoclock();
deliver = (now - play_time) + TS_PLAY_BUF;
deliver = (deliver * cur_speed) / 100;
deliver = (deliver + pause_time);
/* Rewind or Fast forward (i-frame only) */
if (keyframe_mode) {
wait = 0;
/* Find next index */
if (cur_speed < 0) {
if (!tsi) {
TAILQ_FOREACH_REVERSE(tsi, &tsi_file->iframes,
timeshift_index_list, link) {
if (tsi->time < last_time) break;
}
}
} else {
if (!tsi) {
TAILQ_FOREACH(tsi, &tsi_file->iframes, link) {
if (tsi->time > last_time) break;
}
}
}
/* Next file */
if (!tsi) {
if (fd != -1)
close(fd);
wait = 0; // immediately cycle around
fd = -1;
if (cur_speed < 0)
tsi_file = timeshift_filemgr_prev(tsi_file, &end, 1);
else
tsi_file = timeshift_filemgr_next(tsi_file, &end, 0);
}
/* Deliver */
if (tsi && (((cur_speed < 0) && (tsi->time >= deliver)) ||
((cur_speed > 0) && (tsi->time <= deliver)))) {
/* Keep delivery to 5fps max */
if ((now - tx_time) >= 200000) {
/* Open */
if (fd == -1) {
#ifdef TSHFT_TRACE
tvhlog(LOG_DEBUG, "timeshift", "ts %d open file %s",
ts->id, tsi_file->path);
#endif
fd = open(tsi_file->path, O_RDONLY);
}
/* Read */
off_t ret = lseek(fd, tsi->pos, SEEK_SET);
assert(ret == tsi->pos);
ssize_t r = _read_msg(fd, &sm);
/* Send */
if (r > 0) {
#ifdef TSHFT_TRACE
tvhlog(LOG_DEBUG, "timeshift", "ts %d deliver %"PRItime_t,
ts->id, sm->sm_time);
#endif
sm->sm_timeshift = now - sm->sm_time;
streaming_target_deliver2(ts->output, sm);
cur_file = tsi_file;
cur_off = tsi->pos + r;
last_time = sm->sm_time;
tx_time = now;
sm = NULL;
} else {
wait = -1;
close(fd);
fd = -1;
}
}
/* Next index */
if (cur_speed < 0)
tsi = TAILQ_PREV(tsi, timeshift_index_list, link);
else
tsi = TAILQ_NEXT(tsi, link);
/* Not yet! */
} else if (tsi) {
if (cur_speed > 0)
wait = (tsi->time - deliver) / 1000;
else
wait = (deliver - tsi->time) / 1000;
if (wait == 0) wait = 1;
}
/* Full frame delivery */
} else {
/* Open file */
if (fd == -1) {
#ifdef TSHFT_TRACE
tvhlog(LOG_DEBUG, "timeshift", "ts %d open file %s",
ts->id, cur_file->path);
#endif
fd = open(cur_file->path, O_RDONLY);
if (cur_off) lseek(fd, cur_off, SEEK_SET);
}
/* Process */
pthread_mutex_lock(&ts->rdwr_mutex);
end = 1;
while (cur_file && cur_off < cur_file->size) {
/* Read msg */
if (!sm) {
ssize_t r = _read_msg(fd, &sm);
assert(r != -1);
/* Incomplete */
if (r == 0) {
lseek(fd, cur_off, SEEK_SET);
break;
}
cur_off += r;
/* Special case - EOF */
if (r == sizeof(size_t) || cur_off > cur_file->size) {
close(fd);
wait = 0; // immediately cycle around
cur_off = 0; // reset
fd = -1;
cur_file = timeshift_filemgr_next(cur_file, NULL, 0);
break;
}
}
assert(sm);
end = 0;
/* Deliver */
if (sm->sm_time <= deliver) {
#ifdef TSHFT_TRACE
tvhlog(LOG_DEBUG, "timeshift", "ts %d deliver %"PRItime_t,
ts->id, sm->sm_time);
#endif
sm->sm_timeshift = now - sm->sm_time;
streaming_target_deliver2(ts->output, sm);
tx_time = now;
last_time = sm->sm_time;
sm = NULL;
wait = 0;
} else {
wait = (sm->sm_time - deliver) / 1000;
if (wait == 0) wait = 1;
break;
}
}
}
/* Terminate */
if (!cur_file || end) {
/* Back to live */
if (cur_speed > 0) {
tvhlog(LOG_DEBUG, "timeshift", "ts %d eob revert to live mode", ts->id);
ts->state = TS_LIVE;
cur_speed = 100;
ctrl = streaming_msg_create_code(SMT_SPEED, cur_speed);
streaming_target_deliver2(ts->output, ctrl);
/* Pause */
} else if (cur_speed < 0) {
tvhlog(LOG_DEBUG, "timeshift", "ts %d sob pause stream", ts->id);
cur_speed = 0;
ts->state = TS_PAUSE;
ctrl = streaming_msg_create_code(SMT_SPEED, cur_speed);
streaming_target_deliver2(ts->output, ctrl);
}
}
pthread_mutex_unlock(&ts->rdwr_mutex);
pthread_mutex_unlock(&ts->state_mutex);
}
/* Cleanup */
if (sm) streaming_msg_free(sm);
#ifdef TSHFT_TRACE
tvhlog(LOG_DEBUG, "timeshift", "ts %d exit reader thread", ts->id);
#endif
return NULL;
}

View file

@ -0,0 +1,327 @@
/**
* TV headend - Timeshift Write Handler
* Copyright (C) 2012 Adam Sutton
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tvheadend.h"
#include "streaming.h"
#include "timeshift.h"
#include "timeshift/private.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <assert.h>
/* **************************************************************************
* File Writing
* *************************************************************************/
/*
* Write data (retry on EAGAIN)
*/
static ssize_t _write
( int fd, const void *buf, size_t count )
{
ssize_t r;
size_t n = 0;
while ( n < count ) {
r = write(fd, buf+n, count-n);
if (r == -1) {
if (errno == EAGAIN)
continue;
else
return -1;
}
n += r;
}
return count == n ? n : -1;
}
/*
* Write message
*/
static ssize_t _write_msg
( int fd, streaming_message_type_t type, int64_t time,
const void *buf, size_t len )
{
size_t len2 = len + sizeof(type) + sizeof(time);
ssize_t err, ret;
ret = err = _write(fd, &len2, sizeof(len2));
if (err < 0) return err;
err = _write(fd, &type, sizeof(type));
if (err < 0) return err;
ret += err;
err = _write(fd, &time, sizeof(time));
if (err < 0) return err;
ret += err;
if (len) {
err = _write(fd, buf, len);
if (err < 0) return err;
ret += err;
}
return ret;
}
/*
* Write packet buffer
*/
static int _write_pktbuf ( int fd, pktbuf_t *pktbuf )
{
ssize_t ret, err;
if (pktbuf) {
ret = err = _write(fd, &pktbuf->pb_size, sizeof(pktbuf->pb_size));
if (err < 0) return err;
err = _write(fd, pktbuf->pb_data, pktbuf->pb_size);
if (err < 0) return err;
ret += err;
} else {
size_t sz = 0;
ret = _write(fd, &sz, sizeof(sz));
}
return ret;
}
/*
* Write signal status
*/
ssize_t timeshift_write_sigstat
( int fd, int64_t time, signal_status_t *sigstat )
{
return _write_msg(fd, SMT_SIGNAL_STATUS, time, sigstat,
sizeof(signal_status_t));
}
/*
* Write packet
*/
ssize_t timeshift_write_packet ( int fd, int64_t time, th_pkt_t *pkt )
{
ssize_t ret = 0, err;
ret = err = _write_msg(fd, SMT_PACKET, time, pkt, sizeof(th_pkt_t));
if (err <= 0) return err;
err = _write_pktbuf(fd, pkt->pkt_header);
if (err <= 0) return err;
ret += err;
err = _write_pktbuf(fd, pkt->pkt_payload);
if (err <= 0) return err;
ret += err;
return ret;
}
/*
* Write MPEGTS data
*/
ssize_t timeshift_write_mpegts ( int fd, int64_t time, void *data )
{
return _write_msg(fd, SMT_MPEGTS, time, data, 188);
}
/*
* Write skip message
*/
ssize_t timeshift_write_skip ( int fd, streaming_skip_t *skip )
{
return _write_msg(fd, SMT_SKIP, 0, skip, sizeof(streaming_skip_t));
}
/*
* Write speed message
*/
ssize_t timeshift_write_speed ( int fd, int speed )
{
return _write_msg(fd, SMT_SPEED, 0, &speed, sizeof(speed));
}
/*
* Stop
*/
ssize_t timeshift_write_stop ( int fd, int code )
{
return _write_msg(fd, SMT_STOP, 0, &code, sizeof(code));
}
/*
* Exit
*/
ssize_t timeshift_write_exit ( int fd )
{
int code = 0;
return _write_msg(fd, SMT_EXIT, 0, &code, sizeof(code));
}
/*
* Write end of file (special internal message)
*/
ssize_t timeshift_write_eof ( int fd )
{
size_t sz = 0;
return _write(fd, &sz, sizeof(sz));
}
/* **************************************************************************
* Thread
* *************************************************************************/
static inline ssize_t _process_msg0
( timeshift_t *ts, timeshift_file_t *tsf, streaming_message_t **smp )
{
int i;
ssize_t err;
streaming_start_t *ss;
streaming_message_t *sm = *smp;
if (sm->sm_type == SMT_START) {
err = 0;
timeshift_index_t *ti = calloc(1, sizeof(timeshift_index_t));
ti->pos = tsf->size;
ti->data = sm;
*smp = NULL;
TAILQ_INSERT_TAIL(&tsf->sstart, ti, link);
/* Update video index */
ss = sm->sm_data;
for (i = 0; i < ss->ss_num_components; i++)
if (SCT_ISVIDEO(ss->ss_components[i].ssc_type))
ts->vididx = ss->ss_components[i].ssc_index;
} else if (sm->sm_type == SMT_SIGNAL_STATUS)
err = timeshift_write_sigstat(tsf->fd, sm->sm_time, sm->sm_data);
else if (sm->sm_type == SMT_PACKET) {
err = timeshift_write_packet(tsf->fd, sm->sm_time, sm->sm_data);
if (err > 0) {
th_pkt_t *pkt = sm->sm_data;
/* Index video iframes */
if (pkt->pkt_componentindex == ts->vididx &&
pkt->pkt_frametype == PKT_I_FRAME) {
timeshift_index_t *ti = calloc(1, sizeof(timeshift_index_t));
ti->pos = tsf->size;
ti->time = sm->sm_time;
TAILQ_INSERT_TAIL(&tsf->iframes, ti, link);
}
}
} else if (sm->sm_type == SMT_MPEGTS)
err = timeshift_write_mpegts(tsf->fd, sm->sm_time, sm->sm_data);
else
err = 0;
/* OK */
if (err > 0) {
tsf->last = sm->sm_time;
tsf->size += err;
}
return err;
}
static void _process_msg
( timeshift_t *ts, streaming_message_t *sm, int *run )
{
int err;
timeshift_file_t *tsf;
/* Process */
switch (sm->sm_type) {
/* Terminate */
case SMT_EXIT:
if (run) *run = 0;
break;
case SMT_STOP:
if (sm->sm_code == 0 && run)
*run = 0;
break;
/* Timeshifting */
case SMT_SKIP:
case SMT_SPEED:
break;
/* Status */
case SMT_NOSTART:
case SMT_SERVICE_STATUS:
break;
/* Store */
case SMT_SIGNAL_STATUS:
case SMT_START:
case SMT_MPEGTS:
case SMT_PACKET:
pthread_mutex_lock(&ts->rdwr_mutex);
if ((tsf = timeshift_filemgr_get(ts, 1)) && (tsf->fd != -1)) {
if ((err = _process_msg0(ts, tsf, &sm)) < 0) {
timeshift_filemgr_close(tsf);
tsf->bad = 1;
ts->full = 1; ///< Stop any more writing
}
}
pthread_mutex_unlock(&ts->rdwr_mutex);
break;
}
/* Next */
if (sm)
streaming_msg_free(sm);
}
void *timeshift_writer ( void *aux )
{
int run = 1;
timeshift_t *ts = aux;
streaming_queue_t *sq = &ts->wr_queue;
streaming_message_t *sm;
pthread_mutex_lock(&sq->sq_mutex);
while (run) {
/* Get message */
sm = TAILQ_FIRST(&sq->sq_queue);
if (sm == NULL) {
pthread_cond_wait(&sq->sq_cond, &sq->sq_mutex);
continue;
}
TAILQ_REMOVE(&sq->sq_queue, sm, sm_link);
pthread_mutex_unlock(&sq->sq_mutex);
_process_msg(ts, sm, &run);
pthread_mutex_lock(&sq->sq_mutex);
}
pthread_mutex_unlock(&sq->sq_mutex);
return NULL;
}
/* **************************************************************************
* Utilities
* *************************************************************************/
void timeshift_writer_flush ( timeshift_t *ts )
{
streaming_message_t *sm;
streaming_queue_t *sq = &ts->wr_queue;
pthread_mutex_lock(&sq->sq_mutex);
while ((sm = TAILQ_FIRST(&sq->sq_queue))) {
TAILQ_REMOVE(&sq->sq_queue, sm, sm_link);
_process_msg(ts, sm, NULL);
}
pthread_mutex_unlock(&sq->sq_mutex);
}

View file

@ -210,6 +210,24 @@ typedef struct signal_status {
int unc; /* uncorrected blocks */
} signal_status_t;
/**
* Streaming skip
*/
typedef struct streaming_skip
{
enum {
SMT_SKIP_REL_TIME,
SMT_SKIP_ABS_TIME,
SMT_SKIP_REL_SIZE,
SMT_SKIP_ABS_SIZE
} type;
union {
off_t size;
time_t time;
};
} streaming_skip_t;
/**
* A streaming pad generates data.
* It has one or more streaming targets attached to it.
@ -234,6 +252,7 @@ TAILQ_HEAD(streaming_message_queue, streaming_message);
* Streaming messages types
*/
typedef enum {
/**
* Packet with data.
*
@ -291,6 +310,17 @@ typedef enum {
* Internal message to exit receiver
*/
SMT_EXIT,
/**
* Set stream speed
*/
SMT_SPEED,
/**
* Skip the stream
*/
SMT_SKIP,
} streaming_message_type_t;
#define SMT_TO_MASK(x) (1 << ((unsigned int)x))
@ -326,6 +356,10 @@ typedef enum {
typedef struct streaming_message {
TAILQ_ENTRY(streaming_message) sm_link;
streaming_message_type_t sm_type;
#if ENABLE_TIMESHIFT
int64_t sm_time;
uint64_t sm_timeshift;
#endif
union {
void *sm_data;
int sm_code;

View file

@ -103,6 +103,49 @@ tvheadend.miscconf = function() {
if (tvheadend.capabilities.indexOf('imagecache') == -1)
imagecachePanel.hide();
/* ****************************************************************
* Timeshift
* ***************************************************************/
var timeshiftPath = new Ext.form.TextField({
fieldLabel : 'Temp. storage path',
name : 'timeshiftpath',
allowBlank : true,
width : 400
});
var timeshiftPeriod = new Ext.form.NumberField({
fieldLabel : 'Max period (minutes, per stream)',
name : 'timeshiftperiod',
allowBlank : false,
width : 400
});
var timeshiftPeriodU = new Ext.form.Checkbox({
fieldLabel : '(unlimited)',
name : 'timeshiftperiod_unlimited',
allowBlank : false,
width : 400
});
timeshiftPeriodU.on('check', function(e, c) {
timeshiftPeriod.setDisabled(c);
});
var timeshiftSize = new Ext.form.NumberField({
fieldLabel : 'Max size (MB, global)',
name : 'timeshiftsize',
allowBlank : false,
width : 400
});
var timeshiftFields = new Ext.form.FieldSet({
title : 'Timeshift',
width : 700,
autoHeight : true,
collapsible : true,
items : [ timeshiftPath, timeshiftPeriod, timeshiftPeriodU ]//, timeshiftSize ]
});
/* ****************************************************************
* Form
* ***************************************************************/
@ -127,7 +170,7 @@ tvheadend.miscconf = function() {
border : false,
bodyStyle : 'padding:15px',
labelAlign : 'left',
labelWidth : 150,
labelWidth : 200,
waitMsgTarget : true,
reader : confreader,
layout : 'form',
@ -149,6 +192,14 @@ tvheadend.miscconf = function() {
op : 'loadSettings'
},
success : function(form, action) {
v = parseInt(timeshiftPeriod.getValue());
if (v == 4294967295) {
timeshiftPeriodU.setValue(true);
timeshiftPeriod.setValue("");
timeshiftPeriod.setDisabled(true); // TODO: this isn't working
} else {
timeshiftPeriod.setValue(v / 60);
}
confpanel.enable();
}
});

View file

@ -244,6 +244,8 @@ http_stream_run(http_connection_t *hc, streaming_queue_t *sq,
}
break;
case SMT_SKIP:
case SMT_SPEED:
case SMT_SIGNAL_STATUS:
break;