diff --git a/src/input/mpegts/mpegts_input.c b/src/input/mpegts/mpegts_input.c index bba65391..56e423a4 100644 --- a/src/input/mpegts/mpegts_input.c +++ b/src/input/mpegts/mpegts_input.c @@ -478,6 +478,9 @@ static void mpegts_input_stopping_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi ) { + pthread_mutex_lock(&mi->mi_input_lock); + mi->mi_stop = 1; + pthread_mutex_unlock(&mi->mi_input_lock); pthread_mutex_lock(&mi->mi_output_lock); mi->mi_stop = 1; pthread_mutex_unlock(&mi->mi_output_lock); @@ -599,9 +602,10 @@ mpegts_input_recv_packets off += len2; pthread_mutex_lock(&mi->mi_input_lock); - if (TAILQ_FIRST(&mi->mi_input_queue) == NULL) + if (!mi->mi_stop) { + TAILQ_INSERT_TAIL(&mi->mi_input_queue, mp, mp_link); pthread_cond_signal(&mi->mi_input_cond); - TAILQ_INSERT_TAIL(&mi->mi_input_queue, mp, mp_link); + } pthread_mutex_unlock(&mi->mi_input_lock); } @@ -790,11 +794,13 @@ mpegts_input_process // TODO: might be able to optimise this a bit by having slightly // larger buffering and trying to aggregate data (if we get // same PID multiple times in the loop) - mpegts_table_feed_t *mtf = malloc(sizeof(mpegts_table_feed_t)); - memcpy(mtf->mtf_tsb, tsb, 188); - mtf->mtf_mux = mm; - TAILQ_INSERT_TAIL(&mi->mi_table_queue, mtf, mtf_link); - table_wakeup = 1; + if (!mi->mi_stop) { + mpegts_table_feed_t *mtf = malloc(sizeof(mpegts_table_feed_t)); + memcpy(mtf->mtf_tsb, tsb, 188); + mtf->mtf_mux = mm; + TAILQ_INSERT_TAIL(&mi->mi_table_queue, mtf, mtf_link); + table_wakeup = 1; + } } } else { //tvhdebug("tsdemux", "%s - SI packet had errors", name); @@ -884,12 +890,10 @@ mpegts_input_table_thread ( void *aux ) pthread_mutex_unlock(&mi->mi_output_lock); /* Process */ - if (mtf->mtf_mux) { - pthread_mutex_lock(&global_lock); - if (!mi->mi_stop) - mpegts_input_table_dispatch(mtf->mtf_mux, mtf->mtf_tsb); - pthread_mutex_unlock(&global_lock); - } + pthread_mutex_lock(&global_lock); + if (!mi->mi_stop && mtf->mtf_mux) + mpegts_input_table_dispatch(mtf->mtf_mux, mtf->mtf_tsb); + pthread_mutex_unlock(&global_lock); /* Cleanup */ free(mtf); @@ -927,11 +931,13 @@ mpegts_input_flush_mux } pthread_mutex_unlock(&mi->mi_input_lock); - /* Flush table Q - the global lock is already held */ + /* Flush table Q */ + pthread_mutex_lock(&mi->mi_output_lock); TAILQ_FOREACH(mtf, &mi->mi_table_queue, mtf_link) { if (mtf->mtf_mux == mm) mtf->mtf_mux = NULL; } + pthread_mutex_unlock(&mi->mi_output_lock); /* stop flag must be set here */ /* otherwise the picked mtf might be processed after mux deactivation */ assert(mi->mi_stop); diff --git a/src/input/mpegts/mpegts_mux.c b/src/input/mpegts/mpegts_mux.c index 9fed7f08..3c43f0bf 100644 --- a/src/input/mpegts/mpegts_mux.c +++ b/src/input/mpegts/mpegts_mux.c @@ -691,19 +691,20 @@ mpegts_mux_stop ( mpegts_mux_t *mm, int force ) /* Stop possible recursion */ if (!mmi) return; + + /* Clear */ mm->mm_active = NULL; mpegts_mux_nice_name(mm, buf, sizeof(buf)); tvhdebug("mpegts", "%s - stopping mux", buf); - if (mmi) { - mi = mmi->mmi_input; - mi->mi_stopping_mux(mi, mmi); - LIST_FOREACH(sub, &mmi->mmi_subs, ths_mmi_link) - subscription_unlink_mux(sub, SM_CODE_SUBSCRIPTION_OVERRIDDEN); - mi->mi_stop_mux(mi, mmi); - mi->mi_stopped_mux(mi, mmi); - } + mi = mmi->mmi_input; + assert(mi); + mi->mi_stopping_mux(mi, mmi); + LIST_FOREACH(sub, &mmi->mmi_subs, ths_mmi_link) + subscription_unlink_mux(sub, SM_CODE_SUBSCRIPTION_OVERRIDDEN); + mi->mi_stop_mux(mi, mmi); + mi->mi_stopped_mux(mi, mmi); /* Flush all tables */ tvhtrace("mpegts", "%s - flush tables", buf); @@ -711,31 +712,26 @@ mpegts_mux_stop ( mpegts_mux_t *mm, int force ) tvhtrace("mpegts", "%s - mi=%p", buf, (void *)mi); /* Flush table data queue */ - if (mi) - mpegts_input_flush_mux(mi, mm); + mpegts_input_flush_mux(mi, mm); /* Ensure PIDs are cleared */ - if (mi) { - pthread_mutex_lock(&mi->mi_output_lock); - mm->mm_last_pid = -1; - mm->mm_last_mp = NULL; - while ((mp = RB_FIRST(&mm->mm_pids))) { - assert(mi); - while ((mps = RB_FIRST(&mp->mp_subs))) { - RB_REMOVE(&mp->mp_subs, mps, mps_link); - free(mps); - } - RB_REMOVE(&mm->mm_pids, mp, mp_link); - if (mp->mp_fd != -1) { - tvhdebug("mpegts", "%s - close PID %04X (%d)", buf, mp->mp_pid, mp->mp_pid); - close(mp->mp_fd); - } - free(mp); + pthread_mutex_lock(&mi->mi_output_lock); + mm->mm_last_pid = -1; + mm->mm_last_mp = NULL; + while ((mp = RB_FIRST(&mm->mm_pids))) { + assert(mi); + while ((mps = RB_FIRST(&mp->mp_subs))) { + RB_REMOVE(&mp->mp_subs, mps, mps_link); + free(mps); } - pthread_mutex_unlock(&mi->mi_output_lock); - } else { - assert(RB_FIRST(&mm->mm_pids) == NULL); + RB_REMOVE(&mm->mm_pids, mp, mp_link); + if (mp->mp_fd != -1) { + tvhdebug("mpegts", "%s - close PID %04X (%d)", buf, mp->mp_pid, mp->mp_pid); + close(mp->mp_fd); + } + free(mp); } + pthread_mutex_unlock(&mi->mi_output_lock); /* Scanning */ mpegts_network_scan_mux_cancel(mm, 1); @@ -745,9 +741,6 @@ mpegts_mux_stop ( mpegts_mux_t *mm, int force ) /* Events */ mpegts_fire_event(mm, ml_mux_stop); - - /* Clear */ - mm->mm_active = NULL; } void