timeshift: add periodic status message indicating buffer state etc.

This commit is contained in:
Adam Sutton 2013-01-15 11:01:21 +00:00
parent aff8b05c28
commit a3ff11f782
10 changed files with 115 additions and 22 deletions

View file

@ -541,6 +541,7 @@ dvr_thread(void *aux)
case SMT_SPEED:
case SMT_SKIP:
case SMT_SIGNAL_STATUS:
case SMT_TIMESHIFT_STATUS:
break;
case SMT_EXIT:

View file

@ -2169,11 +2169,7 @@ const static char frametypearray[PKT_NTYPES] = {
* Build a htsmsg from a th_pkt and enqueue it on our HTSP service
*/
static void
#if ENABLE_TIMESHIFT
htsp_stream_deliver(htsp_subscription_t *hs, th_pkt_t *pkt, uint64_t timeshift)
#else
htsp_stream_deliver(htsp_subscription_t *hs, th_pkt_t *pkt)
#endif
{
htsmsg_t *m;
htsp_msg_t *hm;
@ -2201,12 +2197,6 @@ htsp_stream_deliver(htsp_subscription_t *hs, th_pkt_t *pkt)
htsmsg_add_u32(m, "stream", pkt->pkt_componentindex);
htsmsg_add_u32(m, "com", pkt->pkt_commercial);
#if ENABLE_TIMESHIFT
if (timeshift)
htsmsg_add_s64(m, "timeshift", timeshift);
#endif
if(pkt->pkt_pts != PTS_UNSET) {
int64_t pts = hs->hs_90khz ? pkt->pkt_pts : ts_rescale(pkt->pkt_pts, 1000000);
htsmsg_add_s64(m, "pts", pts);
@ -2451,6 +2441,25 @@ htsp_subscription_skip(htsp_subscription_t *hs, streaming_skip_t *skip)
htsp_send(hs->hs_htsp, m, NULL, &hs->hs_q, 0);
}
/**
*
*/
#if ENABLE_TIMESHIFT
static void
htsp_subscription_timeshift_status(htsp_subscription_t *hs, timeshift_status_t *status)
{
htsmsg_t *m = htsmsg_create_map();
htsmsg_add_str(m, "method", "timeshiftStatus");
htsmsg_add_u32(m, "full", status->full);
htsmsg_add_s64(m, "shift", hs->hs_90khz ? status->shift : ts_rescale(status->shift, 1000000));
if (status->pts_start != PTS_UNSET)
htsmsg_add_s64(m, "start", hs->hs_90khz ? status->pts_start : ts_rescale(status->pts_start, 1000000)) ;
if (status->pts_end != PTS_UNSET)
htsmsg_add_s64(m, "end", hs->hs_90khz ? status->pts_end : ts_rescale(status->pts_end, 1000000)) ;
htsp_send(hs->hs_htsp, m, NULL, &hs->hs_q, 0);
}
#endif
/**
*
*/
@ -2461,11 +2470,7 @@ htsp_streaming_input(void *opaque, streaming_message_t *sm)
switch(sm->sm_type) {
case SMT_PACKET:
#if ENABLE_TIMESHIFT
htsp_stream_deliver(hs, sm->sm_data, sm->sm_timeshift);
#else
htsp_stream_deliver(hs, sm->sm_data);
#endif
// reference is transfered
sm->sm_data = NULL;
break;
@ -2503,6 +2508,12 @@ htsp_streaming_input(void *opaque, streaming_message_t *sm)
case SMT_SPEED:
htsp_subscription_speed(hs, sm->sm_code);
break;
case SMT_TIMESHIFT_STATUS:
#if ENABLE_TIMESHIFT
htsp_subscription_timeshift_status(hs, sm->sm_data);
#endif
break;
}
streaming_msg_free(sm);
}

View file

@ -257,6 +257,7 @@ gh_hold(globalheaders_t *gh, streaming_message_t *sm)
case SMT_MPEGTS:
case SMT_SPEED:
case SMT_SKIP:
case SMT_TIMESHIFT_STATUS:
streaming_target_deliver2(gh->gh_output, sm);
break;
}
@ -287,6 +288,7 @@ gh_pass(globalheaders_t *gh, streaming_message_t *sm)
case SMT_MPEGTS:
case SMT_SKIP:
case SMT_SPEED:
case SMT_TIMESHIFT_STATUS:
streaming_target_deliver2(gh->gh_output, sm);
break;

View file

@ -371,6 +371,7 @@ tsfix_input(void *opaque, streaming_message_t *sm)
case SMT_MPEGTS:
case SMT_SPEED:
case SMT_SKIP:
case SMT_TIMESHIFT_STATUS:
break;
}

View file

@ -23,6 +23,7 @@
#include "packet.h"
#include "atomic.h"
#include "service.h"
#include "timeshift.h"
void
streaming_pad_init(streaming_pad_t *sp)
@ -139,7 +140,6 @@ streaming_msg_create(streaming_message_type_t type)
sm->sm_type = type;
#if ENABLE_TIMESHIFT
sm->sm_time = 0;
sm->sm_timeshift = 0;
#endif
return sm;
}
@ -195,7 +195,6 @@ streaming_msg_clone(streaming_message_t *src)
dst->sm_type = src->sm_type;
#if ENABLE_TIMESHIFT
dst->sm_time = src->sm_time;
dst->sm_timeshift = src->sm_timeshift;
#endif
switch(src->sm_type) {
@ -220,6 +219,11 @@ streaming_msg_clone(streaming_message_t *src)
memcpy(dst->sm_data, src->sm_data, sizeof(signal_status_t));
break;
case SMT_TIMESHIFT_STATUS:
dst->sm_data = malloc(sizeof(timeshift_status_t));
memcpy(dst->sm_data, src->sm_data, sizeof(timeshift_status_t));
break;
case SMT_SPEED:
case SMT_STOP:
case SMT_SERVICE_STATUS:
@ -286,6 +290,9 @@ streaming_msg_free(streaming_message_t *sm)
case SMT_SKIP:
case SMT_SIGNAL_STATUS:
#if ENABLE_TIMESHIFT
case SMT_TIMESHIFT_STATUS:
#endif
free(sm->sm_data);
break;

View file

@ -30,6 +30,14 @@ extern size_t timeshift_max_size;
extern size_t timeshift_total_size;
extern pthread_mutex_t timeshift_size_lock;
typedef struct timeshift_status
{
int full;
int64_t shift;
int64_t pts_start;
int64_t pts_end;
} timeshift_status_t;
void timeshift_init ( void );
void timeshift_term ( void );
void timeshift_save ( void );

View file

@ -187,6 +187,36 @@ static streaming_message_t *_timeshift_find_sstart
return ti ? ti->data : NULL;
}
static timeshift_index_iframe_t *_timeshift_first_frame
( timeshift_t *ts )
{
int end;
timeshift_index_iframe_t *tsi = NULL;
timeshift_file_t *tsf = timeshift_filemgr_last(ts);
while (tsf && !tsi) {
if (!(tsi = TAILQ_FIRST(&tsf->iframes)))
tsf = timeshift_filemgr_next(tsf, &end, 0);
}
if (tsf)
tsf->refcount--;
return tsi;
}
static timeshift_index_iframe_t *_timeshift_last_frame
( timeshift_t *ts )
{
int end;
timeshift_index_iframe_t *tsi = NULL;
timeshift_file_t *tsf = timeshift_filemgr_get(ts, ts->ondemand);
while (tsf && !tsi) {
if (!(tsi = TAILQ_LAST(&tsf->iframes, timeshift_index_iframe_list)))
tsf = timeshift_filemgr_prev(tsf, &end, 0);
}
if (tsf)
tsf->refcount--;
return tsi;
}
static int _timeshift_skip
( timeshift_t *ts, int64_t req_time, int64_t cur_time,
timeshift_file_t *cur_file, timeshift_file_t **new_file,
@ -351,8 +381,8 @@ static int _timeshift_flush_to_live
if (!*sm) break;
if ((*sm)->sm_type == SMT_PACKET) {
pts = ((th_pkt_t*)(*sm)->sm_data)->pkt_pts;
tvhlog(LOG_DEBUG, "timeshift", "ts %d deliver %"PRId64" pts=%"PRItime_t " shift=%"PRIu64,
ts->id, (*sm)->sm_time, pts, (*sm)->sm_timeshift );
tvhlog(LOG_DEBUG, "timeshift", "ts %d deliver %"PRId64" pts=%"PRItime_t,
ts->id, (*sm)->sm_time, pts);
}
streaming_target_deliver2(ts->output, *sm);
*sm = NULL;
@ -360,7 +390,6 @@ static int _timeshift_flush_to_live
return 0;
}
/* **************************************************************************
* Thread
* *************************************************************************/
@ -380,6 +409,7 @@ void *timeshift_reader ( void *p )
streaming_message_t *sm = NULL, *ctrl = NULL;
timeshift_index_iframe_t *tsi = NULL;
streaming_skip_t *skip = NULL;
time_t last_status = 0;
/* Poll */
struct epoll_event ev = { 0 };
@ -391,6 +421,11 @@ void *timeshift_reader ( void *p )
/* Output */
while (run) {
// Note: Previously we allowed unlimited wait, but we now must wake periodically
// to output status message
if (wait < 0 || wait > 1000)
wait = 1000;
/* Wait for data */
if(wait)
nfds = epoll_wait(efd, &ev, 1, wait);
@ -565,6 +600,28 @@ void *timeshift_reader ( void *p )
}
}
/* Status message */
if (now >= (last_status + 1000000)) {
streaming_message_t *tsm;
timeshift_status_t *status;
timeshift_index_iframe_t *fst, *lst;
status = calloc(1, sizeof(timeshift_status_t));
fst = _timeshift_first_frame(ts);
lst = _timeshift_last_frame(ts);
status->full = ts->full;
status->shift = ts->state <= TS_LIVE ? 0 : ts_rescale_i(now - last_time, 1000000);
if (lst && fst && lst != fst && ts->pts_delta != PTS_UNSET) {
status->pts_start = ts_rescale_i(fst->time - ts->pts_delta, 1000000);
status->pts_end = ts_rescale_i(lst->time - ts->pts_delta, 1000000);
} else {
status->pts_start = PTS_UNSET;
status->pts_end = PTS_UNSET;
}
tsm = streaming_msg_create_data(SMT_TIMESHIFT_STATUS, status);
streaming_target_deliver2(ts->output, tsm);
last_status = now;
}
/* Done */
if (!run || !cur_file || ((ts->state != TS_PLAY && !skip))) {
pthread_mutex_unlock(&ts->state_mutex);
@ -644,16 +701,16 @@ void *timeshift_reader ( void *p )
(((cur_speed < 0) && (sm->sm_time >= deliver)) ||
((cur_speed > 0) && (sm->sm_time <= deliver))))) {
sm->sm_timeshift = now - sm->sm_time;
#ifndef TSHFT_TRACE
if (skip)
#endif
{
time_t pts = 0;
int64_t delta = now - sm->sm_time;
if (sm->sm_type == SMT_PACKET)
pts = ((th_pkt_t*)sm->sm_data)->pkt_pts;
tvhlog(LOG_DEBUG, "timeshift", "ts %d deliver %"PRId64" pts=%"PRItime_t " shift=%"PRIu64,
ts->id, sm->sm_time, pts, sm->sm_timeshift );
ts->id, sm->sm_time, pts, delta);
}
streaming_target_deliver2(ts->output, sm);
last_time = sm->sm_time;

View file

@ -257,6 +257,7 @@ static void _process_msg
/* Status */
case SMT_NOSTART:
case SMT_SERVICE_STATUS:
case SMT_TIMESHIFT_STATUS:
break;
/* Store */

View file

@ -322,6 +322,11 @@ typedef enum {
*/
SMT_SKIP,
/**
* Timeshift status
*/
SMT_TIMESHIFT_STATUS,
} streaming_message_type_t;
#define SMT_TO_MASK(x) (1 << ((unsigned int)x))
@ -359,7 +364,6 @@ typedef struct streaming_message {
streaming_message_type_t sm_type;
#if ENABLE_TIMESHIFT
int64_t sm_time;
uint64_t sm_timeshift;
#endif
union {
void *sm_data;

View file

@ -247,6 +247,7 @@ http_stream_run(http_connection_t *hc, streaming_queue_t *sq,
case SMT_SKIP:
case SMT_SPEED:
case SMT_SIGNAL_STATUS:
case SMT_TIMESHIFT_STATUS:
break;
case SMT_NOSTART: