From fee21455dc594a9a1177a4a897c0954a5b06393c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96man?= Date: Tue, 2 Sep 2008 19:54:16 +0000 Subject: [PATCH] Add more fine grained locking for streams --- dvb/dvb_adapter.c | 5 ++++- dvb/dvb_transport.c | 2 ++ parsers.c | 4 ++-- psi.c | 7 ++++++- transports.c | 18 ++++++++++++------ tvhead.h | 3 +-- 6 files changed, 27 insertions(+), 12 deletions(-) diff --git a/dvb/dvb_adapter.c b/dvb/dvb_adapter.c index b0a4f992..34a4e347 100644 --- a/dvb/dvb_adapter.c +++ b/dvb/dvb_adapter.c @@ -435,7 +435,7 @@ dvb_adapter_clone(th_dvb_adapter_t *dst, th_dvb_adapter_t *src) if(t_src->tht_ch != NULL) transport_map_channel(t_dst, t_src->tht_ch); - + pthread_mutex_lock(&t_src->tht_stream_mutex); LIST_FOREACH(st_src, &t_src->tht_streams, st_link) { @@ -449,6 +449,9 @@ dvb_adapter_clone(th_dvb_adapter_t *dst, th_dvb_adapter_t *src) st_dst->st_frame_duration = st_src->st_frame_duration; st_dst->st_caid = st_src->st_caid; } + + pthread_mutex_unlock(&t_src->tht_stream_mutex); + } dvb_mux_save(tdmi_dst); } diff --git a/dvb/dvb_transport.c b/dvb/dvb_transport.c index 6c0857f9..1914db88 100644 --- a/dvb/dvb_transport.c +++ b/dvb/dvb_transport.c @@ -207,7 +207,9 @@ dvb_transport_load(th_dvb_mux_instance_t *tdmi) t->tht_chname = strdup(t->tht_svcname); } + pthread_mutex_lock(&t->tht_stream_mutex); psi_load_transport_settings(c, t); + pthread_mutex_unlock(&t->tht_stream_mutex); if(!htsmsg_get_u32(c, "mapped", &u32) && u32) transport_map_channel(t, NULL); diff --git a/parsers.c b/parsers.c index 2caa2777..dfcf007c 100644 --- a/parsers.c +++ b/parsers.c @@ -844,10 +844,10 @@ parser_deliver(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt) /* Alert all muxers tied to us that a new packet has arrived */ - pthread_mutex_lock(&t->tht_delivery_mutex); + lock_assert(&t->tht_stream_mutex); + LIST_FOREACH(tm, &t->tht_muxers, tm_transport_link) tm->tm_new_pkt(tm, st, pkt); - pthread_mutex_unlock(&t->tht_delivery_mutex); /* Unref (and possibly free) the packet, muxers are supposed to increase refcount or copy packet if they need anything */ diff --git a/psi.c b/psi.c index df6f4ee1..332b7a4c 100644 --- a/psi.c +++ b/psi.c @@ -78,6 +78,8 @@ psi_parse_pat(th_transport_t *t, uint8_t *ptr, int len, uint16_t pid; th_stream_t *st; + lock_assert(&t->tht_stream_mutex); + if(len < 5) return -1; @@ -182,7 +184,7 @@ psi_parse_pmt(th_transport_t *t, uint8_t *ptr, int len, int chksvcid) if(len < 9) return -1; - lock_assert(&global_lock); + lock_assert(&t->tht_stream_mutex); sid = ptr[0] << 8 | ptr[1]; pcr_pid = (ptr[5] & 0x1f) << 8 | ptr[6]; @@ -560,6 +562,8 @@ psi_save_transport_settings(htsmsg_t *m, th_transport_t *t) htsmsg_add_u32(m, "disabled", !!t->tht_disabled); + + pthread_mutex_lock(&t->tht_stream_mutex); LIST_FOREACH(st, &t->tht_streams, st_link) { sub = htsmsg_create(); @@ -577,6 +581,7 @@ psi_save_transport_settings(htsmsg_t *m, th_transport_t *t) htsmsg_add_msg(m, "stream", sub); } + pthread_mutex_unlock(&t->tht_stream_mutex); } diff --git a/transports.c b/transports.c index dbf5d545..cd71ca06 100644 --- a/transports.c +++ b/transports.c @@ -80,6 +80,8 @@ transport_stop(th_transport_t *t) assert(LIST_FIRST(&t->tht_muxers) == NULL); + pthread_mutex_lock(&t->tht_stream_mutex); + /** * Clean up each stream */ @@ -135,6 +137,8 @@ transport_stop(th_transport_t *t) pkt_unstore(st, pkt); } + pthread_mutex_unlock(&t->tht_stream_mutex); + } /** @@ -185,9 +189,9 @@ transport_link_muxer(th_transport_t *t, th_muxer_t *tm) return; } - pthread_mutex_lock(&t->tht_delivery_mutex); + pthread_mutex_lock(&t->tht_stream_mutex); LIST_INSERT_HEAD(&t->tht_muxers, tm, tm_transport_link); - pthread_mutex_unlock(&t->tht_delivery_mutex); + pthread_mutex_unlock(&t->tht_stream_mutex); tm->tm_transport = t; } @@ -205,9 +209,9 @@ transport_unlink_muxer(th_muxer_t *tm) lock_assert(&global_lock); - pthread_mutex_lock(&t->tht_delivery_mutex); + pthread_mutex_lock(&t->tht_stream_mutex); LIST_REMOVE(tm, tm_transport_link); - pthread_mutex_unlock(&t->tht_delivery_mutex); + pthread_mutex_unlock(&t->tht_stream_mutex); tm->tm_transport = NULL; } @@ -483,7 +487,7 @@ transport_create(const char *identifier, int type, int source_type) lock_assert(&global_lock); - pthread_mutex_init(&t->tht_delivery_mutex, NULL); + pthread_mutex_init(&t->tht_stream_mutex, NULL); t->tht_identifier = strdup(identifier); t->tht_type = type; t->tht_source_type = source_type; @@ -513,6 +517,8 @@ transport_find_by_identifier(const char *identifier) /** * Add a new stream to a transport + * + * */ th_stream_t * transport_add_stream(th_transport_t *t, int pid, tv_streamtype_t type) @@ -520,7 +526,7 @@ transport_add_stream(th_transport_t *t, int pid, tv_streamtype_t type) th_stream_t *st; int i = 0; - lock_assert(&global_lock); + lock_assert(&t->tht_stream_mutex); LIST_FOREACH(st, &t->tht_streams, st_link) { i++; diff --git a/tvhead.h b/tvhead.h index 6582a8a8..67a801df 100644 --- a/tvhead.h +++ b/tvhead.h @@ -384,6 +384,7 @@ typedef struct th_transport { int tht_tt_rundown_content_length; time_t tht_tt_clock; /* Network clock as determined by teletext decoder */ + pthread_mutex_t tht_stream_mutex; struct th_stream_list tht_streams; th_stream_t *tht_video; th_stream_t *tht_audio; @@ -427,8 +428,6 @@ typedef struct th_transport { int (*tht_quality_index)(struct th_transport *t); - pthread_mutex_t tht_delivery_mutex; - struct th_muxer_list tht_muxers; /* muxers */ /*