diff --git a/src/input/mpegts/linuxdvb/linuxdvb_frontend.c b/src/input/mpegts/linuxdvb/linuxdvb_frontend.c index 572d01bc..b8c51ad9 100644 --- a/src/input/mpegts/linuxdvb/linuxdvb_frontend.c +++ b/src/input/mpegts/linuxdvb/linuxdvb_frontend.c @@ -793,7 +793,7 @@ linuxdvb_frontend_monitor ( void *aux ) sm.sm_data = &sigstat; LIST_FOREACH(s, &lfe->mi_transports, s_active_link) { pthread_mutex_lock(&s->s_stream_mutex); - streaming_pad_deliver(&s->s_streaming_pad, &sm); + streaming_pad_deliver(&s->s_streaming_pad, streaming_msg_clone(&sm)); pthread_mutex_unlock(&s->s_stream_mutex); } } diff --git a/src/input/mpegts/mpegts_input.c b/src/input/mpegts/mpegts_input.c index cb52b919..bba65391 100644 --- a/src/input/mpegts/mpegts_input.c +++ b/src/input/mpegts/mpegts_input.c @@ -814,7 +814,7 @@ done: memset(&sm, 0, sizeof(sm)); sm.sm_type = SMT_MPEGTS; sm.sm_data = pb; - streaming_pad_deliver(&mmi->mmi_streaming_pad, &sm); + streaming_pad_deliver(&mmi->mmi_streaming_pad, streaming_msg_clone(&sm)); pktbuf_ref_dec(pb); } diff --git a/src/input/mpegts/satip/satip_frontend.c b/src/input/mpegts/satip/satip_frontend.c index 686ca576..56bab05d 100644 --- a/src/input/mpegts/satip/satip_frontend.c +++ b/src/input/mpegts/satip/satip_frontend.c @@ -1345,7 +1345,7 @@ satip_frontend_signal_cb( void *aux ) sm.sm_data = &sigstat; LIST_FOREACH(svc, &lfe->mi_transports, s_active_link) { pthread_mutex_lock(&svc->s_stream_mutex); - streaming_pad_deliver(&svc->s_streaming_pad, &sm); + streaming_pad_deliver(&svc->s_streaming_pad, streaming_msg_clone(&sm)); pthread_mutex_unlock(&svc->s_stream_mutex); } gtimer_arm_ms(&lfe->sf_monitor_timer, satip_frontend_signal_cb, lfe, 250); diff --git a/src/input/mpegts/tsdemux.c b/src/input/mpegts/tsdemux.c index 57883dad..2b4f7e07 100644 --- a/src/input/mpegts/tsdemux.c +++ b/src/input/mpegts/tsdemux.c @@ -283,7 +283,7 @@ ts_remux(mpegts_service_t *t, const uint8_t *src) sm.sm_type = SMT_MPEGTS; sm.sm_data = pb; - streaming_pad_deliver(&t->s_streaming_pad, &sm); + streaming_pad_deliver(&t->s_streaming_pad, streaming_msg_clone(&sm)); pktbuf_ref_dec(pb); diff --git a/src/input/mpegts/tvhdhomerun/tvhdhomerun_frontend.c b/src/input/mpegts/tvhdhomerun/tvhdhomerun_frontend.c index ace95234..07fcc894 100644 --- a/src/input/mpegts/tvhdhomerun/tvhdhomerun_frontend.c +++ b/src/input/mpegts/tvhdhomerun/tvhdhomerun_frontend.c @@ -333,7 +333,7 @@ tvhdhomerun_frontend_monitor_cb( void *aux ) LIST_FOREACH(svc, &hfe->mi_transports, s_active_link) { pthread_mutex_lock(&svc->s_stream_mutex); - streaming_pad_deliver(&svc->s_streaming_pad, &sm); + streaming_pad_deliver(&svc->s_streaming_pad, streaming_msg_clone(&sm)); pthread_mutex_unlock(&svc->s_stream_mutex); } } diff --git a/src/parsers/parser_teletext.c b/src/parsers/parser_teletext.c index a492ecc2..54ce573b 100644 --- a/src/parsers/parser_teletext.c +++ b/src/parsers/parser_teletext.c @@ -801,9 +801,7 @@ extract_subtitle(mpegts_service_t *t, elementary_stream_t *st, th_pkt_t *pkt = pkt_alloc(sub, off, pts, pts); pkt->pkt_componentindex = st->es_index; - streaming_message_t *sm = streaming_msg_create_pkt(pkt); - streaming_pad_deliver(&t->s_streaming_pad, sm); - streaming_msg_free(sm); + streaming_pad_deliver(&t->s_streaming_pad, streaming_msg_create_pkt(pkt)); /* Decrease our own reference to the packet */ pkt_ref_dec(pkt); diff --git a/src/parsers/parsers.c b/src/parsers/parsers.c index 77ed6e61..1a590f98 100644 --- a/src/parsers/parsers.c +++ b/src/parsers/parsers.c @@ -1408,10 +1408,7 @@ parser_deliver(service_t *t, elementary_stream_t *st, th_pkt_t *pkt, int error) /* Forward packet */ pkt->pkt_componentindex = st->es_index; - streaming_message_t *sm = streaming_msg_create_pkt(pkt); - - streaming_pad_deliver(&t->s_streaming_pad, sm); - streaming_msg_free(sm); + streaming_pad_deliver(&t->s_streaming_pad, streaming_msg_create_pkt(pkt)); /* Decrease our own reference to the packet */ pkt_ref_dec(pkt); diff --git a/src/service.c b/src/service.c index 7c51c811..0783cc78 100644 --- a/src/service.c +++ b/src/service.c @@ -1117,7 +1117,6 @@ service_servicetype_txt ( service_t *s ) void service_set_streaming_status_flags_(service_t *t, int set) { - streaming_message_t *sm; lock_assert(&t->s_stream_mutex); if(set == t->s_streaming_status) @@ -1136,10 +1135,9 @@ service_set_streaming_status_flags_(service_t *t, int set) set & TSS_GRACEPERIOD ? "[Graceperiod expired] " : "", set & TSS_TIMEOUT ? "[Data timeout] " : ""); - sm = streaming_msg_create_code(SMT_SERVICE_STATUS, - t->s_streaming_status); - streaming_pad_deliver(&t->s_streaming_pad, sm); - streaming_msg_free(sm); + streaming_pad_deliver(&t->s_streaming_pad, + streaming_msg_create_code(SMT_SERVICE_STATUS, + t->s_streaming_status)); pthread_cond_broadcast(&t->s_tss_cond); } @@ -1153,22 +1151,20 @@ service_set_streaming_status_flags_(service_t *t, int set) void service_restart(service_t *t, int had_components) { - streaming_message_t *sm; pthread_mutex_lock(&t->s_stream_mutex); if(had_components) { - sm = streaming_msg_create_code(SMT_STOP, SM_CODE_SOURCE_RECONFIGURED); - streaming_pad_deliver(&t->s_streaming_pad, sm); - streaming_msg_free(sm); + streaming_pad_deliver(&t->s_streaming_pad, + streaming_msg_create_code(SMT_STOP, + SM_CODE_SOURCE_RECONFIGURED)); } service_build_filter(t); if(TAILQ_FIRST(&t->s_filt_components) != NULL) { - sm = streaming_msg_create_data(SMT_START, - service_build_stream_start(t)); - streaming_pad_deliver(&t->s_streaming_pad, sm); - streaming_msg_free(sm); + streaming_pad_deliver(&t->s_streaming_pad, + streaming_msg_create_data(SMT_START, + service_build_stream_start(t))); } pthread_mutex_unlock(&t->s_stream_mutex); diff --git a/src/streaming.c b/src/streaming.c index 3cabff9c..9204689f 100644 --- a/src/streaming.c +++ b/src/streaming.c @@ -316,10 +316,10 @@ streaming_msg_free(streaming_message_t *sm) void streaming_target_deliver2(streaming_target_t *st, streaming_message_t *sm) { - if(st->st_reject_filter & SMT_TO_MASK(sm->sm_type)) + if (st->st_reject_filter & SMT_TO_MASK(sm->sm_type)) streaming_msg_free(sm); else - st->st_cb(st->st_opaque, sm); + streaming_target_deliver(st, sm); } /** @@ -328,18 +328,23 @@ streaming_target_deliver2(streaming_target_t *st, streaming_message_t *sm) void streaming_pad_deliver(streaming_pad_t *sp, streaming_message_t *sm) { - streaming_target_t *st, *next; + streaming_target_t *st, *next, *run = NULL; - for(st = LIST_FIRST(&sp->sp_targets);st; st = next) { + for (st = LIST_FIRST(&sp->sp_targets); st; st = next) { next = LIST_NEXT(st, st_link); assert(next != st); - if(st->st_reject_filter & SMT_TO_MASK(sm->sm_type)) + if (st->st_reject_filter & SMT_TO_MASK(sm->sm_type)) continue; - st->st_cb(st->st_opaque, streaming_msg_clone(sm)); + if (run) + streaming_target_deliver(run, streaming_msg_clone(sm)); + run = st; } + if (run) + streaming_target_deliver(run, sm); + else + streaming_msg_free(sm); } - /** * */ diff --git a/src/streaming.h b/src/streaming.h index 8f1cde80..c7c5f0e2 100644 --- a/src/streaming.h +++ b/src/streaming.h @@ -101,7 +101,9 @@ streaming_message_t *streaming_msg_create_code(streaming_message_type_t type, streaming_message_t *streaming_msg_create_pkt(th_pkt_t *pkt); -#define streaming_target_deliver(st, sm) ((st)->st_cb((st)->st_opaque, (sm))) +static inline void +streaming_target_deliver(streaming_target_t *st, streaming_message_t *sm) + { st->st_cb(st->st_opaque, sm); } void streaming_target_deliver2(streaming_target_t *st, streaming_message_t *sm); diff --git a/src/subscriptions.c b/src/subscriptions.c index 99bfa48a..23ee942f 100644 --- a/src/subscriptions.c +++ b/src/subscriptions.c @@ -89,9 +89,10 @@ subscription_link_service(th_subscription_t *s, service_t *t) // Link to service output streaming_target_connect(&t->s_streaming_pad, &s->ths_input); - sm = streaming_msg_create_code(SMT_GRACE, s->ths_postpone + t->s_grace_delay); - streaming_pad_deliver(&t->s_streaming_pad, sm); - streaming_msg_free(sm); + streaming_pad_deliver(&t->s_streaming_pad, + streaming_msg_create_code(SMT_GRACE, + s->ths_postpone + + t->s_grace_delay)); if(s->ths_start_message != NULL && t->s_streaming_status & TSS_PACKETS) {