From 859f08aa5dcb5df3a2c5dbf8500786700cbfdfe9 Mon Sep 17 00:00:00 2001 From: Adam Sutton Date: Tue, 24 Sep 2013 13:40:36 +0100 Subject: [PATCH] mpegts: rework of the PID handling code This should overcome the current issues with EPG grabbers stealing data but also, IMO, is a better approach. However it did require some reworking of lock semantics. --- src/input/mpegts.h | 35 ++++- src/input/mpegts/dvb_psi.c | 17 +- src/input/mpegts/linuxdvb/linuxdvb_frontend.c | 113 +++++--------- src/input/mpegts/linuxdvb/linuxdvb_mux.c | 26 ---- src/input/mpegts/mpegts_input.c | 145 +++++++++++++++--- src/input/mpegts/mpegts_mux.c | 75 +++++++-- src/input/mpegts/mpegts_table.c | 1 - src/input/mpegts/tsdemux.c | 11 +- src/input/mpegts/tsdemux.h | 2 +- src/input/mpegts/tsfile/tsfile_input.c | 2 + src/service.c | 20 +-- src/service.h | 1 - 12 files changed, 276 insertions(+), 172 deletions(-) diff --git a/src/input/mpegts.h b/src/input/mpegts.h index 583e5196..a6721988 100644 --- a/src/input/mpegts.h +++ b/src/input/mpegts.h @@ -87,6 +87,25 @@ typedef struct mpegts_table_state RB_ENTRY(mpegts_table_state) link; } mpegts_table_state_t; +typedef struct mpegts_pid_sub +{ + RB_ENTRY(mpegts_pid_sub) mps_link; + enum { + MPS_NONE, + MPS_STREAM, + MPS_TABLE + } mps_type; + void *mps_owner; +} mpegts_pid_sub_t; + +typedef struct mpegts_pid +{ + int mp_pid; + int mp_fd; // linuxdvb demux fd + RB_HEAD(,mpegts_pid_sub) mp_subs; // subscribers to pid + RB_ENTRY(mpegts_pid) mp_link; +} mpegts_pid_t; + struct mpegts_table { /** @@ -110,7 +129,6 @@ struct mpegts_table /** * File descriptor for filter */ - int mt_fd; LIST_ENTRY(mpegts_table) mt_link; mpegts_mux_t *mt_mux; @@ -260,13 +278,14 @@ struct mpegts_mux mpegts_mux_instance_t *mm_active; /* - * Table processing + * Data processing */ + RB_HEAD(, mpegts_pid) mm_pids; + int mm_num_tables; LIST_HEAD(, mpegts_table) mm_tables; TAILQ_HEAD(, mpegts_table) mm_table_queue; - uint8_t mm_table_filter[8192]; /* * Functions @@ -442,6 +461,8 @@ struct mpegts_input void (*mi_stop_mux) (mpegts_input_t*,mpegts_mux_instance_t*); void (*mi_open_service) (mpegts_input_t*,mpegts_service_t*,int first); void (*mi_close_service) (mpegts_input_t*,mpegts_service_t*); + mpegts_pid_t *(*mi_open_pid)(mpegts_input_t*,mpegts_mux_t*,int,int,void*); + void (*mi_close_pid) (mpegts_input_t*,mpegts_mux_t*,int,int,void*); void (*mi_create_mux_instance) (mpegts_input_t*,mpegts_mux_t*); void (*mi_started_mux) (mpegts_input_t*,mpegts_mux_instance_t*); void (*mi_stopped_mux) (mpegts_input_t*,mpegts_mux_instance_t*); @@ -574,6 +595,8 @@ void mpegts_mux_remove_subscriber(mpegts_mux_t *mm, th_subscription_t *s, int re int mpegts_mux_subscribe(mpegts_mux_t *mm, const char *name, int weight); void mpegts_mux_unsubscribe_by_name(mpegts_mux_t *mm, const char *name); +mpegts_pid_t *mpegts_mux_find_pid(mpegts_mux_t *mm, int pid, int create); + size_t mpegts_input_recv_packets (mpegts_input_t *mi, mpegts_mux_instance_t *mmi, uint8_t *tsb, size_t len, int64_t *pcr, uint16_t *pcr_pid, const char *name); @@ -588,6 +611,12 @@ void mpegts_input_save ( mpegts_input_t *mi, htsmsg_t *c ); void mpegts_input_flush_mux ( mpegts_input_t *mi, mpegts_mux_t *mm ); +mpegts_pid_t * mpegts_input_open_pid + ( mpegts_input_t *mi, mpegts_mux_t *mm, int pid, int type, void *owner ); + +void mpegts_input_close_pid + ( mpegts_input_t *mi, mpegts_mux_t *mm, int pid, int type, void *owner ); + void mpegts_table_dispatch (const uint8_t *sec, size_t r, void *mt); void mpegts_table_release diff --git a/src/input/mpegts/dvb_psi.c b/src/input/mpegts/dvb_psi.c index 87129c43..cac370ac 100644 --- a/src/input/mpegts/dvb_psi.c +++ b/src/input/mpegts/dvb_psi.c @@ -675,7 +675,7 @@ int dvb_pmt_callback (mpegts_table_t *mt, const uint8_t *ptr, int len, int tableid) { - int r, sect, last, ver; + int r, sect, last, ver, had_components; uint16_t sid; mpegts_mux_t *mm = mt->mt_mux; mpegts_service_t *s; @@ -694,8 +694,11 @@ dvb_pmt_callback /* Process */ tvhdebug("pmt", "sid %04X (%d)", sid, sid); pthread_mutex_lock(&s->s_stream_mutex); - psi_parse_pmt(s, ptr, len); + had_components = !!TAILQ_FIRST(&s->s_components); + r = psi_parse_pmt(s, ptr, len); pthread_mutex_unlock(&s->s_stream_mutex); + if (r) + service_restart((service_t*)s, had_components); /* Finish */ return dvb_table_end(mt, st, sect); @@ -1170,10 +1173,11 @@ psi_desc_teletext(mpegts_service_t *t, const uint8_t *ptr, int size, /** * PMT parser, from ISO 13818-1 and ETSI EN 300 468 */ -int +static int psi_parse_pmt (mpegts_service_t *t, const uint8_t *ptr, int len) { + int ret = 0; uint16_t pcr_pid, pid; uint8_t estype; int dllen; @@ -1181,7 +1185,6 @@ psi_parse_pmt streaming_component_type_t hts_stream_type; elementary_stream_t *st, *next; int update = 0; - int had_components; int composition_id; int ancillary_id; int version; @@ -1194,8 +1197,6 @@ psi_parse_pmt lock_assert(&t->s_stream_mutex); - had_components = !!TAILQ_FIRST(&t->s_components); - version = ptr[2] >> 1 & 0x1f; pcr_pid = (ptr[5] & 0x1f) << 8 | ptr[6]; dllen = (ptr[7] & 0xf) << 8 | ptr[8]; @@ -1441,8 +1442,8 @@ psi_parse_pmt PMT_UPDATE_CA_PROVIDER_CHANGE | PMT_UPDATE_CAID_DELETED)) { if(t->s_status == SERVICE_RUNNING) - service_restart((service_t*)t, had_components); + ret = 1; } } - return 0; + return ret; } diff --git a/src/input/mpegts/linuxdvb/linuxdvb_frontend.c b/src/input/mpegts/linuxdvb/linuxdvb_frontend.c index b793d7cd..79a871ee 100644 --- a/src/input/mpegts/linuxdvb/linuxdvb_frontend.c +++ b/src/input/mpegts/linuxdvb/linuxdvb_frontend.c @@ -289,32 +289,37 @@ linuxdvb_frontend_start_mux return linuxdvb_frontend_tune1((linuxdvb_frontend_t*)mi, mmi, -1); } -static int -linuxdvb_frontend_open_pid - ( linuxdvb_frontend_t *lfe, int pid, const char *name ) +static void +linuxdvb_frontend_open_pid0 + ( linuxdvb_frontend_t *lfe, mpegts_pid_t *mp ) { - char buf[256]; + char name[256]; struct dmx_pes_filter_params dmx_param; - int fd = -1; + int fd; + /* Already opened */ + if (mp->mp_fd != -1) + return; + + /* Not locked OR full mux mode */ if (!lfe->lfe_locked || lfe->lfe_fullmux) - return -1; + return; - if (!name) { - lfe->mi_display_name((mpegts_input_t*)lfe, buf, sizeof(buf)); - name = buf; - } + lfe->mi_display_name((mpegts_input_t*)lfe, name, sizeof(name)); + /* Open DMX */ fd = tvh_open(lfe->lfe_dmx_path, O_RDWR, 0); if(fd == -1) { tvherror("linuxdvb", "%s - failed to open dmx for pid %d [e=%s]", - name, pid, strerror(errno)); - return -1; + name, mp->mp_pid, strerror(errno)); + return; } - tvhtrace("linuxdvb", "%s - open PID %04X (%d) fd %d", name, pid, pid, fd); + /* Install filter */ + tvhtrace("linuxdvb", "%s - open PID %04X (%d) fd %d", + name, mp->mp_pid, mp->mp_pid, fd); memset(&dmx_param, 0, sizeof(dmx_param)); - dmx_param.pid = pid; + dmx_param.pid = mp->mp_pid; dmx_param.input = DMX_IN_FRONTEND; dmx_param.output = DMX_OUT_TS_TAP; dmx_param.pes_type = DMX_PES_OTHER; @@ -322,54 +327,30 @@ linuxdvb_frontend_open_pid if(ioctl(fd, DMX_SET_PES_FILTER, &dmx_param)) { tvherror("linuxdvb", "%s - failed to config dmx for pid %d [e=%s]", - name, pid, strerror(errno)); + name, mp->mp_pid, strerror(errno)); close(fd); - return -1; + return; } - return fd; + /* Store */ + mp->mp_fd = fd; + + return; } -static void -linuxdvb_frontend_open_service - ( mpegts_input_t *mi, mpegts_service_t *s, int init ) +static mpegts_pid_t * +linuxdvb_frontend_open_pid + ( mpegts_input_t *mi, mpegts_mux_t *mm, int pid, int type, void *owner ) { - char buf[256]; - elementary_stream_t *st; + mpegts_pid_t *mp; linuxdvb_frontend_t *lfe = (linuxdvb_frontend_t*)mi; - /* Ignore in full rx mode OR if not yet locked */ - if (!lfe->lfe_locked || lfe->lfe_fullmux) goto exit; - mi->mi_display_name(mi, buf, sizeof(buf)); - - /* Install PES filters */ - TAILQ_FOREACH(st, &s->s_components, es_link) { - if(st->es_pid >= 0x2000) - continue; + if (!(mp = mpegts_input_open_pid(mi, mm, pid, type, owner))) + return NULL; - if(st->es_demuxer_fd != -1) - continue; + linuxdvb_frontend_open_pid0(lfe, mp); - st->es_cc_valid = 0; - st->es_demuxer_fd - = linuxdvb_frontend_open_pid((linuxdvb_frontend_t*)mi, st->es_pid, buf); - } - -exit: - mpegts_input_open_service(mi, s, init); -} - -static void -linuxdvb_frontend_close_service - ( mpegts_input_t *mi, mpegts_service_t *s ) -{ - linuxdvb_frontend_t *lfe = (linuxdvb_frontend_t*)mi; - - /* Ignore in full rx mode OR if not yet locked */ - if (!lfe->lfe_locked || lfe->lfe_fullmux) goto exit; - -exit: - mpegts_input_close_service(mi, s); + return mp; } static idnode_set_t * @@ -443,22 +424,6 @@ linuxdvb_frontend_default_tables } } -static void -linuxdvb_frontend_open_all - ( linuxdvb_frontend_t *lfe, mpegts_mux_t *mm ) -{ - mpegts_table_t *mt; - service_t *s; - LIST_FOREACH(s, &lfe->mi_transports, s_active_link) { - lfe->mi_open_service((mpegts_input_t*)lfe, - (mpegts_service_t*)s, 0); - } - LIST_FOREACH(mt, &mm->mm_tables, mt_link) { - if (mt->mt_fd == -1) - mt->mt_fd = lfe->lfe_open_pid(lfe, mt->mt_pid, NULL); - } -} - static void linuxdvb_frontend_monitor ( void *aux ) { @@ -468,6 +433,7 @@ linuxdvb_frontend_monitor ( void *aux ) linuxdvb_frontend_t *lfe = aux; mpegts_mux_instance_t *mmi = LIST_FIRST(&lfe->mi_mux_active); mpegts_mux_t *mm; + mpegts_pid_t *mp; fe_status_t fe_status; signal_state_t status; @@ -533,8 +499,11 @@ linuxdvb_frontend_monitor ( void *aux ) /* Table handlers */ linuxdvb_frontend_default_tables(lfe, (linuxdvb_mux_t*)mm); - /* Services */ - linuxdvb_frontend_open_all(lfe, mm); + /* Locked - ensure everything is open */ + pthread_mutex_lock(&lfe->mi_delivery_mutex); + RB_FOREACH(mp, &mm->mm_pids, mp_link) + linuxdvb_frontend_open_pid0(lfe, mp); + pthread_mutex_unlock(&lfe->mi_delivery_mutex); /* Re-arm (quick) */ } else { @@ -853,10 +822,8 @@ linuxdvb_frontend_create0 lfe->mi_is_enabled = linuxdvb_frontend_is_enabled; lfe->mi_start_mux = linuxdvb_frontend_start_mux; lfe->mi_stop_mux = linuxdvb_frontend_stop_mux; - lfe->mi_open_service = linuxdvb_frontend_open_service; - lfe->mi_close_service = linuxdvb_frontend_close_service; lfe->mi_network_list = linuxdvb_frontend_network_list; - lfe->lfe_open_pid = linuxdvb_frontend_open_pid; + lfe->mi_open_pid = linuxdvb_frontend_open_pid; /* Adapter link */ lfe->lh_parent = (linuxdvb_hardware_t*)la; diff --git a/src/input/mpegts/linuxdvb/linuxdvb_mux.c b/src/input/mpegts/linuxdvb/linuxdvb_mux.c index 3de89e77..ab953e33 100644 --- a/src/input/mpegts/linuxdvb/linuxdvb_mux.c +++ b/src/input/mpegts/linuxdvb/linuxdvb_mux.c @@ -499,30 +499,6 @@ linuxdvb_mux_create_instances ( mpegts_mux_t *mm ) mi->mi_create_mux_instance(mi, mm); } -static void -linuxdvb_mux_open_table ( mpegts_mux_t *mm, mpegts_table_t *mt ) -{ - linuxdvb_frontend_t *lfe; - mpegts_mux_open_table(mm, mt); - - /* Open DMX */ - if (mm->mm_active) { - lfe = (linuxdvb_frontend_t*)mm->mm_active->mmi_input; - mt->mt_fd = lfe->lfe_open_pid(lfe, mt->mt_pid, NULL); - } else { - mt->mt_fd = -1; - } -} - -static void -linuxdvb_mux_close_table ( mpegts_mux_t *mm, mpegts_table_t *mt ) -{ - mpegts_mux_close_table(mm, mt); - - if (mt->mt_fd != -1) - close(mt->mt_fd); -} - static void linuxdvb_mux_delete ( mpegts_mux_t *mm ) { @@ -580,8 +556,6 @@ linuxdvb_mux_create0 lm->mm_display_name = linuxdvb_mux_display_name; lm->mm_config_save = linuxdvb_mux_config_save; lm->mm_create_instances = linuxdvb_mux_create_instances; - lm->mm_open_table = linuxdvb_mux_open_table; - lm->mm_close_table = linuxdvb_mux_close_table; /* No config */ if (!conf) return lm; diff --git a/src/input/mpegts/mpegts_input.c b/src/input/mpegts/mpegts_input.c index df519113..40aa35a8 100644 --- a/src/input/mpegts/mpegts_input.c +++ b/src/input/mpegts/mpegts_input.c @@ -128,25 +128,110 @@ mpegts_input_stop_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi ) { } +static int +mps_cmp ( mpegts_pid_sub_t *a, mpegts_pid_sub_t *b ) +{ + if (a->mps_type != b->mps_type) + return a->mps_type - b->mps_type; + if (a->mps_owner < b->mps_owner) return -1; + if (a->mps_owner > b->mps_owner) return 1; + return 0; +} + +mpegts_pid_t * +mpegts_input_open_pid + ( mpegts_input_t *mi, mpegts_mux_t *mm, int pid, int type, void *owner ) +{ + char buf[512]; + mpegts_pid_t *mp = mpegts_mux_find_pid(mm, pid, 1); + assert(owner != NULL); + if (mp) { + static mpegts_pid_sub_t *skel = NULL; + if (!skel) + skel = calloc(1, sizeof(mpegts_pid_sub_t)); + skel->mps_type = type; + skel->mps_owner = owner; + if (!RB_INSERT_SORTED(&mp->mp_subs, skel, mps_link, mps_cmp)) { + mm->mm_display_name(mm, buf, sizeof(buf)); + tvhdebug("mpegts", "%s - open PID %04X (%d) [%d/%p]", + buf, mp->mp_pid, mp->mp_pid, type, owner); + skel = NULL; + } + } + return mp; +} + +void +mpegts_input_close_pid + ( mpegts_input_t *mi, mpegts_mux_t *mm, int pid, int type, void *owner ) +{ + char buf[512]; + mpegts_pid_sub_t *mps, skel; + mpegts_pid_t *mp = mpegts_mux_find_pid(mm, pid, 1); + assert(owner != NULL); + skel.mps_type = type; + skel.mps_owner = owner; + mps = RB_FIND(&mp->mp_subs, &skel, mps_link, mps_cmp); + if (mps) { + RB_REMOVE(&mp->mp_subs, mps, mps_link); + free(mps); + + if (!RB_FIRST(&mp->mp_subs)) { + RB_REMOVE(&mm->mm_pids, mp, mp_link); + if (mp->mp_fd != -1) { + mm->mm_display_name(mm, buf, sizeof(buf)); + tvhdebug("mpegts", "%s - close PID %04X (%d) [%d/%p]", + buf, mp->mp_pid, mp->mp_pid, type, owner); + close(mp->mp_fd); + } + free(mp); + } + } +} + void mpegts_input_open_service ( mpegts_input_t *mi, mpegts_service_t *s, int init ) { - if (!init) return; + elementary_stream_t *st; + + /* Add to list */ pthread_mutex_lock(&mi->mi_delivery_mutex); - LIST_INSERT_HEAD(&mi->mi_transports, ((service_t*)s), s_active_link); - s->s_dvb_active_input = mi; + if (!s->s_dvb_active_input) { + LIST_INSERT_HEAD(&mi->mi_transports, ((service_t*)s), s_active_link); + s->s_dvb_active_input = mi; + } + + + /* Register PIDs */ + pthread_mutex_lock(&s->s_stream_mutex); + TAILQ_FOREACH(st, &s->s_components, es_link) + mi->mi_open_pid(mi, s->s_dvb_mux, st->es_pid, MPS_STREAM, s); + + pthread_mutex_unlock(&s->s_stream_mutex); pthread_mutex_unlock(&mi->mi_delivery_mutex); } void mpegts_input_close_service ( mpegts_input_t *mi, mpegts_service_t *s ) { + elementary_stream_t *st; + + /* Remove from list */ pthread_mutex_lock(&mi->mi_delivery_mutex); if (s->s_dvb_active_input != NULL) { LIST_REMOVE(((service_t*)s), s_active_link); s->s_dvb_active_input = NULL; } + + /* Close PID */ + pthread_mutex_lock(&s->s_stream_mutex); + TAILQ_FOREACH(st, &s->s_components, es_link) + mi->mi_close_pid(mi, s->s_dvb_mux, st->es_pid, MPS_STREAM, s); + + pthread_mutex_unlock(&s->s_stream_mutex); pthread_mutex_unlock(&mi->mi_delivery_mutex); + + /* Stop mux? */ s->s_dvb_mux->mm_stop(s->s_dvb_mux, 0); } @@ -259,32 +344,44 @@ mpegts_input_recv_packets /* Sync */ if ( tsb[i] == 0x47 ) { - int pid = ((tsb[i+1] & 0x1f) << 8) | tsb[i+2]; + mpegts_pid_t *mp; + mpegts_pid_sub_t *mps; + service_t *s; + int pid = ((tsb[i+1] & 0x1f) << 8) | tsb[i+2]; + int64_t *ppcr = (pcr_pid && *pcr_pid == pid) ? pcr : NULL; tvhtrace("tsdemux", "%s - recv_packet for pid %04X (%d)", name, pid, pid); - /* SI data */ - if (mm->mm_table_filter[pid]) { - if (!(tsb[i+1] & 0x80)) { - mpegts_table_feed_t *mtf = malloc(sizeof(mpegts_table_feed_t)); - memcpy(mtf->mtf_tsb, tsb+i, 188); - mtf->mtf_mux = mm; - TAILQ_INSERT_TAIL(&mi->mi_table_feed, mtf, mtf_link); - table_wakeup = 1; + /* Find PID */ + if ((mp = mpegts_mux_find_pid(mm, pid, 0))) { + + /* Stream takes pref. */ + RB_FOREACH(mps, &mp->mp_subs, mps_link) + if (mps->mps_type == MPS_STREAM) + break; + + /* Stream data */ + if (mps) { + LIST_FOREACH(s, &mi->mi_transports, s_active_link) + ts_recv_packet1((mpegts_service_t*)s, tsb+i, ppcr); + + /* Table data */ } else { - tvhdebug("tsdemux", "%s - SI packet had errors", name); + if (!(tsb[i+1] & 0x80)) { + mpegts_table_feed_t *mtf = malloc(sizeof(mpegts_table_feed_t)); + memcpy(mtf->mtf_tsb, tsb+i, 188); + mtf->mtf_mux = mm; + TAILQ_INSERT_TAIL(&mi->mi_table_feed, mtf, mtf_link); + table_wakeup = 1; + } else { + tvhdebug("tsdemux", "%s - SI packet had errors", name); + } } - - /* Other */ - } else { - service_t *s; - int64_t *ppcr = (pcr_pid && *pcr_pid == pid) ? pcr : NULL; - LIST_FOREACH(s, &mi->mi_transports, s_active_link) { - ts_recv_packet1((mpegts_service_t*)s, tsb+i, ppcr); - } - if (ppcr && *ppcr == PTS_UNSET) - ts_recv_packet1(NULL, tsb+i, ppcr); } + /* Force PCR extraction for tsfile */ + if (ppcr && *ppcr == PTS_UNSET) + ts_recv_packet1(NULL, tsb+i, ppcr); + i += 188; len -= 188; @@ -489,6 +586,8 @@ mpegts_input_create0 mi->mi_stop_mux = mpegts_input_stop_mux; mi->mi_open_service = mpegts_input_open_service; mi->mi_close_service = mpegts_input_close_service; + mi->mi_open_pid = mpegts_input_open_pid; + mi->mi_close_pid = mpegts_input_close_pid; mi->mi_create_mux_instance = mpegts_input_create_mux_instance; mi->mi_started_mux = mpegts_input_started_mux; mi->mi_stopped_mux = mpegts_input_stopped_mux; diff --git a/src/input/mpegts/mpegts_mux.c b/src/input/mpegts/mpegts_mux.c index 02da69a2..1853d433 100644 --- a/src/input/mpegts/mpegts_mux.c +++ b/src/input/mpegts/mpegts_mux.c @@ -400,6 +400,8 @@ mpegts_mux_stop ( mpegts_mux_t *mm, int force ) mpegts_mux_instance_t *mmi = mm->mm_active; mpegts_input_t *mi = NULL; th_subscription_t *sub; + mpegts_pid_t *mp; + mpegts_pid_sub_t *mps; if (!force && mpegts_mux_has_subscribers(mm)) return; @@ -423,8 +425,19 @@ mpegts_mux_stop ( mpegts_mux_t *mm, int force ) if (mi) mpegts_input_flush_mux(mi, mm); - /* Alert listeners */ - // TODO + /* Ensure PIDs are cleared */ + while ((mp = RB_FIRST(&mm->mm_pids))) { + while ((mps = RB_FIRST(&mp->mp_subs))) { + RB_REMOVE(&mp->mp_subs, mps, mps_link); + free(mps); + } + RB_REMOVE(&mm->mm_pids, mp, mp_link); + if (mp->mp_fd) { + tvhdebug("mpegts", "%s - close PID %04X (%d)", buf, mp->mp_pid, mp->mp_pid); + close(mp->mp_fd); + } + free(mp); + } /* Scanning */ if (mm->mm_initial_scan_status == MM_SCAN_CURRENT) { @@ -446,26 +459,23 @@ mpegts_mux_stop ( mpegts_mux_t *mm, int force ) void mpegts_mux_open_table ( mpegts_mux_t *mm, mpegts_table_t *mt ) { - char buf[256]; - if (mt->mt_pid >= 0x2000) - return; - mm->mm_display_name(mm, buf, sizeof(buf)); - if (!mm->mm_table_filter[mt->mt_pid]) - tvhtrace("mpegts", "%s - opened table %s pid %04X (%d)", - buf, mt->mt_name, mt->mt_pid, mt->mt_pid); - mm->mm_table_filter[mt->mt_pid] = 1; + mpegts_input_t *mi; + if (!mm->mm_active || !mm->mm_active->mmi_input) return; + mi = mm->mm_active->mmi_input; + pthread_mutex_lock(&mi->mi_delivery_mutex); + mi->mi_open_pid(mi, mm, mt->mt_pid, MPS_TABLE, mt); + pthread_mutex_unlock(&mi->mi_delivery_mutex); } void mpegts_mux_close_table ( mpegts_mux_t *mm, mpegts_table_t *mt ) { - char buf[256]; - if (mt->mt_pid >= 0x2000) - return; - mm->mm_display_name(mm, buf, sizeof(buf)); - tvhtrace("mpegts", "%s - closed table %s pid %04X (%d)", - buf, mt->mt_name, mt->mt_pid, mt->mt_pid); - mm->mm_table_filter[mt->mt_pid] = 0; + mpegts_input_t *mi; + if (!mm->mm_active || !mm->mm_active->mmi_input) return; + mi = mm->mm_active->mmi_input; + pthread_mutex_lock(&mi->mi_delivery_mutex); + mi->mi_close_pid(mi, mm, mt->mt_pid, MPS_TABLE, mt); + pthread_mutex_unlock(&mi->mi_delivery_mutex); } /* ************************************************************************** @@ -702,6 +712,37 @@ mpegts_mux_find_service ( mpegts_mux_t *mm, uint16_t sid) return ms; } +static int mp_cmp ( mpegts_pid_t *a, mpegts_pid_t *b ) +{ + return a->mp_pid - b->mp_pid; +}; + +mpegts_pid_t * +mpegts_mux_find_pid ( mpegts_mux_t *mm, int pid, int create ) +{ + mpegts_pid_t *mp; + + if (pid > 0x2000) return NULL; + + if (!create) { + mpegts_pid_t skel; + skel.mp_pid = pid; + mp = RB_FIND(&mm->mm_pids, &skel, mp_link, mp_cmp); + } else { + static mpegts_pid_t *skel = NULL; + if (!skel) + skel = calloc(1, sizeof(mpegts_pid_t)); + skel->mp_pid = pid; + mp = RB_INSERT_SORTED(&mm->mm_pids, skel, mp_link, mp_cmp); + if (!mp) { + mp = skel; + skel = NULL; + mp->mp_fd = -1; + } + } + return mp; +} + /****************************************************************************** * Editor Configuration * diff --git a/src/input/mpegts/mpegts_table.c b/src/input/mpegts/mpegts_table.c index 74b7966c..efa60f0b 100644 --- a/src/input/mpegts/mpegts_table.c +++ b/src/input/mpegts/mpegts_table.c @@ -154,7 +154,6 @@ mpegts_table_add mt->mt_table = tableid; mt->mt_mask = mask; mt->mt_mux = mm; - mt->mt_fd = -1; mt->mt_cc = -1; LIST_INSERT_HEAD(&mm->mm_tables, mt, mt_link); mm->mm_num_tables++; diff --git a/src/input/mpegts/tsdemux.c b/src/input/mpegts/tsdemux.c index e6723ed6..0b8e1248 100644 --- a/src/input/mpegts/tsdemux.c +++ b/src/input/mpegts/tsdemux.c @@ -165,7 +165,7 @@ ts_process_pcr(mpegts_service_t *t, elementary_stream_t *st, int64_t pcr) /** * Process service stream packets, extract PCR and optionally descramble */ -void +int ts_recv_packet1(mpegts_service_t *t, const uint8_t *tsb, int64_t *pcrp) { elementary_stream_t *st; @@ -194,11 +194,11 @@ ts_recv_packet1(mpegts_service_t *t, const uint8_t *tsb, int64_t *pcrp) } /* Nothing - special case for tsfile to get PCR */ - if (!t) return; + if (!t) return 0; /* Service inactive - ignore */ if(t->s_status != SERVICE_RUNNING) - return; + return 0; pthread_mutex_lock(&t->s_stream_mutex); @@ -220,7 +220,7 @@ ts_recv_packet1(mpegts_service_t *t, const uint8_t *tsb, int64_t *pcrp) if(st == NULL) { pthread_mutex_unlock(&t->s_stream_mutex); - return; + return 0; } if(!error) @@ -246,7 +246,7 @@ ts_recv_packet1(mpegts_service_t *t, const uint8_t *tsb, int64_t *pcrp) r = td->td_descramble(td, (service_t*)t, st, tsb); if(r == 0) { pthread_mutex_unlock(&t->s_stream_mutex); - return; + return 1; } if(r == 1) @@ -265,6 +265,7 @@ ts_recv_packet1(mpegts_service_t *t, const uint8_t *tsb, int64_t *pcrp) ts_recv_packet0(t, st, tsb); } pthread_mutex_unlock(&t->s_stream_mutex); + return 1; } diff --git a/src/input/mpegts/tsdemux.h b/src/input/mpegts/tsdemux.h index ce7e27da..19d430df 100644 --- a/src/input/mpegts/tsdemux.h +++ b/src/input/mpegts/tsdemux.h @@ -21,7 +21,7 @@ int ts_resync ( const uint8_t *tsb, int *len, int *idx ); -void ts_recv_packet1(struct mpegts_service *t, const uint8_t *tsb, int64_t *pcrp); +int ts_recv_packet1(struct mpegts_service *t, const uint8_t *tsb, int64_t *pcrp); void ts_recv_packet2(struct mpegts_service *t, const uint8_t *tsb); diff --git a/src/input/mpegts/tsfile/tsfile_input.c b/src/input/mpegts/tsfile/tsfile_input.c index ddd40ad4..a3914383 100644 --- a/src/input/mpegts/tsfile/tsfile_input.c +++ b/src/input/mpegts/tsfile/tsfile_input.c @@ -240,6 +240,8 @@ tsfile_input_create ( int idx ) mi->mi_start_mux = tsfile_input_start_mux; mi->mi_stop_mux = tsfile_input_stop_mux; LIST_INSERT_HEAD(&tsfile_inputs, mi, mi_global_link); + if (!mi->mi_displayname) + mi->mi_displayname = strdup("TSFile"); /* Start table thread */ tvhthread_create(&tid, NULL, mpegts_input_table_thread, mi); diff --git a/src/service.c b/src/service.c index 593f88aa..c8b3748a 100644 --- a/src/service.c +++ b/src/service.c @@ -205,12 +205,6 @@ stream_init(elementary_stream_t *st) static void stream_clean(elementary_stream_t *st) { - if(st->es_demuxer_fd != -1) { - // XXX: Should be in DVB-code perhaps - close(st->es_demuxer_fd); - st->es_demuxer_fd = -1; - } - free(st->es_priv); st->es_priv = NULL; @@ -629,7 +623,6 @@ service_stream_create(service_t *t, int pid, st->es_service = t; st->es_pid = pid; - st->es_demuxer_fd = -1; avgstat_init(&st->es_rate, 10); avgstat_init(&st->es_cc_errors, 10); @@ -792,7 +785,7 @@ void service_restart(service_t *t, int had_components) { streaming_message_t *sm; - lock_assert(&t->s_stream_mutex); + pthread_mutex_lock(&t->s_stream_mutex); if(had_components) { sm = streaming_msg_create_code(SMT_STOP, SM_CODE_SOURCE_RECONFIGURED); @@ -800,18 +793,19 @@ service_restart(service_t *t, int had_components) streaming_msg_free(sm); } - if(t->s_refresh_feed != NULL) - t->s_refresh_feed(t); - descrambler_service_start(t); if(TAILQ_FIRST(&t->s_components) != NULL) { - sm = streaming_msg_create_data(SMT_START, service_build_stream_start(t)); streaming_pad_deliver(&t->s_streaming_pad, sm); streaming_msg_free(sm); } + + pthread_mutex_unlock(&t->s_stream_mutex); + + if(t->s_refresh_feed != NULL) + t->s_refresh_feed(t); } @@ -948,9 +942,7 @@ service_saver(void *aux) if(t->s_status != SERVICE_ZOMBIE) t->s_config_save(t); if(t->s_status == SERVICE_RUNNING && restart) { - pthread_mutex_lock(&t->s_stream_mutex); service_restart(t, 1); - pthread_mutex_unlock(&t->s_stream_mutex); } service_unref(t); diff --git a/src/service.h b/src/service.h index 8728b53b..d23a5e30 100644 --- a/src/service.h +++ b/src/service.h @@ -61,7 +61,6 @@ typedef struct elementary_stream { avgstat_t es_cc_errors; avgstat_t es_rate; - int es_demuxer_fd; int es_peak_presentation_delay; /* Max seen diff. of DTS and PTS */ /* PCR recovery */