diff --git a/include/villas/stats.h b/include/villas/stats.h index dc69beb41..833823f78 100644 --- a/include/villas/stats.h +++ b/include/villas/stats.h @@ -36,13 +36,14 @@ enum stats_id { struct stats_delta { double values[STATS_COUNT]; + int update; /**< Bitmask of stats_id. Only those which are masked will be updated */ struct sample *last; }; struct stats { struct hist histograms[STATS_COUNT]; - struct sample *last; + struct stats_delta *delta; }; int stats_init(struct stats *s); diff --git a/lib/hooks.c b/lib/hooks.c index c6f9e4196..cc74110eb 100644 --- a/lib/hooks.c +++ b/lib/hooks.c @@ -70,9 +70,10 @@ int hook_run(struct path *p, struct sample *smps[], size_t cnt, int when) list_foreach(struct hook *h, &p->hooks) { if (h->type & when) { - debug(DBG_HOOK | 22, "Running hook when=%u '%s' prio=%u, cnt=%zu", when, h->name, h->priority, cnt); - cnt = h->cb(h, when, &i); + + debug(DBG_HOOK | 22, "Ran hook '%s' when=%u prio=%u, cnt=%zu", h->name, when, h->priority, cnt); + if (cnt == 0) break; } diff --git a/lib/hooks/hooks-internal.c b/lib/hooks/hooks-internal.c index 81673a3c3..735fa50df 100644 --- a/lib/hooks/hooks-internal.c +++ b/lib/hooks/hooks-internal.c @@ -77,8 +77,8 @@ int hook_drop(struct hook *h, int when, struct hook_info *j) dist = h->last->sequence - (int32_t) h->prev->sequence; if (dist <= 0) { warn("Dropped sample: dist = %d, i = %d", dist, i); - if (j->path) - stats_update(j->path->stats, STATS_DROPPED, dist); + if (j->path && j->path->stats) + stats_update(j->path->stats->delta, STATS_DROPPED, dist); } else { struct sample *tmp; diff --git a/lib/hooks/hooks-stats.c b/lib/hooks/hooks-stats.c index bd78d3c87..38f02916f 100644 --- a/lib/hooks/hooks-stats.c +++ b/lib/hooks/hooks-stats.c @@ -20,11 +20,16 @@ int hook_stats(struct hook *h, int when, struct hook_info *j) struct stats *s = hook_storage(h, when, sizeof(struct stats), (ctor_cb_t) stats_init, (dtor_cb_t) stats_destroy); switch (when) { + case HOOK_INIT: + if (j->path) + j->path->stats = s; + break; + case HOOK_READ: assert(j->smps); - assert(j->path); - stats_collect(j->path->stats, j->smps, j->cnt); + stats_collect(s->delta, j->smps, j->cnt); + stats_commit(s, s->delta); break; case HOOK_PATH_STOP: @@ -45,37 +50,41 @@ int hook_stats(struct hook *h, int when, struct hook_info *j) return j->cnt; } -#if 0 /* currently broken */ -REGISTER_HOOK("stats_send", "Send path statistics to another node", 99, 0, hook_stats_send, HOOK_STORAGE | HOOK_DESTROY | HOOK_PERIODIC | HOOK_PATH) -int hook_stats_send(struct hook *h, int when, struct hook_info *i) +/** @todo This is untested */ +REGISTER_HOOK("stats_send", "Send path statistics to another node", 99, 0, hook_stats_send, HOOK_STORAGE | HOOK_PATH | HOOK_READ) +int hook_stats_send(struct hook *h, int when, struct hook_info *j) { struct private { struct node *dest; + struct stats *stats; int ratio; - } *private = hook_storage(h, when, sizeof(*private)); + } *private = hook_storage(h, when, sizeof(*private), NULL, NULL); switch (when) { - case HOOK_DESTROY: + case HOOK_INIT: + assert(j->nodes); + assert(j->path); + if (!h->parameter) error("Missing parameter for hook '%s'", h->name); - if (!hook_nodes) - error("Missing reference to node list for hook '%s", h->name); - - private->dest = list_lookup(hook_nodes, h->parameter); + private->dest = list_lookup(j->nodes, h->parameter); if (!private->dest) error("Invalid destination node '%s' for hook '%s'", h->parameter, h->name); - - node_start(private->dest); + break; + case HOOK_PATH_START: + node_start(private->dest); break; - case HOOK_PERIODIC: { - stats_send(s, node); + case HOOK_PATH_STOP: + node_stop(private->dest); + break; + + case HOOK_READ: + stats_send(private->stats, private->dest); break; - } } return 0; } -#endif \ No newline at end of file diff --git a/lib/path.c b/lib/path.c index 4a6c7825b..863378380 100644 --- a/lib/path.c +++ b/lib/path.c @@ -52,7 +52,7 @@ static void path_read(struct path *p) if (enqueue != recv) { info("Hooks skipped %u out of %u samples for path %s", recv - enqueue, recv, path_name(p)); - stats_update(p->stats, STATS_SKIPPED, recv - enqueue); + stats_update(p->stats->delta, STATS_SKIPPED, recv - enqueue); } list_foreach(struct path_destination *pd, &p->destinations) { @@ -84,7 +84,7 @@ static void path_write(struct path *p) if (available == 0) break; else if (available < cnt) - warn("Queue underrun for path %s: available=%u expected=%u", path_name(p), available, cnt); + debug(DBG_PATH | 5, "Queue underrun for path %s: available=%u expected=%u", path_name(p), available, cnt); debug(DBG_PATH | 15, "Dequeued %u samples from queue of node %s which is part of path %s", available, node_name(pd->node), path_name(p)); diff --git a/lib/stats.c b/lib/stats.c index 6441edf85..fd137f8ca 100644 --- a/lib/stats.c +++ b/lib/stats.c @@ -37,6 +37,8 @@ int stats_init(struct stats *s) struct stats_desc *desc = &stats_table[i]; hist_create(&s->histograms[i], desc->hist.min, desc->hist.max, desc->hist.resolution); } + + s->delta = alloc(sizeof(struct stats_delta)); return 0; } @@ -46,18 +48,23 @@ void stats_destroy(struct stats *s) for (int i = 0; i < STATS_COUNT; i++) { hist_destroy(&s->histograms[i]); } + + free(s->delta); } void stats_update(struct stats_delta *d, enum stats_id id, double val) { - if (d && id >= 0 && id < STATS_COUNT) - d->values[id] = val; + assert(id >= 0 && id < STATS_COUNT); + + d->values[id] = val; + d->update |= 1 << id; } int stats_commit(struct stats *s, struct stats_delta *d) { for (int i = 0; i < STATS_COUNT; i++) { - hist_put(&s->histograms[i], d->values[i]); + if (d->update & 1 << i) + hist_put(&s->histograms[i], d->values[i]); } return 0; @@ -67,22 +74,23 @@ void stats_collect(struct stats_delta *s, struct sample *smps[], size_t cnt) { struct sample *previous = s->last; - if (previous) { - sample_put(previous); - - for (int i = 0; i < cnt; i++) { + for (int i = 0; i < cnt; i++) { + if (previous) { stats_update(s, STATS_GAP_RECEIVED, time_delta(&previous->ts.received, &smps[i]->ts.received)); stats_update(s, STATS_GAP_SAMPLE, time_delta(&previous->ts.origin, &smps[i]->ts.origin)); stats_update(s, STATS_OWD, time_delta(&smps[i]->ts.origin, &smps[i]->ts.received)); stats_update(s, STATS_GAP_SEQUENCE, smps[i]->sequence - (int32_t) previous->sequence); - - /* Make sure there is always a reference to the previous sample */ - - previous = smps[i]; } + + previous = smps[i]; } - sample_get(previous); + if (s->last) + sample_put(s->last); + + if (previous) + sample_get(previous); + s->last = previous; }