Make sure we correctly clean up when transports / muxes are destroyed

This commit is contained in:
Andreas Öman 2008-05-17 08:53:44 +00:00
parent a807755894
commit 267459c222
5 changed files with 99 additions and 29 deletions

View file

@ -132,6 +132,8 @@ dvb_fe_manager(void *aux)
time(&tdmi->tdmi_got_adapter);
printf("%s: Tuing changed to %d\n", tda->tda_rootpath, p.frequency);
/* Now that we have tuned, start demuxing of tables */

View file

@ -40,12 +40,23 @@
static void serviceprobe_engage(void);
struct th_transport_queue probequeue;
TAILQ_HEAD(sp_queue, sp);
static struct sp_queue probequeue;
static struct sp *sp_current;
static dtimer_t sp_engage_timer;
typedef struct sp {
TAILQ_ENTRY(sp) sp_link;
th_muxer_t *sp_muxer;
th_subscription_t *sp_s;
th_transport_t *sp_t;
dtimer_t sp_timer;
th_subscription_t *sp_s;
} sp_t;
@ -54,21 +65,15 @@ sp_done_callback(void *aux, int64_t now)
{
sp_t *sp = aux;
th_subscription_t *s = sp->sp_s;
th_transport_t *t = sp->sp_s->ths_transport;
muxer_deinit(sp->sp_muxer, s);
LIST_REMOVE(s, ths_transport_link);
free(s);
TAILQ_REMOVE(&probequeue, t, tht_probe_link);
t->tht_on_probe_queue = 0;
transport_stop(t, 0);
subscription_reschedule();
if(s != NULL)
subscription_unsubscribe(s);
serviceprobe_engage();
sp->sp_t->tht_sp = NULL;
TAILQ_REMOVE(&probequeue, sp, sp_link);
free(sp);
}
@ -79,7 +84,7 @@ static void
sp_packet_input(void *opaque, th_muxstream_t *tms, th_pkt_t *pkt)
{
sp_t *sp = opaque;
th_transport_t *t = sp->sp_s->ths_transport;
th_transport_t *t = sp->sp_t;
channel_t *ch;
syslog(LOG_INFO, "Probed \"%s\" -- Ok\n", t->tht_svcname);
@ -100,7 +105,7 @@ static void
sp_status_callback(struct th_subscription *s, int status, void *opaque)
{
sp_t *sp = opaque;
th_transport_t *t = sp->sp_s->ths_transport;
th_transport_t *t = sp->sp_t;
char *errtxt;
s->ths_status_callback = NULL;
@ -127,31 +132,61 @@ sp_status_callback(struct th_subscription *s, int status, void *opaque)
}
/**
* Called when a subscription gets/loses access to a transport
*/
static void
sp_subscription_callback(struct th_subscription *s,
subscription_event_t event, void *opaque)
{
sp_t *sp = opaque;
switch(event) {
case TRANSPORT_AVAILABLE:
break;
case TRANSPORT_UNAVAILABLE:
muxer_deinit(sp->sp_muxer, s);
break;
}
}
/**
* Setup IPTV (TS over UDP) output
*/
static void
serviceprobe_engage(void)
serviceprobe_start(void *aux, int64_t now)
{
th_transport_t *t;
th_subscription_t *s;
th_muxer_t *tm;
th_transport_t *t;
sp_t *sp;
if((t = TAILQ_FIRST(&probequeue)) == NULL)
assert(sp_current == NULL);
printf("Engage ...: ");
if((sp = TAILQ_FIRST(&probequeue)) == NULL) {
printf("Q empty\n");
return;
}
sp = calloc(1, sizeof(sp_t));
s = sp->sp_s = calloc(1, sizeof(th_subscription_t));
t = sp->sp_t;
sp->sp_s = s = calloc(1, sizeof(th_subscription_t));
s->ths_title = "probe";
s->ths_title = strdup("probe");
s->ths_weight = INT32_MAX;
s->ths_opaque = sp;
s->ths_callback = sp_subscription_callback;
LIST_INSERT_HEAD(&subscriptions, s, ths_global_link);
if(t->tht_runstatus != TRANSPORT_RUNNING)
transport_start(t, INT32_MAX);
printf("Starting %s\n", t->tht_svcname);
s->ths_transport = t;
LIST_INSERT_HEAD(&t->tht_subscriptions, s, ths_transport_link);
@ -161,27 +196,51 @@ serviceprobe_engage(void)
s->ths_status_callback = sp_status_callback;
}
/**
*
*/
static void
serviceprobe_engage(void)
{
dtimer_arm(&sp_engage_timer, serviceprobe_start, NULL, 0);
}
/**
*
*/
void
serviceprobe_add(th_transport_t *t)
{
int was_first = TAILQ_FIRST(&probequeue) == NULL;
sp_t *sp;
if(!transport_is_tv(t))
return;
if(t->tht_on_probe_queue)
if(t->tht_sp != NULL)
return;
TAILQ_INSERT_TAIL(&probequeue, t, tht_probe_link);
t->tht_on_probe_queue = 1;
sp = calloc(1, sizeof(sp_t));
if(was_first)
TAILQ_INSERT_TAIL(&probequeue, sp, sp_link);
t->tht_sp = sp;
sp->sp_t = t;
if(sp_current == NULL)
serviceprobe_engage();
}
/**
*
*/
void
serviceprobe_delete(th_transport_t *t)
{
if(t->tht_sp == NULL)
return;
sp_done_callback(t->tht_sp, 0);
}
void

View file

@ -23,4 +23,6 @@ void serviceprobe_setup(void);
void serviceprobe_add(th_transport_t *t);
void serviceprobe_delete(th_transport_t *t);
#endif /* SERVICE_PROBE_H_ */

View file

@ -53,6 +53,7 @@
#include "channels.h"
#include "cwc.h"
#include "notify.h"
#include "serviceprobe.h"
#define TRANSPORT_HASH_WIDTH 101
@ -465,6 +466,10 @@ transport_destroy(th_transport_t *t)
transport_flush_subscribers(t);
if(t->tht_runstatus != TRANSPORT_IDLE)
transport_stop(t, 0);
free(t->tht_identifier);
free(t->tht_svcname);
free(t->tht_chname);
@ -474,6 +479,9 @@ transport_destroy(th_transport_t *t)
LIST_REMOVE(st, st_link);
free(st);
}
serviceprobe_delete(t);
free(t);
}

View file

@ -516,8 +516,7 @@ typedef struct th_transport {
* Autoprobing support
*/
TAILQ_ENTRY(th_transport) tht_probe_link;
int tht_on_probe_queue;
struct sp *tht_sp;
/**
* Channel mapping