Do more processing of XMLTV in main thread

This commit is contained in:
Andreas Öman 2008-04-27 13:31:41 +00:00
parent f322873d90
commit dc66963046
7 changed files with 200 additions and 169 deletions

View file

@ -50,7 +50,7 @@ ajax_config_xmltv_tab(http_connection_t *hc, http_reply_t *hr)
tcp_qprintf(tq, "<div style=\"overflow: auto; width: 100%\">");
switch(xmltv_status) {
switch(xmltv_globalstatus) {
default:
tcp_qprintf(tq, "<p style=\"text-align: center; font-weight: bold\">"
"XMLTV subsystem is not yet fully initialized, please retry "
@ -197,29 +197,18 @@ ajax_xmltvgrabber(http_connection_t *hc, http_reply_t *hr,
tcp_qprintf(tq,"<div id=\"details_%s\">", xg->xg_identifier);
switch(xg->xg_status) {
case XMLTV_GRABBER_DISABLED:
if(xg->xg_enabled == 0) {
tcp_qprintf(tq,
"<p>This grabber is currently not enabled, click "
"<a href=\"javascript:void(0);\" "
"onClick=\"new Ajax.Request('/ajax/xmltvgrabbermode/%s', "
"{parameters: {'mode': 'enable'}})\">here</a> "
"to enable it</p>");
break;
case XMLTV_GRABBER_ENQUEUED:
case XMLTV_GRABBER_GRABBING:
case XMLTV_GRABBER_UNCONFIGURED:
case XMLTV_GRABBER_DYSFUNCTIONAL:
tcp_qprintf(tq, "<p>%s</p>", xmltv_grabber_status_long(xg, xg->xg_status));
break;
case XMLTV_GRABBER_IDLE:
} else if(xg->xg_status == XMLTV_GRAB_OK) {
xmltv_grabber_chlist(tq, xg);
break;
} else {
tcp_qprintf(tq, "<p>%s</p>", xmltv_grabber_status_long(xg));
}
tcp_qprintf(tq,"</div>");

View file

@ -462,7 +462,7 @@ ajax_mailbox_xmltv_grabber_status_change(xmltv_grabber_t *xg, int status)
xmltv_grabber_status(xg));
if(status == XMLTV_GRABBER_IDLE) {
if(xg->xg_status == XMLTV_GRAB_OK) {
snprintf(buf, sizeof(buf), "/ajax/xmltvgrabberlist/%s", xg->xg_identifier);
ajax_mailbox_reload_div("xmltvgrabbers",
"details", xg->xg_identifier,
@ -470,7 +470,7 @@ ajax_mailbox_xmltv_grabber_status_change(xmltv_grabber_t *xg, int status)
} else {
ajax_mailbox_update_div(xg->xg_identifier,
"details", xg->xg_identifier,
xmltv_grabber_status_long(xg, status));
xmltv_grabber_status_long(xg));
}
}

View file

@ -39,7 +39,6 @@ void ajax_mailbox_start(tcp_queue_t *tq);
struct xmltv_grabber;
void ajax_mailbox_xmltv_grabber_status_change(struct xmltv_grabber *xg,
int status);
void ajax_mailbox_xmltv_grabber_status_change(struct xmltv_grabber *xg);
#endif /* AJAXUI_MAILBOX_H_ */

View file

@ -16,6 +16,9 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _GNU_SOURCE
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/ioctl.h>
@ -24,7 +27,6 @@
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <libxml/parser.h>
#include <libxml/tree.h>
@ -39,11 +41,11 @@
#include "spawn.h"
#include "intercom.h"
#include "notify.h"
#include "dispatch.h"
int xmltv_status;
int xmltv_globalstatus;
struct xmltv_grabber_list xmltv_grabbers;
struct xmltv_grabber_queue xmltv_grabber_workq;
static pthread_mutex_t xmltv_save_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t xmltv_work_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t xmltv_work_cond = PTHREAD_COND_INITIALIZER;
static void xmltv_config_load(void);
@ -295,9 +297,7 @@ xmltv_transfer_events(xmltv_grabber_t *xg)
how = 0;
ch = channel_find(xc->xc_displayname, 0, NULL);
if(ch == NULL) {
epg_lock();
ch = xmltv_resolve_by_events(xc);
epg_unlock();
if(ch == NULL)
continue;
@ -344,47 +344,52 @@ xmltv_grab(xmltv_grabber_t *xg)
outlen = spawn_and_store_stdout(prog, NULL, &outbuf);
if(outlen < 1) {
syslog(LOG_ERR, "xmltv: No output from \"%s\"", prog);
return XMLTV_GRABBER_UNCONFIGURED;
return XMLTV_GRAB_UNCONFIGURED;
}
doc = xmlParseMemory(outbuf, outlen);
if(doc == NULL) {
syslog(LOG_ERR, "xmltv: Error while parsing output from \"%s\"", prog);
free(outbuf);
return XMLTV_GRABBER_DYSFUNCTIONAL;
return XMLTV_GRAB_DYSFUNCTIONAL;
}
pthread_mutex_lock(&xg->xg_mutex);
xmltv_flush_events(xg);
root_element = xmlDocGetRootElement(doc);
xmltv_parse_root(xg, root_element);
pthread_mutex_unlock(&xg->xg_mutex);
xmlFreeDoc(doc);
xmlCleanupParser();
syslog(LOG_INFO, "xmltv: EPG sucessfully loaded and parsed");
free(outbuf);
return XMLTV_GRABBER_IDLE;
return XMLTV_GRAB_OK;
}
static struct strtab grabberstatustab[] = {
{ "Disabled", XMLTV_GRABBER_DISABLED },
{ "Missing configuration", XMLTV_GRABBER_UNCONFIGURED },
{ "Dysfunctional", XMLTV_GRABBER_DYSFUNCTIONAL },
{ "Waiting for execution", XMLTV_GRABBER_ENQUEUED },
{ "Grabbing", XMLTV_GRABBER_GRABBING },
{ "Idle", XMLTV_GRABBER_IDLE },
};
const char *
xmltv_grabber_status(xmltv_grabber_t *xg)
/**
*
*/
static void
xmltv_thread_grabber_init_done(icom_t *ic, const char *payload)
{
return val2str(xg->xg_status, grabberstatustab) ?: "Invalid";
htsmsg_t *m = htsmsg_create();
htsmsg_add_u32(m, "initialized", 1);
if(payload != NULL)
htsmsg_add_str(m, "payload", payload);
icom_send_msg_from_thread(ic, m);
htsmsg_destroy(m);
}
/**
*
*/
@ -397,23 +402,37 @@ grabbercmp(xmltv_grabber_t *a, xmltv_grabber_t *b)
/**
* Enlist all active grabbers (for user to choose among)
*
* We send all these in a message back to the main thread
*/
static int
xmltv_find_grabbers(const char *prog)
xmltv_find_grabbers(icom_t *ic, const char *prog)
{
char *outbuf;
int outlen;
outlen = spawn_and_store_stdout(prog, NULL, &outbuf);
if(outlen < 1)
return -1;
xmltv_thread_grabber_init_done(ic, outbuf);
free(outbuf);
return 0;
}
/**
* Parse list of all grabbers
*/
static void
xmltv_parse_grabbers(char *list)
{
char *s, *a, *b;
xmltv_grabber_t *xg;
int n = 0;
char buf[40];
outlen = spawn_and_store_stdout(prog, NULL, &outbuf);
if(outlen < 1) {
return -1;
}
s = outbuf;
s = list;
while(1) {
a = s;
@ -448,8 +467,8 @@ xmltv_find_grabbers(const char *prog)
pthread_mutex_init(&xg->xg_mutex, NULL);
TAILQ_INIT(&xg->xg_channels);
xg->xg_path = a;
xg->xg_title = b;
xg->xg_path = strdup(a);
xg->xg_title = strdup(b);
snprintf(buf, sizeof(buf), "xmlgrabber_%d", n++);
xg->xg_identifier = strdup(buf);
@ -457,26 +476,9 @@ xmltv_find_grabbers(const char *prog)
LIST_INSERT_SORTED(&xmltv_grabbers, xg, xg_link, grabbercmp);
}
xmltv_config_load();
return 0;
}
/**
*
*/
static void
xmltv_thread_set_status(icom_t *ic, xmltv_grabber_t *xg, int s)
{
htsmsg_t *m = htsmsg_create();
xg->xg_status = s;
htsmsg_add_u32(m, "status", s);
htsmsg_add_str(m, "identifier", xg->xg_identifier);
icom_send_msg_from_thread(ic, m);
htsmsg_destroy(m);
}
/*
*
@ -485,96 +487,82 @@ static void *
xmltv_thread(void *aux)
{
icom_t *ic = aux;
struct timespec ts;
htsmsg_t *m;
xmltv_grabber_t *xg;
int r;
time_t n;
/* Start by finding all available grabbers, we try with a few
different locations */
if(xmltv_find_grabbers("/usr/bin/tv_find_grabbers") &&
xmltv_find_grabbers("/bin/tv_find_grabbers") &&
xmltv_find_grabbers("/usr/local/bin/tv_find_grabbers")) {
xmltv_status = XMLTVSTATUS_FIND_GRABBERS_NOT_FOUND;
if(xmltv_find_grabbers(ic, "/usr/bin/tv_find_grabbers") &&
xmltv_find_grabbers(ic, "/bin/tv_find_grabbers") &&
xmltv_find_grabbers(ic, "/usr/local/bin/tv_find_grabbers")) {
xmltv_thread_grabber_init_done(ic, NULL);
return NULL;
}
xmltv_status = XMLTVSTATUS_RUNNING;
pthread_mutex_lock(&xmltv_work_lock);
while(1) {
xg = TAILQ_FIRST(&xmltv_grabber_workq);
if(xg != NULL) {
xmltv_thread_set_status(ic, xg, XMLTV_GRABBER_GRABBING);
TAILQ_REMOVE(&xmltv_grabber_workq, xg, xg_work_link);
pthread_mutex_unlock(&xmltv_work_lock);
r = xmltv_grab(xg);
pthread_mutex_lock(&xmltv_work_lock);
xmltv_thread_set_status(ic, xg, r);
time(&n);
if(r == XMLTV_GRABBER_IDLE) {
/* Completed load */
xg->xg_nextgrab = n + 3600;
} else {
xg->xg_nextgrab = n + 60;
}
continue;
}
LIST_FOREACH(xg, &xmltv_grabbers, xg_link)
xmltv_transfer_events(xg);
while((xg = TAILQ_FIRST(&xmltv_grabber_workq)) == NULL)
pthread_cond_wait(&xmltv_work_cond, &xmltv_work_lock);
time(&n);
n = n + 60;
LIST_FOREACH(xg, &xmltv_grabbers, xg_link)
n = xg->xg_nextgrab ? MIN(n, xg->xg_nextgrab) : n;
xg->xg_on_work_link = 0;
TAILQ_REMOVE(&xmltv_grabber_workq, xg, xg_work_link);
pthread_mutex_unlock(&xmltv_work_lock);
r = xmltv_grab(xg);
ts.tv_sec = n;
ts.tv_nsec = 0;
m = htsmsg_create();
htsmsg_add_u32(m, "grab_completed", r);
htsmsg_add_str(m, "identifier", xg->xg_identifier);
icom_send_msg_from_thread(ic, m);
htsmsg_destroy(m);
pthread_cond_timedwait(&xmltv_work_cond, &xmltv_work_lock, &ts);
time(&n);
LIST_FOREACH(xg, &xmltv_grabbers, xg_link) {
switch(xg->xg_status) {
case XMLTV_GRABBER_GRABBING:
abort();
case XMLTV_GRABBER_DISABLED:
case XMLTV_GRABBER_ENQUEUED:
break;
case XMLTV_GRABBER_IDLE:
case XMLTV_GRABBER_UNCONFIGURED:
case XMLTV_GRABBER_DYSFUNCTIONAL:
if(xg->xg_nextgrab <= n + 1)
TAILQ_INSERT_TAIL(&xmltv_grabber_workq, xg, xg_work_link);
break;
}
}
pthread_mutex_lock(&xmltv_work_lock);
}
return NULL;
}
/**
*
*/
void
xmltv_grabber_enable(xmltv_grabber_t *xg)
xmltv_grabber_enqueue(xmltv_grabber_t *xg)
{
pthread_mutex_lock(&xmltv_work_lock);
if(xg->xg_status != XMLTV_GRABBER_ENQUEUED) {
xg->xg_status = XMLTV_GRABBER_ENQUEUED;
if(!xg->xg_on_work_link) {
xg->xg_on_work_link = 1;
TAILQ_INSERT_TAIL(&xmltv_grabber_workq, xg, xg_work_link);
pthread_cond_signal(&xmltv_work_cond);
}
pthread_mutex_unlock(&xmltv_work_lock);
xmltv_config_save();
}
/**
* Enqueue a new grabing
*/
static void
regrab(void *aux, int64_t now)
{
xmltv_grabber_t *xg = aux;
xmltv_grabber_enqueue(xg);
}
/**
*
*/
static void
xmltv_xfer(void *aux, int64_t now)
{
xmltv_grabber_t *xg = aux;
xmltv_transfer_events(xg);
dtimer_arm(&xg->xg_xfer_timer, xmltv_xfer, xg, 60);
}
/**
@ -584,14 +572,32 @@ static void
icom_cb(void *opaque, htsmsg_t *m)
{
const char *s;
uint32_t status;
char *t;
xmltv_grabber_t *xg;
uint32_t v;
if(!htsmsg_get_u32(m, "status", &status)) {
s = htsmsg_get_str(m, "identifier");
if(s != NULL && (xg = xmltv_grabber_find(s)) != NULL) {
notify_xmltv_grabber_status_change(xg, status);
if(!htsmsg_get_u32(m, "initialized", &v)) {
/* Grabbers loaded */
if((s = htsmsg_get_str(m, "payload")) != NULL) {
t = strdupa(s);
xmltv_parse_grabbers(t);
xmltv_globalstatus = XMLTVSTATUS_RUNNING;
LIST_FOREACH(xg, &xmltv_grabbers, xg_link)
if(xg->xg_enabled)
xmltv_grabber_enqueue(xg);
} else {
xmltv_globalstatus = XMLTVSTATUS_FIND_GRABBERS_NOT_FOUND;
}
} else if(!htsmsg_get_u32(m, "grab_completed", &v) &&
(s = htsmsg_get_str(m, "identifier")) != NULL &&
(xg = xmltv_grabber_find(s)) != NULL) {
xg->xg_status = v;
dtimer_arm(&xg->xg_grab_timer, regrab, xg, v == XMLTV_GRAB_OK ? 3600 : 60);
dtimer_arm(&xg->xg_xfer_timer, xmltv_xfer, xg, 1);
}
htsmsg_destroy(m);
}
@ -629,32 +635,63 @@ xmltv_init(void)
*
*/
const char *
xmltv_grabber_status_long(xmltv_grabber_t *xg, int status)
xmltv_grabber_status_long(xmltv_grabber_t *xg)
{
static char buf[1000];
switch(status) {
case XMLTV_GRABBER_UNCONFIGURED:
if(xg->xg_enabled == 0) {
return "Disabled";
} else if(xg->xg_on_work_link) {
return "Enqueued for grabbing, please wait";
} else switch(xg->xg_status) {
case XMLTV_GRAB_WORKING:
return "Grabbing, please wait";
case XMLTV_GRAB_UNCONFIGURED:
snprintf(buf, sizeof(buf),
"This grabber is not configured.</p><p> You need to configure it "
"This grabber is not configured.</p><p>You need to configure it "
"manually by running '%s --configure' in a shell",
xg->xg_path);
return buf;
case XMLTV_GRABBER_DYSFUNCTIONAL:
case XMLTV_GRAB_DYSFUNCTIONAL:
snprintf(buf, sizeof(buf),
"This grabber does not produce valid XML, please check "
"manually by running '%s' in a shell",
xg->xg_path);
return buf;
case XMLTV_GRABBER_ENQUEUED:
case XMLTV_GRABBER_GRABBING:
return "Grabbing, please wait";
default:
return "Unknown status";
case XMLTV_GRAB_OK:
return "Idle";
}
return "Unknown status";
}
/**
*
*/
const char *
xmltv_grabber_status(xmltv_grabber_t *xg)
{
if(xg->xg_enabled == 0) {
return "Disabled";
} else if(xg->xg_on_work_link) {
return "Grabbing";
} else switch(xg->xg_status) {
case XMLTV_GRAB_WORKING:
return "Grabbing";
case XMLTV_GRAB_UNCONFIGURED:
return "Unconfigured";
case XMLTV_GRAB_DYSFUNCTIONAL:
return "Dysfunctional";
case XMLTV_GRAB_OK:
return "Idle";
}
return "Unknown";
}
@ -669,17 +706,15 @@ xmltv_config_save(void)
xmltv_channel_t *xc;
FILE *fp;
pthread_mutex_lock(&xmltv_save_lock);
snprintf(buf, sizeof(buf), "%s/xmltv-settings.cfg", settings_dir);
if((fp = settings_open_for_write(buf)) != NULL) {
LIST_FOREACH(xg, &xmltv_grabbers, xg_link) {
if(xg->xg_status == XMLTV_GRABBER_DISABLED)
continue;
fprintf(fp, "grabber {\n");
fprintf(fp, "\ttitle = %s\n", xg->xg_title);
fprintf(fp, "\tenabled = %d\n", xg->xg_enabled);
TAILQ_FOREACH(xc, &xg->xg_channels, xc_link) {
fprintf(fp, "\tchannel {\n");
@ -699,7 +734,6 @@ xmltv_config_save(void)
}
fclose(fp);
}
pthread_mutex_unlock(&xmltv_save_lock);
}
/**
@ -736,9 +770,8 @@ xmltv_config_load(void)
if(xg == NULL)
continue;
xg->xg_status = XMLTV_GRABBER_ENQUEUED;
TAILQ_INSERT_TAIL(&xmltv_grabber_workq, xg, xg_work_link);
xg->xg_enabled = atoi(config_get_str_sub(&ce->ce_sub, "enabled", "0"));
TAILQ_FOREACH(ce2, &ce->ce_sub, ce_link) {
if(ce2->ce_type != CFG_SUB || strcasecmp("channel", ce2->ce_key))
continue;
@ -773,3 +806,11 @@ xmltv_config_load(void)
config_free0(&cl);
}
void
xmltv_grabber_enable(xmltv_grabber_t *xg)
{
xg->xg_enabled = 1;
xmltv_grabber_enqueue(xg);
xmltv_config_save();
}

View file

@ -19,6 +19,12 @@
#ifndef XMLTV_H
#define XMLTV_H
#define XMLTV_GRAB_WORKING 0
#define XMLTV_GRAB_UNCONFIGURED 1
#define XMLTV_GRAB_DYSFUNCTIONAL 2
#define XMLTV_GRAB_OK 3
LIST_HEAD(xmltv_grabber_list, xmltv_grabber);
TAILQ_HEAD(xmltv_grabber_queue, xmltv_grabber);
@ -31,18 +37,15 @@ typedef struct xmltv_grabber {
char *xg_title;
char *xg_identifier;
enum {
XMLTV_GRABBER_DISABLED,
XMLTV_GRABBER_UNCONFIGURED,
XMLTV_GRABBER_DYSFUNCTIONAL,
XMLTV_GRABBER_ENQUEUED,
XMLTV_GRABBER_GRABBING,
XMLTV_GRABBER_IDLE,
} xg_status;
TAILQ_ENTRY(xmltv_grabber) xg_work_link;
int xg_on_work_link;
time_t xg_nextgrab;
int xg_enabled;
int xg_status;
dtimer_t xg_grab_timer;
dtimer_t xg_xfer_timer;
struct xmltv_channel_queue xg_channels;
@ -79,7 +82,7 @@ typedef struct xmltv_channel {
#define XMLTVSTATUS_FIND_GRABBERS_NOT_FOUND 1
#define XMLTVSTATUS_RUNNING 2
extern int xmltv_status;
extern int xmltv_globalstatus;
extern struct xmltv_grabber_list xmltv_grabbers;
void xmltv_init(void);
@ -90,7 +93,7 @@ void xmltv_grabber_enable(xmltv_grabber_t *xg);
xmltv_grabber_t *xmltv_grabber_find(const char *id);
const char *xmltv_grabber_status_long(xmltv_grabber_t *xg, int status);
const char *xmltv_grabber_status_long(xmltv_grabber_t *xg);
void xmltv_config_save(void);

View file

@ -69,7 +69,7 @@ notify_tda_change(th_dvb_adapter_t *tda)
}
void
notify_xmltv_grabber_status_change(struct xmltv_grabber *xg, int status)
notify_xmltv_grabber_status_change(struct xmltv_grabber *xg)
{
ajax_mailbox_xmltv_grabber_status_change(xg, status);
ajax_mailbox_xmltv_grabber_status_change(xg);
}

View file

@ -35,7 +35,6 @@ void notify_tdmi_services_change(struct th_dvb_mux_instance *tdmi);
void notify_tda_change(struct th_dvb_adapter *tda);
void notify_xmltv_grabber_status_change(struct xmltv_grabber *xg,
int status);
void notify_xmltv_grabber_status_change(struct xmltv_grabber *xg);
#endif /* NOTIFY_H_ */