From 4e3fc9d40b79ba394a1696be0302cda23c341e8a Mon Sep 17 00:00:00 2001 From: Adam Sutton Date: Fri, 11 Apr 2014 23:43:12 +0100 Subject: [PATCH] mpegts: reworking of the input threading I have split the input threading in two. There is now a smaller/faster thread responisble for reading data from the source device (file/socket/DVB/etc...) and a potentially slower (though not too slow!) thread for processing. This ensures that any minor delay in processing (potentially due to unexpected effects during start/stop, or anything else!) do not unduly impact reading from the source which could otherwise lead to loss of data. --- src/input/mpegts.h | 60 +++-- src/input/mpegts/mpegts_input.c | 415 ++++++++++++++++++++------------ src/input/mpegts/mpegts_mux.c | 10 +- 3 files changed, 307 insertions(+), 178 deletions(-) diff --git a/src/input/mpegts.h b/src/input/mpegts.h index a1a70d1d..242861d5 100644 --- a/src/input/mpegts.h +++ b/src/input/mpegts.h @@ -45,6 +45,8 @@ typedef struct mpegts_mux_sub mpegts_mux_sub_t; typedef struct mpegts_input mpegts_input_t; typedef struct mpegts_table_feed mpegts_table_feed_t; typedef struct mpegts_network_link mpegts_network_link_t; +typedef struct mpegts_packet mpegts_packet_t; +typedef struct mpegts_buffer mpegts_buffer_t; /* Lists */ typedef LIST_HEAD (,mpegts_network) mpegts_network_list_t; @@ -52,7 +54,8 @@ typedef LIST_HEAD (,mpegts_input) mpegts_input_list_t; typedef TAILQ_HEAD(mpegts_mux_queue,mpegts_mux) mpegts_mux_queue_t; typedef LIST_HEAD (,mpegts_mux) mpegts_mux_list_t; typedef LIST_HEAD (,mpegts_network_link) mpegts_network_link_list_t; -TAILQ_HEAD(mpegts_table_feed_queue, mpegts_table_feed); +typedef TAILQ_HEAD(mpegts_table_feed_queue, mpegts_table_feed) + mpegts_table_feed_queue_t; /* Classes */ extern const idclass_t mpegts_network_class; @@ -61,9 +64,17 @@ extern const idclass_t mpegts_service_class; extern const idclass_t mpegts_input_class; /* ************************************************************************** - * SI processing + * Data / SI processing * *************************************************************************/ +struct mpegts_packet +{ + TAILQ_ENTRY(mpegts_packet) mp_link; + size_t mp_len; + mpegts_mux_t *mp_mux; + uint8_t mp_data[0]; +}; + typedef int (*mpegts_table_callback_t) ( mpegts_table_t*, const uint8_t *buf, int len, int tableid ); @@ -437,10 +448,9 @@ struct mpegts_input mpegts_network_link_list_t mi_networks; - LIST_HEAD(,mpegts_mux_instance) mi_mux_active; - LIST_HEAD(,mpegts_mux_instance) mi_mux_instances; + /* * Status */ @@ -450,20 +460,28 @@ struct mpegts_input * Input processing */ - pthread_mutex_t mi_delivery_mutex; + int mi_running; - LIST_HEAD(,service) mi_transports; + /* Data input */ + // Note: this section is protected by mi_input_lock + pthread_t mi_input_tid; + pthread_mutex_t mi_input_lock; + pthread_cond_t mi_input_cond; + TAILQ_HEAD(,mpegts_packet) mi_input_queue; + /* Data processing/output */ + // Note: this lock (mi_output_lock) protects all the remaining + // data fields (excluding the callback functions) + pthread_mutex_t mi_output_lock; - struct mpegts_table_feed_queue mi_table_feed; - pthread_cond_t mi_table_feed_cond; // Bound to mi_delivery_mutex - - - pthread_t mi_thread_id; - th_pipe_t mi_thread_pipe; - - int mi_delivery_running; - pthread_t mi_thread_table_id; + /* Active sources */ + LIST_HEAD(,mpegts_mux_instance) mi_mux_active; + LIST_HEAD(,service) mi_transports; + + /* Table processing */ + pthread_t mi_table_tid; + pthread_cond_t mi_table_cond; + mpegts_table_feed_queue_t mi_table_queue; /* * Functions @@ -520,6 +538,8 @@ mpegts_input_t *mpegts_input_create0 mpegts_input_create0(calloc(1, sizeof(mpegts_input_t)),\ &mpegts_input_class, u, c) +void mpegts_input_stop_all ( mpegts_input_t *mi ); + void mpegts_input_delete ( mpegts_input_t *mi, int delconf ); #define mpegts_input_find(u) idnode_find(u, &mpegts_input_class); @@ -637,13 +657,9 @@ mpegts_mux_find_pid(mpegts_mux_t *mm, int pid, int create) return mm->mm_last_mp; } -size_t mpegts_input_recv_packets - (mpegts_input_t *mi, mpegts_mux_instance_t *mmi, uint8_t *tsb, size_t len, - int64_t *pcr, uint16_t *pcr_pid, const char *name); - -void mpegts_input_table_thread_start( mpegts_input_t *mi ); - -void mpegts_input_table_thread_stop( mpegts_input_t *mi ); +void mpegts_input_recv_packets + (mpegts_input_t *mi, mpegts_mux_instance_t *mmi, sbuf_t *sb, size_t off, + int64_t *pcr, uint16_t *pcr_pid); int mpegts_input_is_free ( mpegts_input_t *mi ); diff --git a/src/input/mpegts/mpegts_input.c b/src/input/mpegts/mpegts_input.c index dabc9779..12a9be09 100644 --- a/src/input/mpegts/mpegts_input.c +++ b/src/input/mpegts/mpegts_input.c @@ -186,13 +186,13 @@ mpegts_input_get_weight ( mpegts_input_t *mi ) } /* Service subs */ - pthread_mutex_lock(&mi->mi_delivery_mutex); + pthread_mutex_lock(&mi->mi_output_lock); LIST_FOREACH(s, &mi->mi_transports, s_active_link) { LIST_FOREACH(ths, &s->s_subscriptions, ths_service_link) { w = MAX(w, ths->ths_weight); } } - pthread_mutex_unlock(&mi->mi_delivery_mutex); + pthread_mutex_unlock(&mi->mi_output_lock); return w; } @@ -288,13 +288,12 @@ mpegts_input_open_service ( mpegts_input_t *mi, mpegts_service_t *s, int init ) elementary_stream_t *st; /* Add to list */ - pthread_mutex_lock(&mi->mi_delivery_mutex); + pthread_mutex_lock(&mi->mi_output_lock); if (!s->s_dvb_active_input) { LIST_INSERT_HEAD(&mi->mi_transports, ((service_t*)s), s_active_link); s->s_dvb_active_input = mi; } - /* Register PIDs */ pthread_mutex_lock(&s->s_stream_mutex); mi->mi_open_pid(mi, s->s_dvb_mux, s->s_pmt_pid, MPS_STREAM, s); @@ -303,7 +302,7 @@ mpegts_input_open_service ( mpegts_input_t *mi, mpegts_service_t *s, int init ) mi->mi_open_pid(mi, s->s_dvb_mux, st->es_pid, MPS_STREAM, s); pthread_mutex_unlock(&s->s_stream_mutex); - pthread_mutex_unlock(&mi->mi_delivery_mutex); + pthread_mutex_unlock(&mi->mi_output_lock); } void @@ -312,7 +311,7 @@ mpegts_input_close_service ( mpegts_input_t *mi, mpegts_service_t *s ) elementary_stream_t *st; /* Remove from list */ - pthread_mutex_lock(&mi->mi_delivery_mutex); + pthread_mutex_lock(&mi->mi_output_lock); if (s->s_dvb_active_input != NULL) { LIST_REMOVE(((service_t*)s), s_active_link); s->s_dvb_active_input = NULL; @@ -326,7 +325,7 @@ mpegts_input_close_service ( mpegts_input_t *mi, mpegts_service_t *s ) mi->mi_close_pid(mi, s->s_dvb_mux, st->es_pid, MPS_STREAM, s); pthread_mutex_unlock(&s->s_stream_mutex); - pthread_mutex_unlock(&mi->mi_delivery_mutex); + pthread_mutex_unlock(&mi->mi_output_lock); /* Stop mux? */ s->s_dvb_mux->mm_stop(s->s_dvb_mux, 0); @@ -388,14 +387,14 @@ mpegts_input_has_subscription ( mpegts_input_t *mi, mpegts_mux_t *mm ) { int ret = 0; service_t *t; - pthread_mutex_lock(&mi->mi_delivery_mutex); + pthread_mutex_lock(&mi->mi_output_lock); LIST_FOREACH(t, &mi->mi_transports, s_active_link) { if (((mpegts_service_t*)t)->s_dvb_mux == mm) { ret = 1; break; } } - pthread_mutex_unlock(&mi->mi_delivery_mutex); + pthread_mutex_unlock(&mi->mi_output_lock); return ret; } @@ -403,112 +402,160 @@ mpegts_input_has_subscription ( mpegts_input_t *mi, mpegts_mux_t *mm ) * Data processing * *************************************************************************/ -size_t -mpegts_input_recv_packets - ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi, - uint8_t *tsb, size_t l, int64_t *pcr, uint16_t *pcr_pid, - const char *name ) +static int inline +ts_sync_count ( const uint8_t *tsb, int len ) { - int len = l; - int i = 0, table_wakeup = 0; - int stream = 0; - int table = 0; - mpegts_mux_t *mm = mmi->mmi_mux; - mpegts_pid_t *last_mp = NULL; - assert(mm != NULL); - assert(name != NULL); + int i = 0; + while (len >= 188 && *tsb == 0x47) { + ++i; + len -= 188; + tsb += 188; + } + return i; +} - // TODO: get the input name - tvhtrace("tsdemux", "%s - recv pkts tsb=%p len=%d pcr=%p pcr_pid=%p mmi=%p", - name, tsb, (int)len, pcr, pcr_pid, mmi); - - /* Not enough data */ - if (len < 188) return len; - - /* Streaming - lock mutex */ - pthread_mutex_lock(&mi->mi_delivery_mutex); +void +mpegts_input_recv_packets + ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi, sbuf_t *sb, size_t off, + int64_t *pcr, uint16_t *pcr_pid ) +{ + int i, p = 0; + mpegts_packet_t *mp; + uint8_t *tsb = sb->sb_data + off; + int len = sb->sb_size - off; +#define MIN_TS_PKT 10 + + /* Check for sync */ + while ( (len >= (MIN_TS_PKT * 188)) && + ((p = ts_sync_count(tsb, len)) < MIN_TS_PKT) ) { + --len; + ++tsb; + ++off; + } + + // Note: we check for sync here so that the buffer can always be + // processed in its entirety inside the processing thread + // without the potential need to buffer data (since that would + // require per mmi buffers, where this is generally not required) + + /* Extract PCR */ + // Note: this is only used by tsfile for timing the play out of packets + // maybe we should move it? + if (pcr && pcr_pid) { + uint8_t *tmp = tsb; + for (i = 0; i < p; i++) { + if (*pcr_pid == (((tmp[1] & 0x1f) << 8) | tmp[2])) + ts_recv_packet1(NULL, tmp, pcr, 0); + tmp += 188; + } + } + + /* Pass */ + if (p >= 10) { + size_t sz = sizeof(mpegts_packet_t) + (p * 188); + + mp = calloc(1, sz); + mp->mp_mux = mmi->mmi_mux; + mp->mp_len = p * 188; + memcpy(mp->mp_data, tsb, mp->mp_len); + + pthread_mutex_lock(&mi->mi_input_lock); + if (TAILQ_FIRST(&mi->mi_input_queue) == NULL) + pthread_cond_signal(&mi->mi_input_cond); + TAILQ_INSERT_TAIL(&mi->mi_input_queue, mp, mp_link); + pthread_mutex_unlock(&mi->mi_input_lock); + + len -= mp->mp_len; + off += mp->mp_len; + } + + /* Adjust buffer */ + if (len) + sbuf_cut(sb, off); // cut off the bottom + else + sb->sb_ptr = 0; // clear +} + +static void +mpegts_input_process + ( mpegts_input_t *mi, mpegts_packet_t *mp ) +{ + int len = mp->mp_len; + int i = 0, table_wakeup = 0; + int table, stream; + uint8_t *tsb = mp->mp_data; + mpegts_mux_t *mm = mp->mp_mux; + mpegts_mux_instance_t *mmi = mm->mm_active; + mpegts_pid_t *last_mp = NULL; /* Process */ while ( len >= 188 ) { + mpegts_pid_t *mp; + mpegts_pid_sub_t *mps; + service_t *s; + int pid = ((tsb[i+1] & 0x1f) << 8) | tsb[i+2]; - /* Sync */ - if ( tsb[i] == 0x47 ) { - mpegts_pid_t *mp; - mpegts_pid_sub_t *mps; - service_t *s; - int pid = ((tsb[i+1] & 0x1f) << 8) | tsb[i+2]; - int64_t *ppcr = (pcr_pid && *pcr_pid == pid) ? pcr : NULL; - tvhtrace("tsdemux", "%s - recv pkt for pid %04X (%d) on mmi %p", - name, pid, pid, mmi); + /* Find PID */ + if ((mp = mpegts_mux_find_pid(mm, pid, 0))) { + // Note: there is a minor danger this caching will get things + // wrong for a brief period of time if the registrations on + // the PID change + if (mp != last_mp) { + if (pid == 0) + stream = table = 1; + else { + stream = table = 0; - /* Find PID */ - if ((mp = mpegts_mux_find_pid(mm, pid, 0))) { - - if (mp != last_mp) { - last_mp = mp; - stream = 0; - table = 0; - - /* Stream takes pref. */ + /* Determine PID type */ RB_FOREACH(mps, &mp->mp_subs, mps_link) { if (mps->mps_type & MPS_STREAM) stream = 1; if (mps->mps_type & MPS_TABLE) table = 1; + if (table && stream) break; } /* Special case streams */ - if (pid == 0) table = stream = 1; LIST_FOREACH(s, &mi->mi_transports, s_active_link) { if (((mpegts_service_t*)s)->s_dvb_mux != mmi->mmi_mux) continue; if (pid == s->s_pmt_pid) stream = 1; else if (pid == s->s_pcr_pid) stream = 1; } } + } - /* Stream data */ - if (stream) { - LIST_FOREACH(s, &mi->mi_transports, s_active_link) { - int f; - if (((mpegts_service_t*)s)->s_dvb_mux != mmi->mmi_mux) continue; - f = table || (pid == s->s_pmt_pid) || (pid == s->s_pcr_pid); - ts_recv_packet1((mpegts_service_t*)s, tsb+i, ppcr, f); - } + /* Stream data */ + if (stream) { + LIST_FOREACH(s, &mi->mi_transports, s_active_link) { + int f; + if (((mpegts_service_t*)s)->s_dvb_mux != mmi->mmi_mux) continue; + f = table || (pid == s->s_pmt_pid) || (pid == s->s_pcr_pid); + ts_recv_packet1((mpegts_service_t*)s, tsb+i, NULL, f); } - - /* Table data */ - if (table) { - if (!(tsb[i+1] & 0x80)) { - mpegts_table_feed_t *mtf = malloc(sizeof(mpegts_table_feed_t)); - memcpy(mtf->mtf_tsb, tsb+i, 188); - mtf->mtf_mux = mm; - TAILQ_INSERT_TAIL(&mi->mi_table_feed, mtf, mtf_link); - table_wakeup = 1; - } else { - tvhdebug("tsdemux", "%s - SI packet had errors", name); - } - } - - /* Force PCR extraction for tsfile */ - } else { - if (ppcr && *ppcr == PTS_UNSET) - ts_recv_packet1(NULL, tsb+i, ppcr, 0); } - i += 188; - len -= 188; - - /* Re-sync */ - } else { - tvhdebug("tsdemux", "%s - ts sync lost", name); - if (ts_resync(tsb, &len, &i)) break; - tvhdebug("tsdemux", "%s - ts sync found", name); + /* Table data */ + if (table) { + if (!(tsb[i+1] & 0x80)) { + // TODO: might be able to optimise this a bit by having slightly + // larger buffering and trying to aggregate data (if we get + // same PID multiple times in the loop) + mpegts_table_feed_t *mtf = malloc(sizeof(mpegts_table_feed_t)); + memcpy(mtf->mtf_tsb, tsb+i, 188); + mtf->mtf_mux = mm; + TAILQ_INSERT_TAIL(&mi->mi_table_queue, mtf, mtf_link); + table_wakeup = 1; + } else { + //tvhdebug("tsdemux", "%s - SI packet had errors", name); + } + } } + i += 188; + len -= 188; } /* Raw stream */ - // Note: this will include unsynced data if that's what is received if (i > 0 && LIST_FIRST(&mmi->mmi_streaming_pad.sp_targets) != NULL) { streaming_message_t sm; pktbuf_t *pb = pktbuf_alloc(tsb, i); @@ -521,17 +568,48 @@ mpegts_input_recv_packets /* Wake table */ if (table_wakeup) - pthread_cond_signal(&mi->mi_table_feed_cond); - - pthread_mutex_unlock(&mi->mi_delivery_mutex); + pthread_cond_signal(&mi->mi_table_cond); /* Bandwidth monitoring */ atomic_add(&mmi->mmi_stats.bps, i); - - /* Reset buffer */ - if (len) memmove(tsb, tsb+i, len); +} - return len; +static void * +mpegts_input_thread ( void * p ) +{ + mpegts_packet_t *mp; + mpegts_input_t *mi = p; + + pthread_mutex_lock(&mi->mi_input_lock); + while (mi->mi_running) { + + /* Wait for a packet */ + if (!(mp = TAILQ_FIRST(&mi->mi_input_queue))) { + pthread_cond_wait(&mi->mi_input_cond, &mi->mi_input_lock); + continue; + } + TAILQ_REMOVE(&mi->mi_input_queue, mp, mp_link); + pthread_mutex_unlock(&mi->mi_input_lock); + + /* Process */ + pthread_mutex_lock(&mi->mi_output_lock); + if (mp->mp_mux && mp->mp_mux->mm_active) + mpegts_input_process(mi, mp); + pthread_mutex_unlock(&mi->mi_output_lock); + + /* Cleanup */ + free(mp); + pthread_mutex_lock(&mi->mi_input_lock); + } + + /* Flush */ + while ((mp = TAILQ_FIRST(&mi->mi_input_queue))) { + TAILQ_REMOVE(&mi->mi_input_queue, mp, mp_link); + free(mp); + } + pthread_mutex_unlock(&mi->mi_input_lock); + + return NULL; } static void @@ -570,71 +648,65 @@ mpegts_input_table_thread ( void *aux ) mpegts_table_feed_t *mtf; mpegts_input_t *mi = aux; - pthread_mutex_lock(&mi->mi_delivery_mutex); - while (mi->mi_delivery_running) { + pthread_mutex_lock(&mi->mi_output_lock); + while (mi->mi_running) { /* Wait for data */ - while(!(mtf = TAILQ_FIRST(&mi->mi_table_feed))) { - if (!mi->mi_delivery_running) - break; - pthread_cond_wait(&mi->mi_table_feed_cond, &mi->mi_delivery_mutex); + if (!(mtf = TAILQ_FIRST(&mi->mi_table_queue))) { + pthread_cond_wait(&mi->mi_table_cond, &mi->mi_output_lock); + continue; } - if (mtf) - TAILQ_REMOVE(&mi->mi_table_feed, mtf, mtf_link); + TAILQ_REMOVE(&mi->mi_table_queue, mtf, mtf_link); + pthread_mutex_unlock(&mi->mi_output_lock); /* Process */ - if (mtf) { - pthread_mutex_unlock(&mi->mi_delivery_mutex); + if (mtf->mtf_mux) { pthread_mutex_lock(&global_lock); mpegts_input_table_dispatch(mtf->mtf_mux, mtf); pthread_mutex_unlock(&global_lock); - free(mtf); - pthread_mutex_lock(&mi->mi_delivery_mutex); } + + /* Cleanup */ + free(mtf); + pthread_mutex_lock(&mi->mi_output_lock); } - while ((mtf = TAILQ_FIRST(&mi->mi_table_feed)) != NULL) { - TAILQ_REMOVE(&mi->mi_table_feed, mtf, mtf_link); + + /* Flush */ + while ((mtf = TAILQ_FIRST(&mi->mi_table_queue)) != NULL) { + TAILQ_REMOVE(&mi->mi_table_queue, mtf, mtf_link); free(mtf); } - pthread_mutex_unlock(&mi->mi_delivery_mutex); + pthread_mutex_unlock(&mi->mi_output_lock); + return NULL; } -void -mpegts_input_table_thread_start( mpegts_input_t *mi ) -{ - mi->mi_delivery_running = 1; - tvhthread_create(&mi->mi_thread_table_id, NULL, - mpegts_input_table_thread, mi, 0); -} - -void -mpegts_input_table_thread_stop( mpegts_input_t *mi ) -{ - pthread_mutex_lock(&mi->mi_delivery_mutex); - mi->mi_delivery_running = 0; - pthread_cond_signal(&mi->mi_table_feed_cond); - pthread_mutex_unlock(&mi->mi_delivery_mutex); - pthread_join(mi->mi_thread_table_id, NULL); -} - void mpegts_input_flush_mux ( mpegts_input_t *mi, mpegts_mux_t *mm ) { - mpegts_table_feed_t *mtf, *next; + mpegts_table_feed_t *mtf; + mpegts_packet_t *mp; - pthread_mutex_lock(&mi->mi_delivery_mutex); - mtf = TAILQ_FIRST(&mi->mi_table_feed); - while (mtf) { - next = TAILQ_NEXT(mtf, mtf_link); - if (mtf->mtf_mux == mm) { - TAILQ_REMOVE(&mi->mi_table_feed, mtf, mtf_link); - free(mtf); - } - mtf = next; + // Note: to avoid long delays in here, rather than actually + // remove things from the Q, we simply invalidate by clearing + // the mux pointer and allow the threads to deal with the deletion + + /* Flush input Q */ + pthread_mutex_lock(&mi->mi_input_lock); + TAILQ_FOREACH(mp, &mi->mi_input_queue, mp_link) { + if (mp->mp_mux == mm) + mp->mp_mux = NULL; } - pthread_mutex_unlock(&mi->mi_delivery_mutex); + pthread_mutex_unlock(&mi->mi_input_lock); + + /* Flush table Q */ + pthread_mutex_lock(&mi->mi_output_lock); + TAILQ_FOREACH(mtf, &mi->mi_table_queue, mtf_link) { + if (mtf->mtf_mux == mm) + mtf->mtf_mux = NULL; + } + pthread_mutex_unlock(&mi->mi_output_lock); } static void @@ -685,13 +757,44 @@ mpegts_input_get_streams mpegts_input_t *mi = (mpegts_input_t*)i; mpegts_mux_instance_t *mmi; - pthread_mutex_lock(&mi->mi_delivery_mutex); + pthread_mutex_lock(&mi->mi_output_lock); LIST_FOREACH(mmi, &mi->mi_mux_active, mmi_active_link) { st = calloc(1, sizeof(tvh_input_stream_t)); mpegts_input_stream_status(mmi, st); LIST_INSERT_HEAD(isl, st, link); } - pthread_mutex_unlock(&mi->mi_delivery_mutex); + pthread_mutex_unlock(&mi->mi_output_lock); +} + +static void +mpegts_input_thread_start ( mpegts_input_t *mi ) +{ + mi->mi_running = 1; + + tvhthread_create(&mi->mi_table_tid, NULL, + mpegts_input_table_thread, mi, 0); + tvhthread_create(&mi->mi_input_tid, NULL, + mpegts_input_thread, mi, 0); +} + +static void +mpegts_input_thread_stop ( mpegts_input_t *mi ) +{ + mi->mi_running = 0; + + /* Stop input thread */ + pthread_mutex_lock(&mi->mi_input_lock); + pthread_cond_signal(&mi->mi_input_cond); + pthread_mutex_unlock(&mi->mi_input_lock); + + /* Stop table thread */ + pthread_mutex_lock(&mi->mi_output_lock); + pthread_cond_signal(&mi->mi_table_cond); + pthread_mutex_unlock(&mi->mi_output_lock); + + /* Join threads */ + pthread_join(mi->mi_input_tid, NULL); + pthread_join(mi->mi_table_tid, NULL); } /* ************************************************************************** @@ -705,7 +808,7 @@ mpegts_input_status_timer ( void *p ) mpegts_input_t *mi = p; mpegts_mux_instance_t *mmi; htsmsg_t *e; - pthread_mutex_lock(&mi->mi_delivery_mutex); + pthread_mutex_lock(&mi->mi_output_lock); LIST_FOREACH(mmi, &mi->mi_mux_active, mmi_active_link) { memset(&st, 0, sizeof(st)); mpegts_input_stream_status(mmi, &st); @@ -714,7 +817,7 @@ mpegts_input_status_timer ( void *p ) notify_by_msg("input_status", e); tvh_input_stream_destroy(&st); } - pthread_mutex_unlock(&mi->mi_delivery_mutex); + pthread_mutex_unlock(&mi->mi_output_lock); gtimer_arm(&mi->mi_status_timer, mpegts_input_status_timer, mi, 1); } @@ -754,15 +857,14 @@ mpegts_input_create0 /* Index */ mi->mi_instance = ++mpegts_input_idx; - /* Init mutex */ - pthread_mutex_init(&mi->mi_delivery_mutex, NULL); - - /* Table input */ - TAILQ_INIT(&mi->mi_table_feed); - pthread_cond_init(&mi->mi_table_feed_cond, NULL); + /* Init input/output structures */ + pthread_mutex_init(&mi->mi_input_lock, NULL); + pthread_cond_init(&mi->mi_input_cond, NULL); + TAILQ_INIT(&mi->mi_input_queue); - /* Init input thread control */ - mi->mi_thread_pipe.rd = mi->mi_thread_pipe.wr = -1; + pthread_mutex_init(&mi->mi_output_lock, NULL); + pthread_cond_init(&mi->mi_table_cond, NULL); + TAILQ_INIT(&mi->mi_table_queue); /* Add to global list */ LIST_INSERT_HEAD(&mpegts_input_all, mi, mi_global_link); @@ -771,19 +873,30 @@ mpegts_input_create0 if (c) idnode_load(&mi->ti_id, c); + /* Start threads */ + mpegts_input_thread_start(mi); + return mi; } +void +mpegts_input_stop_all ( mpegts_input_t *mi ) +{ + mpegts_mux_instance_t *mmi; + while ((mmi = LIST_FIRST(&mi->mi_mux_active))) + mmi->mmi_mux->mm_stop(mmi->mmi_mux, 1); +} + void mpegts_input_delete ( mpegts_input_t *mi, int delconf ) { mpegts_network_link_t *mnl; + mpegts_input_thread_stop(mi); while ((mnl = LIST_FIRST(&mi->mi_networks))) mpegts_input_del_network(mnl); idnode_unlink(&mi->ti_id); - pthread_mutex_destroy(&mi->mi_delivery_mutex); - pthread_cond_destroy(&mi->mi_table_feed_cond); - tvh_pipe_close(&mi->mi_thread_pipe); + pthread_mutex_destroy(&mi->mi_output_lock); + pthread_cond_destroy(&mi->mi_table_cond); LIST_REMOVE(mi, ti_link); LIST_REMOVE(mi, mi_global_link); free(mi->mi_name); diff --git a/src/input/mpegts/mpegts_mux.c b/src/input/mpegts/mpegts_mux.c index 0ccdcaf4..72b86226 100644 --- a/src/input/mpegts/mpegts_mux.c +++ b/src/input/mpegts/mpegts_mux.c @@ -147,7 +147,7 @@ mpegts_mux_instance_weight ( mpegts_mux_instance_t *mmi ) const service_t *s; const th_subscription_t *ths; mpegts_input_t *mi = mmi->mmi_input; - lock_assert(&mi->mi_delivery_mutex); + lock_assert(&mi->mi_output_lock); /* Direct subs */ LIST_FOREACH(ths, &mmi->mmi_subs, ths_mmi_link) { @@ -578,9 +578,9 @@ mpegts_mux_open_table ( mpegts_mux_t *mm, mpegts_table_t *mt ) mpegts_input_t *mi; if (!mm->mm_active || !mm->mm_active->mmi_input) return; mi = mm->mm_active->mmi_input; - pthread_mutex_lock(&mi->mi_delivery_mutex); + pthread_mutex_lock(&mi->mi_output_lock); mi->mi_open_pid(mi, mm, mt->mt_pid, type, mt); - pthread_mutex_unlock(&mi->mi_delivery_mutex); + pthread_mutex_unlock(&mi->mi_output_lock); } void @@ -591,9 +591,9 @@ mpegts_mux_close_table ( mpegts_mux_t *mm, mpegts_table_t *mt ) if (mt->mt_flags & MT_RECORD) type |= MPS_STREAM; if (!mm->mm_active || !mm->mm_active->mmi_input) return; mi = mm->mm_active->mmi_input; - pthread_mutex_lock(&mi->mi_delivery_mutex); + pthread_mutex_lock(&mi->mi_output_lock); mi->mi_close_pid(mi, mm, mt->mt_pid, type, mt); - pthread_mutex_unlock(&mi->mi_delivery_mutex); + pthread_mutex_unlock(&mi->mi_output_lock); } /* **************************************************************************