PID lookup, streaming pad filter and sbuf alloc optimizations

There is high probability (because of the "nature" of TS streams) that
next PID will match previous one. This patch tries to do simple "caching"
of last PID to speedup PID lookups.

The streaming pad filter is handled faster way now.

Also, sbuf allocations routines are optimized (better sb_size prediction and
allocation routine is called only on demand - not all time).
This commit is contained in:
Jaroslav Kysela 2014-03-05 17:31:19 +01:00 committed by Adam Sutton
parent 342d0e7799
commit c6446b1eef
12 changed files with 97 additions and 40 deletions

View file

@ -286,6 +286,8 @@ struct mpegts_mux
*/
RB_HEAD(, mpegts_pid) mm_pids;
int mm_last_pid;
mpegts_pid_t *mm_last_mp;
int mm_num_tables;
LIST_HEAD(, mpegts_table) mm_tables;
@ -601,6 +603,15 @@ void mpegts_mux_remove_subscriber(mpegts_mux_t *mm, th_subscription_t *s, int re
int mpegts_mux_subscribe(mpegts_mux_t *mm, const char *name, int weight);
void mpegts_mux_unsubscribe_by_name(mpegts_mux_t *mm, const char *name);
#define mpegts_mux_find_pid_fast(mm, pid, create) ({ \
mpegts_pid_t *__mp; \
if ((mm)->mm_last_pid != (pid)) \
__mp = mpegts_mux_find_pid(mm, pid, create); \
else \
__mp = (mm)->mm_last_mp; \
__mp; \
})
mpegts_pid_t *mpegts_mux_find_pid(mpegts_mux_t *mm, int pid, int create);
size_t mpegts_input_recv_packets
@ -629,7 +640,9 @@ void mpegts_input_close_pid
void mpegts_table_dispatch
(const uint8_t *sec, size_t r, void *mt);
void mpegts_table_release
#define mpegts_table_release(t) \
do { if(--mt->mt_refcount == 0) mpegts_table_release_(mt); } while (0)
void mpegts_table_release_
(mpegts_table_t *mt);
mpegts_table_t *mpegts_table_add
(mpegts_mux_t *mm, int tableid, int mask,

View file

@ -1176,7 +1176,7 @@ psi_desc_add_ca
tvhdebug("pmt", " caid %04X (%s) provider %08X pid %04X",
caid, descrambler_caid2name(caid), provid, pid);
if((st = service_stream_find((service_t*)t, pid)) == NULL) {
if((st = service_stream_find_fast((service_t*)t, pid)) == NULL) {
st = service_stream_create((service_t*)t, pid, SCT_CA);
r |= PMT_UPDATE_NEW_CA_STREAM;
}
@ -1282,7 +1282,7 @@ psi_desc_teletext(mpegts_service_t *t, const uint8_t *ptr, int size,
// higher than normal MPEG TS (0x2000 ++)
int pid = DVB_TELETEXT_BASE + page;
if((st = service_stream_find((service_t*)t, pid)) == NULL) {
if((st = service_stream_find_fast((service_t*)t, pid)) == NULL) {
r |= PMT_UPDATE_NEW_STREAM;
st = service_stream_create((service_t*)t, pid, SCT_TEXTSUB);
st->es_delete_me = 1;
@ -1492,7 +1492,7 @@ psi_parse_pmt
if(hts_stream_type != SCT_UNKNOWN) {
if((st = service_stream_find((service_t*)t, pid)) == NULL) {
if((st = service_stream_find_fast((service_t*)t, pid)) == NULL) {
update |= PMT_UPDATE_NEW_STREAM;
st = service_stream_create((service_t*)t, pid, hts_stream_type);
}

View file

@ -195,6 +195,10 @@ mpegts_input_close_pid
skel.mps_type = type;
skel.mps_owner = owner;
mps = RB_FIND(&mp->mp_subs, &skel, mps_link, mps_cmp);
if (pid == mm->mm_last_pid) {
mm->mm_last_pid = -1;
mm->mm_last_mp = NULL;
}
if (mps) {
RB_REMOVE(&mp->mp_subs, mps, mps_link);
free(mps);
@ -369,7 +373,7 @@ mpegts_input_recv_packets
name, pid, pid, mmi);
/* Find PID */
if ((mp = mpegts_mux_find_pid(mm, pid, 0))) {
if ((mp = mpegts_mux_find_pid_fast(mm, pid, 0))) {
int stream = 0;
int table = 0;

View file

@ -535,6 +535,8 @@ mpegts_mux_stop ( mpegts_mux_t *mm, int force )
mpegts_input_flush_mux(mi, mm);
/* Ensure PIDs are cleared */
mm->mm_last_pid = -1;
mm->mm_last_mp = NULL;
while ((mp = RB_FIRST(&mm->mm_pids))) {
while ((mps = RB_FIRST(&mp->mp_subs))) {
RB_REMOVE(&mp->mp_subs, mps, mps_link);
@ -773,6 +775,8 @@ mpegts_mux_create0
mm->mm_close_table = mpegts_mux_close_table;
TAILQ_INIT(&mm->mm_table_queue);
mm->mm_last_pid = -1;
/* Configuration */
if (conf)
idnode_load(&mm->mm_id, conf);
@ -917,6 +921,10 @@ mpegts_mux_find_pid ( mpegts_mux_t *mm, int pid, int create )
mp->mp_fd = -1;
}
}
if (mp) {
mm->mm_last_pid = pid;
mm->mm_last_mp = mp;
}
return mp;
}

View file

@ -92,12 +92,10 @@ mpegts_table_dispatch
}
void
mpegts_table_release ( mpegts_table_t *mt )
mpegts_table_release_ ( mpegts_table_t *mt )
{
if(--mt->mt_refcount == 0) {
free(mt->mt_name);
free(mt);
}
free(mt->mt_name);
free(mt);
}
void

View file

@ -68,7 +68,7 @@ ts_recv_packet0
{
int off, pusi, cc, error;
service_set_streaming_status_flags((service_t*)t, TSS_MUX_PACKETS);
service_set_streaming_status_flags_fast((service_t*)t, TSS_MUX_PACKETS);
if(streaming_pad_probe_type(&t->s_streaming_pad, SMT_MPEGTS))
ts_remux(t, tsb);
@ -206,7 +206,7 @@ ts_recv_packet1
pthread_mutex_lock(&t->s_stream_mutex);
service_set_streaming_status_flags((service_t*)t, TSS_INPUT_HARDWARE);
service_set_streaming_status_flags_fast((service_t*)t, TSS_INPUT_HARDWARE);
if(error) {
/* Transport Error Indicator */
@ -216,7 +216,7 @@ ts_recv_packet1
pid = (tsb[1] & 0x1f) << 8 | tsb[2];
st = service_stream_find((service_t*)t, pid);
st = service_stream_find_fast((service_t*)t, pid);
/* Extract PCR */
if (pcr != PTS_UNSET)
@ -228,7 +228,7 @@ ts_recv_packet1
}
if(!error)
service_set_streaming_status_flags((service_t*)t, TSS_INPUT_SERVICE);
service_set_streaming_status_flags_fast((service_t*)t, TSS_INPUT_SERVICE);
avgstat_add(&t->s_rate, 188, dispatch_clock);
@ -259,9 +259,9 @@ ts_recv_packet1
if(!error && t->s_scrambled != 0) {
if(n == 0) {
service_set_streaming_status_flags((service_t*)t, TSS_NO_DESCRAMBLER);
service_set_streaming_status_flags_fast((service_t*)t, TSS_NO_DESCRAMBLER);
} else if(m == n) {
service_set_streaming_status_flags((service_t*)t, TSS_NO_ACCESS);
service_set_streaming_status_flags_fast((service_t*)t, TSS_NO_ACCESS);
}
}
@ -282,7 +282,7 @@ ts_recv_packet2(mpegts_service_t *t, const uint8_t *tsb)
elementary_stream_t *st;
int pid = (tsb[1] & 0x1f) << 8 | tsb[2];
if((st = service_stream_find((service_t*)t, pid)) != NULL)
if((st = service_stream_find_fast((service_t*)t, pid)) != NULL)
ts_recv_packet0(t, st, tsb);
}
@ -310,7 +310,7 @@ ts_remux(mpegts_service_t *t, const uint8_t *src)
pktbuf_ref_dec(pb);
service_set_streaming_status_flags((service_t*)t, TSS_PACKETS);
service_set_streaming_status_flags_fast((service_t*)t, TSS_PACKETS);
sbuf_reset(sb);
}

View file

@ -247,6 +247,11 @@ service_stream_destroy(service_t *t, elementary_stream_t *es)
avgstat_flush(&es->es_rate);
avgstat_flush(&es->es_cc_errors);
if (t->s_last_es == es) {
t->s_last_pid = -1;
t->s_last_es = NULL;
}
TAILQ_REMOVE(&t->s_components, es, es_link);
while ((c = LIST_FIRST(&es->es_caids)) != NULL) {
@ -552,6 +557,7 @@ service_create0
t->s_channel_name = service_channel_name;
t->s_provider_name = service_provider_name;
TAILQ_INIT(&t->s_components);
t->s_last_pid = -1;
streaming_pad_init(&t->s_streaming_pad);
@ -670,7 +676,7 @@ service_stream_create(service_t *t, int pid,
/**
* Add a new stream to a service
* Find an elementary stream in a service
*/
elementary_stream_t *
service_stream_find(service_t *t, int pid)
@ -680,8 +686,11 @@ service_stream_find(service_t *t, int pid)
lock_assert(&t->s_stream_mutex);
TAILQ_FOREACH(st, &t->s_components, es_link) {
if(st->es_pid == pid)
if(st->es_pid == pid) {
t->s_last_es = st;
t->s_last_pid = pid;
return st;
}
}
return NULL;
}

View file

@ -406,6 +406,8 @@ typedef struct service {
* List of all components.
*/
struct elementary_stream_queue s_components;
int s_last_pid;
elementary_stream_t *s_last_es;
/**
@ -448,6 +450,15 @@ service_instance_t *service_find_instance(struct service *s,
int *error,
int weight);
#define service_stream_find_fast(t, pid) ({ \
elementary_stream_t *__es; \
if ((t)->s_last_pid != (pid)) \
__es = service_stream_find(t, pid); \
else \
__es = (t)->s_last_es; \
__es; \
})
elementary_stream_t *service_stream_find(service_t *t, int pid);
elementary_stream_t *service_stream_create(service_t *t, int pid,
@ -474,6 +485,10 @@ void service_remove_subscriber(service_t *t, struct th_subscription *s,
void service_set_streaming_status_flags(service_t *t, int flag);
#define service_set_streaming_status_flags_fast(t, flag) \
do { if (((t)->s_streaming_status & flag) != flag) \
service_set_streaming_status_flags(t, flag); } while (0)
struct streaming_start;
struct streaming_start *service_build_stream_start(service_t *t);

View file

@ -30,6 +30,8 @@ void
streaming_pad_init(streaming_pad_t *sp)
{
LIST_INIT(&sp->sp_targets);
sp->sp_ntargets = 0;
sp->sp_reject_filter = 0;
}
/**
@ -115,6 +117,7 @@ streaming_target_connect(streaming_pad_t *sp, streaming_target_t *st)
sp->sp_ntargets++;
st->st_pad = sp;
LIST_INSERT_HEAD(&sp->sp_targets, st, st_link);
sp->sp_reject_filter |= st->st_reject_filter;
}
@ -124,10 +127,17 @@ streaming_target_connect(streaming_pad_t *sp, streaming_target_t *st)
void
streaming_target_disconnect(streaming_pad_t *sp, streaming_target_t *st)
{
int filter;
sp->sp_ntargets--;
st->st_pad = NULL;
LIST_REMOVE(st, st_link);
filter = 0;
LIST_FOREACH(st, &sp->sp_targets, st_link)
filter |= st->st_reject_filter;
sp->sp_reject_filter = filter;
}
@ -338,22 +348,6 @@ streaming_pad_deliver(streaming_pad_t *sp, streaming_message_t *sm)
}
/**
*
*/
int
streaming_pad_probe_type(streaming_pad_t *sp, streaming_message_type_t smt)
{
streaming_target_t *st;
LIST_FOREACH(st, &sp->sp_targets, st_link) {
if(!(st->st_reject_filter & SMT_TO_MASK(smt)))
return 1;
}
return 0;
}
/**
*
*/

View file

@ -111,8 +111,8 @@ void streaming_start_unref(streaming_start_t *ss);
streaming_start_t *streaming_start_copy(const streaming_start_t *src);
int streaming_pad_probe_type(streaming_pad_t *sp,
streaming_message_type_t smt);
#define streaming_pad_probe_type(sp, smt) \
(((sp)->sp_reject_filter & SMT_TO_MASK(smt)) == 0)
const char *streaming_code2txt(int code);

View file

@ -270,6 +270,7 @@ typedef struct streaming_skip
typedef struct streaming_pad {
struct streaming_target_list sp_targets;
int sp_ntargets;
int sp_reject_filter;
} streaming_pad_t;

View file

@ -281,8 +281,9 @@ void
sbuf_alloc(sbuf_t *sb, int len)
{
if(sb->sb_data == NULL) {
sb->sb_size = 4000;
sb->sb_size = len * 4 > 4000 ? len * 4 : 4000;
sb->sb_data = malloc(sb->sb_size);
return;
}
if(sb->sb_ptr + len >= sb->sb_size) {
@ -291,10 +292,24 @@ sbuf_alloc(sbuf_t *sb, int len)
}
}
static void
sbuf_alloc1(sbuf_t *sb, int len)
{
if(sb->sb_data == NULL) {
sb->sb_size = len * 4 > 4000 ? len * 4 : 4000;
sb->sb_data = malloc(sb->sb_size);
return;
}
sb->sb_size += len * 4;
sb->sb_data = realloc(sb->sb_data, sb->sb_size);
}
void
sbuf_append(sbuf_t *sb, const void *data, int len)
{
sbuf_alloc(sb, len);
if(sb->sb_ptr + len >= sb->sb_size)
sbuf_alloc1(sb, len);
memcpy(sb->sb_data + sb->sb_ptr, data, len);
sb->sb_ptr += len;
}