From fb12cf9239031e542fd988b1aae6e8518365cb9a Mon Sep 17 00:00:00 2001 From: Jaroslav Kysela Date: Sun, 8 Jun 2014 14:47:57 +0200 Subject: [PATCH] mpegts/descrambler: Introduce FAST TABLE for CA ECM processing - also change the capmt locking (get most of job outside global lock) --- src/descrambler.h | 2 + src/descrambler/capmt.c | 104 +++++++++++++++++++++-------- src/descrambler/cwc.c | 3 +- src/descrambler/descrambler.c | 6 +- src/input/mpegts.h | 3 + src/input/mpegts/mpegts_input.c | 115 ++++++++++++++++---------------- src/input/mpegts/mpegts_mux.c | 10 ++- 7 files changed, 157 insertions(+), 86 deletions(-) diff --git a/src/descrambler.h b/src/descrambler.h index 41504ad8..9c19f47e 100755 --- a/src/descrambler.h +++ b/src/descrambler.h @@ -118,6 +118,8 @@ typedef enum { LIST_HEAD(caid_list, caid); +#define DESCRAMBLER_ECM_PID(pid) ((pid) | (MT_FAST << 16)) + void descrambler_init ( void ); void descrambler_done ( void ); void descrambler_service_start ( struct service *t ); diff --git a/src/descrambler/capmt.c b/src/descrambler/capmt.c index eb6c539f..cfdfc5e2 100644 --- a/src/descrambler/capmt.c +++ b/src/descrambler/capmt.c @@ -111,6 +111,10 @@ typedef struct dmx_sct_filter_params { #define CAPMT_DESC_DEMUX 0x82 #define CAPMT_DESC_PID 0x84 +// message type +#define CAPMT_MSG_FAST 0x01 +#define CAPMT_MSG_CLEAR 0x02 + // limits #define MAX_CA 16 #define MAX_INDEX 64 @@ -586,16 +590,26 @@ capmt_write_msg(capmt_t *capmt, int sid, const uint8_t *buf, size_t len) * */ static void -capmt_queue_msg(capmt_t *capmt, int sid, const uint8_t *buf, size_t len) +capmt_queue_msg + (capmt_t *capmt, int sid, const uint8_t *buf, size_t len, int flags) { - capmt_message_t *msg = malloc(sizeof(*msg)); + capmt_message_t *msg; + if (flags & CAPMT_MSG_CLEAR) { + while ((msg = TAILQ_FIRST(&capmt->capmt_writeq)) != NULL) { + TAILQ_REMOVE(&capmt->capmt_writeq, msg, cm_link); + sbuf_free(&msg->cm_sb); + free(msg); + } + } + msg = malloc(sizeof(*msg)); sbuf_init_fixed(&msg->cm_sb, len); sbuf_append(&msg->cm_sb, buf, len); msg->cm_sid = sid; - pthread_mutex_lock(&capmt->capmt_mutex); - TAILQ_INSERT_TAIL(&capmt->capmt_writeq, msg, cm_link); - pthread_mutex_unlock(&capmt->capmt_mutex); + if (flags & CAPMT_MSG_FAST) + TAILQ_INSERT_HEAD(&capmt->capmt_writeq, msg, cm_link); + else + TAILQ_INSERT_TAIL(&capmt->capmt_writeq, msg, cm_link); tvh_write(capmt->capmt_pipe.wr, "c", 1); } @@ -678,7 +692,8 @@ capmt_send_stop(capmt_service_t *t) buf[10] = ((pos - 5 - 12) & 0xF00) >> 8; buf[11] = ((pos - 5 - 12) & 0xFF); - capmt_queue_msg(t->ct_capmt, s->s_dvb_service_id, buf, pos); + capmt_queue_msg(t->ct_capmt, s->s_dvb_service_id, + buf, pos, CAPMT_MSG_CLEAR); } } @@ -693,11 +708,14 @@ capmt_service_destroy(th_descrambler_t *td) mpegts_service_t *s = (mpegts_service_t *)ct->td_service; int oscam_new = capmt_oscam_new(ct->ct_capmt); capmt_caid_ecm_t *cce; + capmt_t *capmt = ct->ct_capmt; tvhlog(LOG_INFO, "capmt", "Removing CAPMT Server from service \"%s\" on adapter %d", s->s_dvb_svcname, ct->ct_adapter); + pthread_mutex_lock(&capmt->capmt_mutex); + /* send stop to client */ if (!oscam_new) capmt_send_stop(ct); @@ -714,20 +732,23 @@ capmt_service_destroy(th_descrambler_t *td) LIST_REMOVE(ct, ct_link); if (oscam_new) - capmt_enumerate_services(ct->ct_capmt, 1); + capmt_enumerate_services(capmt, 1); - if (LIST_EMPTY(&ct->ct_capmt->capmt_services)) { - ct->ct_capmt->capmt_adapters[ct->ct_adapter].ca_tuner = NULL; - memset(&ct->ct_capmt->capmt_demuxes, 0, sizeof(ct->ct_capmt->capmt_demuxes)); + if (LIST_EMPTY(&capmt->capmt_services)) { + capmt->capmt_adapters[ct->ct_adapter].ca_tuner = NULL; + memset(&capmt->capmt_demuxes, 0, sizeof(capmt->capmt_demuxes)); } + pthread_mutex_unlock(&capmt->capmt_mutex); + tvhcsa_destroy(&ct->ct_csa); free(ct); } static void capmt_filter_data(capmt_t *capmt, uint8_t adapter, uint8_t demux_index, - uint8_t filter_index, const uint8_t *data, int len) + uint8_t filter_index, const uint8_t *data, int len, + int flags) { uint8_t *buf = alloca(len + 6); @@ -736,7 +757,7 @@ capmt_filter_data(capmt_t *capmt, uint8_t adapter, uint8_t demux_index, buf[5] = filter_index; memcpy(buf + 6, data, len); if (len - 3 == ((((uint16_t)buf[7] << 8) | buf[8]) & 0xfff)) - capmt_queue_msg(capmt, 0, buf, len + 6); + capmt_queue_msg(capmt, 0, buf, len + 6, flags); } static void @@ -748,6 +769,10 @@ capmt_set_filter(capmt_t *capmt, int adapter, sbuf_t *sb, int offset) dmx_filter_params_t *filter; dmx_filter_params_t *params = (dmx_filter_params_t *)sbuf_peek(sb, offset + 6); capmt_filters_t *cf; + capmt_service_t *ct; + mpegts_service_t *t; + elementary_stream_t *st; + caid_t *c; tvhtrace("capmt", "setting filter: adapter=%d, demux=%d, filter=%d, pid=%d", adapter, demux_index, filter_index, pid); @@ -759,12 +784,31 @@ capmt_set_filter(capmt_t *capmt, int adapter, sbuf_t *sb, int offset) cf = &capmt->capmt_demuxes.filters[demux_index]; if (cf->max && cf->adapter != adapter) return; + pthread_mutex_lock(&capmt->capmt_mutex); cf->adapter = adapter; filter = &cf->dmx[filter_index]; filter->pid = pid; capmt_pid_add(capmt, adapter, pid); memcpy(&filter->filter, ¶ms->filter, sizeof(params->filter)); - filter->timeout = filter->flags = 0; + filter->timeout = 0; + filter->flags = 0; + /* ECM messages have the higher priority */ + LIST_FOREACH(ct, &capmt->capmt_services, ct_link) { + t = (mpegts_service_t *)ct->td_service; + pthread_mutex_lock(&t->s_stream_mutex); + TAILQ_FOREACH(st, &t->s_components, es_link) { + LIST_FOREACH(c, &st->es_caids, link) { + if (c->pid == pid) { + filter->flags = CAPMT_MSG_FAST; + break; + } + } + if (c) break; + } + pthread_mutex_unlock(&t->s_stream_mutex); + if (st) break; + } + /* Update the max values */ if (capmt->capmt_demuxes.max <= demux_index) capmt->capmt_demuxes.max = demux_index + 1; if (cf->max <= filter_index) @@ -774,6 +818,7 @@ capmt_set_filter(capmt_t *capmt, int adapter, sbuf_t *sb, int offset) memmove(filter->filter.filter + 3, filter->filter.filter + 1, DMX_FILTER_SIZE - 3); memmove(filter->filter.mode + 3, filter->filter.mode + 1, DMX_FILTER_SIZE - 3); filter->filter.mask[1] = filter->filter.mask[2] = 0; + pthread_mutex_unlock(&capmt->capmt_mutex); } static void @@ -795,6 +840,7 @@ capmt_stop_filter(capmt_t *capmt, int adapter, sbuf_t *sb, int offset) filter = &cf->dmx[filter_index]; if (filter->pid != pid) return; + pthread_mutex_lock(&capmt->capmt_mutex); memset(filter, 0, sizeof(*filter)); capmt_pid_remove(capmt, adapter, pid); /* short the max values */ @@ -806,11 +852,13 @@ capmt_stop_filter(capmt_t *capmt, int adapter, sbuf_t *sb, int offset) while (demux_index != 255 && capmt->capmt_demuxes.filters[demux_index].max == 0) demux_index--; capmt->capmt_demuxes.max = demux_index == 255 ? 0 : demux_index + 1; + pthread_mutex_unlock(&capmt->capmt_mutex); } static void capmt_notify_server(capmt_t *capmt, capmt_service_t *ct) { + pthread_mutex_lock(&capmt->capmt_mutex); if (capmt_oscam_new(capmt)) { if (!LIST_EMPTY(&capmt->capmt_services)) capmt_enumerate_services(capmt, 0); @@ -821,6 +869,7 @@ capmt_notify_server(capmt_t *capmt, capmt_service_t *ct) LIST_FOREACH(ct, &capmt->capmt_services, ct_link) capmt_send_request(ct, CAPMT_LIST_ONLY); } + pthread_mutex_unlock(&capmt->capmt_mutex); } static void @@ -829,7 +878,7 @@ capmt_abort(capmt_t *capmt, int keystate) mpegts_service_t *t; capmt_service_t *ct; - pthread_mutex_lock(&global_lock); + pthread_mutex_lock(&capmt->capmt_mutex); LIST_FOREACH(ct, &capmt->capmt_services, ct_link) { t = (mpegts_service_t *)ct->td_service; @@ -842,7 +891,7 @@ capmt_abort(capmt_t *capmt, int keystate) ct->td_keystate = keystate; } } - pthread_mutex_unlock(&global_lock); + pthread_mutex_unlock(&capmt->capmt_mutex); } static void @@ -854,7 +903,7 @@ capmt_process_key(capmt_t *capmt, uint8_t adapter, uint16_t seq, capmt_service_t *ct; unsigned int i; - pthread_mutex_lock(&global_lock); + pthread_mutex_lock(&capmt->capmt_mutex); LIST_FOREACH(ct, &capmt->capmt_services, ct_link) { t = (mpegts_service_t *)ct->td_service; @@ -890,7 +939,7 @@ capmt_process_key(capmt_t *capmt, uint8_t adapter, uint16_t seq, ct->td_keystate = DS_RESOLVED; } - pthread_mutex_unlock(&global_lock); + pthread_mutex_unlock(&capmt->capmt_mutex); } static int @@ -1016,9 +1065,7 @@ handle_ca0(capmt_t *capmt) { for (i = 0; i < MAX_CA; i++) sbuf_init(&buffer[i]); - pthread_mutex_lock(&global_lock); capmt_notify_server(capmt, NULL); - pthread_mutex_unlock(&global_lock); capmt->capmt_poll = tvhpoll_create(MAX_CA + 1); capmt_poll_add(capmt, capmt->capmt_pipe.rd, 0); @@ -1117,9 +1164,7 @@ handle_single(capmt_t *capmt) reconnect = capmt->capmt_sock_reconnect[0]; sbuf_init(&buffer); - pthread_mutex_lock(&global_lock); capmt_notify_server(capmt, NULL); - pthread_mutex_unlock(&global_lock); capmt->capmt_poll = tvhpoll_create(2); capmt_poll_add(capmt, capmt->capmt_pipe.rd, 0); @@ -1202,9 +1247,7 @@ handle_ca0_wrapper(capmt_t *capmt) show_connection(capmt, ".so wrapper"); - pthread_mutex_lock(&global_lock); capmt_notify_server(capmt, NULL); - pthread_mutex_unlock(&global_lock); while (capmt->capmt_running) { @@ -1394,6 +1437,8 @@ capmt_table_input(void *opaque, int pid, const uint8_t *data, int len) /* Validate */ if (data == NULL || len > 4096) return; + pthread_mutex_lock(&capmt->capmt_mutex); + for (demux_index = 0; demux_index < capmt->capmt_demuxes.max; demux_index++) { cf = &capmt->capmt_demuxes.filters[demux_index]; if (cf->adapter != o->adapter) @@ -1410,11 +1455,13 @@ capmt_table_input(void *opaque, int pid, const uint8_t *data, int len) if (i >= DMX_FILTER_SIZE && i <= len) { capmt_filter_data(capmt, o->adapter, demux_index, - filter_index, data, len); + filter_index, data, len, + cf->dmx[filter_index].flags); } } } + pthread_mutex_unlock(&capmt->capmt_mutex); } static void @@ -1422,12 +1469,15 @@ capmt_caid_change(th_descrambler_t *td) { capmt_service_t *ct = (capmt_service_t *)td; capmt_t *capmt = ct->ct_capmt; - mpegts_service_t *t = (mpegts_service_t*)td->td_service; + mpegts_service_t *t; elementary_stream_t *st; capmt_caid_ecm_t *cce; caid_t *c; int change = 0; + pthread_mutex_lock(&capmt->capmt_mutex); + + t = (mpegts_service_t*)td->td_service; TAILQ_FOREACH(st, &t->s_components, es_link) { LIST_FOREACH(c, &st->es_caids, link) { /* search ecmpid in list */ @@ -1452,6 +1502,8 @@ capmt_caid_change(th_descrambler_t *td) if (change) capmt_notify_server(capmt, ct); + + pthread_mutex_unlock(&capmt->capmt_mutex); } static void @@ -1577,7 +1629,7 @@ capmt_send_request(capmt_service_t *ct, int lm) buf[9] = pmtversion; pmtversion = (pmtversion + 1) & 0x1F; - capmt_queue_msg(capmt, sid, buf, pos); + capmt_queue_msg(capmt, sid, buf, pos, 0); } static void diff --git a/src/descrambler/cwc.c b/src/descrambler/cwc.c index fea0ef73..1c09b33e 100755 --- a/src/descrambler/cwc.c +++ b/src/descrambler/cwc.c @@ -1991,7 +1991,8 @@ cwc_service_start(service_t *t) LIST_INSERT_HEAD(&cwc->cwc_services, ct, cs_link); - descrambler_open_pid(ct->cs_mux, ct, ct->cs_estream->es_pid, + descrambler_open_pid(ct->cs_mux, ct, + DESCRAMBLER_ECM_PID(ct->cs_estream->es_pid), cwc_table_input); tvhlog(LOG_DEBUG, "cwc", "%s using CWC %s:%d", diff --git a/src/descrambler/descrambler.c b/src/descrambler/descrambler.c index 7d863b9b..07f669b4 100755 --- a/src/descrambler/descrambler.c +++ b/src/descrambler/descrambler.c @@ -204,9 +204,12 @@ descrambler_open_pid_( mpegts_mux_t *mux, void *opaque, int pid, { descrambler_table_t *dt; descrambler_section_t *ds; + int flags; if (mux == NULL) return 0; + flags = pid >> 16; + pid &= 0x3fff; TAILQ_FOREACH(dt, &mux->mm_descrambler_tables, link) { if (dt->table->mt_pid == pid) { TAILQ_FOREACH(ds, &dt->sections, link) { @@ -219,7 +222,7 @@ descrambler_open_pid_( mpegts_mux_t *mux, void *opaque, int pid, dt = calloc(1, sizeof(*dt)); TAILQ_INIT(&dt->sections); dt->table = mpegts_table_add(mux, 0, 0, descrambler_table_callback, - dt, "descrambler", MT_FULL, pid); + dt, "descrambler", MT_FULL | flags, pid); TAILQ_INSERT_TAIL(&mux->mm_descrambler_tables, dt, link); } ds = calloc(1, sizeof(*ds)); @@ -250,6 +253,7 @@ descrambler_close_pid_( mpegts_mux_t *mux, void *opaque, int pid ) if (mux == NULL) return 0; + pid &= 0x3fff; TAILQ_FOREACH(dt, &mux->mm_descrambler_tables, link) { if (dt->table->mt_pid == pid) { TAILQ_FOREACH(ds, &dt->sections, link) { diff --git a/src/input/mpegts.h b/src/input/mpegts.h index 83480f0c..ac0d107e 100644 --- a/src/input/mpegts.h +++ b/src/input/mpegts.h @@ -120,6 +120,7 @@ typedef struct mpegts_pid_sub #define MPS_NONE 0x0 #define MPS_STREAM 0x1 #define MPS_TABLE 0x2 +#define MPS_FTABLE 0x4 int mps_type; void *mps_owner; } mpegts_pid_sub_t; @@ -147,6 +148,8 @@ struct mpegts_table #define MT_RECORD 0x08 #define MT_SKIPSUBS 0x10 #define MT_SCANSUBS 0x20 +#define MT_FAST 0x40 +#define MT_SLOW 0x80 /** * Cycle queue diff --git a/src/input/mpegts/mpegts_input.c b/src/input/mpegts/mpegts_input.c index b52a324a..f325680b 100644 --- a/src/input/mpegts/mpegts_input.c +++ b/src/input/mpegts/mpegts_input.c @@ -521,6 +521,43 @@ mpegts_input_recv_packets sb->sb_ptr = 0; // clear } +static void +mpegts_input_table_dispatch ( mpegts_mux_t *mm, const uint8_t *tsb ) +{ + int i = 0; + int len = mm->mm_num_tables; + uint16_t pid = ((tsb[1] & 0x1f) << 8) | tsb[2]; + uint8_t cc = (tsb[3] & 0x0f); + mpegts_table_t *mt, *vec[len]; + + /* Collate - tables may be removed during callbacks */ + LIST_FOREACH(mt, &mm->mm_tables, mt_link) { + mpegts_table_grab(mt); + vec[i++] = mt; + } + assert(i == len); + + /* Process */ + for (i = 0; i < len; i++) { + mt = vec[i]; + if (!mt->mt_destroyed && mt->mt_pid == pid) { + if (tsb[3] & 0x10) { + int ccerr = 0; + if (mt->mt_cc != -1 && mt->mt_cc != cc) { + ccerr = 1; + /* Ignore dupes (shouldn't have payload set, but some seem to) */ + //if (((mt->mt_cc + 15) & 0xf) != cc) + tvhdebug("psi", "PID %04X CC error %d != %d", pid, cc, mt->mt_cc); + } + mt->mt_cc = (cc + 1) & 0xF; + mpegts_psi_section_reassemble(&mt->mt_sect, tsb, 0, ccerr, + mpegts_table_dispatch, mt); + } + } + mpegts_table_release(mt); + } +} + static void mpegts_input_process ( mpegts_input_t *mi, mpegts_packet_t *mp ) @@ -566,25 +603,24 @@ mpegts_input_process // wrong for a brief period of time if the registrations on // the PID change if (mp != last_mp) { - if (pid == 0) - stream = table = 1; - else { + if (pid == 0) { + stream = MPS_STREAM; + table = MPS_TABLE; + } else { stream = table = 0; /* Determine PID type */ RB_FOREACH(mps, &mp->mp_subs, mps_link) { - if (mps->mps_type & MPS_STREAM) - stream = 1; - if (mps->mps_type & MPS_TABLE) - table = 1; - if (table && stream) break; + stream |= mps->mps_type & MPS_STREAM; + table |= mps->mps_type & (MPS_TABLE | MPS_FTABLE); + if (table == (MPS_TABLE|MPS_FTABLE) && stream) break; } /* Special case streams */ LIST_FOREACH(s, &mi->mi_transports, s_active_link) { if (((mpegts_service_t*)s)->s_dvb_mux != mmi->mmi_mux) continue; - if (pid == s->s_pmt_pid) stream = 1; - else if (pid == s->s_pcr_pid) stream = 1; + if (pid == s->s_pmt_pid) stream = MPS_STREAM; + else if (pid == s->s_pcr_pid) stream = MPS_STREAM; } } } @@ -602,14 +638,18 @@ mpegts_input_process /* Table data */ if (table) { if (!(tsb[i+1] & 0x80)) { - // TODO: might be able to optimise this a bit by having slightly - // larger buffering and trying to aggregate data (if we get - // same PID multiple times in the loop) - mpegts_table_feed_t *mtf = malloc(sizeof(mpegts_table_feed_t)); - memcpy(mtf->mtf_tsb, tsb+i, 188); - mtf->mtf_mux = mm; - TAILQ_INSERT_TAIL(&mi->mi_table_queue, mtf, mtf_link); - table_wakeup = 1; + if (table & MPS_FTABLE) + mpegts_input_table_dispatch(mm, tsb+i); + if (table & MPS_TABLE) { + // TODO: might be able to optimise this a bit by having slightly + // larger buffering and trying to aggregate data (if we get + // same PID multiple times in the loop) + mpegts_table_feed_t *mtf = malloc(sizeof(mpegts_table_feed_t)); + memcpy(mtf->mtf_tsb, tsb+i, 188); + mtf->mtf_mux = mm; + TAILQ_INSERT_TAIL(&mi->mi_table_queue, mtf, mtf_link); + table_wakeup = 1; + } } else { //tvhdebug("tsdemux", "%s - SI packet had errors", name); } @@ -678,43 +718,6 @@ mpegts_input_thread ( void * p ) return NULL; } -static void -mpegts_input_table_dispatch ( mpegts_mux_t *mm, mpegts_table_feed_t *mtf ) -{ - int i = 0; - int len = mm->mm_num_tables; - uint16_t pid = ((mtf->mtf_tsb[1] & 0x1f) << 8) | mtf->mtf_tsb[2]; - uint8_t cc = (mtf->mtf_tsb[3] & 0x0f); - mpegts_table_t *mt, *vec[len]; - - /* Collate - tables may be removed during callbacks */ - LIST_FOREACH(mt, &mm->mm_tables, mt_link) { - mpegts_table_grab(mt); - vec[i++] = mt; - } - assert(i == len); - - /* Process */ - for (i = 0; i < len; i++) { - mt = vec[i]; - if (!mt->mt_destroyed && mt->mt_pid == pid) { - if (mtf->mtf_tsb[3] & 0x10) { - int ccerr = 0; - if (mt->mt_cc != -1 && mt->mt_cc != cc) { - ccerr = 1; - /* Ignore dupes (shouldn't have payload set, but some seem to) */ - //if (((mt->mt_cc + 15) & 0xf) != cc) - tvhdebug("psi", "PID %04X CC error %d != %d", pid, cc, mt->mt_cc); - } - mt->mt_cc = (cc + 1) & 0xF; - mpegts_psi_section_reassemble(&mt->mt_sect, mtf->mtf_tsb, 0, ccerr, - mpegts_table_dispatch, mt); - } - } - mpegts_table_release(mt); - } -} - static void * mpegts_input_table_thread ( void *aux ) { @@ -735,7 +738,7 @@ mpegts_input_table_thread ( void *aux ) /* Process */ if (mtf->mtf_mux) { pthread_mutex_lock(&global_lock); - mpegts_input_table_dispatch(mtf->mtf_mux, mtf); + mpegts_input_table_dispatch(mtf->mtf_mux, mtf->mtf_tsb); pthread_mutex_unlock(&global_lock); } diff --git a/src/input/mpegts/mpegts_mux.c b/src/input/mpegts/mpegts_mux.c index 28b5dd2f..4ac45073 100644 --- a/src/input/mpegts/mpegts_mux.c +++ b/src/input/mpegts/mpegts_mux.c @@ -629,8 +629,11 @@ mpegts_mux_stop ( mpegts_mux_t *mm, int force ) void mpegts_mux_open_table ( mpegts_mux_t *mm, mpegts_table_t *mt ) { - int type = MPS_TABLE; + int type = 0; + if (mt->mt_flags & MT_FAST) type |= MPS_FTABLE; + if (mt->mt_flags & MT_SLOW) type |= MPS_TABLE; if (mt->mt_flags & MT_RECORD) type |= MPS_STREAM; + if ((type & (MPS_FTABLE | MPS_TABLE)) == 0) type |= MPS_TABLE; mpegts_input_t *mi; if (!mm->mm_active || !mm->mm_active->mmi_input) return; mi = mm->mm_active->mmi_input; @@ -643,8 +646,11 @@ void mpegts_mux_close_table ( mpegts_mux_t *mm, mpegts_table_t *mt ) { mpegts_input_t *mi; - int type = MPS_TABLE; + int type = 0; + if (mt->mt_flags & MT_FAST) type |= MPS_FTABLE; + if (mt->mt_flags & MT_SLOW) type |= MPS_TABLE; if (mt->mt_flags & MT_RECORD) type |= MPS_STREAM; + if ((type & (MPS_FTABLE | MPS_TABLE)) == 0) type |= MPS_TABLE; if (!mm->mm_active || !mm->mm_active->mmi_input) return; mi = mm->mm_active->mmi_input; pthread_mutex_lock(&mi->mi_output_lock);