1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

separated in/out signal descriptors

This commit is contained in:
Steffen Vogel 2019-02-06 13:11:57 +01:00
parent e1337c6101
commit 1740df596b
18 changed files with 90 additions and 85 deletions

View file

@ -58,6 +58,7 @@ struct node_direction {
int vectorize; /**< Number of messages to send / recv at once (scatter / gather) */
struct vlist hooks; /**< List of write hooks (struct hook). */
struct vlist signals; /**< Signal description. */
json_t *cfg; /**< A JSON object containing the configuration of the node. */
};
@ -67,8 +68,7 @@ struct node_direction {
* Every entity which exchanges messages is represented by a node.
* Nodes can be remote machines and simulators or locally running processes.
*/
struct node
{
struct node {
char *name; /**< A short identifier of the node, only used for configuration and logging */
enum state state;
@ -84,8 +84,6 @@ struct node
struct node_direction in, out;
struct vlist signals; /**< Signal meta data for data which is __received__ by node_read(). */
#ifdef WITH_NETEM
int mark; /**< Socket mark for netem, routing and filtering */

View file

@ -154,7 +154,7 @@ int mapping_parse_str(struct mapping_entry *me, const char *str, struct vlist *n
first_str = strtok(NULL, "-]");
if (first_str) {
if (me->node)
first = vlist_lookup_index(&me->node->signals, first_str);
first = vlist_lookup_index(&me->node->in.signals, first_str);
if (first < 0) {
char *endptr;
@ -168,14 +168,14 @@ int mapping_parse_str(struct mapping_entry *me, const char *str, struct vlist *n
else {
/* Map all signals */
me->data.offset = 0;
me->length = me->node ? vlist_length(&me->node->signals) : 0;
me->length = me->node ? vlist_length(&me->node->in.signals) : 0;
goto end;
}
last_str = strtok(NULL, "]");
if (last_str) {
if (me->node)
last = vlist_lookup_index(&me->node->signals, last_str);
last = vlist_lookup_index(&me->node->in.signals, last_str);
if (last < 0) {
char *endptr;
@ -451,8 +451,8 @@ int mapping_to_str(const struct mapping_entry *me, unsigned index, char **str)
break;
case MAPPING_TYPE_DATA:
if (me->node && index < vlist_length(&me->node->signals)) {
struct signal *s = vlist_at(&me->node->signals, index);
if (me->node && index < vlist_length(&me->node->in.signals)) {
struct signal *s = vlist_at(&me->node->in.signals, index);
strcatf(str, "data[%s]", s->name);
}

View file

@ -68,11 +68,16 @@ static int node_direction_init(struct node_direction *nd, struct node *n)
nd->vectorize = 1;
nd->builtin = 1;
nd->hooks.state = STATE_DESTROYED;
nd->signals.state = STATE_DESTROYED;
ret = vlist_init(&nd->hooks);
if (ret)
return ret;
ret = vlist_init(&nd->signals);
if (ret)
return ret;
return 0;
}
@ -86,6 +91,10 @@ static int node_direction_destroy(struct node_direction *nd, struct node *n)
return ret;
#endif
ret = vlist_destroy(&nd->signals, (dtor_cb_t) signal_decref, false);
if (ret)
return ret;
return ret;
}
@ -95,12 +104,14 @@ static int node_direction_parse(struct node_direction *nd, struct node *n, json_
json_error_t err;
json_t *json_hooks = NULL;
json_t *json_signals = NULL;
nd->cfg = cfg;
nd->enabled = 1;
ret = json_unpack_ex(cfg, &err, 0, "{ s?: o, s?: i, s?: b, s?: b }",
ret = json_unpack_ex(cfg, &err, 0, "{ s?: o, s?: o, s?: i, s?: b, s?: b }",
"hooks", &json_hooks,
"signals", &json_signals,
"vectorize", &nd->vectorize,
"builtin", &nd->builtin,
"enabled", &nd->enabled
@ -108,6 +119,35 @@ static int node_direction_parse(struct node_direction *nd, struct node *n, json_
if (ret)
jerror(&err, "Failed to parse node %s", node_name(n));
if (n->_vt->flags & NODE_TYPE_PROVIDES_SIGNALS) {
if (json_signals)
error("Node %s does not support signal definitions", node_name(n));
}
else if (json_is_array(json_signals)) {
ret = signal_list_parse(&nd->signals, json_signals);
if (ret)
error("Failed to parse signal definition of node %s", node_name(n));
}
else {
int count = DEFAULT_SAMPLE_LENGTH;
const char *type_str = "float";
if (json_is_object(json_signals)) {
json_unpack_ex(json_signals, &err, 0, "{ s: i, s: s }",
"count", &count,
"type", &type_str
);
}
else
warning("No signal definition found for node %s. Using the default config of %d floating point signals.", node_name(n), DEFAULT_SAMPLE_LENGTH);
int type = signal_type_from_str(type_str);
if (type < 0)
error("Invalid signal type %s", type_str);
signal_list_generate(&nd->signals, count, type);
}
#ifdef WITH_HOOKS
int m = nd == &n->out
? HOOK_NODE_WRITE
@ -187,9 +227,6 @@ int node_init(struct node *n, struct node_type *vt)
n->tc_classifier = NULL;
#endif /* WITH_NETEM */
n->signals.state = STATE_DESTROYED;
vlist_init(&n->signals);
/* Default values */
ret = node_direction_init(&n->in, n);
if (ret)
@ -268,38 +305,6 @@ int node_parse(struct node *n, json_t *json, const char *name)
#endif /* WITH_NETEM */
}
if (nt->flags & NODE_TYPE_PROVIDES_SIGNALS) {
if (json_signals)
error("Node %s does not support signal definitions", node_name(n));
}
else if (json_signals) {
if (json_is_array(json_signals)) {
ret = signal_list_parse(&n->signals, json_signals);
if (ret)
error("Failed to parse signal definition of node %s", node_name(n));
}
else {
int count;
const char *type_str;
json_unpack_ex(json_signals, &err, 0, "{ s: i, s: s }",
"count", &count,
"type", &type_str
);
int type = signal_type_from_str(type_str);
if (type < 0)
error("Invalid signal type %s", type_str);
signal_list_generate(&n->signals, count, type);
}
}
else {
warning("No signal definition found for node %s. Using the default config of %d floating point signals.", node_name(n), DEFAULT_SAMPLE_LENGTH);
signal_list_generate(&n->signals, DEFAULT_SAMPLE_LENGTH, SIGNAL_TYPE_FLOAT);
}
struct {
const char *str;
struct node_direction *dir;
@ -308,7 +313,7 @@ int node_parse(struct node *n, json_t *json, const char *name)
{ "out", &n->out }
};
const char *fields[] = { "builtin", "vectorize", "hooks" };
const char *fields[] = { "signals", "builtin", "vectorize", "hooks" };
for (int j = 0; j < ARRAY_LEN(dirs); j++) {
json_t *json_dir = json_object_get(json, dirs[j].str);
@ -492,16 +497,11 @@ int node_restart(struct node *n)
return 0;
}
int node_destroy(struct node *n)
{
int ret;
assert(n->state != STATE_DESTROYED && n->state != STATE_STARTED);
ret = vlist_destroy(&n->signals, (dtor_cb_t) signal_decref, false);
if (ret)
return ret;
ret = node_direction_destroy(&n->in, n);
if (ret)
return ret;
@ -645,11 +645,11 @@ char * node_name_long(struct node *n)
if (node_type(n)->print) {
struct node_type *vt = node_type(n);
strcatf(&n->_name_long, "%s: #in.signals=%zu, #in.hooks=%zu, in.vectorize=%d, #out.hooks=%zu, out.vectorize=%d",
strcatf(&n->_name_long, "%s: #in.signals=%zu, #out.signals=%zu, #in.hooks=%zu, #out.hooks=%zu, in.vectorize=%d, out.vectorize=%d",
node_name(n),
vlist_length(&n->signals),
vlist_length(&n->in.hooks), n->in.vectorize,
vlist_length(&n->out.hooks), n->out.vectorize
vlist_length(&n->in.signals), vlist_length(&n->out.signals),
vlist_length(&n->in.hooks), vlist_length(&n->out.hooks),
n->in.vectorize, n->out.vectorize
);
#ifdef WITH_NETEM

View file

@ -239,7 +239,7 @@ int amqp_start(struct node *n)
amqp_rpc_reply_t rep;
amqp_queue_declare_ok_t *r;
ret = io_init(&a->io, a->format, &n->signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET);
ret = io_init(&a->io, a->format, &n->in.signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET);
if (ret)
return ret;

View file

@ -243,7 +243,7 @@ int file_start(struct node *n)
if (f->flush)
flags |= IO_FLUSH;
ret = io_init(&f->io, f->format, &n->signals, flags);
ret = io_init(&f->io, f->format, &n->in.signals, flags);
if (ret)
return ret;

View file

@ -74,7 +74,7 @@ static void iec61850_sv_listener(SVSubscriber subscriber, void *ctx, SVSubscribe
smp->sequence = smpcnt;
smp->flags = SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_DATA;
smp->length = 0;
smp->signals = &n->signals;
smp->signals = &n->in.signals;
if (SVSubscriber_ASDU_hasRefrTm(asdu)) {
uint64_t refrtm = SVSubscriber_ASDU_getRefrTmAsMs(asdu);

View file

@ -74,7 +74,12 @@ int loopback_start(struct node *n)
int ret;
struct loopback *l = (struct loopback *) n->_vd;
ret = pool_init(&l->pool, l->queuelen, SAMPLE_LENGTH(vlist_length(&n->signals)), &memory_hugepage);
int len = MAX(
vlist_length(&n->in.signals),
vlist_length(&n->out.signals)
);
ret = pool_init(&l->pool, l->queuelen, SAMPLE_LENGTH(len), &memory_hugepage);
if (ret)
return ret;

View file

@ -302,7 +302,7 @@ int mqtt_start(struct node *n)
mosquitto_message_callback_set(m->client, mqtt_message_cb);
mosquitto_subscribe_callback_set(m->client, mqtt_subscribe_cb);
ret = io_init(&m->io, m->format, &n->signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET);
ret = io_init(&m->io, m->format, &n->in.signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET);
if (ret)
return ret;
@ -310,7 +310,7 @@ int mqtt_start(struct node *n)
if (ret)
return ret;
ret = pool_init(&m->pool, 1024, SAMPLE_LENGTH(vlist_length(&n->signals)), &memory_hugepage);
ret = pool_init(&m->pool, 1024, SAMPLE_LENGTH(vlist_length(&n->in.signals)), &memory_hugepage);
if (ret)
return ret;

View file

@ -153,7 +153,7 @@ int nanomsg_start(struct node *n)
int ret;
struct nanomsg *m = (struct nanomsg *) n->_vd;
ret = io_init(&m->io, m->format, &n->signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET);
ret = io_init(&m->io, m->format, &n->in.signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET);
if (ret)
return ret;

View file

@ -338,7 +338,7 @@ int rtp_start(struct node *n)
return ret;
/* Initialize IO */
ret = io_init(&r->io, r->format, &n->signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET);
ret = io_init(&r->io, r->format, &n->in.signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET);
if (ret)
return ret;

View file

@ -45,9 +45,11 @@ int shmem_parse(struct node *n, json_t *cfg)
json_t *json_exec = NULL;
json_error_t err;
int len = MAX(vlist_length(&n->in.signals), vlist_length(&n->out.signals));
/* Default values */
shm->conf.queuelen = MAX(DEFAULT_SHMEM_QUEUELEN, n->in.vectorize);
shm->conf.samplelen = vlist_length(&n->signals);
shm->conf.samplelen = len;
shm->conf.polling = false;
shm->exec = NULL;
@ -144,7 +146,7 @@ int shmem_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re
/** @todo: signal descriptions are currently not shared between processes */
for (int i = 0; i < recv; i++)
smps[i]->signals = &n->signals;
smps[i]->signals = &n->in.signals;
return recv;
}

View file

@ -86,7 +86,7 @@ static void signal_generator_init_signals(struct node *n)
{
struct signal_generator *s = (struct signal_generator *) n->_vd;
assert(vlist_length(&n->signals) == 0);
assert(vlist_length(&n->in.signals) == 0);
for (int i = 0; i < s->values; i++) {
struct signal *sig = alloc(sizeof(struct signal));
@ -96,7 +96,7 @@ static void signal_generator_init_signals(struct node *n)
sig->name = strdup(signal_generator_type_str(rtype));
sig->type = SIGNAL_TYPE_FLOAT; /* All generated signals are of type float */
vlist_push(&n->signals, sig);
vlist_push(&n->in.signals, sig);
}
}
@ -226,7 +226,7 @@ int signal_generator_read(struct node *n, struct sample *smps[], unsigned cnt, u
t->ts.origin = ts;
t->sequence = s->counter;
t->length = MIN(s->values, t->capacity);
t->signals = &n->signals;
t->signals = &n->in.signals;
for (int i = 0; i < MIN(s->values, t->capacity); i++) {
int rtype = (s->type != SIGNAL_GENERATOR_TYPE_MIXED) ? s->type : i % 7;

View file

@ -163,7 +163,7 @@ int socket_start(struct node *n)
int ret;
/* Initialize IO */
ret = io_init(&s->io, s->format, &n->signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET);
ret = io_init(&s->io, s->format, &n->in.signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET);
if (ret)
return ret;

View file

@ -50,42 +50,42 @@ static void stats_init_signals(struct node *n)
sig = alloc(sizeof(struct signal));
sig->name = strf("%s.%s", desc->name, "total");
sig->type = SIGNAL_TYPE_INTEGER;
vlist_push(&n->signals, sig);
vlist_push(&n->in.signals, sig);
/* Last */
sig = alloc(sizeof(struct signal));
sig->name = strf("%s.%s", desc->name, "last");
sig->unit = strdup(desc->unit);
sig->type = SIGNAL_TYPE_FLOAT;
vlist_push(&n->signals, sig);
vlist_push(&n->in.signals, sig);
/* Highest */
sig = alloc(sizeof(struct signal));
sig->name = strf("%s.%s", desc->name, "highest");
sig->unit = strdup(desc->unit);
sig->type = SIGNAL_TYPE_FLOAT;
vlist_push(&n->signals, sig);
vlist_push(&n->in.signals, sig);
/* Lowest */
sig = alloc(sizeof(struct signal));
sig->name = strf("%s.%s", desc->name, "lowest");
sig->unit = strdup(desc->unit);
sig->type = SIGNAL_TYPE_FLOAT;
vlist_push(&n->signals, sig);
vlist_push(&n->in.signals, sig);
/* Mean */
sig = alloc(sizeof(struct signal));
sig->name = strf("%s.%s", desc->name, "mean");
sig->unit = strdup(desc->unit);
sig->type = SIGNAL_TYPE_FLOAT;
vlist_push(&n->signals, sig);
vlist_push(&n->in.signals, sig);
/* Variance */
sig = alloc(sizeof(struct signal));
sig->name = strf("%s.%s", desc->name, "var");
sig->unit = strf("%s^2", desc->unit); // variance has squared unit of variable
sig->type = SIGNAL_TYPE_FLOAT;
vlist_push(&n->signals, sig);
vlist_push(&n->in.signals, sig);
}
}

View file

@ -247,7 +247,7 @@ int test_rtt_start(struct node *n)
return ret;
}
ret = io_init(&t->io, t->format, &n->signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_DATA);
ret = io_init(&t->io, t->format, &n->in.signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_DATA);
if (ret)
return ret;

View file

@ -307,7 +307,7 @@ int uldaq_parse(struct node *n, json_t *cfg)
u->device_interface_type = iftype;
}
u->in.channel_count = vlist_length(&n->signals);
u->in.channel_count = vlist_length(&n->in.signals);
u->in.queues = realloc(u->in.queues, sizeof(struct AiQueueElement) * u->in.channel_count);
json_array_foreach(json_signals, i, json_signal) {
@ -443,8 +443,8 @@ int uldaq_check(struct node *n)
return -1;
}
for (size_t i = 0; i < vlist_length(&n->signals); i++) {
struct signal *s = (struct signal *) vlist_at(&n->signals, i);
for (size_t i = 0; i < vlist_length(&n->in.signals); i++) {
struct signal *s = (struct signal *) vlist_at(&n->in.signals, i);
AiQueueElement *q = &u->in.queues[i];
if (s->type != SIGNAL_TYPE_FLOAT) {
@ -521,7 +521,7 @@ int uldaq_start(struct node *n)
if (ret)
return ret;
err = ulAInLoadQueue(u->device_handle, u->in.queues, vlist_length(&n->signals));
err = ulAInLoadQueue(u->device_handle, u->in.queues, vlist_length(&n->in.signals));
if (err != ERR_NO_ERROR) {
warning("Failed to load input queue to DAQ device for node '%s'", node_name(n));
return -1;
@ -636,7 +636,7 @@ int uldaq_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re
}
smp->length = u->in.channel_count;
smp->signals = &n->signals;
smp->signals = &n->in.signals;
smp->sequence = u->sequence++;
smp->flags = SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_DATA;
}

View file

@ -257,7 +257,7 @@ int zeromq_start(struct node *n)
int ret;
struct zeromq *z = (struct zeromq *) n->_vd;
ret = io_init(&z->io, z->format, &n->signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET);
ret = io_init(&z->io, z->format, &n->in.signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET);
if (ret)
return ret;

View file

@ -47,9 +47,9 @@ Test(mapping, parse_nodes)
struct node *n = new struct node;
n->name = strdup(node_names[i]);
n->signals.state = STATE_DESTROYED;
n->in.signals.state = STATE_DESTROYED;
vlist_init(&n->signals);
vlist_init(&n->in.signals);
for (unsigned j = 0; j < ARRAY_LEN(signal_names[i]); j++) {
struct signal *sig;
@ -57,7 +57,7 @@ Test(mapping, parse_nodes)
sig = signal_create(signal_names[i][j], nullptr, SIGNAL_TYPE_AUTO);
cr_assert_not_null(sig);
vlist_push(&n->signals, sig);
vlist_push(&n->in.signals, sig);
}
vlist_push(&nodes, n);