mpegts/descrambler: Introduce FAST TABLE for CA ECM processing

- also change the capmt locking (get most of job outside global lock)
This commit is contained in:
Jaroslav Kysela 2014-06-08 14:47:57 +02:00
parent e9617b9ede
commit fb12cf9239
7 changed files with 157 additions and 86 deletions

View file

@ -118,6 +118,8 @@ typedef enum {
LIST_HEAD(caid_list, caid);
#define DESCRAMBLER_ECM_PID(pid) ((pid) | (MT_FAST << 16))
void descrambler_init ( void );
void descrambler_done ( void );
void descrambler_service_start ( struct service *t );

View file

@ -111,6 +111,10 @@ typedef struct dmx_sct_filter_params {
#define CAPMT_DESC_DEMUX 0x82
#define CAPMT_DESC_PID 0x84
// message type
#define CAPMT_MSG_FAST 0x01
#define CAPMT_MSG_CLEAR 0x02
// limits
#define MAX_CA 16
#define MAX_INDEX 64
@ -586,16 +590,26 @@ capmt_write_msg(capmt_t *capmt, int sid, const uint8_t *buf, size_t len)
*
*/
static void
capmt_queue_msg(capmt_t *capmt, int sid, const uint8_t *buf, size_t len)
capmt_queue_msg
(capmt_t *capmt, int sid, const uint8_t *buf, size_t len, int flags)
{
capmt_message_t *msg = malloc(sizeof(*msg));
capmt_message_t *msg;
if (flags & CAPMT_MSG_CLEAR) {
while ((msg = TAILQ_FIRST(&capmt->capmt_writeq)) != NULL) {
TAILQ_REMOVE(&capmt->capmt_writeq, msg, cm_link);
sbuf_free(&msg->cm_sb);
free(msg);
}
}
msg = malloc(sizeof(*msg));
sbuf_init_fixed(&msg->cm_sb, len);
sbuf_append(&msg->cm_sb, buf, len);
msg->cm_sid = sid;
pthread_mutex_lock(&capmt->capmt_mutex);
TAILQ_INSERT_TAIL(&capmt->capmt_writeq, msg, cm_link);
pthread_mutex_unlock(&capmt->capmt_mutex);
if (flags & CAPMT_MSG_FAST)
TAILQ_INSERT_HEAD(&capmt->capmt_writeq, msg, cm_link);
else
TAILQ_INSERT_TAIL(&capmt->capmt_writeq, msg, cm_link);
tvh_write(capmt->capmt_pipe.wr, "c", 1);
}
@ -678,7 +692,8 @@ capmt_send_stop(capmt_service_t *t)
buf[10] = ((pos - 5 - 12) & 0xF00) >> 8;
buf[11] = ((pos - 5 - 12) & 0xFF);
capmt_queue_msg(t->ct_capmt, s->s_dvb_service_id, buf, pos);
capmt_queue_msg(t->ct_capmt, s->s_dvb_service_id,
buf, pos, CAPMT_MSG_CLEAR);
}
}
@ -693,11 +708,14 @@ capmt_service_destroy(th_descrambler_t *td)
mpegts_service_t *s = (mpegts_service_t *)ct->td_service;
int oscam_new = capmt_oscam_new(ct->ct_capmt);
capmt_caid_ecm_t *cce;
capmt_t *capmt = ct->ct_capmt;
tvhlog(LOG_INFO, "capmt",
"Removing CAPMT Server from service \"%s\" on adapter %d",
s->s_dvb_svcname, ct->ct_adapter);
pthread_mutex_lock(&capmt->capmt_mutex);
/* send stop to client */
if (!oscam_new)
capmt_send_stop(ct);
@ -714,20 +732,23 @@ capmt_service_destroy(th_descrambler_t *td)
LIST_REMOVE(ct, ct_link);
if (oscam_new)
capmt_enumerate_services(ct->ct_capmt, 1);
capmt_enumerate_services(capmt, 1);
if (LIST_EMPTY(&ct->ct_capmt->capmt_services)) {
ct->ct_capmt->capmt_adapters[ct->ct_adapter].ca_tuner = NULL;
memset(&ct->ct_capmt->capmt_demuxes, 0, sizeof(ct->ct_capmt->capmt_demuxes));
if (LIST_EMPTY(&capmt->capmt_services)) {
capmt->capmt_adapters[ct->ct_adapter].ca_tuner = NULL;
memset(&capmt->capmt_demuxes, 0, sizeof(capmt->capmt_demuxes));
}
pthread_mutex_unlock(&capmt->capmt_mutex);
tvhcsa_destroy(&ct->ct_csa);
free(ct);
}
static void
capmt_filter_data(capmt_t *capmt, uint8_t adapter, uint8_t demux_index,
uint8_t filter_index, const uint8_t *data, int len)
uint8_t filter_index, const uint8_t *data, int len,
int flags)
{
uint8_t *buf = alloca(len + 6);
@ -736,7 +757,7 @@ capmt_filter_data(capmt_t *capmt, uint8_t adapter, uint8_t demux_index,
buf[5] = filter_index;
memcpy(buf + 6, data, len);
if (len - 3 == ((((uint16_t)buf[7] << 8) | buf[8]) & 0xfff))
capmt_queue_msg(capmt, 0, buf, len + 6);
capmt_queue_msg(capmt, 0, buf, len + 6, flags);
}
static void
@ -748,6 +769,10 @@ capmt_set_filter(capmt_t *capmt, int adapter, sbuf_t *sb, int offset)
dmx_filter_params_t *filter;
dmx_filter_params_t *params = (dmx_filter_params_t *)sbuf_peek(sb, offset + 6);
capmt_filters_t *cf;
capmt_service_t *ct;
mpegts_service_t *t;
elementary_stream_t *st;
caid_t *c;
tvhtrace("capmt", "setting filter: adapter=%d, demux=%d, filter=%d, pid=%d",
adapter, demux_index, filter_index, pid);
@ -759,12 +784,31 @@ capmt_set_filter(capmt_t *capmt, int adapter, sbuf_t *sb, int offset)
cf = &capmt->capmt_demuxes.filters[demux_index];
if (cf->max && cf->adapter != adapter)
return;
pthread_mutex_lock(&capmt->capmt_mutex);
cf->adapter = adapter;
filter = &cf->dmx[filter_index];
filter->pid = pid;
capmt_pid_add(capmt, adapter, pid);
memcpy(&filter->filter, &params->filter, sizeof(params->filter));
filter->timeout = filter->flags = 0;
filter->timeout = 0;
filter->flags = 0;
/* ECM messages have the higher priority */
LIST_FOREACH(ct, &capmt->capmt_services, ct_link) {
t = (mpegts_service_t *)ct->td_service;
pthread_mutex_lock(&t->s_stream_mutex);
TAILQ_FOREACH(st, &t->s_components, es_link) {
LIST_FOREACH(c, &st->es_caids, link) {
if (c->pid == pid) {
filter->flags = CAPMT_MSG_FAST;
break;
}
}
if (c) break;
}
pthread_mutex_unlock(&t->s_stream_mutex);
if (st) break;
}
/* Update the max values */
if (capmt->capmt_demuxes.max <= demux_index)
capmt->capmt_demuxes.max = demux_index + 1;
if (cf->max <= filter_index)
@ -774,6 +818,7 @@ capmt_set_filter(capmt_t *capmt, int adapter, sbuf_t *sb, int offset)
memmove(filter->filter.filter + 3, filter->filter.filter + 1, DMX_FILTER_SIZE - 3);
memmove(filter->filter.mode + 3, filter->filter.mode + 1, DMX_FILTER_SIZE - 3);
filter->filter.mask[1] = filter->filter.mask[2] = 0;
pthread_mutex_unlock(&capmt->capmt_mutex);
}
static void
@ -795,6 +840,7 @@ capmt_stop_filter(capmt_t *capmt, int adapter, sbuf_t *sb, int offset)
filter = &cf->dmx[filter_index];
if (filter->pid != pid)
return;
pthread_mutex_lock(&capmt->capmt_mutex);
memset(filter, 0, sizeof(*filter));
capmt_pid_remove(capmt, adapter, pid);
/* short the max values */
@ -806,11 +852,13 @@ capmt_stop_filter(capmt_t *capmt, int adapter, sbuf_t *sb, int offset)
while (demux_index != 255 && capmt->capmt_demuxes.filters[demux_index].max == 0)
demux_index--;
capmt->capmt_demuxes.max = demux_index == 255 ? 0 : demux_index + 1;
pthread_mutex_unlock(&capmt->capmt_mutex);
}
static void
capmt_notify_server(capmt_t *capmt, capmt_service_t *ct)
{
pthread_mutex_lock(&capmt->capmt_mutex);
if (capmt_oscam_new(capmt)) {
if (!LIST_EMPTY(&capmt->capmt_services))
capmt_enumerate_services(capmt, 0);
@ -821,6 +869,7 @@ capmt_notify_server(capmt_t *capmt, capmt_service_t *ct)
LIST_FOREACH(ct, &capmt->capmt_services, ct_link)
capmt_send_request(ct, CAPMT_LIST_ONLY);
}
pthread_mutex_unlock(&capmt->capmt_mutex);
}
static void
@ -829,7 +878,7 @@ capmt_abort(capmt_t *capmt, int keystate)
mpegts_service_t *t;
capmt_service_t *ct;
pthread_mutex_lock(&global_lock);
pthread_mutex_lock(&capmt->capmt_mutex);
LIST_FOREACH(ct, &capmt->capmt_services, ct_link) {
t = (mpegts_service_t *)ct->td_service;
@ -842,7 +891,7 @@ capmt_abort(capmt_t *capmt, int keystate)
ct->td_keystate = keystate;
}
}
pthread_mutex_unlock(&global_lock);
pthread_mutex_unlock(&capmt->capmt_mutex);
}
static void
@ -854,7 +903,7 @@ capmt_process_key(capmt_t *capmt, uint8_t adapter, uint16_t seq,
capmt_service_t *ct;
unsigned int i;
pthread_mutex_lock(&global_lock);
pthread_mutex_lock(&capmt->capmt_mutex);
LIST_FOREACH(ct, &capmt->capmt_services, ct_link) {
t = (mpegts_service_t *)ct->td_service;
@ -890,7 +939,7 @@ capmt_process_key(capmt_t *capmt, uint8_t adapter, uint16_t seq,
ct->td_keystate = DS_RESOLVED;
}
pthread_mutex_unlock(&global_lock);
pthread_mutex_unlock(&capmt->capmt_mutex);
}
static int
@ -1016,9 +1065,7 @@ handle_ca0(capmt_t *capmt) {
for (i = 0; i < MAX_CA; i++)
sbuf_init(&buffer[i]);
pthread_mutex_lock(&global_lock);
capmt_notify_server(capmt, NULL);
pthread_mutex_unlock(&global_lock);
capmt->capmt_poll = tvhpoll_create(MAX_CA + 1);
capmt_poll_add(capmt, capmt->capmt_pipe.rd, 0);
@ -1117,9 +1164,7 @@ handle_single(capmt_t *capmt)
reconnect = capmt->capmt_sock_reconnect[0];
sbuf_init(&buffer);
pthread_mutex_lock(&global_lock);
capmt_notify_server(capmt, NULL);
pthread_mutex_unlock(&global_lock);
capmt->capmt_poll = tvhpoll_create(2);
capmt_poll_add(capmt, capmt->capmt_pipe.rd, 0);
@ -1202,9 +1247,7 @@ handle_ca0_wrapper(capmt_t *capmt)
show_connection(capmt, ".so wrapper");
pthread_mutex_lock(&global_lock);
capmt_notify_server(capmt, NULL);
pthread_mutex_unlock(&global_lock);
while (capmt->capmt_running) {
@ -1394,6 +1437,8 @@ capmt_table_input(void *opaque, int pid, const uint8_t *data, int len)
/* Validate */
if (data == NULL || len > 4096) return;
pthread_mutex_lock(&capmt->capmt_mutex);
for (demux_index = 0; demux_index < capmt->capmt_demuxes.max; demux_index++) {
cf = &capmt->capmt_demuxes.filters[demux_index];
if (cf->adapter != o->adapter)
@ -1410,11 +1455,13 @@ capmt_table_input(void *opaque, int pid, const uint8_t *data, int len)
if (i >= DMX_FILTER_SIZE && i <= len) {
capmt_filter_data(capmt,
o->adapter, demux_index,
filter_index, data, len);
filter_index, data, len,
cf->dmx[filter_index].flags);
}
}
}
pthread_mutex_unlock(&capmt->capmt_mutex);
}
static void
@ -1422,12 +1469,15 @@ capmt_caid_change(th_descrambler_t *td)
{
capmt_service_t *ct = (capmt_service_t *)td;
capmt_t *capmt = ct->ct_capmt;
mpegts_service_t *t = (mpegts_service_t*)td->td_service;
mpegts_service_t *t;
elementary_stream_t *st;
capmt_caid_ecm_t *cce;
caid_t *c;
int change = 0;
pthread_mutex_lock(&capmt->capmt_mutex);
t = (mpegts_service_t*)td->td_service;
TAILQ_FOREACH(st, &t->s_components, es_link) {
LIST_FOREACH(c, &st->es_caids, link) {
/* search ecmpid in list */
@ -1452,6 +1502,8 @@ capmt_caid_change(th_descrambler_t *td)
if (change)
capmt_notify_server(capmt, ct);
pthread_mutex_unlock(&capmt->capmt_mutex);
}
static void
@ -1577,7 +1629,7 @@ capmt_send_request(capmt_service_t *ct, int lm)
buf[9] = pmtversion;
pmtversion = (pmtversion + 1) & 0x1F;
capmt_queue_msg(capmt, sid, buf, pos);
capmt_queue_msg(capmt, sid, buf, pos, 0);
}
static void

View file

@ -1991,7 +1991,8 @@ cwc_service_start(service_t *t)
LIST_INSERT_HEAD(&cwc->cwc_services, ct, cs_link);
descrambler_open_pid(ct->cs_mux, ct, ct->cs_estream->es_pid,
descrambler_open_pid(ct->cs_mux, ct,
DESCRAMBLER_ECM_PID(ct->cs_estream->es_pid),
cwc_table_input);
tvhlog(LOG_DEBUG, "cwc", "%s using CWC %s:%d",

View file

@ -204,9 +204,12 @@ descrambler_open_pid_( mpegts_mux_t *mux, void *opaque, int pid,
{
descrambler_table_t *dt;
descrambler_section_t *ds;
int flags;
if (mux == NULL)
return 0;
flags = pid >> 16;
pid &= 0x3fff;
TAILQ_FOREACH(dt, &mux->mm_descrambler_tables, link) {
if (dt->table->mt_pid == pid) {
TAILQ_FOREACH(ds, &dt->sections, link) {
@ -219,7 +222,7 @@ descrambler_open_pid_( mpegts_mux_t *mux, void *opaque, int pid,
dt = calloc(1, sizeof(*dt));
TAILQ_INIT(&dt->sections);
dt->table = mpegts_table_add(mux, 0, 0, descrambler_table_callback,
dt, "descrambler", MT_FULL, pid);
dt, "descrambler", MT_FULL | flags, pid);
TAILQ_INSERT_TAIL(&mux->mm_descrambler_tables, dt, link);
}
ds = calloc(1, sizeof(*ds));
@ -250,6 +253,7 @@ descrambler_close_pid_( mpegts_mux_t *mux, void *opaque, int pid )
if (mux == NULL)
return 0;
pid &= 0x3fff;
TAILQ_FOREACH(dt, &mux->mm_descrambler_tables, link) {
if (dt->table->mt_pid == pid) {
TAILQ_FOREACH(ds, &dt->sections, link) {

View file

@ -120,6 +120,7 @@ typedef struct mpegts_pid_sub
#define MPS_NONE 0x0
#define MPS_STREAM 0x1
#define MPS_TABLE 0x2
#define MPS_FTABLE 0x4
int mps_type;
void *mps_owner;
} mpegts_pid_sub_t;
@ -147,6 +148,8 @@ struct mpegts_table
#define MT_RECORD 0x08
#define MT_SKIPSUBS 0x10
#define MT_SCANSUBS 0x20
#define MT_FAST 0x40
#define MT_SLOW 0x80
/**
* Cycle queue

View file

@ -521,6 +521,43 @@ mpegts_input_recv_packets
sb->sb_ptr = 0; // clear
}
static void
mpegts_input_table_dispatch ( mpegts_mux_t *mm, const uint8_t *tsb )
{
int i = 0;
int len = mm->mm_num_tables;
uint16_t pid = ((tsb[1] & 0x1f) << 8) | tsb[2];
uint8_t cc = (tsb[3] & 0x0f);
mpegts_table_t *mt, *vec[len];
/* Collate - tables may be removed during callbacks */
LIST_FOREACH(mt, &mm->mm_tables, mt_link) {
mpegts_table_grab(mt);
vec[i++] = mt;
}
assert(i == len);
/* Process */
for (i = 0; i < len; i++) {
mt = vec[i];
if (!mt->mt_destroyed && mt->mt_pid == pid) {
if (tsb[3] & 0x10) {
int ccerr = 0;
if (mt->mt_cc != -1 && mt->mt_cc != cc) {
ccerr = 1;
/* Ignore dupes (shouldn't have payload set, but some seem to) */
//if (((mt->mt_cc + 15) & 0xf) != cc)
tvhdebug("psi", "PID %04X CC error %d != %d", pid, cc, mt->mt_cc);
}
mt->mt_cc = (cc + 1) & 0xF;
mpegts_psi_section_reassemble(&mt->mt_sect, tsb, 0, ccerr,
mpegts_table_dispatch, mt);
}
}
mpegts_table_release(mt);
}
}
static void
mpegts_input_process
( mpegts_input_t *mi, mpegts_packet_t *mp )
@ -566,25 +603,24 @@ mpegts_input_process
// wrong for a brief period of time if the registrations on
// the PID change
if (mp != last_mp) {
if (pid == 0)
stream = table = 1;
else {
if (pid == 0) {
stream = MPS_STREAM;
table = MPS_TABLE;
} else {
stream = table = 0;
/* Determine PID type */
RB_FOREACH(mps, &mp->mp_subs, mps_link) {
if (mps->mps_type & MPS_STREAM)
stream = 1;
if (mps->mps_type & MPS_TABLE)
table = 1;
if (table && stream) break;
stream |= mps->mps_type & MPS_STREAM;
table |= mps->mps_type & (MPS_TABLE | MPS_FTABLE);
if (table == (MPS_TABLE|MPS_FTABLE) && stream) break;
}
/* Special case streams */
LIST_FOREACH(s, &mi->mi_transports, s_active_link) {
if (((mpegts_service_t*)s)->s_dvb_mux != mmi->mmi_mux) continue;
if (pid == s->s_pmt_pid) stream = 1;
else if (pid == s->s_pcr_pid) stream = 1;
if (pid == s->s_pmt_pid) stream = MPS_STREAM;
else if (pid == s->s_pcr_pid) stream = MPS_STREAM;
}
}
}
@ -602,14 +638,18 @@ mpegts_input_process
/* Table data */
if (table) {
if (!(tsb[i+1] & 0x80)) {
// TODO: might be able to optimise this a bit by having slightly
// larger buffering and trying to aggregate data (if we get
// same PID multiple times in the loop)
mpegts_table_feed_t *mtf = malloc(sizeof(mpegts_table_feed_t));
memcpy(mtf->mtf_tsb, tsb+i, 188);
mtf->mtf_mux = mm;
TAILQ_INSERT_TAIL(&mi->mi_table_queue, mtf, mtf_link);
table_wakeup = 1;
if (table & MPS_FTABLE)
mpegts_input_table_dispatch(mm, tsb+i);
if (table & MPS_TABLE) {
// TODO: might be able to optimise this a bit by having slightly
// larger buffering and trying to aggregate data (if we get
// same PID multiple times in the loop)
mpegts_table_feed_t *mtf = malloc(sizeof(mpegts_table_feed_t));
memcpy(mtf->mtf_tsb, tsb+i, 188);
mtf->mtf_mux = mm;
TAILQ_INSERT_TAIL(&mi->mi_table_queue, mtf, mtf_link);
table_wakeup = 1;
}
} else {
//tvhdebug("tsdemux", "%s - SI packet had errors", name);
}
@ -678,43 +718,6 @@ mpegts_input_thread ( void * p )
return NULL;
}
static void
mpegts_input_table_dispatch ( mpegts_mux_t *mm, mpegts_table_feed_t *mtf )
{
int i = 0;
int len = mm->mm_num_tables;
uint16_t pid = ((mtf->mtf_tsb[1] & 0x1f) << 8) | mtf->mtf_tsb[2];
uint8_t cc = (mtf->mtf_tsb[3] & 0x0f);
mpegts_table_t *mt, *vec[len];
/* Collate - tables may be removed during callbacks */
LIST_FOREACH(mt, &mm->mm_tables, mt_link) {
mpegts_table_grab(mt);
vec[i++] = mt;
}
assert(i == len);
/* Process */
for (i = 0; i < len; i++) {
mt = vec[i];
if (!mt->mt_destroyed && mt->mt_pid == pid) {
if (mtf->mtf_tsb[3] & 0x10) {
int ccerr = 0;
if (mt->mt_cc != -1 && mt->mt_cc != cc) {
ccerr = 1;
/* Ignore dupes (shouldn't have payload set, but some seem to) */
//if (((mt->mt_cc + 15) & 0xf) != cc)
tvhdebug("psi", "PID %04X CC error %d != %d", pid, cc, mt->mt_cc);
}
mt->mt_cc = (cc + 1) & 0xF;
mpegts_psi_section_reassemble(&mt->mt_sect, mtf->mtf_tsb, 0, ccerr,
mpegts_table_dispatch, mt);
}
}
mpegts_table_release(mt);
}
}
static void *
mpegts_input_table_thread ( void *aux )
{
@ -735,7 +738,7 @@ mpegts_input_table_thread ( void *aux )
/* Process */
if (mtf->mtf_mux) {
pthread_mutex_lock(&global_lock);
mpegts_input_table_dispatch(mtf->mtf_mux, mtf);
mpegts_input_table_dispatch(mtf->mtf_mux, mtf->mtf_tsb);
pthread_mutex_unlock(&global_lock);
}

View file

@ -629,8 +629,11 @@ mpegts_mux_stop ( mpegts_mux_t *mm, int force )
void
mpegts_mux_open_table ( mpegts_mux_t *mm, mpegts_table_t *mt )
{
int type = MPS_TABLE;
int type = 0;
if (mt->mt_flags & MT_FAST) type |= MPS_FTABLE;
if (mt->mt_flags & MT_SLOW) type |= MPS_TABLE;
if (mt->mt_flags & MT_RECORD) type |= MPS_STREAM;
if ((type & (MPS_FTABLE | MPS_TABLE)) == 0) type |= MPS_TABLE;
mpegts_input_t *mi;
if (!mm->mm_active || !mm->mm_active->mmi_input) return;
mi = mm->mm_active->mmi_input;
@ -643,8 +646,11 @@ void
mpegts_mux_close_table ( mpegts_mux_t *mm, mpegts_table_t *mt )
{
mpegts_input_t *mi;
int type = MPS_TABLE;
int type = 0;
if (mt->mt_flags & MT_FAST) type |= MPS_FTABLE;
if (mt->mt_flags & MT_SLOW) type |= MPS_TABLE;
if (mt->mt_flags & MT_RECORD) type |= MPS_STREAM;
if ((type & (MPS_FTABLE | MPS_TABLE)) == 0) type |= MPS_TABLE;
if (!mm->mm_active || !mm->mm_active->mmi_input) return;
mi = mm->mm_active->mmi_input;
pthread_mutex_lock(&mi->mi_output_lock);