mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
fix a lot of small bugs
This commit is contained in:
parent
461980dbcf
commit
baddec2228
20 changed files with 108 additions and 94 deletions
|
@ -6,10 +6,10 @@ nodes = {
|
|||
|
||||
layer = "eth",
|
||||
in = {
|
||||
address = "12:34:56:78:90:AB%eth0:12002"
|
||||
address = "12:34:56:78:90:AB%em1:12002"
|
||||
},
|
||||
out = {
|
||||
address = "12:34:56:78:90:AB%eth0:12002"
|
||||
address = "12:34:56:78:90:AB%em1:12002"
|
||||
}
|
||||
},
|
||||
unix_domain_node = {
|
||||
|
|
|
@ -10,8 +10,8 @@ nodes = {
|
|||
|
||||
signal = "mixed",
|
||||
values = 3,
|
||||
frequency = 3,
|
||||
rate = 100000,
|
||||
frequency = 3.0,
|
||||
rate = 100000.0,
|
||||
limit = 100000,
|
||||
},
|
||||
|
||||
|
|
|
@ -2,8 +2,7 @@ nodes = {
|
|||
loopback_node = {
|
||||
type = "loopback", # A loopback node will receive exactly the same data which has been sent to it.
|
||||
# The internal implementation is based on queue.
|
||||
queuelen = 1024, # The queue length of the internal queue which buffers the samples.
|
||||
samplelen = 64, # Each buffered sample can contain up to 64 values.
|
||||
queuelen = 1024, # The queue length of the internal queue which buffers the samples.
|
||||
mode = "polling" # Use busy polling for synchronization of the read and write side of the queue
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,9 +2,11 @@ nodes = {
|
|||
signal_node = {
|
||||
type = "signal",
|
||||
|
||||
signal = [ "sine", "pulse" ], # One of "sine", "square", "ramp", "triangle", "random", "mixed", "counter"
|
||||
values = 2, # Number of values per sample
|
||||
amplitude = [ 1.2, 0, 4 ], # Amplitude of generated signals
|
||||
# One of "sine", "square", "ramp", "triangle", "random", "mixed", "counter"
|
||||
signal = [ "sine", "pulse", "square" ],
|
||||
|
||||
values = 3, # Number of values per sample
|
||||
amplitude = [ 1.2, 0.0, 4.0 ], # Amplitude of generated signals
|
||||
frequency = 10, # Frequency of generated signals
|
||||
stddev = 2, # Standard deviation of random signals (normal distributed)
|
||||
rate = 10.0, # Sample rate
|
||||
|
|
|
@ -57,7 +57,7 @@ paths = (
|
|||
|
||||
"rtds_gtnet2.ts.origin",
|
||||
"rtds_gtnet2.hdr.sequence",
|
||||
"rtds_gtnet2.data[0-6]",
|
||||
"rtds_gtnet2.data[0-6]"
|
||||
],
|
||||
|
||||
out = [
|
||||
|
|
|
@ -84,9 +84,9 @@ public:
|
|||
|
||||
~Config();
|
||||
|
||||
json_t * load(std::FILE *f, bool resolveIncludes=true, bool resolveEnvVars=true);
|
||||
json_t * load(std::FILE *f, bool resolveIncludes = true, bool resolveEnvVars = true);
|
||||
|
||||
json_t * load(const std::string &u, bool resolveIncludes=true, bool resolveEnvVars=true);
|
||||
json_t * load(const std::string &u, bool resolveIncludes = true, bool resolveEnvVars = true);
|
||||
};
|
||||
|
||||
} /* namespace node */
|
||||
|
|
|
@ -51,7 +51,7 @@
|
|||
struct rtnl_cls;
|
||||
#endif /* WITH_NETEM */
|
||||
|
||||
#define RE_NODE_NAME "[a-z0-9_-]{3,32}"
|
||||
#define RE_NODE_NAME "[a-z0-9_-]{2,32}"
|
||||
|
||||
/** The data structure for a node.
|
||||
*
|
||||
|
|
|
@ -41,7 +41,6 @@ struct sample;
|
|||
*/
|
||||
struct loopback_internal {
|
||||
int queuelen;
|
||||
enum QueueSignalledMode mode;
|
||||
struct queue_signalled queue;
|
||||
|
||||
struct vnode *source;
|
||||
|
|
|
@ -51,7 +51,7 @@ Config::Config() :
|
|||
Config::Config(const std::string &u) :
|
||||
Config()
|
||||
{
|
||||
load(u);
|
||||
root = load(u);
|
||||
}
|
||||
|
||||
Config::~Config()
|
||||
|
@ -62,7 +62,7 @@ Config::~Config()
|
|||
|
||||
json_t * Config::load(std::FILE *f, bool resolveInc, bool resolveEnvVars)
|
||||
{
|
||||
root = decode(f);
|
||||
json_t *root = decode(f);
|
||||
|
||||
if (resolveInc)
|
||||
root = resolveIncludes(root);
|
||||
|
@ -87,7 +87,7 @@ json_t * Config::load(const std::string &u, bool resolveInc, bool resolveEnvVars
|
|||
f = af->file;
|
||||
}
|
||||
|
||||
root = load(f, resolveInc, resolveEnvVars);
|
||||
json_t *root = load(f, resolveInc, resolveEnvVars);
|
||||
|
||||
if (af)
|
||||
afclose(af);
|
||||
|
@ -182,23 +182,23 @@ json_t * Config::libconfigDecode(FILE *f)
|
|||
}
|
||||
#endif /* WITH_CONFIG */
|
||||
|
||||
json_t * Config::walkStrings(json_t *in, str_walk_fcn_t cb)
|
||||
json_t * Config::walkStrings(json_t *root, str_walk_fcn_t cb)
|
||||
{
|
||||
const char *key;
|
||||
size_t index;
|
||||
json_t *val, *new_val, *new_root;
|
||||
|
||||
switch (json_typeof(in)) {
|
||||
switch (json_typeof(root)) {
|
||||
case JSON_STRING:
|
||||
return cb(in);
|
||||
return cb(root);
|
||||
|
||||
case JSON_OBJECT:
|
||||
new_root = json_object();
|
||||
|
||||
json_object_foreach(in, key, val) {
|
||||
json_object_foreach(root, key, val) {
|
||||
new_val = walkStrings(val, cb);
|
||||
|
||||
json_object_set(new_root, key, new_val);
|
||||
json_object_set_new(new_root, key, new_val);
|
||||
}
|
||||
|
||||
return new_root;
|
||||
|
@ -206,16 +206,16 @@ json_t * Config::walkStrings(json_t *in, str_walk_fcn_t cb)
|
|||
case JSON_ARRAY:
|
||||
new_root = json_array();
|
||||
|
||||
json_array_foreach(in, index, val) {
|
||||
json_array_foreach(root, index, val) {
|
||||
new_val = walkStrings(val, cb);
|
||||
|
||||
json_array_append(new_root, new_val);
|
||||
json_array_append_new(new_root, new_val);
|
||||
}
|
||||
|
||||
return new_root;
|
||||
|
||||
default:
|
||||
return in;
|
||||
return root;
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
@ -24,7 +24,7 @@ set(HOOK_SRC
|
|||
average.cpp
|
||||
cast.cpp
|
||||
decimate.cpp
|
||||
#dp.cpp
|
||||
dp.cpp
|
||||
drop.cpp
|
||||
dump.cpp
|
||||
ebm.cpp
|
||||
|
|
|
@ -112,8 +112,9 @@ int fpga_init(struct vnode *n)
|
|||
new (&f->dma) std::shared_ptr<fpga::ip::Node>();
|
||||
new (&f->intf) std::shared_ptr<fpga::ip::Node>();
|
||||
|
||||
new (&f->in.mem) std::shared_ptr<MemoryBlock>();
|
||||
new (&f->out.mem) std::shared_ptr<MemoryBlock>();
|
||||
// TODO: fixme
|
||||
// new (&f->in.mem) std::shared_ptr<MemoryBlock>();
|
||||
// new (&f->out.mem) std::shared_ptr<MemoryBlock>();
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -52,7 +52,7 @@ int loopback_parse(struct vnode *n, json_t *cfg)
|
|||
json_error_t err;
|
||||
int ret;
|
||||
|
||||
ret = json_unpack_ex(cfg, &err, 0, "{ s?: s }",
|
||||
ret = json_unpack_ex(cfg, &err, 0, "{ s?: i, s?: s }",
|
||||
"queuelen", &l->queuelen,
|
||||
"mode", &mode_str
|
||||
);
|
||||
|
|
|
@ -38,7 +38,6 @@ int loopback_internal_init(struct vnode *n)
|
|||
{
|
||||
struct loopback_internal *l = (struct loopback_internal *) n->_vd;
|
||||
|
||||
l->mode = QueueSignalledMode::EVENTFD;
|
||||
l->queuelen = DEFAULT_QUEUE_LENGTH;
|
||||
|
||||
return 0;
|
||||
|
@ -46,8 +45,13 @@ int loopback_internal_init(struct vnode *n)
|
|||
|
||||
int loopback_internal_prepare(struct vnode *n)
|
||||
{
|
||||
int ret;
|
||||
struct loopback_internal *l = (struct loopback_internal *) n->_vd;
|
||||
|
||||
ret = signal_list_copy(&n->in.signals, &l->source->in.signals);
|
||||
if (ret)
|
||||
return -1;
|
||||
|
||||
return queue_signalled_init(&l->queue, l->queuelen, memory_default, QueueSignalledMode::EVENTFD);
|
||||
}
|
||||
|
||||
|
@ -84,16 +88,6 @@ int loopback_internal_write(struct vnode *n, struct sample *smps[], unsigned cnt
|
|||
return queue_signalled_push_many(&l->queue, (void **) smps, cnt);
|
||||
}
|
||||
|
||||
char * loopback_internal_print(struct vnode *n)
|
||||
{
|
||||
struct loopback_internal *l = (struct loopback_internal *) n->_vd;
|
||||
char *buf = nullptr;
|
||||
|
||||
strcatf(&buf, "queuelen=%d", l->queuelen);
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
||||
int loopback_internal_poll_fds(struct vnode *n, int fds[])
|
||||
{
|
||||
struct loopback_internal *l = (struct loopback_internal *) n->_vd;
|
||||
|
@ -108,25 +102,21 @@ struct vnode * loopback_internal_create(struct vnode *orig)
|
|||
int ret;
|
||||
struct vnode *n;
|
||||
struct loopback_internal *l;
|
||||
|
||||
|
||||
n = new struct vnode;
|
||||
if (!n)
|
||||
throw MemoryAllocationError();
|
||||
|
||||
l = (struct loopback_internal *) n->_vd;
|
||||
|
||||
ret = node_init(n, &p.node);
|
||||
if (ret)
|
||||
return nullptr;
|
||||
|
||||
l = (struct loopback_internal *) n->_vd;
|
||||
|
||||
l->source = orig;
|
||||
|
||||
asprintf(&n->name, "%s_lo%zu", node_name_short(orig), vlist_length(&orig->sources));
|
||||
|
||||
ret = signal_list_copy(&n->in.signals, &orig->in.signals);
|
||||
if (ret)
|
||||
return nullptr;
|
||||
|
||||
return n;
|
||||
}
|
||||
|
||||
|
@ -139,7 +129,6 @@ static void register_plugin() {
|
|||
p.node.vectorize = 0;
|
||||
p.node.flags = (int) NodeFlags::PROVIDES_SIGNALS | (int) NodeFlags::INTERNAL;
|
||||
p.node.size = sizeof(struct loopback_internal);
|
||||
p.node.print = loopback_internal_print;
|
||||
p.node.prepare = loopback_internal_prepare;
|
||||
p.node.init = loopback_internal_init;
|
||||
p.node.destroy = loopback_internal_destroy;
|
||||
|
|
|
@ -869,7 +869,7 @@ static void register_plugin() {
|
|||
p.node.reverse = ngsi_reverse;
|
||||
|
||||
int ret = vlist_init(&p.node.instances);
|
||||
if (ret)
|
||||
if (!ret)
|
||||
vlist_init_and_push(&plugins, &p);
|
||||
}
|
||||
|
||||
|
|
|
@ -259,25 +259,23 @@ int signal_generator_parse(struct vnode *n, json_t *cfg)
|
|||
size_t i;
|
||||
json_t *json_value;
|
||||
json_array_foreach(a.json, i, json_value) {
|
||||
if (!json_is_real(json_value))
|
||||
throw ConfigError(json_value, "node-config-node-signal", "Values must gives as array of float values!");
|
||||
if (!json_is_number(json_value))
|
||||
throw ConfigError(json_value, "node-config-node-signal", "Values must gives as array of integer or float values!");
|
||||
|
||||
(*a.array)[i] = json_real_value(json_value);
|
||||
(*a.array)[i] = json_number_value(json_value);
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case JSON_INTEGER:
|
||||
case JSON_REAL:
|
||||
if (!json_is_real(a.json))
|
||||
throw ConfigError(a.json, "node-config-node-signal", "Values must gives as array of float values!");
|
||||
|
||||
for (size_t i = 0; i < s->values; i++)
|
||||
(*a.array)[i] = json_real_value(a.json);
|
||||
(*a.array)[i] = json_number_value(a.json);
|
||||
|
||||
break;
|
||||
|
||||
default:
|
||||
throw ConfigError(a.json, "node-config-node-signal", "Values must given as array or scalar float value!");
|
||||
throw ConfigError(a.json, "node-config-node-signal", "Values must given as array or scalar integer or float value!");
|
||||
}
|
||||
}
|
||||
else {
|
||||
|
@ -398,7 +396,7 @@ int signal_generator_read(struct vnode *n, struct sample *smps[], unsigned cnt,
|
|||
}
|
||||
|
||||
if (s->limit > 0 && s->counter >= (unsigned) s->limit) {
|
||||
info("Reached limit.");
|
||||
info("Node %s reached limit.", node_name(n));
|
||||
|
||||
n->state = State::STOPPING;
|
||||
|
||||
|
@ -437,9 +435,13 @@ int signal_generator_poll_fds(struct vnode *n, int fds[])
|
|||
{
|
||||
struct signal_generator *s = (struct signal_generator *) n->_vd;
|
||||
|
||||
fds[0] = s->task.getFD();
|
||||
if (s->rt) {
|
||||
fds[0] = s->task.getFD();
|
||||
|
||||
return 1;
|
||||
return 1;
|
||||
}
|
||||
else
|
||||
return 0;
|
||||
}
|
||||
|
||||
static struct plugin p;
|
||||
|
|
|
@ -91,6 +91,32 @@ int stats_node_type_start(villas::node::SuperNode *sn)
|
|||
return 0;
|
||||
}
|
||||
|
||||
int stats_node_prepare(struct vnode *n)
|
||||
{
|
||||
struct stats_node *s = (struct stats_node *) n->_vd;
|
||||
|
||||
assert(vlist_length(&n->in.signals) == 0);
|
||||
|
||||
/* Generate signal list */
|
||||
for (size_t i = 0; i < vlist_length(&s->signals); i++) {
|
||||
struct stats_node_signal *stats_sig = (struct stats_node_signal *) vlist_at(&s->signals, i);
|
||||
struct signal *sig;
|
||||
|
||||
const char *metric = Stats::metrics[stats_sig->metric].name;
|
||||
const char *type = Stats::types[stats_sig->type].name;
|
||||
|
||||
auto name = fmt::format("{}.{}.{}", stats_sig->node_str, metric, type);
|
||||
|
||||
sig = signal_create(name.c_str(),
|
||||
Stats::metrics[stats_sig->metric].unit,
|
||||
Stats::types[stats_sig->type].signal_type);
|
||||
|
||||
vlist_push(&n->in.signals, sig);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int stats_node_start(struct vnode *n)
|
||||
{
|
||||
struct stats_node *s = (struct stats_node *) n->_vd;
|
||||
|
@ -176,10 +202,7 @@ int stats_node_parse(struct vnode *n, json_t *cfg)
|
|||
error("Setting 'in.signals' of node %s must be an array", node_name(n));
|
||||
|
||||
json_array_foreach(json_signals, i, json_signal) {
|
||||
struct signal *sig = (struct signal *) vlist_at(&n->in.signals, i);
|
||||
struct stats_node_signal *stats_sig;
|
||||
|
||||
stats_sig = new struct stats_node_signal;
|
||||
auto *stats_sig = new struct stats_node_signal;
|
||||
if (!stats_sig)
|
||||
throw MemoryAllocationError();
|
||||
|
||||
|
@ -187,19 +210,6 @@ int stats_node_parse(struct vnode *n, json_t *cfg)
|
|||
if (ret)
|
||||
error("Failed to parse statistics signal definition of node %s", node_name(n));
|
||||
|
||||
if (!sig->name) {
|
||||
const char *metric = Stats::metrics[stats_sig->metric].name;
|
||||
const char *type = Stats::types[stats_sig->type].name;
|
||||
|
||||
sig->name = strf("%s.%s.%s", stats_sig->node_str, metric, type);
|
||||
}
|
||||
|
||||
if (!sig->unit)
|
||||
sig->unit = strdup(Stats::metrics[stats_sig->metric].unit);
|
||||
|
||||
if (sig->type != Stats::types[stats_sig->type].signal_type)
|
||||
error("Invalid type for signal %zu in node %s", i, node_name(n));
|
||||
|
||||
vlist_push(&s->signals, stats_sig);
|
||||
}
|
||||
|
||||
|
@ -252,13 +262,14 @@ static void register_plugin() {
|
|||
p.type = PluginType::NODE;
|
||||
p.node.instances.state = State::DESTROYED;
|
||||
p.node.vectorize = 1;
|
||||
p.node.flags = 0;
|
||||
p.node.flags = (int) NodeFlags::PROVIDES_SIGNALS;
|
||||
p.node.size = sizeof(struct stats_node);
|
||||
p.node.type.start = stats_node_type_start;
|
||||
p.node.parse = stats_node_parse;
|
||||
p.node.init = stats_node_init;
|
||||
p.node.destroy = stats_node_destroy;
|
||||
p.node.print = stats_node_print;
|
||||
p.node.prepare = stats_node_prepare;
|
||||
p.node.start = stats_node_start;
|
||||
p.node.stop = stats_node_stop;
|
||||
p.node.read = stats_node_read;
|
||||
|
|
|
@ -228,6 +228,9 @@ int test_rtt_parse(struct vnode *n, json_t *cfg)
|
|||
if (!c)
|
||||
throw MemoryAllocationError();
|
||||
|
||||
c->filename = nullptr;
|
||||
c->filename_formatted = nullptr;
|
||||
|
||||
c->rate = rates[i];
|
||||
c->values = values[j];
|
||||
|
||||
|
@ -461,7 +464,7 @@ static void register_plugin() {
|
|||
p.node.write = test_rtt_write;
|
||||
|
||||
int ret = vlist_init(&p.node.instances);
|
||||
if (ret)
|
||||
if (!ret)
|
||||
vlist_init_and_push(&plugins, &p);
|
||||
}
|
||||
|
||||
|
|
|
@ -662,7 +662,7 @@ static void register_plugin() {
|
|||
p.node.read = uldaq_read;
|
||||
|
||||
int ret = vlist_init(&p.node.instances);
|
||||
if (ret)
|
||||
if (!ret)
|
||||
vlist_init_and_push(&plugins, &p);
|
||||
}
|
||||
|
||||
|
|
|
@ -48,7 +48,7 @@ using namespace villas::utils;
|
|||
|
||||
SuperNode::SuperNode() :
|
||||
state(State::INITIALIZED),
|
||||
idleStop(false),
|
||||
idleStop(true),
|
||||
#ifdef WITH_API
|
||||
api(this),
|
||||
#endif
|
||||
|
@ -92,12 +92,12 @@ SuperNode::SuperNode() :
|
|||
|
||||
void SuperNode::parse(const std::string &u)
|
||||
{
|
||||
config.load(u);
|
||||
config.root = config.load(u);
|
||||
|
||||
parse(config.root);
|
||||
}
|
||||
|
||||
void SuperNode::parse(json_t *cfg)
|
||||
void SuperNode::parse(json_t *root)
|
||||
{
|
||||
int ret;
|
||||
const char *nme = nullptr;
|
||||
|
@ -107,14 +107,12 @@ void SuperNode::parse(json_t *cfg)
|
|||
json_t *json_nodes = nullptr;
|
||||
json_t *json_paths = nullptr;
|
||||
json_t *json_logging = nullptr;
|
||||
json_t *json_web = nullptr;
|
||||
json_t *json_http = nullptr;
|
||||
|
||||
json_error_t err;
|
||||
|
||||
idleStop = true;
|
||||
|
||||
ret = json_unpack_ex(cfg, &err, JSON_STRICT, "{ s?: o, s?: o, s?: o, s?: o, s?: i, s?: i, s?: i, s?: s, s?: b }",
|
||||
"http", &json_web,
|
||||
ret = json_unpack_ex(root, &err, 0, "{ s?: o, s?: o, s?: o, s?: o, s?: i, s?: i, s?: i, s?: s, s?: b }",
|
||||
"http", &json_http,
|
||||
"logging", &json_logging,
|
||||
"nodes", &json_nodes,
|
||||
"paths", &json_paths,
|
||||
|
@ -125,14 +123,14 @@ void SuperNode::parse(json_t *cfg)
|
|||
"idle_stop", &idleStop
|
||||
);
|
||||
if (ret)
|
||||
throw ConfigError(cfg, err, "node-config", "Unpacking top-level config failed");
|
||||
throw ConfigError(root, err, "node-config", "Unpacking top-level config failed");
|
||||
|
||||
if (nme)
|
||||
name = nme;
|
||||
|
||||
#ifdef WITH_WEB
|
||||
if (json_web)
|
||||
web.parse(json_web);
|
||||
if (json_http)
|
||||
web.parse(json_http);
|
||||
#endif /* WITH_WEB */
|
||||
|
||||
if (json_logging)
|
||||
|
@ -155,7 +153,7 @@ void SuperNode::parse(json_t *cfg)
|
|||
|
||||
ret = json_unpack_ex(json_node, &err, 0, "{ s: s }", "type", &type);
|
||||
if (ret)
|
||||
throw ConfigError(cfg, err, "node-config-node-type", "Failed to parse type of node '{}'", name);
|
||||
throw ConfigError(root, err, "node-config-node-type", "Failed to parse type of node '{}'", name);
|
||||
|
||||
nt = node_type_lookup(type);
|
||||
if (!nt)
|
||||
|
@ -359,6 +357,15 @@ void SuperNode::prepare()
|
|||
prepareNodes();
|
||||
preparePaths();
|
||||
|
||||
for (size_t i = 0; i < vlist_length(&nodes); i++) {
|
||||
auto *n = (struct vnode *) vlist_at(&nodes, i);
|
||||
if (vlist_length(&n->sources) == 0 &&
|
||||
vlist_length(&n->destinations) == 0) {
|
||||
logger->info("Node {} is not used by any path. Disabling...");
|
||||
n->enabled = false;
|
||||
}
|
||||
}
|
||||
|
||||
state = State::PREPARED;
|
||||
}
|
||||
|
||||
|
|
|
@ -62,8 +62,9 @@ protected:
|
|||
<< " CONFIG is the path to an optional configuration file" << std::endl
|
||||
<< " OPTIONS is one or more of the following options:" << std::endl
|
||||
<< " -d LVL set debug level" << std::endl
|
||||
<< " -V show version and exit" << std::endl
|
||||
<< " -h show usage and exit" << std::endl << std::endl;
|
||||
<< " -V show version and exit" << std::endl
|
||||
<< " -c perform plausability checks on config" << std::endl
|
||||
<< " -h show usage and exit" << std::endl << std::endl;
|
||||
|
||||
printCopyright();
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue