profile: add profile work sharing

This commit is contained in:
Jaroslav Kysela 2014-10-24 16:01:33 +02:00
parent 596b4e0b29
commit daa9788c42
4 changed files with 384 additions and 57 deletions

View file

@ -615,8 +615,10 @@ _eit_callback
/* Get service */
svc = mpegts_mux_find_service(mm, sid);
if (!svc)
if (!svc) {
tvhtrace("eit", "sid %i not found", sid);
goto done;
}
if (map->om_first) {
map->om_tune_count++;

View file

@ -1704,7 +1704,6 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in)
uint32_t chid, sid, weight, req90khz, timeshiftPeriod = 0;
const char *str, *profile_id;
channel_t *ch;
int pflags = 0;
htsp_subscription_t *hs;
profile_t *pro;
@ -1725,8 +1724,6 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in)
weight = htsmsg_get_u32_or_default(in, "weight", 150);
req90khz = htsmsg_get_u32_or_default(in, "90khz", 0);
if (htsmsg_get_u32_or_default(in, "normts", 0))
pflags |= PRCH_FLAG_TSFIX;
profile_id = htsmsg_get_str(in, "profile");
@ -1762,7 +1759,7 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in)
pro = profile_find_by_list(htsp->htsp_granted_access->aa_profiles, profile_id, "htsp");
profile_chain_init(&hs->hs_prch, pro, ch);
if (profile_chain_work(&hs->hs_prch, &hs->hs_input, timeshiftPeriod, pflags)) {
if (profile_chain_work(&hs->hs_prch, &hs->hs_input, timeshiftPeriod, 0)) {
tvhlog(LOG_ERR, "htsp", "unable to create profile chain '%s'", pro->pro_name);
free(hs);
return htsp_error("Stream setup error");
@ -1779,7 +1776,7 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in)
htsmsg_t *rep = htsmsg_create_map();
if(req90khz)
htsmsg_add_u32(rep, "90khz", 1);
if(hs->hs_prch.prch_tsfix)
if(hs->hs_prch.prch_sharer->prsh_tsfix)
htsmsg_add_u32(rep, "normts", 1);
#if ENABLE_TIMESHIFT

View file

@ -35,6 +35,7 @@
profile_builders_queue profile_builders;
struct profile_entry_queue profiles;
static LIST_HEAD(,profile_chain) profile_chains;
static profile_t *profile_default;
@ -434,6 +435,218 @@ profile_get_htsp_list(htsmsg_t *array, htsmsg_t *filter)
}
}
/*
*
*/
static void
profile_deliver(profile_chain_t *prch, streaming_message_t *sm)
{
if (prch->prch_start_pending) {
profile_sharer_t *prsh = prch->prch_sharer;
streaming_message_t *sm2;
if (!prsh->prsh_start_msg) {
streaming_msg_free(sm);
return;
}
sm2 = streaming_msg_create_data(SMT_START,
streaming_start_copy(prsh->prsh_start_msg));
streaming_target_deliver(prch->prch_post_share, sm2);
prch->prch_start_pending = 0;
}
if (sm)
streaming_target_deliver(prch->prch_post_share, sm);
}
/*
*
*/
static void
profile_input(void *opaque, streaming_message_t *sm)
{
profile_chain_t *prch = opaque, *prch2;
profile_sharer_t *prsh = prch->prch_sharer;
if (sm->sm_type == SMT_START)
prch->prch_stop = 0;
if (prch == prsh->prsh_master) {
if (sm->sm_type == SMT_STOP) {
prch->prch_stop = 1;
/* elect new master */
prsh->prsh_master = NULL;
LIST_FOREACH(prch2, &prsh->prsh_chains, prch_sharer_link)
if (!prch2->prch_stop) {
prsh->prsh_master = prch2;
break;
}
if (prsh->prsh_master)
goto direct;
}
streaming_target_deliver(prch->prch_share, sm);
return;
}
if (sm->sm_type == SMT_STOP) {
prch->prch_stop = 1;
} else if (sm->sm_type == SMT_START) {
prch->prch_stop = 0;
prch->prch_start_pending = 1;
streaming_msg_free(sm);
sm = NULL;
} else if (sm->sm_type == SMT_PACKET || sm->sm_type == SMT_MPEGTS) {
streaming_msg_free(sm);
return;
}
direct:
profile_deliver(prch, sm);
}
/*
*
*/
static void
profile_sharer_deliver(profile_chain_t *prch, streaming_message_t *sm)
{
if (sm->sm_type == SMT_PACKET) {
if (!prch->prch_ts_delta)
goto deliver;
th_pkt_t *pkt = sm->sm_data;
if (prch->prch_ts_delta == PTS_UNSET) {
prch->prch_ts_delta = MAX(0, pkt->pkt_dts - 10000);
printf("ts delta: %li\n", (long)prch->prch_ts_delta);
}
/*
* time correction here
*/
if (pkt->pkt_pts >= prch->prch_ts_delta &&
pkt->pkt_dts >= prch->prch_ts_delta) {
th_pkt_t *n = pkt_copy_shallow(pkt);
pkt_ref_dec(pkt);
n->pkt_pts -= prch->prch_ts_delta;
n->pkt_dts -= prch->prch_ts_delta;
sm->sm_data = n;
} else {
streaming_msg_free(sm);
return;
}
}
deliver:
profile_deliver(prch, sm);
}
/*
*
*/
static void
profile_sharer_input(void *opaque, streaming_message_t *sm)
{
profile_sharer_t *prsh = opaque;
profile_chain_t *prch, *next, *run = NULL;
if (sm->sm_type == SMT_STOP) {
if (prsh->prsh_start_msg)
streaming_start_unref(prsh->prsh_start_msg);
prsh->prsh_start_msg = NULL;
}
for (prch = LIST_FIRST(&prsh->prsh_chains); prch; prch = next) {
next = LIST_NEXT(prch, prch_sharer_link);
if (prch == prsh->prsh_master) {
if (sm->sm_type == SMT_START) {
if (prsh->prsh_start_msg)
streaming_start_unref(prsh->prsh_start_msg);
prsh->prsh_start_msg = streaming_start_copy(sm->sm_data);
}
if (run)
profile_sharer_deliver(run, streaming_msg_clone(sm));
run = prch;
continue;
}
if (sm->sm_type != SMT_PACKET && sm->sm_type != SMT_MPEGTS)
continue;
if (prch->prch_stop)
continue;
if (run)
profile_sharer_deliver(run, streaming_msg_clone(sm));
run = prch;
}
if (run)
profile_sharer_deliver(run, sm);
else
streaming_msg_free(sm);
}
/*
*
*/
static profile_sharer_t *
profile_sharer_find(profile_chain_t *prch)
{
profile_sharer_t *prsh = NULL;
profile_chain_t *prch2;
LIST_FOREACH(prch2, &profile_chains, prch_link) {
if (prch2->prch_id != prch->prch_id)
continue;
if (prch2 == prch)
continue;
if (prch2->prch_can_share && prch2->prch_can_share(prch2, prch)) {
prsh = prch2->prch_sharer;
break;
}
}
if (!prsh) {
prsh = calloc(1, sizeof(*prsh));
streaming_target_init(&prsh->prsh_input, profile_sharer_input, prsh, 0);
LIST_INIT(&prsh->prsh_chains);
}
return prsh;
}
/*
*
*/
static int
profile_sharer_create(profile_sharer_t *prsh,
profile_chain_t *prch,
streaming_target_t *dst)
{
prch->prch_post_share = dst;
prch->prch_ts_delta = LIST_EMPTY(&prsh->prsh_chains) ? 0 : PTS_UNSET;
LIST_INSERT_HEAD(&prsh->prsh_chains, prch, prch_sharer_link);
prch->prch_sharer = prsh;
if (!prsh->prsh_master)
prsh->prsh_master = prch;
return 0;
}
/*
*
*/
static void
profile_sharer_destroy(profile_chain_t *prch)
{
profile_sharer_t *prsh = prch->prch_sharer;
if (prsh == NULL)
return;
LIST_REMOVE(prch, prch_sharer_link);
prch->prch_sharer = NULL;
prch->prch_post_share = NULL;
if (LIST_EMPTY(&prsh->prsh_chains)) {
if (prsh->prsh_tsfix)
tsfix_destroy(prsh->prsh_tsfix);
#if ENABLE_LIBAV
if (prsh->prsh_transcoder)
transcoder_destroy(prsh->prsh_transcoder);
#endif
if (prsh->prsh_start_msg)
streaming_start_unref(prsh->prsh_start_msg);
free(prsh);
}
}
/*
*
*/
@ -446,6 +659,9 @@ profile_chain_init(profile_chain_t *prch, profile_t *pro, void *id)
prch->prch_pro = pro;
prch->prch_id = id;
streaming_queue_init(&prch->prch_sq, 0, 0);
LIST_INSERT_HEAD(&profile_chains, prch, prch_link);
prch->prch_linked = 1;
prch->prch_stop = 1;
}
/*
@ -499,24 +715,39 @@ profile_chain_raw_open(profile_chain_t *prch, void *id, size_t qsize)
void
profile_chain_close(profile_chain_t *prch)
{
profile_sharer_destroy(prch);
#if ENABLE_TIMESHIFT
if (prch->prch_timeshift)
if (prch->prch_timeshift) {
timeshift_destroy(prch->prch_timeshift);
prch->prch_timeshift = NULL;
}
#endif
if (prch->prch_tsfix)
tsfix_destroy(prch->prch_tsfix);
if (prch->prch_gh)
if (prch->prch_gh) {
globalheaders_destroy(prch->prch_gh);
#if ENABLE_LIBAV
if (prch->prch_transcoder)
transcoder_destroy(prch->prch_transcoder);
#endif
if (prch->prch_muxer)
prch->prch_gh = NULL;
}
if (prch->prch_tsfix) {
globalheaders_destroy(prch->prch_tsfix);
prch->prch_tsfix = NULL;
}
if (prch->prch_muxer) {
muxer_destroy(prch->prch_muxer);
streaming_queue_deinit(&prch->prch_sq);
prch->prch_muxer = NULL;
}
prch->prch_st = NULL;
if (prch->prch_pro)
if (prch->prch_linked) {
streaming_queue_deinit(&prch->prch_sq);
LIST_REMOVE(prch, prch_link);
prch->prch_linked = 0;
}
if (prch->prch_pro) {
profile_release(prch->prch_pro);
prch->prch_pro = NULL;
}
}
/*
@ -538,19 +769,31 @@ profile_htsp_work(profile_chain_t *prch,
streaming_target_t *dst,
uint32_t timeshift_period, int flags)
{
profile_sharer_t *prsh;
prsh = profile_sharer_find(prch);
if (!prsh)
goto fail;
#if ENABLE_TIMESHIFT
if (timeshift_period > 0) {
if (timeshift_period > 0)
dst = prch->prch_timeshift = timeshift_create(dst, timeshift_period);
flags |= PRCH_FLAG_TSFIX;
}
#endif
if (flags & PRCH_FLAG_TSFIX)
dst = prch->prch_tsfix = tsfix_create(dst);
if (profile_sharer_create(prsh, prch, dst))
goto fail;
prch->prch_st = dst;
if (!prsh->prsh_tsfix)
prsh->prsh_tsfix = tsfix_create(&prsh->prsh_input);
prch->prch_share = prsh->prsh_tsfix;
streaming_target_init(&prch->prch_input, profile_input, prch, 0);
prch->prch_st = &prch->prch_input;
return 0;
fail:
profile_chain_close(prch);
return -1;
}
static muxer_container_type_t
@ -618,7 +861,10 @@ profile_mpegts_pass_open(profile_chain_t *prch,
c.m_rewrite_pmt = pro->pro_rewrite_pmt;
prch->prch_flags = SUBSCRIPTION_RAW_MPEGTS;
streaming_queue_init(&prch->prch_sq, SMT_PACKET, qsize);
prch->prch_sq.sq_st.st_reject_filter = SMT_PACKET;
prch->prch_sq.sq_maxsize = qsize;
prch->prch_muxer = muxer_create(&c);
prch->prch_st = &prch->prch_sq.sq_st;
return 0;
@ -669,6 +915,7 @@ profile_matroska_open(profile_chain_t *prch,
muxer_config_t *m_cfg, int flags, size_t qsize)
{
profile_matroska_t *pro = (profile_matroska_t *)prch->prch_pro;
streaming_target_t *dst;
muxer_config_t c;
if (m_cfg)
@ -680,11 +927,12 @@ profile_matroska_open(profile_chain_t *prch,
if (pro->pro_webm)
c.m_type = MC_WEBM;
streaming_queue_init(&prch->prch_sq, 0, qsize);
prch->prch_gh = globalheaders_create(&prch->prch_sq.sq_st);
prch->prch_tsfix = tsfix_create(prch->prch_gh);
prch->prch_sq.sq_maxsize = qsize;
dst = prch->prch_gh = globalheaders_create(&prch->prch_sq.sq_st);
dst = prch->prch_tsfix = tsfix_create(dst);
prch->prch_st = dst;
prch->prch_muxer = muxer_create(&c);
prch->prch_st = prch->prch_tsfix;
return 0;
}
@ -934,38 +1182,92 @@ const idclass_t profile_transcode_class =
}
};
static int
profile_transcode_resolution(profile_transcode_t *pro)
{
return pro->pro_resolution >= 240 ? pro->pro_resolution : 240;
}
static int
profile_transcode_bandwidth(profile_transcode_t *pro)
{
return pro->pro_bandwidth >= 64 ? pro->pro_bandwidth : 64;
}
static int
profile_transcode_can_share(profile_chain_t *prch,
profile_chain_t *joiner)
{
profile_transcode_t *pro1 = (profile_transcode_t *)prch->prch_pro;
profile_transcode_t *pro2 = (profile_transcode_t *)joiner->prch_pro;
if (pro1 == pro2)
return 1;
if (!idnode_is_instance(&pro2->pro_id, &profile_transcode_class))
return 0;
/*
* Do full params check here, note that profiles might differ
* only in the muxer setup.
*/
if (strcmp(pro1->pro_vcodec ?: "", pro2->pro_vcodec ?: ""))
return 0;
if (strcmp(pro1->pro_acodec ?: "", pro2->pro_acodec ?: ""))
return 0;
if (strcmp(pro1->pro_scodec ?: "", pro2->pro_scodec ?: ""))
return 0;
if (profile_transcode_resolution(pro1) != profile_transcode_resolution(pro2))
return 0;
if (profile_transcode_bandwidth(pro1) != profile_transcode_bandwidth(pro2))
return 0;
if (strcmp(pro1->pro_language ?: "", pro2->pro_language ?: ""))
return 0;
return 1;
}
static int
profile_transcode_work(profile_chain_t *prch,
streaming_target_t *dst,
uint32_t timeshift_period, int flags)
{
profile_sharer_t *prsh;
profile_transcode_t *pro = (profile_transcode_t *)prch->prch_pro;
transcoder_props_t props;
prsh = profile_sharer_find(prch);
if (!prsh)
goto fail;
prch->prch_can_share = profile_transcode_can_share;
memset(&props, 0, sizeof(props));
strncpy(props.tp_vcodec, pro->pro_vcodec ?: "", sizeof(props.tp_vcodec)-1);
strncpy(props.tp_acodec, pro->pro_acodec ?: "", sizeof(props.tp_acodec)-1);
strncpy(props.tp_scodec, pro->pro_scodec ?: "", sizeof(props.tp_scodec)-1);
props.tp_resolution = pro->pro_resolution >= 240 ? pro->pro_resolution : 240;
props.tp_resolution = profile_transcode_resolution(pro);
props.tp_channels = pro->pro_channels;
props.tp_bandwidth = pro->pro_bandwidth >= 64 ? pro->pro_bandwidth : 64;
props.tp_bandwidth = profile_transcode_bandwidth(pro);
strncpy(props.tp_language, pro->pro_language ?: "", 3);
#if ENABLE_TIMESHIFT
if (timeshift_period > 0)
dst = prch->prch_timeshift = timeshift_create(dst, timeshift_period);
#endif
dst = prch->prch_transcoder = transcoder_create(dst);
if (!dst) {
profile_chain_close(prch);
return -1;
if (profile_sharer_create(prsh, prch, dst))
goto fail;
if (!prsh->prsh_transcoder) {
assert(!prsh->prsh_tsfix);
dst = prsh->prsh_transcoder = transcoder_create(&prsh->prsh_input);
if (!dst)
goto fail;
transcoder_set_properties(dst, &props);
prsh->prsh_tsfix = tsfix_create(dst);
}
transcoder_set_properties(dst, &props);
prch->prch_tsfix = tsfix_create(dst);
prch->prch_st = prch->prch_tsfix;
prch->prch_share = prsh->prsh_tsfix;
streaming_target_init(&prch->prch_input, profile_input, prch, 0);
prch->prch_st = &prch->prch_input;
return 0;
fail:
profile_chain_close(prch);
return -1;
}
static int
@ -1001,13 +1303,15 @@ profile_transcode_open(profile_chain_t *prch,
c.m_type = MC_MATROSKA;
}
streaming_queue_init(&prch->prch_sq, 0, qsize);
prch->prch_sq.sq_maxsize = qsize;
prch->prch_gh = globalheaders_create(&prch->prch_sq.sq_st);
r = profile_transcode_work(prch, prch->prch_gh, 0,
PRCH_FLAG_SKIPZEROING | PRCH_FLAG_TSFIX);
if (r)
r = profile_transcode_work(prch, prch->prch_gh, 0, 0);
if (r) {
profile_chain_close(prch);
return r;
}
prch->prch_muxer = muxer_create(&c);
return 0;
@ -1055,6 +1359,7 @@ profile_init(void)
LIST_INIT(&profile_builders);
TAILQ_INIT(&profiles);
LIST_INIT(&profile_chains);
profile_register(&profile_mpegts_pass_class, profile_mpegts_pass_builder);
profile_register(&profile_matroska_class, profile_matroska_builder);

View file

@ -26,6 +26,7 @@
struct profile;
struct muxer;
struct streaming_target;
struct streaming_start;
extern const idclass_t profile_class;
extern const idclass_t profile_mpegts_pass_class;
@ -47,24 +48,46 @@ typedef LIST_HEAD(, profile_build) profile_builders_queue;
extern profile_builders_queue profile_builders;
#define PRCH_FLAG_SKIPZEROING (1<<0)
#define PRCH_FLAG_TSFIX (1<<1)
typedef struct profile_sharer {
streaming_target_t prsh_input;
LIST_HEAD(,profile_chain) prsh_chains;
struct profile_chain *prsh_master;
struct streaming_start *prsh_start_msg;
struct streaming_target *prsh_tsfix;
#if ENABLE_LIBAV
struct streaming_target *prsh_transcoder;
#endif
} profile_sharer_t;
typedef struct profile_chain {
struct profile *prch_pro;
void *prch_id;
int prch_flags;
struct streaming_queue prch_sq;
struct streaming_target *prch_st;
struct muxer *prch_muxer;
struct streaming_target *prch_gh;
struct streaming_target *prch_tsfix;
#if ENABLE_LIBAV
struct streaming_target *prch_transcoder;
#endif
LIST_ENTRY(profile_chain) prch_link;
int prch_linked;
struct profile_sharer *prch_sharer;
LIST_ENTRY(profile_chain) prch_sharer_link;
struct profile *prch_pro;
void *prch_id;
int64_t prch_ts_delta;
int prch_flags;
int prch_stop;
int prch_start_pending;
struct streaming_queue prch_sq;
struct streaming_target *prch_post_share;
struct streaming_target *prch_st;
struct muxer *prch_muxer;
struct streaming_target *prch_gh;
struct streaming_target *prch_tsfix;
#if ENABLE_TIMESHIFT
struct streaming_target *prch_timeshift;
struct streaming_target *prch_timeshift;
#endif
struct streaming_target prch_input;
struct streaming_target *prch_share;
int (*prch_can_share)(struct profile_chain *prch,
struct profile_chain *joiner);
} profile_chain_t;
typedef struct profile {