epggrab: ota - change the subscription scheme

- now all muxes are handled continuously in one queue
- the queue can be kicked using the start time
This commit is contained in:
Jaroslav Kysela 2014-06-30 15:48:41 +02:00
parent 80209a4aff
commit 24adf59330
2 changed files with 126 additions and 119 deletions

View file

@ -198,10 +198,9 @@ struct epggrab_ota_mux
LIST_HEAD(,epggrab_ota_map) om_modules; ///< List of linked mods
int om_complete; ///< Has completed a scan
int om_active;
time_t om_when; ///< Next event time
gtimer_t om_timer; ///< Per mux active timer
LIST_ENTRY(epggrab_ota_mux) om_q_link;
TAILQ_ENTRY(epggrab_ota_mux) om_q_link;
RB_ENTRY(epggrab_ota_mux) om_global_link;
};

View file

@ -30,31 +30,36 @@
#include <sys/stat.h>
#include <unistd.h>
RB_HEAD(,epggrab_ota_mux) epggrab_ota_all;
LIST_HEAD(,epggrab_ota_mux) epggrab_ota_pending;
LIST_HEAD(,epggrab_ota_mux) epggrab_ota_active;
#define EPGGRAB_OTA_MIN_INTERVAL 300
#define EPGGRAB_OTA_MIN_TIMEOUT 30
gtimer_t epggrab_ota_pending_timer;
gtimer_t epggrab_ota_active_timer;
#define EPGGRAB_OTA_DONE_COMPLETE 0
#define EPGGRAB_OTA_DONE_TIMEOUT 1
#define EPGGRAB_OTA_DONE_STOLEN 2
typedef TAILQ_HEAD(epggrab_ota_head,epggrab_ota_mux) epggrab_ota_head_t;
RB_HEAD(,epggrab_ota_mux) epggrab_ota_all;
epggrab_ota_head_t epggrab_ota_pending;
epggrab_ota_head_t epggrab_ota_active;
gtimer_t epggrab_ota_kick_timer;
gtimer_t epggrab_ota_pending_timer;
SKEL_DECLARE(epggrab_ota_mux_skel, epggrab_ota_mux_t);
SKEL_DECLARE(epggrab_svc_link_skel, epggrab_ota_svc_link_t);
static void epggrab_ota_active_timer_cb ( void *p );
static void epggrab_ota_pending_timer_cb ( void *p );
static void epggrab_ota_timeout_cb ( void *p );
static void epggrab_ota_kick_cb ( void *p );
static void epggrab_ota_save ( epggrab_ota_mux_t *ota );
static void epggrab_ota_free ( epggrab_ota_head_t *head, epggrab_ota_mux_t *ota );
/* **************************************************************************
* Utilities
* *************************************************************************/
static int
om_time_cmp ( epggrab_ota_mux_t *a, epggrab_ota_mux_t *b )
{
return (a->om_when - b->om_when);
}
static int
om_id_cmp ( epggrab_ota_mux_t *a, epggrab_ota_mux_t *b )
{
@ -67,22 +72,6 @@ om_svcl_cmp ( epggrab_ota_svc_link_t *a, epggrab_ota_svc_link_t *b )
return strcmp(a->uuid, b->uuid);
}
#define EPGGRAB_OTA_MIN_PERIOD 300
#define EPGGRAB_OTA_MIN_TIMEOUT 30
static int
epggrab_ota_period ( int divider )
{
int period = 3600;
period /= divider;
if (period < EPGGRAB_OTA_MIN_PERIOD)
period = EPGGRAB_OTA_MIN_PERIOD;
return period;
}
static int
epggrab_ota_timeout ( void )
{
@ -95,37 +84,44 @@ epggrab_ota_timeout ( void )
}
static void
epggrab_ota_done ( epggrab_ota_mux_t *ota, int timeout )
epggrab_ota_kick ( int delay )
{
gtimer_arm(&epggrab_ota_kick_timer, epggrab_ota_kick_cb, NULL, delay);
}
static void
epggrab_ota_done ( epggrab_ota_mux_t *om, int reason )
{
mpegts_mux_t *mm;
LIST_REMOVE(ota, om_q_link);
ota->om_active = 0;
ota->om_when = dispatch_clock + epggrab_ota_period(1);
LIST_INSERT_SORTED(&epggrab_ota_pending, ota, om_q_link, om_time_cmp);
gtimer_disarm(&om->om_timer);
TAILQ_REMOVE(&epggrab_ota_active, om, om_q_link);
if (reason == EPGGRAB_OTA_DONE_STOLEN)
TAILQ_INSERT_HEAD(&epggrab_ota_pending, om, om_q_link);
else if (reason == EPGGRAB_OTA_DONE_TIMEOUT) {
char name[256];
mpegts_mux_t *mm = mpegts_mux_find(om->om_mux_uuid);
mm->mm_display_name(mm, name, sizeof(name));
tvhlog(LOG_WARNING, "epggrab", "data completion timeout for %s", name);
}
/* Remove subscriber */
if ((mm = mpegts_mux_find(ota->om_mux_uuid)))
if ((mm = mpegts_mux_find(om->om_mux_uuid)))
mpegts_mux_unsubscribe_by_name(mm, "epggrab");
/* Re-arm */
if (LIST_FIRST(&epggrab_ota_pending) == ota)
epggrab_ota_pending_timer_cb(NULL);
/* Remove from active */
if (!timeout)
epggrab_ota_active_timer_cb(NULL);
/* Kick - try start waiting muxes */
epggrab_ota_kick(1);
}
static void
epggrab_ota_start ( epggrab_ota_mux_t *om, int grace )
{
epggrab_ota_map_t *map;
om->om_when = dispatch_clock + epggrab_ota_timeout() + grace;
om->om_active = 1;
LIST_INSERT_SORTED(&epggrab_ota_active, om, om_q_link, om_time_cmp);
if (LIST_FIRST(&epggrab_ota_active) == om)
epggrab_ota_active_timer_cb(NULL);
TAILQ_INSERT_TAIL(&epggrab_ota_active, om, om_q_link);
gtimer_arm(&om->om_timer, epggrab_ota_timeout_cb, om,
epggrab_ota_timeout() + grace);
LIST_FOREACH(map, &om->om_modules, om_link) {
map->om_first = 1;
map->om_complete = 0;
@ -146,7 +142,7 @@ epggrab_mux_start ( mpegts_mux_t *mm, void *p )
const char *uuid = idnode_uuid_as_str(&mm->mm_id);
/* Already started */
LIST_FOREACH(ota, &epggrab_ota_active, om_q_link)
TAILQ_FOREACH(ota, &epggrab_ota_active, om_q_link)
if (!strcmp(ota->om_mux_uuid, uuid))
return;
@ -171,8 +167,13 @@ static void
epggrab_mux_stop ( mpegts_mux_t *mm, void *p )
{
epggrab_ota_mux_t *ota;
while ((ota = LIST_FIRST(&epggrab_ota_active)))
epggrab_ota_done(ota, 0);
const char *uuid = idnode_uuid_as_str(&mm->mm_id);
TAILQ_FOREACH(ota, &epggrab_ota_active, om_q_link)
if (!strcmp(ota->om_mux_uuid, uuid)) {
epggrab_ota_done(ota, EPGGRAB_OTA_DONE_STOLEN);
break;
}
}
/* **************************************************************************
@ -200,11 +201,9 @@ epggrab_ota_register
ota = epggrab_ota_mux_skel;
SKEL_USED(epggrab_ota_mux_skel);
ota->om_mux_uuid = strdup(uuid);
ota->om_when = dispatch_clock + epggrab_ota_timeout();
ota->om_active = 1;
LIST_INSERT_SORTED(&epggrab_ota_active, ota, om_q_link, om_time_cmp);
if (LIST_FIRST(&epggrab_ota_active) == ota)
epggrab_ota_active_timer_cb(NULL);
TAILQ_INSERT_TAIL(&epggrab_ota_pending, ota, om_q_link);
if (TAILQ_FIRST(&epggrab_ota_pending) == ota)
epggrab_ota_kick(1);
save = 1;
}
}
@ -232,6 +231,7 @@ epggrab_ota_complete
( epggrab_module_ota_t *mod, epggrab_ota_mux_t *ota )
{
int done = 1;
epggrab_ota_mux_t *ota2;
epggrab_ota_map_t *map;
lock_assert(&global_lock);
tvhdebug(mod->id, "grab complete");
@ -253,68 +253,81 @@ epggrab_ota_complete
if (!done) return;
/* Done */
epggrab_ota_done(ota, 0);
TAILQ_FOREACH(ota2, &epggrab_ota_active, om_q_link)
if (ota == ota2) {
epggrab_ota_done(ota, EPGGRAB_OTA_DONE_COMPLETE);
break;
}
}
/* **************************************************************************
* Timer callback
* Timer callbacks
* *************************************************************************/
static void
epggrab_ota_active_timer_cb ( void *p )
epggrab_ota_timeout_cb ( void *p )
{
epggrab_ota_mux_t *om = LIST_FIRST(&epggrab_ota_active);
gtimer_disarm(&epggrab_ota_active_timer);
epggrab_ota_mux_t *om = p;
lock_assert(&global_lock);
if (!om)
return;
/* Double check */
if (om->om_when > dispatch_clock)
goto done;
/* Re-queue */
epggrab_ota_done(om, 1);
done:
om = LIST_FIRST(&epggrab_ota_active);
if (om)
gtimer_arm_abs(&epggrab_ota_active_timer, epggrab_ota_active_timer_cb,
NULL, om->om_when);
epggrab_ota_done(om, EPGGRAB_OTA_DONE_TIMEOUT);
}
static void
epggrab_ota_pending_timer_cb ( void *p )
epggrab_ota_kick_cb ( void *p )
{
extern const idclass_t mpegts_mux_class;
epggrab_ota_map_t *map;
epggrab_ota_mux_t *om = LIST_FIRST(&epggrab_ota_pending);
epggrab_ota_mux_t *om = TAILQ_FIRST(&epggrab_ota_pending);
epggrab_ota_mux_t *first = NULL;
mpegts_mux_t *mm;
int extra = 0;
gtimer_disarm(&epggrab_ota_pending_timer);
struct {
mpegts_network_t *net;
int failed;
} networks[64], *net; /* more than 64 networks? - you're a king */
int i, r, networks_count = 0;
lock_assert(&global_lock);
if (!om)
return;
/* Double check */
if (om->om_when > dispatch_clock)
goto done;
next_one:
LIST_REMOVE(om, om_q_link);
/* Find the mux */
extern const idclass_t mpegts_mux_class;
mm = mpegts_mux_find(om->om_mux_uuid);
if (!mm) {
RB_REMOVE(&epggrab_ota_all, om, om_global_link);
while ((map = LIST_FIRST(&om->om_modules))) {
LIST_REMOVE(map, om_link);
free(map);
epggrab_ota_free(&epggrab_ota_pending, om);
goto done;
}
TAILQ_REMOVE(&epggrab_ota_pending, om, om_q_link);
/* Check if this network failed before */
for (i = 0, net = NULL; i < networks_count; i++) {
net = &networks[i];
if (net->net == mm->mm_network) {
if (net->failed)
goto done;
break;
}
free(om->om_mux_uuid);
free(om);
}
if (i >= networks_count) {
net = &networks[networks_count++];
net->net = mm->mm_network;
net->failed = 0;
}
if (mm->mm_is_epg(mm) <= 0) {
#if TRACE_ENABLE
char name[256];
mm->mm_display_name(mm, name, sizeof(name));
tvhtrace("epggrab", "epg mux %s is disabled, skipping", name);
#endif
goto done;
}
@ -326,33 +339,26 @@ next_one:
if (!map) {
char name[256];
mm->mm_display_name(mm, name, sizeof(name));
tvhdebug("epggrab", "no modules attached to %s, check again later", name);
om->om_when = dispatch_clock + epggrab_ota_period(4);
LIST_INSERT_SORTED(&epggrab_ota_pending, om, om_q_link, om_time_cmp);
tvhdebug("epggrab", "no modules attached to %s, check again next time", name);
goto done;
}
/* Subscribe to the mux */
if (mm->mm_is_epg(mm) <= 0 ||
mpegts_mux_subscribe(mm, "epggrab", SUBSCRIPTION_PRIO_EPG)) {
om->om_active = 0;
om->om_when = dispatch_clock + epggrab_ota_period(4) + extra;
LIST_INSERT_SORTED(&epggrab_ota_pending, om, om_q_link, om_time_cmp);
if ((r = mpegts_mux_subscribe(mm, "epggrab", SUBSCRIPTION_PRIO_EPG))) {
TAILQ_INSERT_TAIL(&epggrab_ota_pending, om, om_q_link);
if (r == SM_CODE_NO_FREE_ADAPTER)
net->failed = 1;
if (first == NULL)
first = om;
} else {
mpegts_mux_instance_t *mmi = mm->mm_active;
epggrab_ota_start(om, mpegts_input_grace(mmi->mmi_input, mm));
}
done:
om = LIST_FIRST(&epggrab_ota_pending);
if (om) {
if (om->om_when <= dispatch_clock) {
extra += 60; /* differentiate the mux busy requests */
goto next_one;
}
gtimer_arm_abs(&epggrab_ota_pending_timer, epggrab_ota_pending_timer_cb,
NULL, om->om_when);
}
om = TAILQ_FIRST(&epggrab_ota_pending);
if (networks_count < ARRAY_SIZE(networks) && om && first != om)
goto next_one;
}
void
@ -441,7 +447,7 @@ epggrab_ota_load_one
free(ota);
return;
}
LIST_INSERT_SORTED(&epggrab_ota_pending, ota, om_q_link, om_time_cmp);
TAILQ_INSERT_TAIL(&epggrab_ota_pending, ota, om_q_link);
if (!(l = htsmsg_get_list(c, "modules"))) return;
HTSMSG_FOREACH(f, l) {
@ -476,6 +482,9 @@ epggrab_ota_init ( void )
};
mpegts_add_listener(&ml);
TAILQ_INIT(&epggrab_ota_pending);
TAILQ_INIT(&epggrab_ota_active);
/* Delete old config */
hts_settings_buildpath(path, sizeof(path), "epggrab/otamux");
if (!lstat(path, &st))
@ -491,19 +500,18 @@ epggrab_ota_init ( void )
htsmsg_destroy(c);
}
/* Init timer (immediate call after full init) */
if (LIST_FIRST(&epggrab_ota_pending))
gtimer_arm_abs(&epggrab_ota_pending_timer, epggrab_ota_pending_timer_cb,
NULL, 0);
/* Init timer (call after full init - wait for network tuners) */
if (TAILQ_FIRST(&epggrab_ota_pending))
epggrab_ota_kick(15);
}
static void
epggrab_ota_free ( epggrab_ota_mux_t *ota )
epggrab_ota_free ( epggrab_ota_head_t *head, epggrab_ota_mux_t *ota )
{
epggrab_ota_map_t *map;
epggrab_ota_svc_link_t *svcl;
LIST_REMOVE(ota, om_q_link);
TAILQ_REMOVE(head, ota, om_q_link);
while ((map = LIST_FIRST(&ota->om_modules)) != NULL) {
LIST_REMOVE(map, om_link);
while ((svcl = RB_FIRST(&map->om_svcs)) != NULL)
@ -520,10 +528,10 @@ epggrab_ota_shutdown ( void )
epggrab_ota_mux_t *ota;
pthread_mutex_lock(&global_lock);
while ((ota = LIST_FIRST(&epggrab_ota_active)) != NULL)
epggrab_ota_free(ota);
while ((ota = LIST_FIRST(&epggrab_ota_pending)) != NULL)
epggrab_ota_free(ota);
while ((ota = TAILQ_FIRST(&epggrab_ota_active)) != NULL)
epggrab_ota_free(&epggrab_ota_active, ota);
while ((ota = TAILQ_FIRST(&epggrab_ota_pending)) != NULL)
epggrab_ota_free(&epggrab_ota_pending, ota);
pthread_mutex_unlock(&global_lock);
SKEL_FREE(epggrab_ota_mux_skel);
SKEL_FREE(epggrab_svc_link_skel);