idnode: added some rate limiting to idnode update messages

This commit is contained in:
Adam Sutton 2013-07-09 19:46:36 +01:00
parent 4833618031
commit c587fd3fdc
9 changed files with 101 additions and 56 deletions

View file

@ -31,6 +31,10 @@
static int randfd = 0;
static RB_HEAD(,idnode) idnodes;
static pthread_cond_t idnode_cond;
static pthread_mutex_t idnode_mutex;
static htsmsg_t *idnode_queue;
static void* idnode_thread(void* p);
/* **************************************************************************
* Utilities
@ -110,9 +114,17 @@ in_cmp(const idnode_t *a, const idnode_t *b)
void
idnode_init(void)
{
pthread_t tid;
randfd = open("/dev/urandom", O_RDONLY);
if(randfd == -1)
exit(1);
idnode_queue = NULL;
pthread_mutex_init(&idnode_mutex, NULL);
pthread_cond_init(&idnode_cond, NULL);
pthread_create(&tid, NULL, idnode_thread, NULL);
}
/**
@ -566,12 +578,19 @@ idnode_write0 ( idnode_t *self, htsmsg_t *c, int optmask, int dosave )
{
int save = 0;
const idclass_t *idc = self->in_class;
htsmsg_t *updated = htsmsg_create_map();
for (; idc; idc = idc->ic_super)
save |= prop_write_values(self, idc->ic_properties, c, optmask, updated);
if (save && dosave)
idnode_notify(NULL, self, optmask, updated);
htsmsg_destroy(updated);
save |= prop_write_values(self, idc->ic_properties, c, optmask, NULL);
if (save) {
if (dosave) {
for(; idc != NULL; idc = idc->ic_super) {
if(idc->ic_save != NULL) {
idc->ic_save(self);
break;
}
}
}
idnode_notify(self, NULL, 0);
}
return save;
}
@ -695,46 +714,81 @@ idnode_serialize0(idnode_t *self, int optmask)
* Notifcation
* *************************************************************************/
/**
*
*/
void
idnode_notify_title_changed(void *obj)
{
idnode_t *in = obj;
htsmsg_t *m = htsmsg_create_map();
htsmsg_add_str(m, "id", idnode_uuid_as_str(in));
htsmsg_add_str(m, "text", idnode_get_title(in));
notify_by_msg("idnodeNameChanged", m);
}
/**
* Notify on a given channel
*/
void
idnode_notify
(const char *chn, idnode_t *in, int optmask, htsmsg_t *inc)
(idnode_t *in, const char *chn, int force)
{
const idclass_t *ic = in->in_class;
/* Save */
for(; ic != NULL; ic = ic->ic_super) {
if(ic->ic_save != NULL) {
ic->ic_save(in);
break;
}
/* Forced */
if (chn || force) {
htsmsg_t *m = idnode_serialize0(in, 0);
notify_by_msg(chn ?: "idnodeParamsChanged", m);
/* Rate-limited */
} else {
pthread_mutex_lock(&idnode_mutex);
if (!idnode_queue)
idnode_queue = htsmsg_create_map();
htsmsg_set_u32(idnode_queue, idnode_uuid_as_str(in), 1);
pthread_cond_signal(&idnode_cond);
pthread_mutex_unlock(&idnode_mutex);
}
}
/* Notification */
void
idnode_notify_simple (void *in)
{
idnode_notify(in, NULL, 0);
}
htsmsg_t *m = htsmsg_create_map();
htsmsg_add_str(m, "id", idnode_uuid_as_str(in));
/*
* Thread for handling notifications
*/
void*
idnode_thread ( void *p )
{
idnode_t *node;
htsmsg_t *m, *q;
htsmsg_field_t *f;
htsmsg_t *p = htsmsg_create_list();
add_params(in, in->in_class, p, optmask, inc);
htsmsg_add_msg(m, "params", p);
pthread_mutex_lock(&idnode_mutex);
notify_by_msg(chn ?: "idnodeParamsChanged", m);
while (1) {
/* Get queue */
if (!idnode_queue) {
pthread_cond_wait(&idnode_cond, &idnode_mutex);
continue;
}
q = idnode_queue;
idnode_queue = NULL;
pthread_mutex_unlock(&idnode_mutex);
/* Process */
pthread_mutex_lock(&global_lock);
HTSMSG_FOREACH(f, q) {
node = idnode_find(f->hmf_name, NULL);
if (node) {
m = idnode_serialize0(node, 0);
if (m)
notify_by_msg("idnodeUpdated", m);
} else {
m = htsmsg_create_map();
htsmsg_add_str(m, "uuid", f->hmf_name);
notify_by_msg("idnodeDeleted", m);
}
}
/* Finished */
pthread_mutex_unlock(&global_lock);
htsmsg_destroy(q);
pthread_mutex_lock(&idnode_mutex);
}
return NULL;
}
/******************************************************************************

View file

@ -116,8 +116,10 @@ int idnode_is_instance (idnode_t *in, const idclass_t *idc);
void *idnode_find (const char *uuid, const idclass_t *idc);
idnode_set_t *idnode_find_all(const idclass_t *idc);
void idnode_notify(const char *chn, idnode_t *in, int optmask, htsmsg_t *inc);
void idnode_notify_title_changed(void *obj);
#define idnode_updated(in) idnode_notify(in, NULL, 0)
void idnode_notify
(idnode_t *in, const char *chn, int force);
void idnode_notify_simple (void *in);
htsmsg_t *idclass_serialize0 (const idclass_t *idc, int optmask);
htsmsg_t *idnode_serialize0 (idnode_t *self, int optmask);

View file

@ -892,7 +892,7 @@ dvb_sdt_callback
/* Save details */
if (save) {
idnode_notify(NULL, &s->s_id, 0, NULL);
idnode_updated(&s->s_id);
s->s_config_save((service_t*)s);
service_refresh_channel((service_t*)s);
}

View file

@ -565,7 +565,7 @@ linuxdvb_satconf_create0
ls->ls_lnb = linuxdvb_lnb_create0(NULL, NULL, ls);
/* Notification */
idnode_notify("linuxdvb_satconf", &ls->mi_id, 0, NULL);
idnode_notify(&ls->mi_id, "linuxdvb_satconf", 0);
return ls;
}

View file

@ -54,7 +54,7 @@ const idclass_t mpegts_input_class =
.id = "displayname",
.name = "Name",
.off = offsetof(mpegts_input_t, mi_displayname),
.notify = idnode_notify_title_changed,
.notify = idnode_notify_simple,
},
{}
}

View file

@ -464,7 +464,6 @@ mpegts_mux_create0
mpegts_network_t *mn, uint16_t onid, uint16_t tsid, htsmsg_t *conf )
{
char buf[256];
static htsmsg_t *inc = NULL;
idnode_insert(&mm->mm_id, uuid, class);
@ -506,12 +505,8 @@ mpegts_mux_create0
tvhtrace("mpegts", "%s - created", buf);
/* Notification */
idnode_notify("mpegts_mux", &mm->mm_id, 0, NULL);
if (!inc) {
inc = htsmsg_create_map();
htsmsg_set_u32(inc, "num_mux", 1);
}
idnode_notify(NULL, &mn->mn_id, 0, inc);
idnode_notify(&mm->mm_id, "mpegts_mux", 0);
idnode_updated(&mn->mn_id);
return mm;
}

View file

@ -224,7 +224,7 @@ mpegts_network_create0
tvhtrace("mpegts", "created network %s", buf);
/* Notification */
idnode_notify("mpegts_network", &mn->mn_id, 0, NULL);
idnode_notify(&mn->mn_id, "mpegts_network", 0);
return mn;
}

View file

@ -298,7 +298,6 @@ mpegts_service_create0
mpegts_mux_t *mm, uint16_t sid, uint16_t pmt_pid, htsmsg_t *conf )
{
char buf[256];
static htsmsg_t *inc = NULL;
service_create0((service_t*)s, class, uuid, S_MPEG_TS, conf);
/* Create */
@ -327,13 +326,8 @@ mpegts_service_create0
tvhlog(LOG_DEBUG, "mpegts", "%s - add service %04X %s", buf, s->s_dvb_service_id, s->s_dvb_svcname);
/* Notification */
if (!inc) {
inc = htsmsg_create_map();
htsmsg_set_u32(inc, "num_mux", 1);
htsmsg_set_u32(inc, "num_svc", 1);
}
idnode_notify(NULL, &s->s_dvb_mux->mm_id, 0, inc);
idnode_notify(NULL, &s->s_dvb_mux->mm_network->mn_id, 0, inc);
idnode_updated(&mm->mm_id);
idnode_updated(&mm->mm_network->mn_id);
return s;
}

View file

@ -523,7 +523,7 @@ service_create0
service_load(t, conf);
/* Notify */
idnode_notify("service", &t->s_id, 0, NULL);
idnode_notify(&t->s_id, "service", 0);
return t;
}