mpegts input: cleanups in locking and queue shutdown

This commit is contained in:
Jaroslav Kysela 2014-10-26 20:56:11 +01:00
parent 16363a4ae3
commit 46ce1de009
2 changed files with 45 additions and 46 deletions

View file

@ -478,6 +478,9 @@ static void
mpegts_input_stopping_mux
( mpegts_input_t *mi, mpegts_mux_instance_t *mmi )
{
pthread_mutex_lock(&mi->mi_input_lock);
mi->mi_stop = 1;
pthread_mutex_unlock(&mi->mi_input_lock);
pthread_mutex_lock(&mi->mi_output_lock);
mi->mi_stop = 1;
pthread_mutex_unlock(&mi->mi_output_lock);
@ -599,9 +602,10 @@ mpegts_input_recv_packets
off += len2;
pthread_mutex_lock(&mi->mi_input_lock);
if (TAILQ_FIRST(&mi->mi_input_queue) == NULL)
if (!mi->mi_stop) {
TAILQ_INSERT_TAIL(&mi->mi_input_queue, mp, mp_link);
pthread_cond_signal(&mi->mi_input_cond);
TAILQ_INSERT_TAIL(&mi->mi_input_queue, mp, mp_link);
}
pthread_mutex_unlock(&mi->mi_input_lock);
}
@ -790,11 +794,13 @@ mpegts_input_process
// TODO: might be able to optimise this a bit by having slightly
// larger buffering and trying to aggregate data (if we get
// same PID multiple times in the loop)
mpegts_table_feed_t *mtf = malloc(sizeof(mpegts_table_feed_t));
memcpy(mtf->mtf_tsb, tsb, 188);
mtf->mtf_mux = mm;
TAILQ_INSERT_TAIL(&mi->mi_table_queue, mtf, mtf_link);
table_wakeup = 1;
if (!mi->mi_stop) {
mpegts_table_feed_t *mtf = malloc(sizeof(mpegts_table_feed_t));
memcpy(mtf->mtf_tsb, tsb, 188);
mtf->mtf_mux = mm;
TAILQ_INSERT_TAIL(&mi->mi_table_queue, mtf, mtf_link);
table_wakeup = 1;
}
}
} else {
//tvhdebug("tsdemux", "%s - SI packet had errors", name);
@ -884,12 +890,10 @@ mpegts_input_table_thread ( void *aux )
pthread_mutex_unlock(&mi->mi_output_lock);
/* Process */
if (mtf->mtf_mux) {
pthread_mutex_lock(&global_lock);
if (!mi->mi_stop)
mpegts_input_table_dispatch(mtf->mtf_mux, mtf->mtf_tsb);
pthread_mutex_unlock(&global_lock);
}
pthread_mutex_lock(&global_lock);
if (!mi->mi_stop && mtf->mtf_mux)
mpegts_input_table_dispatch(mtf->mtf_mux, mtf->mtf_tsb);
pthread_mutex_unlock(&global_lock);
/* Cleanup */
free(mtf);
@ -927,11 +931,13 @@ mpegts_input_flush_mux
}
pthread_mutex_unlock(&mi->mi_input_lock);
/* Flush table Q - the global lock is already held */
/* Flush table Q */
pthread_mutex_lock(&mi->mi_output_lock);
TAILQ_FOREACH(mtf, &mi->mi_table_queue, mtf_link) {
if (mtf->mtf_mux == mm)
mtf->mtf_mux = NULL;
}
pthread_mutex_unlock(&mi->mi_output_lock);
/* stop flag must be set here */
/* otherwise the picked mtf might be processed after mux deactivation */
assert(mi->mi_stop);

View file

@ -691,19 +691,20 @@ mpegts_mux_stop ( mpegts_mux_t *mm, int force )
/* Stop possible recursion */
if (!mmi) return;
/* Clear */
mm->mm_active = NULL;
mpegts_mux_nice_name(mm, buf, sizeof(buf));
tvhdebug("mpegts", "%s - stopping mux", buf);
if (mmi) {
mi = mmi->mmi_input;
mi->mi_stopping_mux(mi, mmi);
LIST_FOREACH(sub, &mmi->mmi_subs, ths_mmi_link)
subscription_unlink_mux(sub, SM_CODE_SUBSCRIPTION_OVERRIDDEN);
mi->mi_stop_mux(mi, mmi);
mi->mi_stopped_mux(mi, mmi);
}
mi = mmi->mmi_input;
assert(mi);
mi->mi_stopping_mux(mi, mmi);
LIST_FOREACH(sub, &mmi->mmi_subs, ths_mmi_link)
subscription_unlink_mux(sub, SM_CODE_SUBSCRIPTION_OVERRIDDEN);
mi->mi_stop_mux(mi, mmi);
mi->mi_stopped_mux(mi, mmi);
/* Flush all tables */
tvhtrace("mpegts", "%s - flush tables", buf);
@ -711,31 +712,26 @@ mpegts_mux_stop ( mpegts_mux_t *mm, int force )
tvhtrace("mpegts", "%s - mi=%p", buf, (void *)mi);
/* Flush table data queue */
if (mi)
mpegts_input_flush_mux(mi, mm);
mpegts_input_flush_mux(mi, mm);
/* Ensure PIDs are cleared */
if (mi) {
pthread_mutex_lock(&mi->mi_output_lock);
mm->mm_last_pid = -1;
mm->mm_last_mp = NULL;
while ((mp = RB_FIRST(&mm->mm_pids))) {
assert(mi);
while ((mps = RB_FIRST(&mp->mp_subs))) {
RB_REMOVE(&mp->mp_subs, mps, mps_link);
free(mps);
}
RB_REMOVE(&mm->mm_pids, mp, mp_link);
if (mp->mp_fd != -1) {
tvhdebug("mpegts", "%s - close PID %04X (%d)", buf, mp->mp_pid, mp->mp_pid);
close(mp->mp_fd);
}
free(mp);
pthread_mutex_lock(&mi->mi_output_lock);
mm->mm_last_pid = -1;
mm->mm_last_mp = NULL;
while ((mp = RB_FIRST(&mm->mm_pids))) {
assert(mi);
while ((mps = RB_FIRST(&mp->mp_subs))) {
RB_REMOVE(&mp->mp_subs, mps, mps_link);
free(mps);
}
pthread_mutex_unlock(&mi->mi_output_lock);
} else {
assert(RB_FIRST(&mm->mm_pids) == NULL);
RB_REMOVE(&mm->mm_pids, mp, mp_link);
if (mp->mp_fd != -1) {
tvhdebug("mpegts", "%s - close PID %04X (%d)", buf, mp->mp_pid, mp->mp_pid);
close(mp->mp_fd);
}
free(mp);
}
pthread_mutex_unlock(&mi->mi_output_lock);
/* Scanning */
mpegts_network_scan_mux_cancel(mm, 1);
@ -745,9 +741,6 @@ mpegts_mux_stop ( mpegts_mux_t *mm, int force )
/* Events */
mpegts_fire_event(mm, ml_mux_stop);
/* Clear */
mm->mm_active = NULL;
}
void