mpegts: reworking of the input threading

I have split the input threading in two. There is now a smaller/faster thread
responisble for reading data from the source device (file/socket/DVB/etc...)
and a potentially slower (though not too slow!) thread for processing.

This ensures that any minor delay in processing (potentially due to unexpected
effects during start/stop, or anything else!) do not unduly impact reading from
the source which could otherwise lead to loss of data.
This commit is contained in:
Adam Sutton 2014-04-11 23:43:12 +01:00
parent 0a3f08dacd
commit 4e3fc9d40b
3 changed files with 307 additions and 178 deletions

View file

@ -45,6 +45,8 @@ typedef struct mpegts_mux_sub mpegts_mux_sub_t;
typedef struct mpegts_input mpegts_input_t;
typedef struct mpegts_table_feed mpegts_table_feed_t;
typedef struct mpegts_network_link mpegts_network_link_t;
typedef struct mpegts_packet mpegts_packet_t;
typedef struct mpegts_buffer mpegts_buffer_t;
/* Lists */
typedef LIST_HEAD (,mpegts_network) mpegts_network_list_t;
@ -52,7 +54,8 @@ typedef LIST_HEAD (,mpegts_input) mpegts_input_list_t;
typedef TAILQ_HEAD(mpegts_mux_queue,mpegts_mux) mpegts_mux_queue_t;
typedef LIST_HEAD (,mpegts_mux) mpegts_mux_list_t;
typedef LIST_HEAD (,mpegts_network_link) mpegts_network_link_list_t;
TAILQ_HEAD(mpegts_table_feed_queue, mpegts_table_feed);
typedef TAILQ_HEAD(mpegts_table_feed_queue, mpegts_table_feed)
mpegts_table_feed_queue_t;
/* Classes */
extern const idclass_t mpegts_network_class;
@ -61,9 +64,17 @@ extern const idclass_t mpegts_service_class;
extern const idclass_t mpegts_input_class;
/* **************************************************************************
* SI processing
* Data / SI processing
* *************************************************************************/
struct mpegts_packet
{
TAILQ_ENTRY(mpegts_packet) mp_link;
size_t mp_len;
mpegts_mux_t *mp_mux;
uint8_t mp_data[0];
};
typedef int (*mpegts_table_callback_t)
( mpegts_table_t*, const uint8_t *buf, int len, int tableid );
@ -437,10 +448,9 @@ struct mpegts_input
mpegts_network_link_list_t mi_networks;
LIST_HEAD(,mpegts_mux_instance) mi_mux_active;
LIST_HEAD(,mpegts_mux_instance) mi_mux_instances;
/*
* Status
*/
@ -450,20 +460,28 @@ struct mpegts_input
* Input processing
*/
pthread_mutex_t mi_delivery_mutex;
int mi_running;
LIST_HEAD(,service) mi_transports;
/* Data input */
// Note: this section is protected by mi_input_lock
pthread_t mi_input_tid;
pthread_mutex_t mi_input_lock;
pthread_cond_t mi_input_cond;
TAILQ_HEAD(,mpegts_packet) mi_input_queue;
/* Data processing/output */
// Note: this lock (mi_output_lock) protects all the remaining
// data fields (excluding the callback functions)
pthread_mutex_t mi_output_lock;
struct mpegts_table_feed_queue mi_table_feed;
pthread_cond_t mi_table_feed_cond; // Bound to mi_delivery_mutex
pthread_t mi_thread_id;
th_pipe_t mi_thread_pipe;
int mi_delivery_running;
pthread_t mi_thread_table_id;
/* Active sources */
LIST_HEAD(,mpegts_mux_instance) mi_mux_active;
LIST_HEAD(,service) mi_transports;
/* Table processing */
pthread_t mi_table_tid;
pthread_cond_t mi_table_cond;
mpegts_table_feed_queue_t mi_table_queue;
/*
* Functions
@ -520,6 +538,8 @@ mpegts_input_t *mpegts_input_create0
mpegts_input_create0(calloc(1, sizeof(mpegts_input_t)),\
&mpegts_input_class, u, c)
void mpegts_input_stop_all ( mpegts_input_t *mi );
void mpegts_input_delete ( mpegts_input_t *mi, int delconf );
#define mpegts_input_find(u) idnode_find(u, &mpegts_input_class);
@ -637,13 +657,9 @@ mpegts_mux_find_pid(mpegts_mux_t *mm, int pid, int create)
return mm->mm_last_mp;
}
size_t mpegts_input_recv_packets
(mpegts_input_t *mi, mpegts_mux_instance_t *mmi, uint8_t *tsb, size_t len,
int64_t *pcr, uint16_t *pcr_pid, const char *name);
void mpegts_input_table_thread_start( mpegts_input_t *mi );
void mpegts_input_table_thread_stop( mpegts_input_t *mi );
void mpegts_input_recv_packets
(mpegts_input_t *mi, mpegts_mux_instance_t *mmi, sbuf_t *sb, size_t off,
int64_t *pcr, uint16_t *pcr_pid);
int mpegts_input_is_free ( mpegts_input_t *mi );

View file

@ -186,13 +186,13 @@ mpegts_input_get_weight ( mpegts_input_t *mi )
}
/* Service subs */
pthread_mutex_lock(&mi->mi_delivery_mutex);
pthread_mutex_lock(&mi->mi_output_lock);
LIST_FOREACH(s, &mi->mi_transports, s_active_link) {
LIST_FOREACH(ths, &s->s_subscriptions, ths_service_link) {
w = MAX(w, ths->ths_weight);
}
}
pthread_mutex_unlock(&mi->mi_delivery_mutex);
pthread_mutex_unlock(&mi->mi_output_lock);
return w;
}
@ -288,13 +288,12 @@ mpegts_input_open_service ( mpegts_input_t *mi, mpegts_service_t *s, int init )
elementary_stream_t *st;
/* Add to list */
pthread_mutex_lock(&mi->mi_delivery_mutex);
pthread_mutex_lock(&mi->mi_output_lock);
if (!s->s_dvb_active_input) {
LIST_INSERT_HEAD(&mi->mi_transports, ((service_t*)s), s_active_link);
s->s_dvb_active_input = mi;
}
/* Register PIDs */
pthread_mutex_lock(&s->s_stream_mutex);
mi->mi_open_pid(mi, s->s_dvb_mux, s->s_pmt_pid, MPS_STREAM, s);
@ -303,7 +302,7 @@ mpegts_input_open_service ( mpegts_input_t *mi, mpegts_service_t *s, int init )
mi->mi_open_pid(mi, s->s_dvb_mux, st->es_pid, MPS_STREAM, s);
pthread_mutex_unlock(&s->s_stream_mutex);
pthread_mutex_unlock(&mi->mi_delivery_mutex);
pthread_mutex_unlock(&mi->mi_output_lock);
}
void
@ -312,7 +311,7 @@ mpegts_input_close_service ( mpegts_input_t *mi, mpegts_service_t *s )
elementary_stream_t *st;
/* Remove from list */
pthread_mutex_lock(&mi->mi_delivery_mutex);
pthread_mutex_lock(&mi->mi_output_lock);
if (s->s_dvb_active_input != NULL) {
LIST_REMOVE(((service_t*)s), s_active_link);
s->s_dvb_active_input = NULL;
@ -326,7 +325,7 @@ mpegts_input_close_service ( mpegts_input_t *mi, mpegts_service_t *s )
mi->mi_close_pid(mi, s->s_dvb_mux, st->es_pid, MPS_STREAM, s);
pthread_mutex_unlock(&s->s_stream_mutex);
pthread_mutex_unlock(&mi->mi_delivery_mutex);
pthread_mutex_unlock(&mi->mi_output_lock);
/* Stop mux? */
s->s_dvb_mux->mm_stop(s->s_dvb_mux, 0);
@ -388,14 +387,14 @@ mpegts_input_has_subscription ( mpegts_input_t *mi, mpegts_mux_t *mm )
{
int ret = 0;
service_t *t;
pthread_mutex_lock(&mi->mi_delivery_mutex);
pthread_mutex_lock(&mi->mi_output_lock);
LIST_FOREACH(t, &mi->mi_transports, s_active_link) {
if (((mpegts_service_t*)t)->s_dvb_mux == mm) {
ret = 1;
break;
}
}
pthread_mutex_unlock(&mi->mi_delivery_mutex);
pthread_mutex_unlock(&mi->mi_output_lock);
return ret;
}
@ -403,112 +402,160 @@ mpegts_input_has_subscription ( mpegts_input_t *mi, mpegts_mux_t *mm )
* Data processing
* *************************************************************************/
size_t
mpegts_input_recv_packets
( mpegts_input_t *mi, mpegts_mux_instance_t *mmi,
uint8_t *tsb, size_t l, int64_t *pcr, uint16_t *pcr_pid,
const char *name )
static int inline
ts_sync_count ( const uint8_t *tsb, int len )
{
int len = l;
int i = 0, table_wakeup = 0;
int stream = 0;
int table = 0;
mpegts_mux_t *mm = mmi->mmi_mux;
mpegts_pid_t *last_mp = NULL;
assert(mm != NULL);
assert(name != NULL);
int i = 0;
while (len >= 188 && *tsb == 0x47) {
++i;
len -= 188;
tsb += 188;
}
return i;
}
// TODO: get the input name
tvhtrace("tsdemux", "%s - recv pkts tsb=%p len=%d pcr=%p pcr_pid=%p mmi=%p",
name, tsb, (int)len, pcr, pcr_pid, mmi);
/* Not enough data */
if (len < 188) return len;
/* Streaming - lock mutex */
pthread_mutex_lock(&mi->mi_delivery_mutex);
void
mpegts_input_recv_packets
( mpegts_input_t *mi, mpegts_mux_instance_t *mmi, sbuf_t *sb, size_t off,
int64_t *pcr, uint16_t *pcr_pid )
{
int i, p = 0;
mpegts_packet_t *mp;
uint8_t *tsb = sb->sb_data + off;
int len = sb->sb_size - off;
#define MIN_TS_PKT 10
/* Check for sync */
while ( (len >= (MIN_TS_PKT * 188)) &&
((p = ts_sync_count(tsb, len)) < MIN_TS_PKT) ) {
--len;
++tsb;
++off;
}
// Note: we check for sync here so that the buffer can always be
// processed in its entirety inside the processing thread
// without the potential need to buffer data (since that would
// require per mmi buffers, where this is generally not required)
/* Extract PCR */
// Note: this is only used by tsfile for timing the play out of packets
// maybe we should move it?
if (pcr && pcr_pid) {
uint8_t *tmp = tsb;
for (i = 0; i < p; i++) {
if (*pcr_pid == (((tmp[1] & 0x1f) << 8) | tmp[2]))
ts_recv_packet1(NULL, tmp, pcr, 0);
tmp += 188;
}
}
/* Pass */
if (p >= 10) {
size_t sz = sizeof(mpegts_packet_t) + (p * 188);
mp = calloc(1, sz);
mp->mp_mux = mmi->mmi_mux;
mp->mp_len = p * 188;
memcpy(mp->mp_data, tsb, mp->mp_len);
pthread_mutex_lock(&mi->mi_input_lock);
if (TAILQ_FIRST(&mi->mi_input_queue) == NULL)
pthread_cond_signal(&mi->mi_input_cond);
TAILQ_INSERT_TAIL(&mi->mi_input_queue, mp, mp_link);
pthread_mutex_unlock(&mi->mi_input_lock);
len -= mp->mp_len;
off += mp->mp_len;
}
/* Adjust buffer */
if (len)
sbuf_cut(sb, off); // cut off the bottom
else
sb->sb_ptr = 0; // clear
}
static void
mpegts_input_process
( mpegts_input_t *mi, mpegts_packet_t *mp )
{
int len = mp->mp_len;
int i = 0, table_wakeup = 0;
int table, stream;
uint8_t *tsb = mp->mp_data;
mpegts_mux_t *mm = mp->mp_mux;
mpegts_mux_instance_t *mmi = mm->mm_active;
mpegts_pid_t *last_mp = NULL;
/* Process */
while ( len >= 188 ) {
mpegts_pid_t *mp;
mpegts_pid_sub_t *mps;
service_t *s;
int pid = ((tsb[i+1] & 0x1f) << 8) | tsb[i+2];
/* Sync */
if ( tsb[i] == 0x47 ) {
mpegts_pid_t *mp;
mpegts_pid_sub_t *mps;
service_t *s;
int pid = ((tsb[i+1] & 0x1f) << 8) | tsb[i+2];
int64_t *ppcr = (pcr_pid && *pcr_pid == pid) ? pcr : NULL;
tvhtrace("tsdemux", "%s - recv pkt for pid %04X (%d) on mmi %p",
name, pid, pid, mmi);
/* Find PID */
if ((mp = mpegts_mux_find_pid(mm, pid, 0))) {
// Note: there is a minor danger this caching will get things
// wrong for a brief period of time if the registrations on
// the PID change
if (mp != last_mp) {
if (pid == 0)
stream = table = 1;
else {
stream = table = 0;
/* Find PID */
if ((mp = mpegts_mux_find_pid(mm, pid, 0))) {
if (mp != last_mp) {
last_mp = mp;
stream = 0;
table = 0;
/* Stream takes pref. */
/* Determine PID type */
RB_FOREACH(mps, &mp->mp_subs, mps_link) {
if (mps->mps_type & MPS_STREAM)
stream = 1;
if (mps->mps_type & MPS_TABLE)
table = 1;
if (table && stream) break;
}
/* Special case streams */
if (pid == 0) table = stream = 1;
LIST_FOREACH(s, &mi->mi_transports, s_active_link) {
if (((mpegts_service_t*)s)->s_dvb_mux != mmi->mmi_mux) continue;
if (pid == s->s_pmt_pid) stream = 1;
else if (pid == s->s_pcr_pid) stream = 1;
}
}
}
/* Stream data */
if (stream) {
LIST_FOREACH(s, &mi->mi_transports, s_active_link) {
int f;
if (((mpegts_service_t*)s)->s_dvb_mux != mmi->mmi_mux) continue;
f = table || (pid == s->s_pmt_pid) || (pid == s->s_pcr_pid);
ts_recv_packet1((mpegts_service_t*)s, tsb+i, ppcr, f);
}
/* Stream data */
if (stream) {
LIST_FOREACH(s, &mi->mi_transports, s_active_link) {
int f;
if (((mpegts_service_t*)s)->s_dvb_mux != mmi->mmi_mux) continue;
f = table || (pid == s->s_pmt_pid) || (pid == s->s_pcr_pid);
ts_recv_packet1((mpegts_service_t*)s, tsb+i, NULL, f);
}
/* Table data */
if (table) {
if (!(tsb[i+1] & 0x80)) {
mpegts_table_feed_t *mtf = malloc(sizeof(mpegts_table_feed_t));
memcpy(mtf->mtf_tsb, tsb+i, 188);
mtf->mtf_mux = mm;
TAILQ_INSERT_TAIL(&mi->mi_table_feed, mtf, mtf_link);
table_wakeup = 1;
} else {
tvhdebug("tsdemux", "%s - SI packet had errors", name);
}
}
/* Force PCR extraction for tsfile */
} else {
if (ppcr && *ppcr == PTS_UNSET)
ts_recv_packet1(NULL, tsb+i, ppcr, 0);
}
i += 188;
len -= 188;
/* Re-sync */
} else {
tvhdebug("tsdemux", "%s - ts sync lost", name);
if (ts_resync(tsb, &len, &i)) break;
tvhdebug("tsdemux", "%s - ts sync found", name);
/* Table data */
if (table) {
if (!(tsb[i+1] & 0x80)) {
// TODO: might be able to optimise this a bit by having slightly
// larger buffering and trying to aggregate data (if we get
// same PID multiple times in the loop)
mpegts_table_feed_t *mtf = malloc(sizeof(mpegts_table_feed_t));
memcpy(mtf->mtf_tsb, tsb+i, 188);
mtf->mtf_mux = mm;
TAILQ_INSERT_TAIL(&mi->mi_table_queue, mtf, mtf_link);
table_wakeup = 1;
} else {
//tvhdebug("tsdemux", "%s - SI packet had errors", name);
}
}
}
i += 188;
len -= 188;
}
/* Raw stream */
// Note: this will include unsynced data if that's what is received
if (i > 0 && LIST_FIRST(&mmi->mmi_streaming_pad.sp_targets) != NULL) {
streaming_message_t sm;
pktbuf_t *pb = pktbuf_alloc(tsb, i);
@ -521,17 +568,48 @@ mpegts_input_recv_packets
/* Wake table */
if (table_wakeup)
pthread_cond_signal(&mi->mi_table_feed_cond);
pthread_mutex_unlock(&mi->mi_delivery_mutex);
pthread_cond_signal(&mi->mi_table_cond);
/* Bandwidth monitoring */
atomic_add(&mmi->mmi_stats.bps, i);
/* Reset buffer */
if (len) memmove(tsb, tsb+i, len);
}
return len;
static void *
mpegts_input_thread ( void * p )
{
mpegts_packet_t *mp;
mpegts_input_t *mi = p;
pthread_mutex_lock(&mi->mi_input_lock);
while (mi->mi_running) {
/* Wait for a packet */
if (!(mp = TAILQ_FIRST(&mi->mi_input_queue))) {
pthread_cond_wait(&mi->mi_input_cond, &mi->mi_input_lock);
continue;
}
TAILQ_REMOVE(&mi->mi_input_queue, mp, mp_link);
pthread_mutex_unlock(&mi->mi_input_lock);
/* Process */
pthread_mutex_lock(&mi->mi_output_lock);
if (mp->mp_mux && mp->mp_mux->mm_active)
mpegts_input_process(mi, mp);
pthread_mutex_unlock(&mi->mi_output_lock);
/* Cleanup */
free(mp);
pthread_mutex_lock(&mi->mi_input_lock);
}
/* Flush */
while ((mp = TAILQ_FIRST(&mi->mi_input_queue))) {
TAILQ_REMOVE(&mi->mi_input_queue, mp, mp_link);
free(mp);
}
pthread_mutex_unlock(&mi->mi_input_lock);
return NULL;
}
static void
@ -570,71 +648,65 @@ mpegts_input_table_thread ( void *aux )
mpegts_table_feed_t *mtf;
mpegts_input_t *mi = aux;
pthread_mutex_lock(&mi->mi_delivery_mutex);
while (mi->mi_delivery_running) {
pthread_mutex_lock(&mi->mi_output_lock);
while (mi->mi_running) {
/* Wait for data */
while(!(mtf = TAILQ_FIRST(&mi->mi_table_feed))) {
if (!mi->mi_delivery_running)
break;
pthread_cond_wait(&mi->mi_table_feed_cond, &mi->mi_delivery_mutex);
if (!(mtf = TAILQ_FIRST(&mi->mi_table_queue))) {
pthread_cond_wait(&mi->mi_table_cond, &mi->mi_output_lock);
continue;
}
if (mtf)
TAILQ_REMOVE(&mi->mi_table_feed, mtf, mtf_link);
TAILQ_REMOVE(&mi->mi_table_queue, mtf, mtf_link);
pthread_mutex_unlock(&mi->mi_output_lock);
/* Process */
if (mtf) {
pthread_mutex_unlock(&mi->mi_delivery_mutex);
if (mtf->mtf_mux) {
pthread_mutex_lock(&global_lock);
mpegts_input_table_dispatch(mtf->mtf_mux, mtf);
pthread_mutex_unlock(&global_lock);
free(mtf);
pthread_mutex_lock(&mi->mi_delivery_mutex);
}
/* Cleanup */
free(mtf);
pthread_mutex_lock(&mi->mi_output_lock);
}
while ((mtf = TAILQ_FIRST(&mi->mi_table_feed)) != NULL) {
TAILQ_REMOVE(&mi->mi_table_feed, mtf, mtf_link);
/* Flush */
while ((mtf = TAILQ_FIRST(&mi->mi_table_queue)) != NULL) {
TAILQ_REMOVE(&mi->mi_table_queue, mtf, mtf_link);
free(mtf);
}
pthread_mutex_unlock(&mi->mi_delivery_mutex);
pthread_mutex_unlock(&mi->mi_output_lock);
return NULL;
}
void
mpegts_input_table_thread_start( mpegts_input_t *mi )
{
mi->mi_delivery_running = 1;
tvhthread_create(&mi->mi_thread_table_id, NULL,
mpegts_input_table_thread, mi, 0);
}
void
mpegts_input_table_thread_stop( mpegts_input_t *mi )
{
pthread_mutex_lock(&mi->mi_delivery_mutex);
mi->mi_delivery_running = 0;
pthread_cond_signal(&mi->mi_table_feed_cond);
pthread_mutex_unlock(&mi->mi_delivery_mutex);
pthread_join(mi->mi_thread_table_id, NULL);
}
void
mpegts_input_flush_mux
( mpegts_input_t *mi, mpegts_mux_t *mm )
{
mpegts_table_feed_t *mtf, *next;
mpegts_table_feed_t *mtf;
mpegts_packet_t *mp;
pthread_mutex_lock(&mi->mi_delivery_mutex);
mtf = TAILQ_FIRST(&mi->mi_table_feed);
while (mtf) {
next = TAILQ_NEXT(mtf, mtf_link);
if (mtf->mtf_mux == mm) {
TAILQ_REMOVE(&mi->mi_table_feed, mtf, mtf_link);
free(mtf);
}
mtf = next;
// Note: to avoid long delays in here, rather than actually
// remove things from the Q, we simply invalidate by clearing
// the mux pointer and allow the threads to deal with the deletion
/* Flush input Q */
pthread_mutex_lock(&mi->mi_input_lock);
TAILQ_FOREACH(mp, &mi->mi_input_queue, mp_link) {
if (mp->mp_mux == mm)
mp->mp_mux = NULL;
}
pthread_mutex_unlock(&mi->mi_delivery_mutex);
pthread_mutex_unlock(&mi->mi_input_lock);
/* Flush table Q */
pthread_mutex_lock(&mi->mi_output_lock);
TAILQ_FOREACH(mtf, &mi->mi_table_queue, mtf_link) {
if (mtf->mtf_mux == mm)
mtf->mtf_mux = NULL;
}
pthread_mutex_unlock(&mi->mi_output_lock);
}
static void
@ -685,13 +757,44 @@ mpegts_input_get_streams
mpegts_input_t *mi = (mpegts_input_t*)i;
mpegts_mux_instance_t *mmi;
pthread_mutex_lock(&mi->mi_delivery_mutex);
pthread_mutex_lock(&mi->mi_output_lock);
LIST_FOREACH(mmi, &mi->mi_mux_active, mmi_active_link) {
st = calloc(1, sizeof(tvh_input_stream_t));
mpegts_input_stream_status(mmi, st);
LIST_INSERT_HEAD(isl, st, link);
}
pthread_mutex_unlock(&mi->mi_delivery_mutex);
pthread_mutex_unlock(&mi->mi_output_lock);
}
static void
mpegts_input_thread_start ( mpegts_input_t *mi )
{
mi->mi_running = 1;
tvhthread_create(&mi->mi_table_tid, NULL,
mpegts_input_table_thread, mi, 0);
tvhthread_create(&mi->mi_input_tid, NULL,
mpegts_input_thread, mi, 0);
}
static void
mpegts_input_thread_stop ( mpegts_input_t *mi )
{
mi->mi_running = 0;
/* Stop input thread */
pthread_mutex_lock(&mi->mi_input_lock);
pthread_cond_signal(&mi->mi_input_cond);
pthread_mutex_unlock(&mi->mi_input_lock);
/* Stop table thread */
pthread_mutex_lock(&mi->mi_output_lock);
pthread_cond_signal(&mi->mi_table_cond);
pthread_mutex_unlock(&mi->mi_output_lock);
/* Join threads */
pthread_join(mi->mi_input_tid, NULL);
pthread_join(mi->mi_table_tid, NULL);
}
/* **************************************************************************
@ -705,7 +808,7 @@ mpegts_input_status_timer ( void *p )
mpegts_input_t *mi = p;
mpegts_mux_instance_t *mmi;
htsmsg_t *e;
pthread_mutex_lock(&mi->mi_delivery_mutex);
pthread_mutex_lock(&mi->mi_output_lock);
LIST_FOREACH(mmi, &mi->mi_mux_active, mmi_active_link) {
memset(&st, 0, sizeof(st));
mpegts_input_stream_status(mmi, &st);
@ -714,7 +817,7 @@ mpegts_input_status_timer ( void *p )
notify_by_msg("input_status", e);
tvh_input_stream_destroy(&st);
}
pthread_mutex_unlock(&mi->mi_delivery_mutex);
pthread_mutex_unlock(&mi->mi_output_lock);
gtimer_arm(&mi->mi_status_timer, mpegts_input_status_timer, mi, 1);
}
@ -754,15 +857,14 @@ mpegts_input_create0
/* Index */
mi->mi_instance = ++mpegts_input_idx;
/* Init mutex */
pthread_mutex_init(&mi->mi_delivery_mutex, NULL);
/* Table input */
TAILQ_INIT(&mi->mi_table_feed);
pthread_cond_init(&mi->mi_table_feed_cond, NULL);
/* Init input/output structures */
pthread_mutex_init(&mi->mi_input_lock, NULL);
pthread_cond_init(&mi->mi_input_cond, NULL);
TAILQ_INIT(&mi->mi_input_queue);
/* Init input thread control */
mi->mi_thread_pipe.rd = mi->mi_thread_pipe.wr = -1;
pthread_mutex_init(&mi->mi_output_lock, NULL);
pthread_cond_init(&mi->mi_table_cond, NULL);
TAILQ_INIT(&mi->mi_table_queue);
/* Add to global list */
LIST_INSERT_HEAD(&mpegts_input_all, mi, mi_global_link);
@ -771,19 +873,30 @@ mpegts_input_create0
if (c)
idnode_load(&mi->ti_id, c);
/* Start threads */
mpegts_input_thread_start(mi);
return mi;
}
void
mpegts_input_stop_all ( mpegts_input_t *mi )
{
mpegts_mux_instance_t *mmi;
while ((mmi = LIST_FIRST(&mi->mi_mux_active)))
mmi->mmi_mux->mm_stop(mmi->mmi_mux, 1);
}
void
mpegts_input_delete ( mpegts_input_t *mi, int delconf )
{
mpegts_network_link_t *mnl;
mpegts_input_thread_stop(mi);
while ((mnl = LIST_FIRST(&mi->mi_networks)))
mpegts_input_del_network(mnl);
idnode_unlink(&mi->ti_id);
pthread_mutex_destroy(&mi->mi_delivery_mutex);
pthread_cond_destroy(&mi->mi_table_feed_cond);
tvh_pipe_close(&mi->mi_thread_pipe);
pthread_mutex_destroy(&mi->mi_output_lock);
pthread_cond_destroy(&mi->mi_table_cond);
LIST_REMOVE(mi, ti_link);
LIST_REMOVE(mi, mi_global_link);
free(mi->mi_name);

View file

@ -147,7 +147,7 @@ mpegts_mux_instance_weight ( mpegts_mux_instance_t *mmi )
const service_t *s;
const th_subscription_t *ths;
mpegts_input_t *mi = mmi->mmi_input;
lock_assert(&mi->mi_delivery_mutex);
lock_assert(&mi->mi_output_lock);
/* Direct subs */
LIST_FOREACH(ths, &mmi->mmi_subs, ths_mmi_link) {
@ -578,9 +578,9 @@ mpegts_mux_open_table ( mpegts_mux_t *mm, mpegts_table_t *mt )
mpegts_input_t *mi;
if (!mm->mm_active || !mm->mm_active->mmi_input) return;
mi = mm->mm_active->mmi_input;
pthread_mutex_lock(&mi->mi_delivery_mutex);
pthread_mutex_lock(&mi->mi_output_lock);
mi->mi_open_pid(mi, mm, mt->mt_pid, type, mt);
pthread_mutex_unlock(&mi->mi_delivery_mutex);
pthread_mutex_unlock(&mi->mi_output_lock);
}
void
@ -591,9 +591,9 @@ mpegts_mux_close_table ( mpegts_mux_t *mm, mpegts_table_t *mt )
if (mt->mt_flags & MT_RECORD) type |= MPS_STREAM;
if (!mm->mm_active || !mm->mm_active->mmi_input) return;
mi = mm->mm_active->mmi_input;
pthread_mutex_lock(&mi->mi_delivery_mutex);
pthread_mutex_lock(&mi->mi_output_lock);
mi->mi_close_pid(mi, mm, mt->mt_pid, type, mt);
pthread_mutex_unlock(&mi->mi_delivery_mutex);
pthread_mutex_unlock(&mi->mi_output_lock);
}
/* **************************************************************************