profile: move all stream chain code to profile.c from htsp_service.c

This commit is contained in:
Jaroslav Kysela 2014-10-22 13:22:08 +02:00
parent fcb7b833b0
commit aa53af579a
3 changed files with 132 additions and 114 deletions

View file

@ -184,14 +184,7 @@ typedef struct htsp_subscription {
th_subscription_t *hs_s; // Temporary
streaming_target_t hs_input;
streaming_target_t *hs_tsfix;
#if ENABLE_TIMESHIFT
streaming_target_t *hs_tshift;
#endif
streaming_target_t *hs_work;
void (*hs_work_destroy)(streaming_target_t *);
profile_chain_t hs_prch;
htsp_msg_q_t hs_q;
@ -335,16 +328,8 @@ htsp_subscription_destroy(htsp_connection_t *htsp, htsp_subscription_t *hs)
subscription_unsubscribe(hs->hs_s);
if(hs->hs_tsfix != NULL)
tsfix_destroy(hs->hs_tsfix);
if(hs->hs_work != NULL)
hs->hs_work_destroy(hs->hs_work);
#if ENABLE_TIMESHIFT
if(hs->hs_tshift)
timeshift_destroy(hs->hs_tshift);
#endif
if(hs->hs_prch.prch_st != NULL)
profile_chain_close(&hs->hs_prch);
htsp_flush_queue(htsp, &hs->hs_q, 1);
}
@ -1716,13 +1701,13 @@ htsp_method_getTicket(htsp_connection_t *htsp, htsmsg_t *in)
static htsmsg_t *
htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in)
{
uint32_t chid, sid, weight, req90khz, normts;
#if ENABLE_TIMESHIFT
uint32_t timeshiftPeriod = 0;
#endif
const char *str;
uint32_t chid, sid, weight, req90khz, timeshiftPeriod = 0;
const char *str, *profile_id;
channel_t *ch;
int pflags = 0;
htsp_subscription_t *hs;
profile_t *pro;
if(htsmsg_get_u32(in, "subscriptionId", &sid))
return htsp_error("Missing argument 'subscriptionId'");
@ -1740,7 +1725,10 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in)
weight = htsmsg_get_u32_or_default(in, "weight", 150);
req90khz = htsmsg_get_u32_or_default(in, "90khz", 0);
normts = htsmsg_get_u32_or_default(in, "normts", 0);
if (htsmsg_get_u32_or_default(in, "normts", 0))
pflags |= PRCH_FLAG_TSFIX;
profile_id = htsmsg_get_str(in, "profile");
#if ENABLE_TIMESHIFT
if (timeshift_enabled) {
@ -1750,27 +1738,6 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in)
}
#endif
/*
* We send the reply now to avoid the user getting the 'subscriptionStart'
* async message before the reply to 'subscribe'.
*
* Send some opiotanl boolean flags back to the subscriber so it can infer
* if we support those
*
*/
htsmsg_t *rep = htsmsg_create_map();
if(req90khz)
htsmsg_add_u32(rep, "90khz", 1);
if(normts)
htsmsg_add_u32(rep, "normts", 1);
#if ENABLE_TIMESHIFT
if(timeshiftPeriod)
htsmsg_add_u32(rep, "timeshiftPeriod", timeshiftPeriod);
#endif
htsp_reply(htsp, in, rep);
/* Initialize the HTSP subscription structure */
hs = calloc(1, sizeof(htsp_subscription_t));
@ -1782,43 +1749,55 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in)
htsp_init_queue(&hs->hs_q, 0);
hs->hs_sid = sid;
LIST_INSERT_HEAD(&htsp->htsp_subscriptions, hs, hs_link);
streaming_target_init(&hs->hs_input, htsp_streaming_input, hs, 0);
streaming_target_t *st = &hs->hs_input;
#if ENABLE_TIMESHIFT
if (timeshiftPeriod != 0) {
if (timeshiftPeriod == ~0)
tvhlog(LOG_DEBUG, "htsp", "using timeshift buffer (unlimited)");
else
tvhlog(LOG_DEBUG, "htsp", "using timeshift buffer (%u mins)", timeshiftPeriod / 60);
st = hs->hs_tshift = timeshift_create(st, timeshiftPeriod);
normts = 1;
pflags |= PRCH_FLAG_TSFIX;
}
#endif
profile_t *pro;
const char *profile_id = htsmsg_get_str(in, "profile");
if (profile_id) {
pro = profile_find_by_uuid(profile_id);
if (pro)
profile_id = pro->pro_name;
}
pro = profile_find_by_list(htsp->htsp_granted_access->aa_profiles, profile_id, "htsp");
hs->hs_work = profile_work(pro, st, &hs->hs_work_destroy);
if (hs->hs_work) {
st = hs->hs_work;
normts = 1;
if (!profile_work(pro, &hs->hs_prch, &hs->hs_input, timeshiftPeriod, pflags)) {
tvhlog(LOG_ERR, "htsp", "unable to create profile chain '%s'", pro->pro_name);
free(hs);
return htsp_error("Stream setup error");
}
if(normts)
st = hs->hs_tsfix = tsfix_create(st);
/*
* We send the reply now to avoid the user getting the 'subscriptionStart'
* async message before the reply to 'subscribe'.
*
* Send some optional boolean flags back to the subscriber so it can infer
* if we support those
*
*/
htsmsg_t *rep = htsmsg_create_map();
if(req90khz)
htsmsg_add_u32(rep, "90khz", 1);
if(hs->hs_prch.prch_tsfix)
htsmsg_add_u32(rep, "normts", 1);
#if ENABLE_TIMESHIFT
if(timeshiftPeriod)
htsmsg_add_u32(rep, "timeshiftPeriod", timeshiftPeriod);
#endif
htsp_reply(htsp, in, rep);
/*
* subscribe now...
*/
LIST_INSERT_HEAD(&htsp->htsp_subscriptions, hs, hs_link);
tvhdebug("htsp", "%s - subscribe to %s\n", htsp->htsp_logname, ch->ch_name ?: "");
hs->hs_s = subscription_create_from_channel(ch, pro, weight,
htsp->htsp_logname,
st,
hs->hs_prch.prch_st,
SUBSCRIPTION_STREAMING,
htsp->htsp_peername,
htsp->htsp_username,

View file

@ -27,6 +27,9 @@
#include "lang_codes.h"
#include "plumbing/transcoding.h"
#endif
#if ENABLE_TIMESHIFT
#include "timeshift.h"
#endif
#include "dvr/dvr.h"
profile_builders_queue profile_builders;
@ -403,24 +406,24 @@ profile_get_htsp_list(htsmsg_t *array, htsmsg_t *filter)
const char *uuid, *s;
TAILQ_FOREACH(pro, &profiles, pro_link) {
if (pro->pro_work) {
uuid = idnode_uuid_as_str(&pro->pro_id);
if (filter) {
HTSMSG_FOREACH(f, filter) {
if (!(s = htsmsg_field_get_str(f)))
continue;
if (strcmp(s, uuid) == 0)
break;
}
if (f == NULL)
if (!pro->pro_work)
continue;
uuid = idnode_uuid_as_str(&pro->pro_id);
if (filter) {
HTSMSG_FOREACH(f, filter) {
if (!(s = htsmsg_field_get_str(f)))
continue;
if (strcmp(s, uuid) == 0)
break;
}
m = htsmsg_create_map();
htsmsg_add_str(m, "uuid", uuid);
htsmsg_add_str(m, "name", pro->pro_name ?: "");
htsmsg_add_str(m, "comment", pro->pro_comment ?: "");
htsmsg_add_msg(array, NULL, m);
if (f == NULL)
continue;
}
m = htsmsg_create_map();
htsmsg_add_str(m, "uuid", uuid);
htsmsg_add_str(m, "name", pro->pro_name ?: "");
htsmsg_add_str(m, "comment", pro->pro_comment ?: "");
htsmsg_add_msg(array, NULL, m);
}
}
@ -448,6 +451,10 @@ profile_chain_raw_open(profile_chain_t *prch, size_t qsize)
void
profile_chain_close(profile_chain_t *prch)
{
#if ENABLE_TIMESHIFT
if (prch->prch_timeshift)
timeshift_destroy(prch->prch_timeshift);
#endif
if (prch->prch_tsfix)
tsfix_destroy(prch->prch_tsfix);
if (prch->prch_gh)
@ -459,6 +466,7 @@ profile_chain_close(profile_chain_t *prch)
if (prch->prch_muxer)
muxer_destroy(prch->prch_muxer);
streaming_queue_deinit(&prch->prch_sq);
prch->prch_st = NULL;
}
/*
@ -475,6 +483,22 @@ const idclass_t profile_htsp_class =
}
};
static int
profile_htsp_work(profile_t *_pro, profile_chain_t *prch,
streaming_target_t *dst,
uint32_t timeshift_period, int flags)
{
if (!(flags & PRCH_FLAG_SKIPZEROING))
memset(prch, 0, sizeof(*prch));
if (flags & PRCH_FLAG_TSFIX)
dst = prch->prch_tsfix = tsfix_create(prch->prch_transcoder);
prch->prch_st = dst;
return 0;
}
static muxer_container_type_t
profile_htsp_get_mc(profile_t *_pro)
{
@ -485,7 +509,7 @@ static profile_t *
profile_htsp_builder(void)
{
profile_t *pro = calloc(1, sizeof(*pro));
pro->pro_open = NULL;
pro->pro_work = profile_htsp_work;
pro->pro_get_mc = profile_htsp_get_mc;
return pro;
}
@ -605,7 +629,7 @@ profile_matroska_open(profile_t *_pro, profile_chain_t *prch,
memset(prch, 0, sizeof(*prch));
streaming_queue_init(&prch->prch_sq, 0, qsize);
prch->prch_gh = globalheaders_create(&prch->prch_sq.sq_st);
prch->prch_gh = globalheaders_create(&prch->prch_sq.sq_st);
prch->prch_tsfix = tsfix_create(prch->prch_gh);
prch->prch_muxer = muxer_create(&c);
prch->prch_st = prch->prch_tsfix;
@ -858,20 +882,13 @@ const idclass_t profile_transcode_class =
}
};
static void
profile_transcode_work_destroy(streaming_target_t *st)
{
if (st)
transcoder_destroy(st);
}
static streaming_target_t *
profile_transcode_work(profile_t *_pro, streaming_target_t *src,
void (**destroy)(streaming_target_t *st))
static int
profile_transcode_work(profile_t *_pro, profile_chain_t *prch,
streaming_target_t *dst,
uint32_t timeshift_period, int flags)
{
profile_transcode_t *pro = (profile_transcode_t *)_pro;
transcoder_props_t props;
streaming_target_t *st;
memset(&props, 0, sizeof(props));
strncpy(props.tp_vcodec, pro->pro_vcodec ?: "", sizeof(props.tp_vcodec)-1);
@ -882,13 +899,24 @@ profile_transcode_work(profile_t *_pro, streaming_target_t *src,
props.tp_bandwidth = pro->pro_bandwidth >= 64 ? pro->pro_bandwidth : 64;
strncpy(props.tp_language, pro->pro_language ?: "", 3);
if (destroy)
*destroy = profile_transcode_work_destroy;
if (!(flags & PRCH_FLAG_SKIPZEROING))
memset(prch, 0, sizeof(*prch));
st = transcoder_create(src);
if (st)
transcoder_set_properties(st, &props);
return st;
#if ENABLE_TIMESHIFT
if (timeshift_period > 0)
dst = prch->prch_timeshift = timeshift_create(dst, timeshift_period);
#endif
dst = prch->prch_transcoder = transcoder_create(dst);
if (!dst) {
profile_chain_close(prch);
return -1;
}
transcoder_set_properties(dst, &props);
prch->prch_tsfix = tsfix_create(dst);
prch->prch_st = prch->prch_tsfix;
return 0;
}
static int
@ -912,6 +940,7 @@ profile_transcode_open(profile_t *_pro, profile_chain_t *prch,
{
profile_transcode_t *pro = (profile_transcode_t *)_pro;
muxer_config_t c;
int r;
if (m_cfg)
c = *m_cfg; /* do not alter the original parameter */
@ -924,12 +953,16 @@ profile_transcode_open(profile_t *_pro, profile_chain_t *prch,
}
memset(prch, 0, sizeof(*prch));
streaming_queue_init(&prch->prch_sq, 0, qsize);
prch->prch_gh = globalheaders_create(&prch->prch_sq.sq_st);
prch->prch_transcoder = profile_transcode_work(_pro, prch->prch_gh, NULL);
prch->prch_tsfix = tsfix_create(prch->prch_transcoder);
prch->prch_muxer = muxer_create(&c);
prch->prch_st = prch->prch_tsfix;
prch->prch_gh = globalheaders_create(&prch->prch_sq.sq_st);
r = profile_transcode_work(_pro, prch, prch->prch_gh, 0,
PRCH_FLAG_SKIPZEROING | PRCH_FLAG_TSFIX);
if (r)
return r;
prch->prch_muxer = muxer_create(&c);
return 0;
}

View file

@ -47,6 +47,9 @@ typedef LIST_HEAD(, profile_build) profile_builders_queue;
extern profile_builders_queue profile_builders;
#define PRCH_FLAG_SKIPZEROING (1<<0)
#define PRCH_FLAG_TSFIX (1<<1)
typedef struct profile_chain {
int prch_flags;
struct streaming_queue prch_sq;
@ -57,6 +60,9 @@ typedef struct profile_chain {
#if ENABLE_LIBAV
struct streaming_target *prch_transcoder;
#endif
#if ENABLE_TIMESHIFT
struct streaming_target *prch_timeshift;
#endif
} profile_chain_t;
typedef struct profile {
@ -77,11 +83,11 @@ typedef struct profile {
void (*pro_conf_changed)(struct profile *pro);
muxer_container_type_t (*pro_get_mc)(struct profile *pro);
struct streaming_target *(*pro_work)(struct profile *pro,
struct streaming_target *src,
void (**destroy)(struct streaming_target *));
int (*pro_open)(struct profile *pro, profile_chain_t *prch,
muxer_config_t *m_cfg, int flags, size_t qsize);
int (*pro_work)(struct profile *pro, profile_chain_t *prch,
struct streaming_target *dst,
uint32_t timeshift_period, int flags);
int (*pro_open)(struct profile *pro, profile_chain_t *prch,
muxer_config_t *m_cfg, int flags, size_t qsize);
} profile_t;
void profile_register(const idclass_t *clazz, profile_builder_t builder);
@ -89,11 +95,11 @@ void profile_register(const idclass_t *clazz, profile_builder_t builder);
profile_t *profile_create
(const char *uuid, htsmsg_t *conf, int save);
static inline struct streaming_target *
profile_work(profile_t *pro, struct streaming_target *src,
void (**destroy)(struct streaming_target *st))
{ return pro && pro->pro_work ? pro->pro_work(pro, src, destroy) : NULL; }
static inline int
profile_work(profile_t *pro, profile_chain_t *prch,
struct streaming_target *dst,
uint32_t timeshift_period, int flags)
{ return pro && pro->pro_work ? pro->pro_work(pro, prch, dst, timeshift_period, flags) : -1; }
static inline int
profile_chain_open(profile_t *pro, profile_chain_t *prch,