From 267459c22287023b31388a826041c760ca17d388 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96man?= <andreas@lonelycoder.com> Date: Sat, 17 May 2008 08:53:44 +0000 Subject: [PATCH] Make sure we correctly clean up when transports / muxes are destroyed --- dvb_fe.c | 2 + serviceprobe.c | 113 +++++++++++++++++++++++++++++++++++++------------ serviceprobe.h | 2 + transports.c | 8 ++++ tvhead.h | 3 +- 5 files changed, 99 insertions(+), 29 deletions(-) diff --git a/dvb_fe.c b/dvb_fe.c index 3b6be75e..6db6231f 100644 --- a/dvb_fe.c +++ b/dvb_fe.c @@ -132,6 +132,8 @@ dvb_fe_manager(void *aux) time(&tdmi->tdmi_got_adapter); + printf("%s: Tuing changed to %d\n", tda->tda_rootpath, p.frequency); + /* Now that we have tuned, start demuxing of tables */ diff --git a/serviceprobe.c b/serviceprobe.c index f1e3ba5f..7ab06535 100644 --- a/serviceprobe.c +++ b/serviceprobe.c @@ -40,12 +40,23 @@ static void serviceprobe_engage(void); -struct th_transport_queue probequeue; +TAILQ_HEAD(sp_queue, sp); + +static struct sp_queue probequeue; + +static struct sp *sp_current; + +static dtimer_t sp_engage_timer; typedef struct sp { + TAILQ_ENTRY(sp) sp_link; + th_muxer_t *sp_muxer; - th_subscription_t *sp_s; + th_transport_t *sp_t; dtimer_t sp_timer; + + th_subscription_t *sp_s; + } sp_t; @@ -54,21 +65,15 @@ sp_done_callback(void *aux, int64_t now) { sp_t *sp = aux; th_subscription_t *s = sp->sp_s; - th_transport_t *t = sp->sp_s->ths_transport; - muxer_deinit(sp->sp_muxer, s); - - LIST_REMOVE(s, ths_transport_link); - free(s); - - TAILQ_REMOVE(&probequeue, t, tht_probe_link); - t->tht_on_probe_queue = 0; - - transport_stop(t, 0); - - subscription_reschedule(); + if(s != NULL) + subscription_unsubscribe(s); serviceprobe_engage(); + + sp->sp_t->tht_sp = NULL; + + TAILQ_REMOVE(&probequeue, sp, sp_link); free(sp); } @@ -79,7 +84,7 @@ static void sp_packet_input(void *opaque, th_muxstream_t *tms, th_pkt_t *pkt) { sp_t *sp = opaque; - th_transport_t *t = sp->sp_s->ths_transport; + th_transport_t *t = sp->sp_t; channel_t *ch; syslog(LOG_INFO, "Probed \"%s\" -- Ok\n", t->tht_svcname); @@ -100,7 +105,7 @@ static void sp_status_callback(struct th_subscription *s, int status, void *opaque) { sp_t *sp = opaque; - th_transport_t *t = sp->sp_s->ths_transport; + th_transport_t *t = sp->sp_t; char *errtxt; s->ths_status_callback = NULL; @@ -127,31 +132,61 @@ sp_status_callback(struct th_subscription *s, int status, void *opaque) } + +/** + * Called when a subscription gets/loses access to a transport + */ +static void +sp_subscription_callback(struct th_subscription *s, + subscription_event_t event, void *opaque) +{ + sp_t *sp = opaque; + + switch(event) { + case TRANSPORT_AVAILABLE: + break; + + case TRANSPORT_UNAVAILABLE: + muxer_deinit(sp->sp_muxer, s); + break; + } +} + /** * Setup IPTV (TS over UDP) output */ static void -serviceprobe_engage(void) +serviceprobe_start(void *aux, int64_t now) { - th_transport_t *t; th_subscription_t *s; th_muxer_t *tm; + th_transport_t *t; sp_t *sp; - if((t = TAILQ_FIRST(&probequeue)) == NULL) + assert(sp_current == NULL); + + printf("Engage ...: "); + + if((sp = TAILQ_FIRST(&probequeue)) == NULL) { + printf("Q empty\n"); return; + } - sp = calloc(1, sizeof(sp_t)); + s = sp->sp_s = calloc(1, sizeof(th_subscription_t)); + t = sp->sp_t; - sp->sp_s = s = calloc(1, sizeof(th_subscription_t)); - s->ths_title = "probe"; + s->ths_title = strdup("probe"); s->ths_weight = INT32_MAX; s->ths_opaque = sp; + s->ths_callback = sp_subscription_callback; + LIST_INSERT_HEAD(&subscriptions, s, ths_global_link); if(t->tht_runstatus != TRANSPORT_RUNNING) transport_start(t, INT32_MAX); + printf("Starting %s\n", t->tht_svcname); + s->ths_transport = t; LIST_INSERT_HEAD(&t->tht_subscriptions, s, ths_transport_link); @@ -161,27 +196,51 @@ serviceprobe_engage(void) s->ths_status_callback = sp_status_callback; } +/** + * + */ +static void +serviceprobe_engage(void) +{ + dtimer_arm(&sp_engage_timer, serviceprobe_start, NULL, 0); +} + + /** * */ void serviceprobe_add(th_transport_t *t) { - int was_first = TAILQ_FIRST(&probequeue) == NULL; + sp_t *sp; if(!transport_is_tv(t)) return; - if(t->tht_on_probe_queue) + if(t->tht_sp != NULL) return; - TAILQ_INSERT_TAIL(&probequeue, t, tht_probe_link); - t->tht_on_probe_queue = 1; + sp = calloc(1, sizeof(sp_t)); - if(was_first) + TAILQ_INSERT_TAIL(&probequeue, sp, sp_link); + t->tht_sp = sp; + sp->sp_t = t; + + if(sp_current == NULL) serviceprobe_engage(); } +/** + * + */ +void +serviceprobe_delete(th_transport_t *t) +{ + if(t->tht_sp == NULL) + return; + + sp_done_callback(t->tht_sp, 0); +} void diff --git a/serviceprobe.h b/serviceprobe.h index e28a31df..54774ead 100644 --- a/serviceprobe.h +++ b/serviceprobe.h @@ -23,4 +23,6 @@ void serviceprobe_setup(void); void serviceprobe_add(th_transport_t *t); +void serviceprobe_delete(th_transport_t *t); + #endif /* SERVICE_PROBE_H_ */ diff --git a/transports.c b/transports.c index 19da2cd3..f2dc3e62 100644 --- a/transports.c +++ b/transports.c @@ -53,6 +53,7 @@ #include "channels.h" #include "cwc.h" #include "notify.h" +#include "serviceprobe.h" #define TRANSPORT_HASH_WIDTH 101 @@ -465,6 +466,10 @@ transport_destroy(th_transport_t *t) transport_flush_subscribers(t); + if(t->tht_runstatus != TRANSPORT_IDLE) + transport_stop(t, 0); + + free(t->tht_identifier); free(t->tht_svcname); free(t->tht_chname); @@ -474,6 +479,9 @@ transport_destroy(th_transport_t *t) LIST_REMOVE(st, st_link); free(st); } + + serviceprobe_delete(t); + free(t); } diff --git a/tvhead.h b/tvhead.h index e84d4898..f41639c9 100644 --- a/tvhead.h +++ b/tvhead.h @@ -516,8 +516,7 @@ typedef struct th_transport { * Autoprobing support */ - TAILQ_ENTRY(th_transport) tht_probe_link; - int tht_on_probe_queue; + struct sp *tht_sp; /** * Channel mapping