From 1740df596baa062e9e3d0abcd2af671f1474a7b8 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Wed, 6 Feb 2019 13:11:57 +0100 Subject: [PATCH] separated in/out signal descriptors --- include/villas/node.h | 6 +-- lib/mapping.c | 10 ++-- lib/node.c | 92 ++++++++++++++++++------------------ lib/nodes/amqp.c | 2 +- lib/nodes/file.c | 2 +- lib/nodes/iec61850_sv.c | 2 +- lib/nodes/loopback.c | 7 ++- lib/nodes/mqtt.c | 4 +- lib/nodes/nanomsg.c | 2 +- lib/nodes/rtp.c | 2 +- lib/nodes/shmem.c | 6 ++- lib/nodes/signal_generator.c | 6 +-- lib/nodes/socket.c | 2 +- lib/nodes/stats.c | 12 ++--- lib/nodes/test_rtt.c | 2 +- lib/nodes/uldaq.c | 10 ++-- lib/nodes/zeromq.c | 2 +- tests/unit/mapping.cpp | 6 +-- 18 files changed, 90 insertions(+), 85 deletions(-) diff --git a/include/villas/node.h b/include/villas/node.h index 4c34503b4..38557ebb8 100644 --- a/include/villas/node.h +++ b/include/villas/node.h @@ -58,6 +58,7 @@ struct node_direction { int vectorize; /**< Number of messages to send / recv at once (scatter / gather) */ struct vlist hooks; /**< List of write hooks (struct hook). */ + struct vlist signals; /**< Signal description. */ json_t *cfg; /**< A JSON object containing the configuration of the node. */ }; @@ -67,8 +68,7 @@ struct node_direction { * Every entity which exchanges messages is represented by a node. * Nodes can be remote machines and simulators or locally running processes. */ -struct node -{ +struct node { char *name; /**< A short identifier of the node, only used for configuration and logging */ enum state state; @@ -84,8 +84,6 @@ struct node struct node_direction in, out; - struct vlist signals; /**< Signal meta data for data which is __received__ by node_read(). */ - #ifdef WITH_NETEM int mark; /**< Socket mark for netem, routing and filtering */ diff --git a/lib/mapping.c b/lib/mapping.c index 87491861d..e09a6758d 100644 --- a/lib/mapping.c +++ b/lib/mapping.c @@ -154,7 +154,7 @@ int mapping_parse_str(struct mapping_entry *me, const char *str, struct vlist *n first_str = strtok(NULL, "-]"); if (first_str) { if (me->node) - first = vlist_lookup_index(&me->node->signals, first_str); + first = vlist_lookup_index(&me->node->in.signals, first_str); if (first < 0) { char *endptr; @@ -168,14 +168,14 @@ int mapping_parse_str(struct mapping_entry *me, const char *str, struct vlist *n else { /* Map all signals */ me->data.offset = 0; - me->length = me->node ? vlist_length(&me->node->signals) : 0; + me->length = me->node ? vlist_length(&me->node->in.signals) : 0; goto end; } last_str = strtok(NULL, "]"); if (last_str) { if (me->node) - last = vlist_lookup_index(&me->node->signals, last_str); + last = vlist_lookup_index(&me->node->in.signals, last_str); if (last < 0) { char *endptr; @@ -451,8 +451,8 @@ int mapping_to_str(const struct mapping_entry *me, unsigned index, char **str) break; case MAPPING_TYPE_DATA: - if (me->node && index < vlist_length(&me->node->signals)) { - struct signal *s = vlist_at(&me->node->signals, index); + if (me->node && index < vlist_length(&me->node->in.signals)) { + struct signal *s = vlist_at(&me->node->in.signals, index); strcatf(str, "data[%s]", s->name); } diff --git a/lib/node.c b/lib/node.c index 3695c6e2b..599439066 100644 --- a/lib/node.c +++ b/lib/node.c @@ -68,11 +68,16 @@ static int node_direction_init(struct node_direction *nd, struct node *n) nd->vectorize = 1; nd->builtin = 1; nd->hooks.state = STATE_DESTROYED; + nd->signals.state = STATE_DESTROYED; ret = vlist_init(&nd->hooks); if (ret) return ret; + ret = vlist_init(&nd->signals); + if (ret) + return ret; + return 0; } @@ -86,6 +91,10 @@ static int node_direction_destroy(struct node_direction *nd, struct node *n) return ret; #endif + ret = vlist_destroy(&nd->signals, (dtor_cb_t) signal_decref, false); + if (ret) + return ret; + return ret; } @@ -95,12 +104,14 @@ static int node_direction_parse(struct node_direction *nd, struct node *n, json_ json_error_t err; json_t *json_hooks = NULL; + json_t *json_signals = NULL; nd->cfg = cfg; nd->enabled = 1; - ret = json_unpack_ex(cfg, &err, 0, "{ s?: o, s?: i, s?: b, s?: b }", + ret = json_unpack_ex(cfg, &err, 0, "{ s?: o, s?: o, s?: i, s?: b, s?: b }", "hooks", &json_hooks, + "signals", &json_signals, "vectorize", &nd->vectorize, "builtin", &nd->builtin, "enabled", &nd->enabled @@ -108,6 +119,35 @@ static int node_direction_parse(struct node_direction *nd, struct node *n, json_ if (ret) jerror(&err, "Failed to parse node %s", node_name(n)); + if (n->_vt->flags & NODE_TYPE_PROVIDES_SIGNALS) { + if (json_signals) + error("Node %s does not support signal definitions", node_name(n)); + } + else if (json_is_array(json_signals)) { + ret = signal_list_parse(&nd->signals, json_signals); + if (ret) + error("Failed to parse signal definition of node %s", node_name(n)); + } + else { + int count = DEFAULT_SAMPLE_LENGTH; + const char *type_str = "float"; + + if (json_is_object(json_signals)) { + json_unpack_ex(json_signals, &err, 0, "{ s: i, s: s }", + "count", &count, + "type", &type_str + ); + } + else + warning("No signal definition found for node %s. Using the default config of %d floating point signals.", node_name(n), DEFAULT_SAMPLE_LENGTH); + + int type = signal_type_from_str(type_str); + if (type < 0) + error("Invalid signal type %s", type_str); + + signal_list_generate(&nd->signals, count, type); + } + #ifdef WITH_HOOKS int m = nd == &n->out ? HOOK_NODE_WRITE @@ -187,9 +227,6 @@ int node_init(struct node *n, struct node_type *vt) n->tc_classifier = NULL; #endif /* WITH_NETEM */ - n->signals.state = STATE_DESTROYED; - vlist_init(&n->signals); - /* Default values */ ret = node_direction_init(&n->in, n); if (ret) @@ -268,38 +305,6 @@ int node_parse(struct node *n, json_t *json, const char *name) #endif /* WITH_NETEM */ } - if (nt->flags & NODE_TYPE_PROVIDES_SIGNALS) { - if (json_signals) - error("Node %s does not support signal definitions", node_name(n)); - } - else if (json_signals) { - if (json_is_array(json_signals)) { - ret = signal_list_parse(&n->signals, json_signals); - if (ret) - error("Failed to parse signal definition of node %s", node_name(n)); - } - else { - int count; - const char *type_str; - - json_unpack_ex(json_signals, &err, 0, "{ s: i, s: s }", - "count", &count, - "type", &type_str - ); - - int type = signal_type_from_str(type_str); - if (type < 0) - error("Invalid signal type %s", type_str); - - signal_list_generate(&n->signals, count, type); - } - } - else { - warning("No signal definition found for node %s. Using the default config of %d floating point signals.", node_name(n), DEFAULT_SAMPLE_LENGTH); - - signal_list_generate(&n->signals, DEFAULT_SAMPLE_LENGTH, SIGNAL_TYPE_FLOAT); - } - struct { const char *str; struct node_direction *dir; @@ -308,7 +313,7 @@ int node_parse(struct node *n, json_t *json, const char *name) { "out", &n->out } }; - const char *fields[] = { "builtin", "vectorize", "hooks" }; + const char *fields[] = { "signals", "builtin", "vectorize", "hooks" }; for (int j = 0; j < ARRAY_LEN(dirs); j++) { json_t *json_dir = json_object_get(json, dirs[j].str); @@ -492,16 +497,11 @@ int node_restart(struct node *n) return 0; } - int node_destroy(struct node *n) { int ret; assert(n->state != STATE_DESTROYED && n->state != STATE_STARTED); - ret = vlist_destroy(&n->signals, (dtor_cb_t) signal_decref, false); - if (ret) - return ret; - ret = node_direction_destroy(&n->in, n); if (ret) return ret; @@ -645,11 +645,11 @@ char * node_name_long(struct node *n) if (node_type(n)->print) { struct node_type *vt = node_type(n); - strcatf(&n->_name_long, "%s: #in.signals=%zu, #in.hooks=%zu, in.vectorize=%d, #out.hooks=%zu, out.vectorize=%d", + strcatf(&n->_name_long, "%s: #in.signals=%zu, #out.signals=%zu, #in.hooks=%zu, #out.hooks=%zu, in.vectorize=%d, out.vectorize=%d", node_name(n), - vlist_length(&n->signals), - vlist_length(&n->in.hooks), n->in.vectorize, - vlist_length(&n->out.hooks), n->out.vectorize + vlist_length(&n->in.signals), vlist_length(&n->out.signals), + vlist_length(&n->in.hooks), vlist_length(&n->out.hooks), + n->in.vectorize, n->out.vectorize ); #ifdef WITH_NETEM diff --git a/lib/nodes/amqp.c b/lib/nodes/amqp.c index d177545e5..e644d679e 100644 --- a/lib/nodes/amqp.c +++ b/lib/nodes/amqp.c @@ -239,7 +239,7 @@ int amqp_start(struct node *n) amqp_rpc_reply_t rep; amqp_queue_declare_ok_t *r; - ret = io_init(&a->io, a->format, &n->signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET); + ret = io_init(&a->io, a->format, &n->in.signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET); if (ret) return ret; diff --git a/lib/nodes/file.c b/lib/nodes/file.c index f854ecb0d..4b05ac0c4 100644 --- a/lib/nodes/file.c +++ b/lib/nodes/file.c @@ -243,7 +243,7 @@ int file_start(struct node *n) if (f->flush) flags |= IO_FLUSH; - ret = io_init(&f->io, f->format, &n->signals, flags); + ret = io_init(&f->io, f->format, &n->in.signals, flags); if (ret) return ret; diff --git a/lib/nodes/iec61850_sv.c b/lib/nodes/iec61850_sv.c index 25c7d481a..20b70acae 100644 --- a/lib/nodes/iec61850_sv.c +++ b/lib/nodes/iec61850_sv.c @@ -74,7 +74,7 @@ static void iec61850_sv_listener(SVSubscriber subscriber, void *ctx, SVSubscribe smp->sequence = smpcnt; smp->flags = SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_DATA; smp->length = 0; - smp->signals = &n->signals; + smp->signals = &n->in.signals; if (SVSubscriber_ASDU_hasRefrTm(asdu)) { uint64_t refrtm = SVSubscriber_ASDU_getRefrTmAsMs(asdu); diff --git a/lib/nodes/loopback.c b/lib/nodes/loopback.c index 8dd864d6e..fcc151e85 100644 --- a/lib/nodes/loopback.c +++ b/lib/nodes/loopback.c @@ -74,7 +74,12 @@ int loopback_start(struct node *n) int ret; struct loopback *l = (struct loopback *) n->_vd; - ret = pool_init(&l->pool, l->queuelen, SAMPLE_LENGTH(vlist_length(&n->signals)), &memory_hugepage); + int len = MAX( + vlist_length(&n->in.signals), + vlist_length(&n->out.signals) + ); + + ret = pool_init(&l->pool, l->queuelen, SAMPLE_LENGTH(len), &memory_hugepage); if (ret) return ret; diff --git a/lib/nodes/mqtt.c b/lib/nodes/mqtt.c index 1cd0b410a..8125b1e1f 100644 --- a/lib/nodes/mqtt.c +++ b/lib/nodes/mqtt.c @@ -302,7 +302,7 @@ int mqtt_start(struct node *n) mosquitto_message_callback_set(m->client, mqtt_message_cb); mosquitto_subscribe_callback_set(m->client, mqtt_subscribe_cb); - ret = io_init(&m->io, m->format, &n->signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET); + ret = io_init(&m->io, m->format, &n->in.signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET); if (ret) return ret; @@ -310,7 +310,7 @@ int mqtt_start(struct node *n) if (ret) return ret; - ret = pool_init(&m->pool, 1024, SAMPLE_LENGTH(vlist_length(&n->signals)), &memory_hugepage); + ret = pool_init(&m->pool, 1024, SAMPLE_LENGTH(vlist_length(&n->in.signals)), &memory_hugepage); if (ret) return ret; diff --git a/lib/nodes/nanomsg.c b/lib/nodes/nanomsg.c index 990766430..0fd7576d2 100644 --- a/lib/nodes/nanomsg.c +++ b/lib/nodes/nanomsg.c @@ -153,7 +153,7 @@ int nanomsg_start(struct node *n) int ret; struct nanomsg *m = (struct nanomsg *) n->_vd; - ret = io_init(&m->io, m->format, &n->signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET); + ret = io_init(&m->io, m->format, &n->in.signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET); if (ret) return ret; diff --git a/lib/nodes/rtp.c b/lib/nodes/rtp.c index da024600b..8839ff0d2 100644 --- a/lib/nodes/rtp.c +++ b/lib/nodes/rtp.c @@ -338,7 +338,7 @@ int rtp_start(struct node *n) return ret; /* Initialize IO */ - ret = io_init(&r->io, r->format, &n->signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET); + ret = io_init(&r->io, r->format, &n->in.signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET); if (ret) return ret; diff --git a/lib/nodes/shmem.c b/lib/nodes/shmem.c index a68732ce4..2588db0c9 100644 --- a/lib/nodes/shmem.c +++ b/lib/nodes/shmem.c @@ -45,9 +45,11 @@ int shmem_parse(struct node *n, json_t *cfg) json_t *json_exec = NULL; json_error_t err; + int len = MAX(vlist_length(&n->in.signals), vlist_length(&n->out.signals)); + /* Default values */ shm->conf.queuelen = MAX(DEFAULT_SHMEM_QUEUELEN, n->in.vectorize); - shm->conf.samplelen = vlist_length(&n->signals); + shm->conf.samplelen = len; shm->conf.polling = false; shm->exec = NULL; @@ -144,7 +146,7 @@ int shmem_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re /** @todo: signal descriptions are currently not shared between processes */ for (int i = 0; i < recv; i++) - smps[i]->signals = &n->signals; + smps[i]->signals = &n->in.signals; return recv; } diff --git a/lib/nodes/signal_generator.c b/lib/nodes/signal_generator.c index 18812bf00..4fdff2263 100644 --- a/lib/nodes/signal_generator.c +++ b/lib/nodes/signal_generator.c @@ -86,7 +86,7 @@ static void signal_generator_init_signals(struct node *n) { struct signal_generator *s = (struct signal_generator *) n->_vd; - assert(vlist_length(&n->signals) == 0); + assert(vlist_length(&n->in.signals) == 0); for (int i = 0; i < s->values; i++) { struct signal *sig = alloc(sizeof(struct signal)); @@ -96,7 +96,7 @@ static void signal_generator_init_signals(struct node *n) sig->name = strdup(signal_generator_type_str(rtype)); sig->type = SIGNAL_TYPE_FLOAT; /* All generated signals are of type float */ - vlist_push(&n->signals, sig); + vlist_push(&n->in.signals, sig); } } @@ -226,7 +226,7 @@ int signal_generator_read(struct node *n, struct sample *smps[], unsigned cnt, u t->ts.origin = ts; t->sequence = s->counter; t->length = MIN(s->values, t->capacity); - t->signals = &n->signals; + t->signals = &n->in.signals; for (int i = 0; i < MIN(s->values, t->capacity); i++) { int rtype = (s->type != SIGNAL_GENERATOR_TYPE_MIXED) ? s->type : i % 7; diff --git a/lib/nodes/socket.c b/lib/nodes/socket.c index ae12b2ea8..9d8fb2c28 100644 --- a/lib/nodes/socket.c +++ b/lib/nodes/socket.c @@ -163,7 +163,7 @@ int socket_start(struct node *n) int ret; /* Initialize IO */ - ret = io_init(&s->io, s->format, &n->signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET); + ret = io_init(&s->io, s->format, &n->in.signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET); if (ret) return ret; diff --git a/lib/nodes/stats.c b/lib/nodes/stats.c index 643097919..c3135bcbc 100644 --- a/lib/nodes/stats.c +++ b/lib/nodes/stats.c @@ -50,42 +50,42 @@ static void stats_init_signals(struct node *n) sig = alloc(sizeof(struct signal)); sig->name = strf("%s.%s", desc->name, "total"); sig->type = SIGNAL_TYPE_INTEGER; - vlist_push(&n->signals, sig); + vlist_push(&n->in.signals, sig); /* Last */ sig = alloc(sizeof(struct signal)); sig->name = strf("%s.%s", desc->name, "last"); sig->unit = strdup(desc->unit); sig->type = SIGNAL_TYPE_FLOAT; - vlist_push(&n->signals, sig); + vlist_push(&n->in.signals, sig); /* Highest */ sig = alloc(sizeof(struct signal)); sig->name = strf("%s.%s", desc->name, "highest"); sig->unit = strdup(desc->unit); sig->type = SIGNAL_TYPE_FLOAT; - vlist_push(&n->signals, sig); + vlist_push(&n->in.signals, sig); /* Lowest */ sig = alloc(sizeof(struct signal)); sig->name = strf("%s.%s", desc->name, "lowest"); sig->unit = strdup(desc->unit); sig->type = SIGNAL_TYPE_FLOAT; - vlist_push(&n->signals, sig); + vlist_push(&n->in.signals, sig); /* Mean */ sig = alloc(sizeof(struct signal)); sig->name = strf("%s.%s", desc->name, "mean"); sig->unit = strdup(desc->unit); sig->type = SIGNAL_TYPE_FLOAT; - vlist_push(&n->signals, sig); + vlist_push(&n->in.signals, sig); /* Variance */ sig = alloc(sizeof(struct signal)); sig->name = strf("%s.%s", desc->name, "var"); sig->unit = strf("%s^2", desc->unit); // variance has squared unit of variable sig->type = SIGNAL_TYPE_FLOAT; - vlist_push(&n->signals, sig); + vlist_push(&n->in.signals, sig); } } diff --git a/lib/nodes/test_rtt.c b/lib/nodes/test_rtt.c index 01e5921ac..6cb89f99e 100644 --- a/lib/nodes/test_rtt.c +++ b/lib/nodes/test_rtt.c @@ -247,7 +247,7 @@ int test_rtt_start(struct node *n) return ret; } - ret = io_init(&t->io, t->format, &n->signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_DATA); + ret = io_init(&t->io, t->format, &n->in.signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_DATA); if (ret) return ret; diff --git a/lib/nodes/uldaq.c b/lib/nodes/uldaq.c index 005730a95..238c5de21 100644 --- a/lib/nodes/uldaq.c +++ b/lib/nodes/uldaq.c @@ -307,7 +307,7 @@ int uldaq_parse(struct node *n, json_t *cfg) u->device_interface_type = iftype; } - u->in.channel_count = vlist_length(&n->signals); + u->in.channel_count = vlist_length(&n->in.signals); u->in.queues = realloc(u->in.queues, sizeof(struct AiQueueElement) * u->in.channel_count); json_array_foreach(json_signals, i, json_signal) { @@ -443,8 +443,8 @@ int uldaq_check(struct node *n) return -1; } - for (size_t i = 0; i < vlist_length(&n->signals); i++) { - struct signal *s = (struct signal *) vlist_at(&n->signals, i); + for (size_t i = 0; i < vlist_length(&n->in.signals); i++) { + struct signal *s = (struct signal *) vlist_at(&n->in.signals, i); AiQueueElement *q = &u->in.queues[i]; if (s->type != SIGNAL_TYPE_FLOAT) { @@ -521,7 +521,7 @@ int uldaq_start(struct node *n) if (ret) return ret; - err = ulAInLoadQueue(u->device_handle, u->in.queues, vlist_length(&n->signals)); + err = ulAInLoadQueue(u->device_handle, u->in.queues, vlist_length(&n->in.signals)); if (err != ERR_NO_ERROR) { warning("Failed to load input queue to DAQ device for node '%s'", node_name(n)); return -1; @@ -636,7 +636,7 @@ int uldaq_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re } smp->length = u->in.channel_count; - smp->signals = &n->signals; + smp->signals = &n->in.signals; smp->sequence = u->sequence++; smp->flags = SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_DATA; } diff --git a/lib/nodes/zeromq.c b/lib/nodes/zeromq.c index 642fad8ad..dcb851f0c 100644 --- a/lib/nodes/zeromq.c +++ b/lib/nodes/zeromq.c @@ -257,7 +257,7 @@ int zeromq_start(struct node *n) int ret; struct zeromq *z = (struct zeromq *) n->_vd; - ret = io_init(&z->io, z->format, &n->signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET); + ret = io_init(&z->io, z->format, &n->in.signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET); if (ret) return ret; diff --git a/tests/unit/mapping.cpp b/tests/unit/mapping.cpp index df3958277..982f5ca37 100644 --- a/tests/unit/mapping.cpp +++ b/tests/unit/mapping.cpp @@ -47,9 +47,9 @@ Test(mapping, parse_nodes) struct node *n = new struct node; n->name = strdup(node_names[i]); - n->signals.state = STATE_DESTROYED; + n->in.signals.state = STATE_DESTROYED; - vlist_init(&n->signals); + vlist_init(&n->in.signals); for (unsigned j = 0; j < ARRAY_LEN(signal_names[i]); j++) { struct signal *sig; @@ -57,7 +57,7 @@ Test(mapping, parse_nodes) sig = signal_create(signal_names[i][j], nullptr, SIGNAL_TYPE_AUTO); cr_assert_not_null(sig); - vlist_push(&n->signals, sig); + vlist_push(&n->in.signals, sig); } vlist_push(&nodes, n);