From daa9788c42e93c2b9e8b41cba415f1af612c9970 Mon Sep 17 00:00:00 2001 From: Jaroslav Kysela Date: Fri, 24 Oct 2014 16:01:33 +0200 Subject: [PATCH] profile: add profile work sharing --- src/epggrab/module/eit.c | 4 +- src/htsp_server.c | 7 +- src/profile.c | 379 +++++++++++++++++++++++++++++++++++---- src/profile.h | 51 ++++-- 4 files changed, 384 insertions(+), 57 deletions(-) diff --git a/src/epggrab/module/eit.c b/src/epggrab/module/eit.c index 12bcf5af..d21f5b27 100644 --- a/src/epggrab/module/eit.c +++ b/src/epggrab/module/eit.c @@ -615,8 +615,10 @@ _eit_callback /* Get service */ svc = mpegts_mux_find_service(mm, sid); - if (!svc) + if (!svc) { + tvhtrace("eit", "sid %i not found", sid); goto done; + } if (map->om_first) { map->om_tune_count++; diff --git a/src/htsp_server.c b/src/htsp_server.c index 4716edd7..9f14b0ba 100644 --- a/src/htsp_server.c +++ b/src/htsp_server.c @@ -1704,7 +1704,6 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in) uint32_t chid, sid, weight, req90khz, timeshiftPeriod = 0; const char *str, *profile_id; channel_t *ch; - int pflags = 0; htsp_subscription_t *hs; profile_t *pro; @@ -1725,8 +1724,6 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in) weight = htsmsg_get_u32_or_default(in, "weight", 150); req90khz = htsmsg_get_u32_or_default(in, "90khz", 0); - if (htsmsg_get_u32_or_default(in, "normts", 0)) - pflags |= PRCH_FLAG_TSFIX; profile_id = htsmsg_get_str(in, "profile"); @@ -1762,7 +1759,7 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in) pro = profile_find_by_list(htsp->htsp_granted_access->aa_profiles, profile_id, "htsp"); profile_chain_init(&hs->hs_prch, pro, ch); - if (profile_chain_work(&hs->hs_prch, &hs->hs_input, timeshiftPeriod, pflags)) { + if (profile_chain_work(&hs->hs_prch, &hs->hs_input, timeshiftPeriod, 0)) { tvhlog(LOG_ERR, "htsp", "unable to create profile chain '%s'", pro->pro_name); free(hs); return htsp_error("Stream setup error"); @@ -1779,7 +1776,7 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in) htsmsg_t *rep = htsmsg_create_map(); if(req90khz) htsmsg_add_u32(rep, "90khz", 1); - if(hs->hs_prch.prch_tsfix) + if(hs->hs_prch.prch_sharer->prsh_tsfix) htsmsg_add_u32(rep, "normts", 1); #if ENABLE_TIMESHIFT diff --git a/src/profile.c b/src/profile.c index 28cc5942..59478330 100644 --- a/src/profile.c +++ b/src/profile.c @@ -35,6 +35,7 @@ profile_builders_queue profile_builders; struct profile_entry_queue profiles; +static LIST_HEAD(,profile_chain) profile_chains; static profile_t *profile_default; @@ -434,6 +435,218 @@ profile_get_htsp_list(htsmsg_t *array, htsmsg_t *filter) } } +/* + * + */ +static void +profile_deliver(profile_chain_t *prch, streaming_message_t *sm) +{ + if (prch->prch_start_pending) { + profile_sharer_t *prsh = prch->prch_sharer; + streaming_message_t *sm2; + if (!prsh->prsh_start_msg) { + streaming_msg_free(sm); + return; + } + sm2 = streaming_msg_create_data(SMT_START, + streaming_start_copy(prsh->prsh_start_msg)); + streaming_target_deliver(prch->prch_post_share, sm2); + prch->prch_start_pending = 0; + } + if (sm) + streaming_target_deliver(prch->prch_post_share, sm); +} + +/* + * + */ +static void +profile_input(void *opaque, streaming_message_t *sm) +{ + profile_chain_t *prch = opaque, *prch2; + profile_sharer_t *prsh = prch->prch_sharer; + + if (sm->sm_type == SMT_START) + prch->prch_stop = 0; + + if (prch == prsh->prsh_master) { + if (sm->sm_type == SMT_STOP) { + prch->prch_stop = 1; + /* elect new master */ + prsh->prsh_master = NULL; + LIST_FOREACH(prch2, &prsh->prsh_chains, prch_sharer_link) + if (!prch2->prch_stop) { + prsh->prsh_master = prch2; + break; + } + if (prsh->prsh_master) + goto direct; + } + streaming_target_deliver(prch->prch_share, sm); + return; + } + + if (sm->sm_type == SMT_STOP) { + prch->prch_stop = 1; + } else if (sm->sm_type == SMT_START) { + prch->prch_stop = 0; + prch->prch_start_pending = 1; + streaming_msg_free(sm); + sm = NULL; + } else if (sm->sm_type == SMT_PACKET || sm->sm_type == SMT_MPEGTS) { + streaming_msg_free(sm); + return; + } + +direct: + profile_deliver(prch, sm); +} + +/* + * + */ +static void +profile_sharer_deliver(profile_chain_t *prch, streaming_message_t *sm) +{ + if (sm->sm_type == SMT_PACKET) { + if (!prch->prch_ts_delta) + goto deliver; + th_pkt_t *pkt = sm->sm_data; + if (prch->prch_ts_delta == PTS_UNSET) { + prch->prch_ts_delta = MAX(0, pkt->pkt_dts - 10000); + printf("ts delta: %li\n", (long)prch->prch_ts_delta); + } + /* + * time correction here + */ + if (pkt->pkt_pts >= prch->prch_ts_delta && + pkt->pkt_dts >= prch->prch_ts_delta) { + th_pkt_t *n = pkt_copy_shallow(pkt); + pkt_ref_dec(pkt); + n->pkt_pts -= prch->prch_ts_delta; + n->pkt_dts -= prch->prch_ts_delta; + sm->sm_data = n; + } else { + streaming_msg_free(sm); + return; + } + } +deliver: + profile_deliver(prch, sm); +} + +/* + * + */ +static void +profile_sharer_input(void *opaque, streaming_message_t *sm) +{ + profile_sharer_t *prsh = opaque; + profile_chain_t *prch, *next, *run = NULL; + + if (sm->sm_type == SMT_STOP) { + if (prsh->prsh_start_msg) + streaming_start_unref(prsh->prsh_start_msg); + prsh->prsh_start_msg = NULL; + } + for (prch = LIST_FIRST(&prsh->prsh_chains); prch; prch = next) { + next = LIST_NEXT(prch, prch_sharer_link); + if (prch == prsh->prsh_master) { + if (sm->sm_type == SMT_START) { + if (prsh->prsh_start_msg) + streaming_start_unref(prsh->prsh_start_msg); + prsh->prsh_start_msg = streaming_start_copy(sm->sm_data); + } + if (run) + profile_sharer_deliver(run, streaming_msg_clone(sm)); + run = prch; + continue; + } + if (sm->sm_type != SMT_PACKET && sm->sm_type != SMT_MPEGTS) + continue; + if (prch->prch_stop) + continue; + if (run) + profile_sharer_deliver(run, streaming_msg_clone(sm)); + run = prch; + } + + if (run) + profile_sharer_deliver(run, sm); + else + streaming_msg_free(sm); +} + +/* + * + */ +static profile_sharer_t * +profile_sharer_find(profile_chain_t *prch) +{ + profile_sharer_t *prsh = NULL; + profile_chain_t *prch2; + + LIST_FOREACH(prch2, &profile_chains, prch_link) { + if (prch2->prch_id != prch->prch_id) + continue; + if (prch2 == prch) + continue; + if (prch2->prch_can_share && prch2->prch_can_share(prch2, prch)) { + prsh = prch2->prch_sharer; + break; + } + } + if (!prsh) { + prsh = calloc(1, sizeof(*prsh)); + streaming_target_init(&prsh->prsh_input, profile_sharer_input, prsh, 0); + LIST_INIT(&prsh->prsh_chains); + } + return prsh; +} + +/* + * + */ +static int +profile_sharer_create(profile_sharer_t *prsh, + profile_chain_t *prch, + streaming_target_t *dst) +{ + prch->prch_post_share = dst; + prch->prch_ts_delta = LIST_EMPTY(&prsh->prsh_chains) ? 0 : PTS_UNSET; + LIST_INSERT_HEAD(&prsh->prsh_chains, prch, prch_sharer_link); + prch->prch_sharer = prsh; + if (!prsh->prsh_master) + prsh->prsh_master = prch; + return 0; +} + +/* + * + */ +static void +profile_sharer_destroy(profile_chain_t *prch) +{ + profile_sharer_t *prsh = prch->prch_sharer; + + if (prsh == NULL) + return; + LIST_REMOVE(prch, prch_sharer_link); + prch->prch_sharer = NULL; + prch->prch_post_share = NULL; + if (LIST_EMPTY(&prsh->prsh_chains)) { + if (prsh->prsh_tsfix) + tsfix_destroy(prsh->prsh_tsfix); +#if ENABLE_LIBAV + if (prsh->prsh_transcoder) + transcoder_destroy(prsh->prsh_transcoder); +#endif + if (prsh->prsh_start_msg) + streaming_start_unref(prsh->prsh_start_msg); + free(prsh); + } +} + /* * */ @@ -446,6 +659,9 @@ profile_chain_init(profile_chain_t *prch, profile_t *pro, void *id) prch->prch_pro = pro; prch->prch_id = id; streaming_queue_init(&prch->prch_sq, 0, 0); + LIST_INSERT_HEAD(&profile_chains, prch, prch_link); + prch->prch_linked = 1; + prch->prch_stop = 1; } /* @@ -499,24 +715,39 @@ profile_chain_raw_open(profile_chain_t *prch, void *id, size_t qsize) void profile_chain_close(profile_chain_t *prch) { + profile_sharer_destroy(prch); + #if ENABLE_TIMESHIFT - if (prch->prch_timeshift) + if (prch->prch_timeshift) { timeshift_destroy(prch->prch_timeshift); + prch->prch_timeshift = NULL; + } #endif - if (prch->prch_tsfix) - tsfix_destroy(prch->prch_tsfix); - if (prch->prch_gh) + if (prch->prch_gh) { globalheaders_destroy(prch->prch_gh); -#if ENABLE_LIBAV - if (prch->prch_transcoder) - transcoder_destroy(prch->prch_transcoder); -#endif - if (prch->prch_muxer) + prch->prch_gh = NULL; + } + if (prch->prch_tsfix) { + globalheaders_destroy(prch->prch_tsfix); + prch->prch_tsfix = NULL; + } + if (prch->prch_muxer) { muxer_destroy(prch->prch_muxer); - streaming_queue_deinit(&prch->prch_sq); + prch->prch_muxer = NULL; + } + prch->prch_st = NULL; - if (prch->prch_pro) + + if (prch->prch_linked) { + streaming_queue_deinit(&prch->prch_sq); + LIST_REMOVE(prch, prch_link); + prch->prch_linked = 0; + } + + if (prch->prch_pro) { profile_release(prch->prch_pro); + prch->prch_pro = NULL; + } } /* @@ -538,19 +769,31 @@ profile_htsp_work(profile_chain_t *prch, streaming_target_t *dst, uint32_t timeshift_period, int flags) { + profile_sharer_t *prsh; + + prsh = profile_sharer_find(prch); + if (!prsh) + goto fail; + #if ENABLE_TIMESHIFT - if (timeshift_period > 0) { + if (timeshift_period > 0) dst = prch->prch_timeshift = timeshift_create(dst, timeshift_period); - flags |= PRCH_FLAG_TSFIX; - } #endif - if (flags & PRCH_FLAG_TSFIX) - dst = prch->prch_tsfix = tsfix_create(dst); + if (profile_sharer_create(prsh, prch, dst)) + goto fail; - prch->prch_st = dst; + if (!prsh->prsh_tsfix) + prsh->prsh_tsfix = tsfix_create(&prsh->prsh_input); + prch->prch_share = prsh->prsh_tsfix; + streaming_target_init(&prch->prch_input, profile_input, prch, 0); + prch->prch_st = &prch->prch_input; return 0; + +fail: + profile_chain_close(prch); + return -1; } static muxer_container_type_t @@ -618,7 +861,10 @@ profile_mpegts_pass_open(profile_chain_t *prch, c.m_rewrite_pmt = pro->pro_rewrite_pmt; prch->prch_flags = SUBSCRIPTION_RAW_MPEGTS; - streaming_queue_init(&prch->prch_sq, SMT_PACKET, qsize); + + prch->prch_sq.sq_st.st_reject_filter = SMT_PACKET; + prch->prch_sq.sq_maxsize = qsize; + prch->prch_muxer = muxer_create(&c); prch->prch_st = &prch->prch_sq.sq_st; return 0; @@ -669,6 +915,7 @@ profile_matroska_open(profile_chain_t *prch, muxer_config_t *m_cfg, int flags, size_t qsize) { profile_matroska_t *pro = (profile_matroska_t *)prch->prch_pro; + streaming_target_t *dst; muxer_config_t c; if (m_cfg) @@ -680,11 +927,12 @@ profile_matroska_open(profile_chain_t *prch, if (pro->pro_webm) c.m_type = MC_WEBM; - streaming_queue_init(&prch->prch_sq, 0, qsize); - prch->prch_gh = globalheaders_create(&prch->prch_sq.sq_st); - prch->prch_tsfix = tsfix_create(prch->prch_gh); + prch->prch_sq.sq_maxsize = qsize; + + dst = prch->prch_gh = globalheaders_create(&prch->prch_sq.sq_st); + dst = prch->prch_tsfix = tsfix_create(dst); + prch->prch_st = dst; prch->prch_muxer = muxer_create(&c); - prch->prch_st = prch->prch_tsfix; return 0; } @@ -934,38 +1182,92 @@ const idclass_t profile_transcode_class = } }; +static int +profile_transcode_resolution(profile_transcode_t *pro) +{ + return pro->pro_resolution >= 240 ? pro->pro_resolution : 240; +} + +static int +profile_transcode_bandwidth(profile_transcode_t *pro) +{ + return pro->pro_bandwidth >= 64 ? pro->pro_bandwidth : 64; +} + +static int +profile_transcode_can_share(profile_chain_t *prch, + profile_chain_t *joiner) +{ + profile_transcode_t *pro1 = (profile_transcode_t *)prch->prch_pro; + profile_transcode_t *pro2 = (profile_transcode_t *)joiner->prch_pro; + if (pro1 == pro2) + return 1; + if (!idnode_is_instance(&pro2->pro_id, &profile_transcode_class)) + return 0; + /* + * Do full params check here, note that profiles might differ + * only in the muxer setup. + */ + if (strcmp(pro1->pro_vcodec ?: "", pro2->pro_vcodec ?: "")) + return 0; + if (strcmp(pro1->pro_acodec ?: "", pro2->pro_acodec ?: "")) + return 0; + if (strcmp(pro1->pro_scodec ?: "", pro2->pro_scodec ?: "")) + return 0; + if (profile_transcode_resolution(pro1) != profile_transcode_resolution(pro2)) + return 0; + if (profile_transcode_bandwidth(pro1) != profile_transcode_bandwidth(pro2)) + return 0; + if (strcmp(pro1->pro_language ?: "", pro2->pro_language ?: "")) + return 0; + return 1; +} + static int profile_transcode_work(profile_chain_t *prch, streaming_target_t *dst, uint32_t timeshift_period, int flags) { + profile_sharer_t *prsh; profile_transcode_t *pro = (profile_transcode_t *)prch->prch_pro; transcoder_props_t props; + prsh = profile_sharer_find(prch); + if (!prsh) + goto fail; + + prch->prch_can_share = profile_transcode_can_share; + memset(&props, 0, sizeof(props)); strncpy(props.tp_vcodec, pro->pro_vcodec ?: "", sizeof(props.tp_vcodec)-1); strncpy(props.tp_acodec, pro->pro_acodec ?: "", sizeof(props.tp_acodec)-1); strncpy(props.tp_scodec, pro->pro_scodec ?: "", sizeof(props.tp_scodec)-1); - props.tp_resolution = pro->pro_resolution >= 240 ? pro->pro_resolution : 240; + props.tp_resolution = profile_transcode_resolution(pro); props.tp_channels = pro->pro_channels; - props.tp_bandwidth = pro->pro_bandwidth >= 64 ? pro->pro_bandwidth : 64; + props.tp_bandwidth = profile_transcode_bandwidth(pro); strncpy(props.tp_language, pro->pro_language ?: "", 3); #if ENABLE_TIMESHIFT if (timeshift_period > 0) dst = prch->prch_timeshift = timeshift_create(dst, timeshift_period); #endif - dst = prch->prch_transcoder = transcoder_create(dst); - if (!dst) { - profile_chain_close(prch); - return -1; + if (profile_sharer_create(prsh, prch, dst)) + goto fail; + if (!prsh->prsh_transcoder) { + assert(!prsh->prsh_tsfix); + dst = prsh->prsh_transcoder = transcoder_create(&prsh->prsh_input); + if (!dst) + goto fail; + transcoder_set_properties(dst, &props); + prsh->prsh_tsfix = tsfix_create(dst); } - transcoder_set_properties(dst, &props); - prch->prch_tsfix = tsfix_create(dst); - - prch->prch_st = prch->prch_tsfix; - + prch->prch_share = prsh->prsh_tsfix; + streaming_target_init(&prch->prch_input, profile_input, prch, 0); + prch->prch_st = &prch->prch_input; return 0; +fail: + profile_chain_close(prch); + return -1; } static int @@ -1001,13 +1303,15 @@ profile_transcode_open(profile_chain_t *prch, c.m_type = MC_MATROSKA; } - streaming_queue_init(&prch->prch_sq, 0, qsize); + prch->prch_sq.sq_maxsize = qsize; + prch->prch_gh = globalheaders_create(&prch->prch_sq.sq_st); - r = profile_transcode_work(prch, prch->prch_gh, 0, - PRCH_FLAG_SKIPZEROING | PRCH_FLAG_TSFIX); - if (r) + r = profile_transcode_work(prch, prch->prch_gh, 0, 0); + if (r) { + profile_chain_close(prch); return r; + } prch->prch_muxer = muxer_create(&c); return 0; @@ -1055,6 +1359,7 @@ profile_init(void) LIST_INIT(&profile_builders); TAILQ_INIT(&profiles); + LIST_INIT(&profile_chains); profile_register(&profile_mpegts_pass_class, profile_mpegts_pass_builder); profile_register(&profile_matroska_class, profile_matroska_builder); diff --git a/src/profile.h b/src/profile.h index 7be38cfd..e18701dd 100644 --- a/src/profile.h +++ b/src/profile.h @@ -26,6 +26,7 @@ struct profile; struct muxer; struct streaming_target; +struct streaming_start; extern const idclass_t profile_class; extern const idclass_t profile_mpegts_pass_class; @@ -47,24 +48,46 @@ typedef LIST_HEAD(, profile_build) profile_builders_queue; extern profile_builders_queue profile_builders; -#define PRCH_FLAG_SKIPZEROING (1<<0) -#define PRCH_FLAG_TSFIX (1<<1) +typedef struct profile_sharer { + streaming_target_t prsh_input; + LIST_HEAD(,profile_chain) prsh_chains; + struct profile_chain *prsh_master; + struct streaming_start *prsh_start_msg; + struct streaming_target *prsh_tsfix; +#if ENABLE_LIBAV + struct streaming_target *prsh_transcoder; +#endif +} profile_sharer_t; typedef struct profile_chain { - struct profile *prch_pro; - void *prch_id; - int prch_flags; - struct streaming_queue prch_sq; - struct streaming_target *prch_st; - struct muxer *prch_muxer; - struct streaming_target *prch_gh; - struct streaming_target *prch_tsfix; -#if ENABLE_LIBAV - struct streaming_target *prch_transcoder; -#endif + LIST_ENTRY(profile_chain) prch_link; + int prch_linked; + + struct profile_sharer *prch_sharer; + LIST_ENTRY(profile_chain) prch_sharer_link; + + struct profile *prch_pro; + void *prch_id; + + int64_t prch_ts_delta; + + int prch_flags; + int prch_stop; + int prch_start_pending; + struct streaming_queue prch_sq; + struct streaming_target *prch_post_share; + struct streaming_target *prch_st; + struct muxer *prch_muxer; + struct streaming_target *prch_gh; + struct streaming_target *prch_tsfix; #if ENABLE_TIMESHIFT - struct streaming_target *prch_timeshift; + struct streaming_target *prch_timeshift; #endif + struct streaming_target prch_input; + struct streaming_target *prch_share; + + int (*prch_can_share)(struct profile_chain *prch, + struct profile_chain *joiner); } profile_chain_t; typedef struct profile {