diff --git a/include/villas/path.h b/include/villas/path.h index 01bd5bfc1..c99e0c575 100644 --- a/include/villas/path.h +++ b/include/villas/path.h @@ -46,6 +46,7 @@ struct super_node; struct path_source { struct node *node; + struct pool pool; struct list hooks; /**< Read Hooks. */ @@ -62,12 +63,12 @@ struct path_destination { /** The datastructure for a path. */ struct path { enum state state; /**< Path state. */ - + struct { int nfds; struct pollfd *pfds; } reader; - + struct pool pool; struct sample *last_sample; diff --git a/lib/hook.c b/lib/hook.c index 3f2200e37..ac0c1bb85 100644 --- a/lib/hook.c +++ b/lib/hook.c @@ -39,6 +39,7 @@ int hook_init(struct hook *h, struct hook_type *vt, struct path *p) assert(h->state == STATE_DESTROYED); h->priority = vt->priority; + h->path = p; h->_vt = vt; diff --git a/lib/mapping.c b/lib/mapping.c index a441bbb49..f81d97bf4 100644 --- a/lib/mapping.c +++ b/lib/mapping.c @@ -37,23 +37,23 @@ int mapping_parse_str(struct mapping_entry *e, const char *str, struct list *nod cpy = strdup(str); if (!cpy) return -1; - + if (nodes) { node = strtok(cpy, "."); if (!node) goto invalid_format; - + e->node = list_lookup(nodes, node); if (!e->node) goto invalid_format; - + type = strtok(NULL, ".["); if (!type) type = "data"; } else { e->node = NULL; - + type = strtok(cpy, ".["); if (!type) goto invalid_format; @@ -216,7 +216,7 @@ int mapping_parse_list(struct list *l, json_t *cfg, struct list *nodes) ret = mapping_parse(e, json_entry, nodes); if (ret) return ret; - + e->offset = off; off += e->length; @@ -230,14 +230,14 @@ int mapping_update(struct mapping_entry *me, struct sample *remapped, struct sam { int len = me->length; int off = me->offset; - + /* me->length == 0 means that we want to take all values */ if (!len) len = original->length; - + if (len + off > remapped->capacity) return -1; - + if (len + off > remapped->length) remapped->length = len + off; @@ -332,7 +332,7 @@ int mapping_update(struct mapping_entry *me, struct sample *remapped, struct sam break; } - + return 0; } @@ -355,4 +355,4 @@ int mapping_remap(struct list *m, struct sample *remapped, struct sample *origin } return 0; -} \ No newline at end of file +} diff --git a/lib/node.c b/lib/node.c index 0af50ee58..2cbe0f6aa 100644 --- a/lib/node.c +++ b/lib/node.c @@ -207,7 +207,7 @@ int node_read(struct node *n, struct sample *smps[], unsigned cnt) readd = n->_vt->read(n, &smps[nread], MIN(cnt - nread, n->_vt->vectorize)); if (readd < 0) return readd; - + nread += readd; debug(LOG_NODES | 5, "Received %u samples from node %s", readd, node_name(n)); } @@ -347,4 +347,4 @@ invalid2: error("Unknown node '%s'. Choose of one of: %s", str, allstr); return 0; -} \ No newline at end of file +} diff --git a/lib/nodes/shmem.c b/lib/nodes/shmem.c index 4fa00a2fa..85ed995e2 100644 --- a/lib/nodes/shmem.c +++ b/lib/nodes/shmem.c @@ -117,6 +117,7 @@ int shmem_read(struct node *n, struct sample *smps[], unsigned cnt) do { recv = shmem_int_read(&shm->intf, shared_smps, cnt); } while (recv == 0); + if (recv < 0) { /* This can only really mean that the other process has exited, so close * the interface to make sure the shared memory object is unlinked */ diff --git a/lib/nodes/signal.c b/lib/nodes/signal.c index 661fa57fc..8f87d22ed 100644 --- a/lib/nodes/signal.c +++ b/lib/nodes/signal.c @@ -81,7 +81,6 @@ int signal_parse(struct node *n, json_t *cfg) if (ret) jerror(&err, "Failed to parse configuration of node %s", node_name(n)); - if (type) { ret = signal_lookup_type(type); if (ret == -1) @@ -173,7 +172,7 @@ int signal_open(struct node *n) s->counter = 0; s->started = time_now(); s->last = alloc(sizeof(double) * s->values); - + for (int i = 0; i < s->values; i++) s->last[i] = s->offset; @@ -197,9 +196,9 @@ int signal_close(struct node *n) if (ret) return ret; } - + free(s->last); - + return 0; } @@ -293,7 +292,7 @@ char * signal_print(struct node *n) int signal_fd(struct node *n) { struct signal *s = n->_vd; - + return task_fd(&s->task); } diff --git a/lib/path.c b/lib/path.c index 8404f9089..b867f678b 100644 --- a/lib/path.c +++ b/lib/path.c @@ -73,15 +73,15 @@ static void path_read_source(struct path *p, struct path_source *ps) mux = sample_alloc(&p->pool, muxed_smps, recv); if (mux != recv) warn("Pool underrun for path %s", path_name(p)); - + for (int i = 0; i < mux; i++) { mapping_remap(&ps->mappings, p->last_sample, read_smps[i], NULL); - + p->last_sample->sequence = p->sequence++; sample_copy(muxed_smps[i], p->last_sample); } - + /* Run processing hooks */ enqueue = hook_process_list(&p->hooks, muxed_smps, mux); if (enqueue != mux) { @@ -112,11 +112,11 @@ static void path_read_source(struct path *p, struct path_source *ps) static void path_poll(struct path *p) { int ret; - + ret = poll(p->reader.pfds, p->reader.nfds, -1); if (ret < 0) serror("Failed to poll"); - + int updates = 0; for (int i = 0; i < p->reader.nfds; i++) { struct path_source *ps = list_at(&p->sources, i); @@ -202,7 +202,7 @@ int path_init(struct path *p, struct super_node *sn) list_init(&p->hooks); list_init(&p->destinations); list_init(&p->sources); - + p->_name = NULL; /* Default values */ @@ -277,13 +277,13 @@ int path_init2(struct path *p) ret = pool_init(&p->pool, list_length(&p->destinations) * p->queuelen, SAMPLE_LEN(p->samplelen), &memtype_hugepage); if (ret) return ret; - + sample_alloc(&p->pool, &p->last_sample, 1); - + /* Prepare poll() */ p->reader.nfds = list_length(&p->sources); p->reader.pfds = alloc(sizeof(struct pollfd) * p->reader.nfds); - + for (int i = 0; i < p->reader.nfds; i++) { struct path_source *ps = list_at(&p->sources, i); @@ -336,22 +336,22 @@ int path_parse(struct path *p, json_t *cfg, struct list *nodes) for (size_t i = 0; i < list_length(&sources); i++) { struct mapping_entry *me = list_at(&sources, i); - + struct path_source *ps = NULL; - + /* Check if there is already a path_source for this source */ for (size_t j = 0; j < list_length(&p->sources); j++) { struct path_source *pt = list_at(&p->sources, j); - + if (pt->node == me->node) { ps = pt; break; } } - + if (!ps) { ps = alloc(sizeof(struct path_source)); - + ps->node = me->node; list_init(&ps->mappings); @@ -442,7 +442,7 @@ int path_start(struct path *p) if (ret) return ret; } - + p->sequence = 0; /* Start one thread per path for sending to destinations */ @@ -486,13 +486,12 @@ int path_destroy(struct path *p) return 0; list_destroy(&p->hooks, (dtor_cb_t) hook_destroy, true); - list_destroy(&p->sources, (dtor_cb_t) path_source_destroy, true); list_destroy(&p->destinations, (dtor_cb_t) path_destination_destroy, true); if (p->_name) free(p->_name); - + pool_destroy(&p->pool); p->state = STATE_DESTROYED; @@ -510,7 +509,7 @@ const char * path_name(struct path *p) strcatf(&p->_name, " %s", node_name_short(ps->node)); } - + strcatf(&p->_name, " ] " CLR_MAG("=>") " ["); for (size_t i = 0; i < list_length(&p->destinations); i++) { @@ -533,7 +532,7 @@ int path_uses_node(struct path *p, struct node *n) if (pd->node == n) return 0; } - + for (size_t i = 0; i < list_length(&p->sources); i++) { struct path_source *ps = list_at(&p->sources, i); @@ -550,10 +549,10 @@ int path_reverse(struct path *p, struct path *r) if (list_length(&p->destinations) != 1 || list_length(&p->sources) != 1) return -1; - + /* General */ r->enabled = p->enabled; - + /* Source / Destinations */ struct path_destination *orig_pd = list_first(&p->destinations); struct path_source *orig_ps = list_first(&p->sources); @@ -562,7 +561,7 @@ int path_reverse(struct path *p, struct path *r) struct path_source *new_ps = alloc(sizeof(struct path_source)); new_pd->node = orig_ps->node; - + new_ps->node = orig_pd->node; list_push(&r->destinations, new_pd); diff --git a/lib/stats.c b/lib/stats.c index 470fb576a..b7d3e7ab0 100644 --- a/lib/stats.c +++ b/lib/stats.c @@ -37,11 +37,11 @@ static struct stats_desc { const char *desc; int hist_buckets; } stats_metrics[] = { - { "skipped", "samples", "Skipped samples and the distance between them", 25 }, - { "reordered", "samples", "Reordered samples and the distance between them", 25 }, - { "gap_sample", "seconds", "Inter-message timestamps (as sent by remote)", 25 }, - { "gap_received", "seconds", "Inter-message arrival time (as seen by this instance)", 25 }, - { "owd", "seconds", "One-way-delay (OWD) of received messages", 25 } + { "skipped", "samples", "Skipped samples and the distance between them", 25 }, + { "reordered", "samples", "Reordered samples and the distance between them", 25 }, + { "gap_sample", "seconds", "Inter-message timestamps (as sent by remote)", 25 }, + { "gap_received", "seconds", "Inter-message arrival time (as seen by this instance)", 25 }, + { "owd", "seconds", "One-way-delay (OWD) of received messages", 25 } }; int stats_lookup_format(const char *str) @@ -158,14 +158,14 @@ void stats_reset(struct stats *s) } static struct table_column stats_cols[] = { - { 35, "Path", "%s", NULL, TABLE_ALIGN_LEFT }, - { 10, "Cnt", "%ju", "p", TABLE_ALIGN_RIGHT }, - { 10, "OWD last", "%f", "S", TABLE_ALIGN_RIGHT }, - { 10, "OWD mean", "%f", "S", TABLE_ALIGN_RIGHT }, - { 10, "Rate last", "%f", "p/S", TABLE_ALIGN_RIGHT }, - { 10, "Rate mean", "%f", "p/S", TABLE_ALIGN_RIGHT }, - { 10, "Drop", "%ju", "p", TABLE_ALIGN_RIGHT }, - { 10, "Skip", "%ju", "p", TABLE_ALIGN_RIGHT } + { 35, "Path", "%s", NULL, TABLE_ALIGN_LEFT }, + { 10, "Cnt", "%ju", "p", TABLE_ALIGN_RIGHT }, + { 10, "OWD last", "%f", "S", TABLE_ALIGN_RIGHT }, + { 10, "OWD mean", "%f", "S", TABLE_ALIGN_RIGHT }, + { 10, "Rate last", "%f", "p/S", TABLE_ALIGN_RIGHT }, + { 10, "Rate mean", "%f", "p/S", TABLE_ALIGN_RIGHT }, + { 10, "Drop", "%ju", "p", TABLE_ALIGN_RIGHT }, + { 10, "Skip", "%ju", "p", TABLE_ALIGN_RIGHT } }; static struct table stats_table = { diff --git a/lib/super_node.c b/lib/super_node.c index 3db5a7945..fac76c93f 100644 --- a/lib/super_node.c +++ b/lib/super_node.c @@ -74,7 +74,7 @@ int super_node_init(struct super_node *sn) int super_node_parse_uri(struct super_node *sn, const char *uri) { json_error_t err; - + info("Parsing configuration"); if (uri) { INDENT @@ -156,7 +156,7 @@ int super_node_parse_uri(struct super_node *sn, const char *uri) afclose(af); else if (f != stdin) fclose(f); - + sn->uri = strdup(uri); return super_node_parse_json(sn, sn->cfg); diff --git a/src/node.c b/src/node.c index f2b7c4544..e4a18f68e 100644 --- a/src/node.c +++ b/src/node.c @@ -129,19 +129,19 @@ int main(int argc, char *argv[]) ret = signals_init(quit); if (ret) error("Failed to initialize signal subsystem"); - + ret = super_node_init(&sn); if (ret) error("Failed to initialize super node"); - + ret = super_node_parse_cli(&sn, argc, argv); if (ret) error("Failed to parse command line arguments"); - + ret = super_node_check(&sn); if (ret) error("Failed to verify configuration"); - + ret = super_node_start(&sn); if (ret) error("Failed to start super node"); @@ -150,7 +150,7 @@ int main(int argc, char *argv[]) stats_print_header(STATS_FORMAT_HUMAN); struct task t; - + ret = task_init(&t, 1.0 / sn.stats, CLOCK_REALTIME); if (ret) error("Failed to create stats timer");