From c6446b1eef249e7e36b1a9af0da5be1d42798629 Mon Sep 17 00:00:00 2001 From: Jaroslav Kysela Date: Wed, 5 Mar 2014 17:31:19 +0100 Subject: [PATCH] PID lookup, streaming pad filter and sbuf alloc optimizations There is high probability (because of the "nature" of TS streams) that next PID will match previous one. This patch tries to do simple "caching" of last PID to speedup PID lookups. The streaming pad filter is handled faster way now. Also, sbuf allocations routines are optimized (better sb_size prediction and allocation routine is called only on demand - not all time). --- src/input/mpegts.h | 15 ++++++++++++++- src/input/mpegts/dvb_psi.c | 6 +++--- src/input/mpegts/mpegts_input.c | 6 +++++- src/input/mpegts/mpegts_mux.c | 8 ++++++++ src/input/mpegts/mpegts_table.c | 8 +++----- src/input/mpegts/tsdemux.c | 16 ++++++++-------- src/service.c | 13 +++++++++++-- src/service.h | 15 +++++++++++++++ src/streaming.c | 26 ++++++++++---------------- src/streaming.h | 4 ++-- src/tvheadend.h | 1 + src/utils.c | 19 +++++++++++++++++-- 12 files changed, 97 insertions(+), 40 deletions(-) diff --git a/src/input/mpegts.h b/src/input/mpegts.h index 751d5433..4a43f141 100644 --- a/src/input/mpegts.h +++ b/src/input/mpegts.h @@ -286,6 +286,8 @@ struct mpegts_mux */ RB_HEAD(, mpegts_pid) mm_pids; + int mm_last_pid; + mpegts_pid_t *mm_last_mp; int mm_num_tables; LIST_HEAD(, mpegts_table) mm_tables; @@ -601,6 +603,15 @@ void mpegts_mux_remove_subscriber(mpegts_mux_t *mm, th_subscription_t *s, int re int mpegts_mux_subscribe(mpegts_mux_t *mm, const char *name, int weight); void mpegts_mux_unsubscribe_by_name(mpegts_mux_t *mm, const char *name); +#define mpegts_mux_find_pid_fast(mm, pid, create) ({ \ + mpegts_pid_t *__mp; \ + if ((mm)->mm_last_pid != (pid)) \ + __mp = mpegts_mux_find_pid(mm, pid, create); \ + else \ + __mp = (mm)->mm_last_mp; \ + __mp; \ +}) + mpegts_pid_t *mpegts_mux_find_pid(mpegts_mux_t *mm, int pid, int create); size_t mpegts_input_recv_packets @@ -629,7 +640,9 @@ void mpegts_input_close_pid void mpegts_table_dispatch (const uint8_t *sec, size_t r, void *mt); -void mpegts_table_release +#define mpegts_table_release(t) \ + do { if(--mt->mt_refcount == 0) mpegts_table_release_(mt); } while (0) +void mpegts_table_release_ (mpegts_table_t *mt); mpegts_table_t *mpegts_table_add (mpegts_mux_t *mm, int tableid, int mask, diff --git a/src/input/mpegts/dvb_psi.c b/src/input/mpegts/dvb_psi.c index 866c264f..4b700edb 100644 --- a/src/input/mpegts/dvb_psi.c +++ b/src/input/mpegts/dvb_psi.c @@ -1176,7 +1176,7 @@ psi_desc_add_ca tvhdebug("pmt", " caid %04X (%s) provider %08X pid %04X", caid, descrambler_caid2name(caid), provid, pid); - if((st = service_stream_find((service_t*)t, pid)) == NULL) { + if((st = service_stream_find_fast((service_t*)t, pid)) == NULL) { st = service_stream_create((service_t*)t, pid, SCT_CA); r |= PMT_UPDATE_NEW_CA_STREAM; } @@ -1282,7 +1282,7 @@ psi_desc_teletext(mpegts_service_t *t, const uint8_t *ptr, int size, // higher than normal MPEG TS (0x2000 ++) int pid = DVB_TELETEXT_BASE + page; - if((st = service_stream_find((service_t*)t, pid)) == NULL) { + if((st = service_stream_find_fast((service_t*)t, pid)) == NULL) { r |= PMT_UPDATE_NEW_STREAM; st = service_stream_create((service_t*)t, pid, SCT_TEXTSUB); st->es_delete_me = 1; @@ -1492,7 +1492,7 @@ psi_parse_pmt if(hts_stream_type != SCT_UNKNOWN) { - if((st = service_stream_find((service_t*)t, pid)) == NULL) { + if((st = service_stream_find_fast((service_t*)t, pid)) == NULL) { update |= PMT_UPDATE_NEW_STREAM; st = service_stream_create((service_t*)t, pid, hts_stream_type); } diff --git a/src/input/mpegts/mpegts_input.c b/src/input/mpegts/mpegts_input.c index 4e85ede6..0758406c 100644 --- a/src/input/mpegts/mpegts_input.c +++ b/src/input/mpegts/mpegts_input.c @@ -195,6 +195,10 @@ mpegts_input_close_pid skel.mps_type = type; skel.mps_owner = owner; mps = RB_FIND(&mp->mp_subs, &skel, mps_link, mps_cmp); + if (pid == mm->mm_last_pid) { + mm->mm_last_pid = -1; + mm->mm_last_mp = NULL; + } if (mps) { RB_REMOVE(&mp->mp_subs, mps, mps_link); free(mps); @@ -369,7 +373,7 @@ mpegts_input_recv_packets name, pid, pid, mmi); /* Find PID */ - if ((mp = mpegts_mux_find_pid(mm, pid, 0))) { + if ((mp = mpegts_mux_find_pid_fast(mm, pid, 0))) { int stream = 0; int table = 0; diff --git a/src/input/mpegts/mpegts_mux.c b/src/input/mpegts/mpegts_mux.c index 6c44daf9..a5f5be6a 100644 --- a/src/input/mpegts/mpegts_mux.c +++ b/src/input/mpegts/mpegts_mux.c @@ -535,6 +535,8 @@ mpegts_mux_stop ( mpegts_mux_t *mm, int force ) mpegts_input_flush_mux(mi, mm); /* Ensure PIDs are cleared */ + mm->mm_last_pid = -1; + mm->mm_last_mp = NULL; while ((mp = RB_FIRST(&mm->mm_pids))) { while ((mps = RB_FIRST(&mp->mp_subs))) { RB_REMOVE(&mp->mp_subs, mps, mps_link); @@ -773,6 +775,8 @@ mpegts_mux_create0 mm->mm_close_table = mpegts_mux_close_table; TAILQ_INIT(&mm->mm_table_queue); + mm->mm_last_pid = -1; + /* Configuration */ if (conf) idnode_load(&mm->mm_id, conf); @@ -917,6 +921,10 @@ mpegts_mux_find_pid ( mpegts_mux_t *mm, int pid, int create ) mp->mp_fd = -1; } } + if (mp) { + mm->mm_last_pid = pid; + mm->mm_last_mp = mp; + } return mp; } diff --git a/src/input/mpegts/mpegts_table.c b/src/input/mpegts/mpegts_table.c index d01e0139..55e5d21b 100644 --- a/src/input/mpegts/mpegts_table.c +++ b/src/input/mpegts/mpegts_table.c @@ -92,12 +92,10 @@ mpegts_table_dispatch } void -mpegts_table_release ( mpegts_table_t *mt ) +mpegts_table_release_ ( mpegts_table_t *mt ) { - if(--mt->mt_refcount == 0) { - free(mt->mt_name); - free(mt); - } + free(mt->mt_name); + free(mt); } void diff --git a/src/input/mpegts/tsdemux.c b/src/input/mpegts/tsdemux.c index 7cb078d1..7c13a5a1 100644 --- a/src/input/mpegts/tsdemux.c +++ b/src/input/mpegts/tsdemux.c @@ -68,7 +68,7 @@ ts_recv_packet0 { int off, pusi, cc, error; - service_set_streaming_status_flags((service_t*)t, TSS_MUX_PACKETS); + service_set_streaming_status_flags_fast((service_t*)t, TSS_MUX_PACKETS); if(streaming_pad_probe_type(&t->s_streaming_pad, SMT_MPEGTS)) ts_remux(t, tsb); @@ -206,7 +206,7 @@ ts_recv_packet1 pthread_mutex_lock(&t->s_stream_mutex); - service_set_streaming_status_flags((service_t*)t, TSS_INPUT_HARDWARE); + service_set_streaming_status_flags_fast((service_t*)t, TSS_INPUT_HARDWARE); if(error) { /* Transport Error Indicator */ @@ -216,7 +216,7 @@ ts_recv_packet1 pid = (tsb[1] & 0x1f) << 8 | tsb[2]; - st = service_stream_find((service_t*)t, pid); + st = service_stream_find_fast((service_t*)t, pid); /* Extract PCR */ if (pcr != PTS_UNSET) @@ -228,7 +228,7 @@ ts_recv_packet1 } if(!error) - service_set_streaming_status_flags((service_t*)t, TSS_INPUT_SERVICE); + service_set_streaming_status_flags_fast((service_t*)t, TSS_INPUT_SERVICE); avgstat_add(&t->s_rate, 188, dispatch_clock); @@ -259,9 +259,9 @@ ts_recv_packet1 if(!error && t->s_scrambled != 0) { if(n == 0) { - service_set_streaming_status_flags((service_t*)t, TSS_NO_DESCRAMBLER); + service_set_streaming_status_flags_fast((service_t*)t, TSS_NO_DESCRAMBLER); } else if(m == n) { - service_set_streaming_status_flags((service_t*)t, TSS_NO_ACCESS); + service_set_streaming_status_flags_fast((service_t*)t, TSS_NO_ACCESS); } } @@ -282,7 +282,7 @@ ts_recv_packet2(mpegts_service_t *t, const uint8_t *tsb) elementary_stream_t *st; int pid = (tsb[1] & 0x1f) << 8 | tsb[2]; - if((st = service_stream_find((service_t*)t, pid)) != NULL) + if((st = service_stream_find_fast((service_t*)t, pid)) != NULL) ts_recv_packet0(t, st, tsb); } @@ -310,7 +310,7 @@ ts_remux(mpegts_service_t *t, const uint8_t *src) pktbuf_ref_dec(pb); - service_set_streaming_status_flags((service_t*)t, TSS_PACKETS); + service_set_streaming_status_flags_fast((service_t*)t, TSS_PACKETS); sbuf_reset(sb); } diff --git a/src/service.c b/src/service.c index 592e7b61..21ce9c00 100644 --- a/src/service.c +++ b/src/service.c @@ -247,6 +247,11 @@ service_stream_destroy(service_t *t, elementary_stream_t *es) avgstat_flush(&es->es_rate); avgstat_flush(&es->es_cc_errors); + if (t->s_last_es == es) { + t->s_last_pid = -1; + t->s_last_es = NULL; + } + TAILQ_REMOVE(&t->s_components, es, es_link); while ((c = LIST_FIRST(&es->es_caids)) != NULL) { @@ -552,6 +557,7 @@ service_create0 t->s_channel_name = service_channel_name; t->s_provider_name = service_provider_name; TAILQ_INIT(&t->s_components); + t->s_last_pid = -1; streaming_pad_init(&t->s_streaming_pad); @@ -670,7 +676,7 @@ service_stream_create(service_t *t, int pid, /** - * Add a new stream to a service + * Find an elementary stream in a service */ elementary_stream_t * service_stream_find(service_t *t, int pid) @@ -680,8 +686,11 @@ service_stream_find(service_t *t, int pid) lock_assert(&t->s_stream_mutex); TAILQ_FOREACH(st, &t->s_components, es_link) { - if(st->es_pid == pid) + if(st->es_pid == pid) { + t->s_last_es = st; + t->s_last_pid = pid; return st; + } } return NULL; } diff --git a/src/service.h b/src/service.h index 6fa15037..2de8a7a8 100644 --- a/src/service.h +++ b/src/service.h @@ -406,6 +406,8 @@ typedef struct service { * List of all components. */ struct elementary_stream_queue s_components; + int s_last_pid; + elementary_stream_t *s_last_es; /** @@ -448,6 +450,15 @@ service_instance_t *service_find_instance(struct service *s, int *error, int weight); +#define service_stream_find_fast(t, pid) ({ \ + elementary_stream_t *__es; \ + if ((t)->s_last_pid != (pid)) \ + __es = service_stream_find(t, pid); \ + else \ + __es = (t)->s_last_es; \ + __es; \ +}) + elementary_stream_t *service_stream_find(service_t *t, int pid); elementary_stream_t *service_stream_create(service_t *t, int pid, @@ -474,6 +485,10 @@ void service_remove_subscriber(service_t *t, struct th_subscription *s, void service_set_streaming_status_flags(service_t *t, int flag); +#define service_set_streaming_status_flags_fast(t, flag) \ + do { if (((t)->s_streaming_status & flag) != flag) \ + service_set_streaming_status_flags(t, flag); } while (0) + struct streaming_start; struct streaming_start *service_build_stream_start(service_t *t); diff --git a/src/streaming.c b/src/streaming.c index a8542dae..4bfd5073 100644 --- a/src/streaming.c +++ b/src/streaming.c @@ -30,6 +30,8 @@ void streaming_pad_init(streaming_pad_t *sp) { LIST_INIT(&sp->sp_targets); + sp->sp_ntargets = 0; + sp->sp_reject_filter = 0; } /** @@ -115,6 +117,7 @@ streaming_target_connect(streaming_pad_t *sp, streaming_target_t *st) sp->sp_ntargets++; st->st_pad = sp; LIST_INSERT_HEAD(&sp->sp_targets, st, st_link); + sp->sp_reject_filter |= st->st_reject_filter; } @@ -124,10 +127,17 @@ streaming_target_connect(streaming_pad_t *sp, streaming_target_t *st) void streaming_target_disconnect(streaming_pad_t *sp, streaming_target_t *st) { + int filter; + sp->sp_ntargets--; st->st_pad = NULL; LIST_REMOVE(st, st_link); + + filter = 0; + LIST_FOREACH(st, &sp->sp_targets, st_link) + filter |= st->st_reject_filter; + sp->sp_reject_filter = filter; } @@ -338,22 +348,6 @@ streaming_pad_deliver(streaming_pad_t *sp, streaming_message_t *sm) } -/** - * - */ -int -streaming_pad_probe_type(streaming_pad_t *sp, streaming_message_type_t smt) -{ - streaming_target_t *st; - - LIST_FOREACH(st, &sp->sp_targets, st_link) { - if(!(st->st_reject_filter & SMT_TO_MASK(smt))) - return 1; - } - return 0; -} - - /** * */ diff --git a/src/streaming.h b/src/streaming.h index 6c047161..45924513 100644 --- a/src/streaming.h +++ b/src/streaming.h @@ -111,8 +111,8 @@ void streaming_start_unref(streaming_start_t *ss); streaming_start_t *streaming_start_copy(const streaming_start_t *src); -int streaming_pad_probe_type(streaming_pad_t *sp, - streaming_message_type_t smt); +#define streaming_pad_probe_type(sp, smt) \ + (((sp)->sp_reject_filter & SMT_TO_MASK(smt)) == 0) const char *streaming_code2txt(int code); diff --git a/src/tvheadend.h b/src/tvheadend.h index d4af5fc9..34b5cf13 100644 --- a/src/tvheadend.h +++ b/src/tvheadend.h @@ -270,6 +270,7 @@ typedef struct streaming_skip typedef struct streaming_pad { struct streaming_target_list sp_targets; int sp_ntargets; + int sp_reject_filter; } streaming_pad_t; diff --git a/src/utils.c b/src/utils.c index eaab345d..57bcbe2b 100644 --- a/src/utils.c +++ b/src/utils.c @@ -281,8 +281,9 @@ void sbuf_alloc(sbuf_t *sb, int len) { if(sb->sb_data == NULL) { - sb->sb_size = 4000; + sb->sb_size = len * 4 > 4000 ? len * 4 : 4000; sb->sb_data = malloc(sb->sb_size); + return; } if(sb->sb_ptr + len >= sb->sb_size) { @@ -291,10 +292,24 @@ sbuf_alloc(sbuf_t *sb, int len) } } +static void +sbuf_alloc1(sbuf_t *sb, int len) +{ + if(sb->sb_data == NULL) { + sb->sb_size = len * 4 > 4000 ? len * 4 : 4000; + sb->sb_data = malloc(sb->sb_size); + return; + } + + sb->sb_size += len * 4; + sb->sb_data = realloc(sb->sb_data, sb->sb_size); +} + void sbuf_append(sbuf_t *sb, const void *data, int len) { - sbuf_alloc(sb, len); + if(sb->sb_ptr + len >= sb->sb_size) + sbuf_alloc1(sb, len); memcpy(sb->sb_data + sb->sb_ptr, data, len); sb->sb_ptr += len; }