streaming: change streaming_pad_deliver() semantics

This commit is contained in:
Jaroslav Kysela 2014-10-24 10:27:04 +02:00
parent ad5f59ce14
commit 8a0078d7ed
11 changed files with 35 additions and 36 deletions

View file

@ -793,7 +793,7 @@ linuxdvb_frontend_monitor ( void *aux )
sm.sm_data = &sigstat;
LIST_FOREACH(s, &lfe->mi_transports, s_active_link) {
pthread_mutex_lock(&s->s_stream_mutex);
streaming_pad_deliver(&s->s_streaming_pad, &sm);
streaming_pad_deliver(&s->s_streaming_pad, streaming_msg_clone(&sm));
pthread_mutex_unlock(&s->s_stream_mutex);
}
}

View file

@ -814,7 +814,7 @@ done:
memset(&sm, 0, sizeof(sm));
sm.sm_type = SMT_MPEGTS;
sm.sm_data = pb;
streaming_pad_deliver(&mmi->mmi_streaming_pad, &sm);
streaming_pad_deliver(&mmi->mmi_streaming_pad, streaming_msg_clone(&sm));
pktbuf_ref_dec(pb);
}

View file

@ -1345,7 +1345,7 @@ satip_frontend_signal_cb( void *aux )
sm.sm_data = &sigstat;
LIST_FOREACH(svc, &lfe->mi_transports, s_active_link) {
pthread_mutex_lock(&svc->s_stream_mutex);
streaming_pad_deliver(&svc->s_streaming_pad, &sm);
streaming_pad_deliver(&svc->s_streaming_pad, streaming_msg_clone(&sm));
pthread_mutex_unlock(&svc->s_stream_mutex);
}
gtimer_arm_ms(&lfe->sf_monitor_timer, satip_frontend_signal_cb, lfe, 250);

View file

@ -283,7 +283,7 @@ ts_remux(mpegts_service_t *t, const uint8_t *src)
sm.sm_type = SMT_MPEGTS;
sm.sm_data = pb;
streaming_pad_deliver(&t->s_streaming_pad, &sm);
streaming_pad_deliver(&t->s_streaming_pad, streaming_msg_clone(&sm));
pktbuf_ref_dec(pb);

View file

@ -333,7 +333,7 @@ tvhdhomerun_frontend_monitor_cb( void *aux )
LIST_FOREACH(svc, &hfe->mi_transports, s_active_link) {
pthread_mutex_lock(&svc->s_stream_mutex);
streaming_pad_deliver(&svc->s_streaming_pad, &sm);
streaming_pad_deliver(&svc->s_streaming_pad, streaming_msg_clone(&sm));
pthread_mutex_unlock(&svc->s_stream_mutex);
}
}

View file

@ -801,9 +801,7 @@ extract_subtitle(mpegts_service_t *t, elementary_stream_t *st,
th_pkt_t *pkt = pkt_alloc(sub, off, pts, pts);
pkt->pkt_componentindex = st->es_index;
streaming_message_t *sm = streaming_msg_create_pkt(pkt);
streaming_pad_deliver(&t->s_streaming_pad, sm);
streaming_msg_free(sm);
streaming_pad_deliver(&t->s_streaming_pad, streaming_msg_create_pkt(pkt));
/* Decrease our own reference to the packet */
pkt_ref_dec(pkt);

View file

@ -1408,10 +1408,7 @@ parser_deliver(service_t *t, elementary_stream_t *st, th_pkt_t *pkt, int error)
/* Forward packet */
pkt->pkt_componentindex = st->es_index;
streaming_message_t *sm = streaming_msg_create_pkt(pkt);
streaming_pad_deliver(&t->s_streaming_pad, sm);
streaming_msg_free(sm);
streaming_pad_deliver(&t->s_streaming_pad, streaming_msg_create_pkt(pkt));
/* Decrease our own reference to the packet */
pkt_ref_dec(pkt);

View file

@ -1117,7 +1117,6 @@ service_servicetype_txt ( service_t *s )
void
service_set_streaming_status_flags_(service_t *t, int set)
{
streaming_message_t *sm;
lock_assert(&t->s_stream_mutex);
if(set == t->s_streaming_status)
@ -1136,10 +1135,9 @@ service_set_streaming_status_flags_(service_t *t, int set)
set & TSS_GRACEPERIOD ? "[Graceperiod expired] " : "",
set & TSS_TIMEOUT ? "[Data timeout] " : "");
sm = streaming_msg_create_code(SMT_SERVICE_STATUS,
t->s_streaming_status);
streaming_pad_deliver(&t->s_streaming_pad, sm);
streaming_msg_free(sm);
streaming_pad_deliver(&t->s_streaming_pad,
streaming_msg_create_code(SMT_SERVICE_STATUS,
t->s_streaming_status));
pthread_cond_broadcast(&t->s_tss_cond);
}
@ -1153,22 +1151,20 @@ service_set_streaming_status_flags_(service_t *t, int set)
void
service_restart(service_t *t, int had_components)
{
streaming_message_t *sm;
pthread_mutex_lock(&t->s_stream_mutex);
if(had_components) {
sm = streaming_msg_create_code(SMT_STOP, SM_CODE_SOURCE_RECONFIGURED);
streaming_pad_deliver(&t->s_streaming_pad, sm);
streaming_msg_free(sm);
streaming_pad_deliver(&t->s_streaming_pad,
streaming_msg_create_code(SMT_STOP,
SM_CODE_SOURCE_RECONFIGURED));
}
service_build_filter(t);
if(TAILQ_FIRST(&t->s_filt_components) != NULL) {
sm = streaming_msg_create_data(SMT_START,
service_build_stream_start(t));
streaming_pad_deliver(&t->s_streaming_pad, sm);
streaming_msg_free(sm);
streaming_pad_deliver(&t->s_streaming_pad,
streaming_msg_create_data(SMT_START,
service_build_stream_start(t)));
}
pthread_mutex_unlock(&t->s_stream_mutex);

View file

@ -316,10 +316,10 @@ streaming_msg_free(streaming_message_t *sm)
void
streaming_target_deliver2(streaming_target_t *st, streaming_message_t *sm)
{
if(st->st_reject_filter & SMT_TO_MASK(sm->sm_type))
if (st->st_reject_filter & SMT_TO_MASK(sm->sm_type))
streaming_msg_free(sm);
else
st->st_cb(st->st_opaque, sm);
streaming_target_deliver(st, sm);
}
/**
@ -328,18 +328,23 @@ streaming_target_deliver2(streaming_target_t *st, streaming_message_t *sm)
void
streaming_pad_deliver(streaming_pad_t *sp, streaming_message_t *sm)
{
streaming_target_t *st, *next;
streaming_target_t *st, *next, *run = NULL;
for(st = LIST_FIRST(&sp->sp_targets);st; st = next) {
for (st = LIST_FIRST(&sp->sp_targets); st; st = next) {
next = LIST_NEXT(st, st_link);
assert(next != st);
if(st->st_reject_filter & SMT_TO_MASK(sm->sm_type))
if (st->st_reject_filter & SMT_TO_MASK(sm->sm_type))
continue;
st->st_cb(st->st_opaque, streaming_msg_clone(sm));
if (run)
streaming_target_deliver(run, streaming_msg_clone(sm));
run = st;
}
if (run)
streaming_target_deliver(run, sm);
else
streaming_msg_free(sm);
}
/**
*
*/

View file

@ -101,7 +101,9 @@ streaming_message_t *streaming_msg_create_code(streaming_message_type_t type,
streaming_message_t *streaming_msg_create_pkt(th_pkt_t *pkt);
#define streaming_target_deliver(st, sm) ((st)->st_cb((st)->st_opaque, (sm)))
static inline void
streaming_target_deliver(streaming_target_t *st, streaming_message_t *sm)
{ st->st_cb(st->st_opaque, sm); }
void streaming_target_deliver2(streaming_target_t *st, streaming_message_t *sm);

View file

@ -89,9 +89,10 @@ subscription_link_service(th_subscription_t *s, service_t *t)
// Link to service output
streaming_target_connect(&t->s_streaming_pad, &s->ths_input);
sm = streaming_msg_create_code(SMT_GRACE, s->ths_postpone + t->s_grace_delay);
streaming_pad_deliver(&t->s_streaming_pad, sm);
streaming_msg_free(sm);
streaming_pad_deliver(&t->s_streaming_pad,
streaming_msg_create_code(SMT_GRACE,
s->ths_postpone +
t->s_grace_delay));
if(s->ths_start_message != NULL && t->s_streaming_status & TSS_PACKETS) {