diff --git a/etc/examples/nodes/ethernet.conf b/etc/examples/nodes/ethernet.conf index 547dcd9b2..e4e4e26bf 100644 --- a/etc/examples/nodes/ethernet.conf +++ b/etc/examples/nodes/ethernet.conf @@ -6,10 +6,10 @@ nodes = { layer = "eth", in = { - address = "12:34:56:78:90:AB%eth0:12002" + address = "12:34:56:78:90:AB%em1:12002" }, out = { - address = "12:34:56:78:90:AB%eth0:12002" + address = "12:34:56:78:90:AB%em1:12002" } }, unix_domain_node = { diff --git a/etc/examples/nodes/infiniband.conf b/etc/examples/nodes/infiniband.conf index 783f455a2..409b73e79 100644 --- a/etc/examples/nodes/infiniband.conf +++ b/etc/examples/nodes/infiniband.conf @@ -10,8 +10,8 @@ nodes = { signal = "mixed", values = 3, - frequency = 3, - rate = 100000, + frequency = 3.0, + rate = 100000.0, limit = 100000, }, diff --git a/etc/examples/nodes/loopback.conf b/etc/examples/nodes/loopback.conf index cefc752e1..1ab457568 100644 --- a/etc/examples/nodes/loopback.conf +++ b/etc/examples/nodes/loopback.conf @@ -2,8 +2,7 @@ nodes = { loopback_node = { type = "loopback", # A loopback node will receive exactly the same data which has been sent to it. # The internal implementation is based on queue. - queuelen = 1024, # The queue length of the internal queue which buffers the samples. - samplelen = 64, # Each buffered sample can contain up to 64 values. + queuelen = 1024, # The queue length of the internal queue which buffers the samples. mode = "polling" # Use busy polling for synchronization of the read and write side of the queue } } diff --git a/etc/examples/nodes/signal_generator.conf b/etc/examples/nodes/signal_generator.conf index dbaeaaf17..318d7def4 100644 --- a/etc/examples/nodes/signal_generator.conf +++ b/etc/examples/nodes/signal_generator.conf @@ -2,9 +2,11 @@ nodes = { signal_node = { type = "signal", - signal = [ "sine", "pulse" ], # One of "sine", "square", "ramp", "triangle", "random", "mixed", "counter" - values = 2, # Number of values per sample - amplitude = [ 1.2, 0, 4 ], # Amplitude of generated signals + # One of "sine", "square", "ramp", "triangle", "random", "mixed", "counter" + signal = [ "sine", "pulse", "square" ], + + values = 3, # Number of values per sample + amplitude = [ 1.2, 0.0, 4.0 ], # Amplitude of generated signals frequency = 10, # Frequency of generated signals stddev = 2, # Standard deviation of random signals (normal distributed) rate = 10.0, # Sample rate diff --git a/etc/labs/lab11.conf b/etc/labs/lab11.conf index 82ee5e03e..f0df98819 100644 --- a/etc/labs/lab11.conf +++ b/etc/labs/lab11.conf @@ -57,7 +57,7 @@ paths = ( "rtds_gtnet2.ts.origin", "rtds_gtnet2.hdr.sequence", - "rtds_gtnet2.data[0-6]", + "rtds_gtnet2.data[0-6]" ], out = [ diff --git a/include/villas/config.hpp b/include/villas/config.hpp index 3b8867ca8..c504616bf 100644 --- a/include/villas/config.hpp +++ b/include/villas/config.hpp @@ -84,9 +84,9 @@ public: ~Config(); - json_t * load(std::FILE *f, bool resolveIncludes=true, bool resolveEnvVars=true); + json_t * load(std::FILE *f, bool resolveIncludes = true, bool resolveEnvVars = true); - json_t * load(const std::string &u, bool resolveIncludes=true, bool resolveEnvVars=true); + json_t * load(const std::string &u, bool resolveIncludes = true, bool resolveEnvVars = true); }; } /* namespace node */ diff --git a/include/villas/node.h b/include/villas/node.h index bea6b4528..671347d74 100644 --- a/include/villas/node.h +++ b/include/villas/node.h @@ -51,7 +51,7 @@ struct rtnl_cls; #endif /* WITH_NETEM */ -#define RE_NODE_NAME "[a-z0-9_-]{3,32}" +#define RE_NODE_NAME "[a-z0-9_-]{2,32}" /** The data structure for a node. * diff --git a/include/villas/nodes/loopback_internal.hpp b/include/villas/nodes/loopback_internal.hpp index 3c128b2be..1fb1d872f 100644 --- a/include/villas/nodes/loopback_internal.hpp +++ b/include/villas/nodes/loopback_internal.hpp @@ -41,7 +41,6 @@ struct sample; */ struct loopback_internal { int queuelen; - enum QueueSignalledMode mode; struct queue_signalled queue; struct vnode *source; diff --git a/lib/config.cpp b/lib/config.cpp index 43d114266..4fbfacbe4 100644 --- a/lib/config.cpp +++ b/lib/config.cpp @@ -51,7 +51,7 @@ Config::Config() : Config::Config(const std::string &u) : Config() { - load(u); + root = load(u); } Config::~Config() @@ -62,7 +62,7 @@ Config::~Config() json_t * Config::load(std::FILE *f, bool resolveInc, bool resolveEnvVars) { - root = decode(f); + json_t *root = decode(f); if (resolveInc) root = resolveIncludes(root); @@ -87,7 +87,7 @@ json_t * Config::load(const std::string &u, bool resolveInc, bool resolveEnvVars f = af->file; } - root = load(f, resolveInc, resolveEnvVars); + json_t *root = load(f, resolveInc, resolveEnvVars); if (af) afclose(af); @@ -182,23 +182,23 @@ json_t * Config::libconfigDecode(FILE *f) } #endif /* WITH_CONFIG */ -json_t * Config::walkStrings(json_t *in, str_walk_fcn_t cb) +json_t * Config::walkStrings(json_t *root, str_walk_fcn_t cb) { const char *key; size_t index; json_t *val, *new_val, *new_root; - switch (json_typeof(in)) { + switch (json_typeof(root)) { case JSON_STRING: - return cb(in); + return cb(root); case JSON_OBJECT: new_root = json_object(); - json_object_foreach(in, key, val) { + json_object_foreach(root, key, val) { new_val = walkStrings(val, cb); - json_object_set(new_root, key, new_val); + json_object_set_new(new_root, key, new_val); } return new_root; @@ -206,16 +206,16 @@ json_t * Config::walkStrings(json_t *in, str_walk_fcn_t cb) case JSON_ARRAY: new_root = json_array(); - json_array_foreach(in, index, val) { + json_array_foreach(root, index, val) { new_val = walkStrings(val, cb); - json_array_append(new_root, new_val); + json_array_append_new(new_root, new_val); } return new_root; default: - return in; + return root; }; } diff --git a/lib/hooks/CMakeLists.txt b/lib/hooks/CMakeLists.txt index a93614196..ee5a4d78e 100644 --- a/lib/hooks/CMakeLists.txt +++ b/lib/hooks/CMakeLists.txt @@ -24,7 +24,7 @@ set(HOOK_SRC average.cpp cast.cpp decimate.cpp - #dp.cpp + dp.cpp drop.cpp dump.cpp ebm.cpp diff --git a/lib/nodes/fpga.cpp b/lib/nodes/fpga.cpp index 11d9c57b7..f11e8351f 100644 --- a/lib/nodes/fpga.cpp +++ b/lib/nodes/fpga.cpp @@ -112,8 +112,9 @@ int fpga_init(struct vnode *n) new (&f->dma) std::shared_ptr(); new (&f->intf) std::shared_ptr(); - new (&f->in.mem) std::shared_ptr(); - new (&f->out.mem) std::shared_ptr(); + // TODO: fixme + // new (&f->in.mem) std::shared_ptr(); + // new (&f->out.mem) std::shared_ptr(); return 0; } diff --git a/lib/nodes/loopback.cpp b/lib/nodes/loopback.cpp index 94a1bb5fe..586967fe4 100644 --- a/lib/nodes/loopback.cpp +++ b/lib/nodes/loopback.cpp @@ -52,7 +52,7 @@ int loopback_parse(struct vnode *n, json_t *cfg) json_error_t err; int ret; - ret = json_unpack_ex(cfg, &err, 0, "{ s?: s }", + ret = json_unpack_ex(cfg, &err, 0, "{ s?: i, s?: s }", "queuelen", &l->queuelen, "mode", &mode_str ); diff --git a/lib/nodes/loopback_internal.cpp b/lib/nodes/loopback_internal.cpp index 69ea56265..2ba8d5610 100644 --- a/lib/nodes/loopback_internal.cpp +++ b/lib/nodes/loopback_internal.cpp @@ -38,7 +38,6 @@ int loopback_internal_init(struct vnode *n) { struct loopback_internal *l = (struct loopback_internal *) n->_vd; - l->mode = QueueSignalledMode::EVENTFD; l->queuelen = DEFAULT_QUEUE_LENGTH; return 0; @@ -46,8 +45,13 @@ int loopback_internal_init(struct vnode *n) int loopback_internal_prepare(struct vnode *n) { + int ret; struct loopback_internal *l = (struct loopback_internal *) n->_vd; + ret = signal_list_copy(&n->in.signals, &l->source->in.signals); + if (ret) + return -1; + return queue_signalled_init(&l->queue, l->queuelen, memory_default, QueueSignalledMode::EVENTFD); } @@ -84,16 +88,6 @@ int loopback_internal_write(struct vnode *n, struct sample *smps[], unsigned cnt return queue_signalled_push_many(&l->queue, (void **) smps, cnt); } -char * loopback_internal_print(struct vnode *n) -{ - struct loopback_internal *l = (struct loopback_internal *) n->_vd; - char *buf = nullptr; - - strcatf(&buf, "queuelen=%d", l->queuelen); - - return buf; -} - int loopback_internal_poll_fds(struct vnode *n, int fds[]) { struct loopback_internal *l = (struct loopback_internal *) n->_vd; @@ -108,25 +102,21 @@ struct vnode * loopback_internal_create(struct vnode *orig) int ret; struct vnode *n; struct loopback_internal *l; - + n = new struct vnode; if (!n) throw MemoryAllocationError(); - l = (struct loopback_internal *) n->_vd; - ret = node_init(n, &p.node); if (ret) return nullptr; + l = (struct loopback_internal *) n->_vd; + l->source = orig; asprintf(&n->name, "%s_lo%zu", node_name_short(orig), vlist_length(&orig->sources)); - ret = signal_list_copy(&n->in.signals, &orig->in.signals); - if (ret) - return nullptr; - return n; } @@ -139,7 +129,6 @@ static void register_plugin() { p.node.vectorize = 0; p.node.flags = (int) NodeFlags::PROVIDES_SIGNALS | (int) NodeFlags::INTERNAL; p.node.size = sizeof(struct loopback_internal); - p.node.print = loopback_internal_print; p.node.prepare = loopback_internal_prepare; p.node.init = loopback_internal_init; p.node.destroy = loopback_internal_destroy; diff --git a/lib/nodes/ngsi.cpp b/lib/nodes/ngsi.cpp index e5eed207f..9073d6a85 100644 --- a/lib/nodes/ngsi.cpp +++ b/lib/nodes/ngsi.cpp @@ -869,7 +869,7 @@ static void register_plugin() { p.node.reverse = ngsi_reverse; int ret = vlist_init(&p.node.instances); - if (ret) + if (!ret) vlist_init_and_push(&plugins, &p); } diff --git a/lib/nodes/signal_generator.cpp b/lib/nodes/signal_generator.cpp index 19ec675d2..7bcdd89fd 100644 --- a/lib/nodes/signal_generator.cpp +++ b/lib/nodes/signal_generator.cpp @@ -259,25 +259,23 @@ int signal_generator_parse(struct vnode *n, json_t *cfg) size_t i; json_t *json_value; json_array_foreach(a.json, i, json_value) { - if (!json_is_real(json_value)) - throw ConfigError(json_value, "node-config-node-signal", "Values must gives as array of float values!"); + if (!json_is_number(json_value)) + throw ConfigError(json_value, "node-config-node-signal", "Values must gives as array of integer or float values!"); - (*a.array)[i] = json_real_value(json_value); + (*a.array)[i] = json_number_value(json_value); } break; + case JSON_INTEGER: case JSON_REAL: - if (!json_is_real(a.json)) - throw ConfigError(a.json, "node-config-node-signal", "Values must gives as array of float values!"); - for (size_t i = 0; i < s->values; i++) - (*a.array)[i] = json_real_value(a.json); + (*a.array)[i] = json_number_value(a.json); break; default: - throw ConfigError(a.json, "node-config-node-signal", "Values must given as array or scalar float value!"); + throw ConfigError(a.json, "node-config-node-signal", "Values must given as array or scalar integer or float value!"); } } else { @@ -398,7 +396,7 @@ int signal_generator_read(struct vnode *n, struct sample *smps[], unsigned cnt, } if (s->limit > 0 && s->counter >= (unsigned) s->limit) { - info("Reached limit."); + info("Node %s reached limit.", node_name(n)); n->state = State::STOPPING; @@ -437,9 +435,13 @@ int signal_generator_poll_fds(struct vnode *n, int fds[]) { struct signal_generator *s = (struct signal_generator *) n->_vd; - fds[0] = s->task.getFD(); + if (s->rt) { + fds[0] = s->task.getFD(); - return 1; + return 1; + } + else + return 0; } static struct plugin p; diff --git a/lib/nodes/stats.cpp b/lib/nodes/stats.cpp index 0ad2ea0f8..61e2353e4 100644 --- a/lib/nodes/stats.cpp +++ b/lib/nodes/stats.cpp @@ -91,6 +91,32 @@ int stats_node_type_start(villas::node::SuperNode *sn) return 0; } +int stats_node_prepare(struct vnode *n) +{ + struct stats_node *s = (struct stats_node *) n->_vd; + + assert(vlist_length(&n->in.signals) == 0); + + /* Generate signal list */ + for (size_t i = 0; i < vlist_length(&s->signals); i++) { + struct stats_node_signal *stats_sig = (struct stats_node_signal *) vlist_at(&s->signals, i); + struct signal *sig; + + const char *metric = Stats::metrics[stats_sig->metric].name; + const char *type = Stats::types[stats_sig->type].name; + + auto name = fmt::format("{}.{}.{}", stats_sig->node_str, metric, type); + + sig = signal_create(name.c_str(), + Stats::metrics[stats_sig->metric].unit, + Stats::types[stats_sig->type].signal_type); + + vlist_push(&n->in.signals, sig); + } + + return 0; +} + int stats_node_start(struct vnode *n) { struct stats_node *s = (struct stats_node *) n->_vd; @@ -176,10 +202,7 @@ int stats_node_parse(struct vnode *n, json_t *cfg) error("Setting 'in.signals' of node %s must be an array", node_name(n)); json_array_foreach(json_signals, i, json_signal) { - struct signal *sig = (struct signal *) vlist_at(&n->in.signals, i); - struct stats_node_signal *stats_sig; - - stats_sig = new struct stats_node_signal; + auto *stats_sig = new struct stats_node_signal; if (!stats_sig) throw MemoryAllocationError(); @@ -187,19 +210,6 @@ int stats_node_parse(struct vnode *n, json_t *cfg) if (ret) error("Failed to parse statistics signal definition of node %s", node_name(n)); - if (!sig->name) { - const char *metric = Stats::metrics[stats_sig->metric].name; - const char *type = Stats::types[stats_sig->type].name; - - sig->name = strf("%s.%s.%s", stats_sig->node_str, metric, type); - } - - if (!sig->unit) - sig->unit = strdup(Stats::metrics[stats_sig->metric].unit); - - if (sig->type != Stats::types[stats_sig->type].signal_type) - error("Invalid type for signal %zu in node %s", i, node_name(n)); - vlist_push(&s->signals, stats_sig); } @@ -252,13 +262,14 @@ static void register_plugin() { p.type = PluginType::NODE; p.node.instances.state = State::DESTROYED; p.node.vectorize = 1; - p.node.flags = 0; + p.node.flags = (int) NodeFlags::PROVIDES_SIGNALS; p.node.size = sizeof(struct stats_node); p.node.type.start = stats_node_type_start; p.node.parse = stats_node_parse; p.node.init = stats_node_init; p.node.destroy = stats_node_destroy; p.node.print = stats_node_print; + p.node.prepare = stats_node_prepare; p.node.start = stats_node_start; p.node.stop = stats_node_stop; p.node.read = stats_node_read; diff --git a/lib/nodes/test_rtt.cpp b/lib/nodes/test_rtt.cpp index 5edb7d057..f8ab96e6b 100644 --- a/lib/nodes/test_rtt.cpp +++ b/lib/nodes/test_rtt.cpp @@ -228,6 +228,9 @@ int test_rtt_parse(struct vnode *n, json_t *cfg) if (!c) throw MemoryAllocationError(); + c->filename = nullptr; + c->filename_formatted = nullptr; + c->rate = rates[i]; c->values = values[j]; @@ -461,7 +464,7 @@ static void register_plugin() { p.node.write = test_rtt_write; int ret = vlist_init(&p.node.instances); - if (ret) + if (!ret) vlist_init_and_push(&plugins, &p); } diff --git a/lib/nodes/uldaq.cpp b/lib/nodes/uldaq.cpp index b45516627..5bd63fd25 100644 --- a/lib/nodes/uldaq.cpp +++ b/lib/nodes/uldaq.cpp @@ -662,7 +662,7 @@ static void register_plugin() { p.node.read = uldaq_read; int ret = vlist_init(&p.node.instances); - if (ret) + if (!ret) vlist_init_and_push(&plugins, &p); } diff --git a/lib/super_node.cpp b/lib/super_node.cpp index 664dc6272..4da961480 100644 --- a/lib/super_node.cpp +++ b/lib/super_node.cpp @@ -48,7 +48,7 @@ using namespace villas::utils; SuperNode::SuperNode() : state(State::INITIALIZED), - idleStop(false), + idleStop(true), #ifdef WITH_API api(this), #endif @@ -92,12 +92,12 @@ SuperNode::SuperNode() : void SuperNode::parse(const std::string &u) { - config.load(u); + config.root = config.load(u); parse(config.root); } -void SuperNode::parse(json_t *cfg) +void SuperNode::parse(json_t *root) { int ret; const char *nme = nullptr; @@ -107,14 +107,12 @@ void SuperNode::parse(json_t *cfg) json_t *json_nodes = nullptr; json_t *json_paths = nullptr; json_t *json_logging = nullptr; - json_t *json_web = nullptr; + json_t *json_http = nullptr; json_error_t err; - idleStop = true; - - ret = json_unpack_ex(cfg, &err, JSON_STRICT, "{ s?: o, s?: o, s?: o, s?: o, s?: i, s?: i, s?: i, s?: s, s?: b }", - "http", &json_web, + ret = json_unpack_ex(root, &err, 0, "{ s?: o, s?: o, s?: o, s?: o, s?: i, s?: i, s?: i, s?: s, s?: b }", + "http", &json_http, "logging", &json_logging, "nodes", &json_nodes, "paths", &json_paths, @@ -125,14 +123,14 @@ void SuperNode::parse(json_t *cfg) "idle_stop", &idleStop ); if (ret) - throw ConfigError(cfg, err, "node-config", "Unpacking top-level config failed"); + throw ConfigError(root, err, "node-config", "Unpacking top-level config failed"); if (nme) name = nme; #ifdef WITH_WEB - if (json_web) - web.parse(json_web); + if (json_http) + web.parse(json_http); #endif /* WITH_WEB */ if (json_logging) @@ -155,7 +153,7 @@ void SuperNode::parse(json_t *cfg) ret = json_unpack_ex(json_node, &err, 0, "{ s: s }", "type", &type); if (ret) - throw ConfigError(cfg, err, "node-config-node-type", "Failed to parse type of node '{}'", name); + throw ConfigError(root, err, "node-config-node-type", "Failed to parse type of node '{}'", name); nt = node_type_lookup(type); if (!nt) @@ -359,6 +357,15 @@ void SuperNode::prepare() prepareNodes(); preparePaths(); + for (size_t i = 0; i < vlist_length(&nodes); i++) { + auto *n = (struct vnode *) vlist_at(&nodes, i); + if (vlist_length(&n->sources) == 0 && + vlist_length(&n->destinations) == 0) { + logger->info("Node {} is not used by any path. Disabling..."); + n->enabled = false; + } + } + state = State::PREPARED; } diff --git a/src/villas-test-config.cpp b/src/villas-test-config.cpp index c85ca3efb..12101f302 100644 --- a/src/villas-test-config.cpp +++ b/src/villas-test-config.cpp @@ -62,8 +62,9 @@ protected: << " CONFIG is the path to an optional configuration file" << std::endl << " OPTIONS is one or more of the following options:" << std::endl << " -d LVL set debug level" << std::endl - << " -V show version and exit" << std::endl - << " -h show usage and exit" << std::endl << std::endl; + << " -V show version and exit" << std::endl + << " -c perform plausability checks on config" << std::endl + << " -h show usage and exit" << std::endl << std::endl; printCopyright(); }