Add more fine grained locking for streams

This commit is contained in:
Andreas Öman 2008-09-02 19:54:16 +00:00
parent 654980dddd
commit fee21455dc
6 changed files with 27 additions and 12 deletions

View file

@ -435,7 +435,7 @@ dvb_adapter_clone(th_dvb_adapter_t *dst, th_dvb_adapter_t *src)
if(t_src->tht_ch != NULL)
transport_map_channel(t_dst, t_src->tht_ch);
pthread_mutex_lock(&t_src->tht_stream_mutex);
LIST_FOREACH(st_src, &t_src->tht_streams, st_link) {
@ -449,6 +449,9 @@ dvb_adapter_clone(th_dvb_adapter_t *dst, th_dvb_adapter_t *src)
st_dst->st_frame_duration = st_src->st_frame_duration;
st_dst->st_caid = st_src->st_caid;
}
pthread_mutex_unlock(&t_src->tht_stream_mutex);
}
dvb_mux_save(tdmi_dst);
}

View file

@ -207,7 +207,9 @@ dvb_transport_load(th_dvb_mux_instance_t *tdmi)
t->tht_chname = strdup(t->tht_svcname);
}
pthread_mutex_lock(&t->tht_stream_mutex);
psi_load_transport_settings(c, t);
pthread_mutex_unlock(&t->tht_stream_mutex);
if(!htsmsg_get_u32(c, "mapped", &u32) && u32)
transport_map_channel(t, NULL);

View file

@ -844,10 +844,10 @@ parser_deliver(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt)
/* Alert all muxers tied to us that a new packet has arrived */
pthread_mutex_lock(&t->tht_delivery_mutex);
lock_assert(&t->tht_stream_mutex);
LIST_FOREACH(tm, &t->tht_muxers, tm_transport_link)
tm->tm_new_pkt(tm, st, pkt);
pthread_mutex_unlock(&t->tht_delivery_mutex);
/* Unref (and possibly free) the packet, muxers are supposed
to increase refcount or copy packet if they need anything */

7
psi.c
View file

@ -78,6 +78,8 @@ psi_parse_pat(th_transport_t *t, uint8_t *ptr, int len,
uint16_t pid;
th_stream_t *st;
lock_assert(&t->tht_stream_mutex);
if(len < 5)
return -1;
@ -182,7 +184,7 @@ psi_parse_pmt(th_transport_t *t, uint8_t *ptr, int len, int chksvcid)
if(len < 9)
return -1;
lock_assert(&global_lock);
lock_assert(&t->tht_stream_mutex);
sid = ptr[0] << 8 | ptr[1];
pcr_pid = (ptr[5] & 0x1f) << 8 | ptr[6];
@ -560,6 +562,8 @@ psi_save_transport_settings(htsmsg_t *m, th_transport_t *t)
htsmsg_add_u32(m, "disabled", !!t->tht_disabled);
pthread_mutex_lock(&t->tht_stream_mutex);
LIST_FOREACH(st, &t->tht_streams, st_link) {
sub = htsmsg_create();
@ -577,6 +581,7 @@ psi_save_transport_settings(htsmsg_t *m, th_transport_t *t)
htsmsg_add_msg(m, "stream", sub);
}
pthread_mutex_unlock(&t->tht_stream_mutex);
}

View file

@ -80,6 +80,8 @@ transport_stop(th_transport_t *t)
assert(LIST_FIRST(&t->tht_muxers) == NULL);
pthread_mutex_lock(&t->tht_stream_mutex);
/**
* Clean up each stream
*/
@ -135,6 +137,8 @@ transport_stop(th_transport_t *t)
pkt_unstore(st, pkt);
}
pthread_mutex_unlock(&t->tht_stream_mutex);
}
/**
@ -185,9 +189,9 @@ transport_link_muxer(th_transport_t *t, th_muxer_t *tm)
return;
}
pthread_mutex_lock(&t->tht_delivery_mutex);
pthread_mutex_lock(&t->tht_stream_mutex);
LIST_INSERT_HEAD(&t->tht_muxers, tm, tm_transport_link);
pthread_mutex_unlock(&t->tht_delivery_mutex);
pthread_mutex_unlock(&t->tht_stream_mutex);
tm->tm_transport = t;
}
@ -205,9 +209,9 @@ transport_unlink_muxer(th_muxer_t *tm)
lock_assert(&global_lock);
pthread_mutex_lock(&t->tht_delivery_mutex);
pthread_mutex_lock(&t->tht_stream_mutex);
LIST_REMOVE(tm, tm_transport_link);
pthread_mutex_unlock(&t->tht_delivery_mutex);
pthread_mutex_unlock(&t->tht_stream_mutex);
tm->tm_transport = NULL;
}
@ -483,7 +487,7 @@ transport_create(const char *identifier, int type, int source_type)
lock_assert(&global_lock);
pthread_mutex_init(&t->tht_delivery_mutex, NULL);
pthread_mutex_init(&t->tht_stream_mutex, NULL);
t->tht_identifier = strdup(identifier);
t->tht_type = type;
t->tht_source_type = source_type;
@ -513,6 +517,8 @@ transport_find_by_identifier(const char *identifier)
/**
* Add a new stream to a transport
*
*
*/
th_stream_t *
transport_add_stream(th_transport_t *t, int pid, tv_streamtype_t type)
@ -520,7 +526,7 @@ transport_add_stream(th_transport_t *t, int pid, tv_streamtype_t type)
th_stream_t *st;
int i = 0;
lock_assert(&global_lock);
lock_assert(&t->tht_stream_mutex);
LIST_FOREACH(st, &t->tht_streams, st_link) {
i++;

View file

@ -384,6 +384,7 @@ typedef struct th_transport {
int tht_tt_rundown_content_length;
time_t tht_tt_clock; /* Network clock as determined by teletext decoder */
pthread_mutex_t tht_stream_mutex;
struct th_stream_list tht_streams;
th_stream_t *tht_video;
th_stream_t *tht_audio;
@ -427,8 +428,6 @@ typedef struct th_transport {
int (*tht_quality_index)(struct th_transport *t);
pthread_mutex_t tht_delivery_mutex;
struct th_muxer_list tht_muxers; /* muxers */
/*