1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

path: use internal loopback nodes to allow nodes to be used as sources by multiple paths

This commit is contained in:
Steffen Vogel 2020-08-28 09:42:46 +02:00
parent 14d0da9589
commit 1e73500c75
5 changed files with 125 additions and 93 deletions

View file

@ -91,6 +91,9 @@ struct vnode {
#endif /* WITH_NETEM */
#endif /* __linux__ */
struct vpath sources; /**< A list of path sources which reference this node (struct vpath_sources). */
struct vpath destinations; /**< A list of path destinations which reference this node (struct vpath_destinations). */
struct vnode_type *_vt; /**< Virtual functions (C++ OOP style) */
void *_vd; /**< Virtual data (used by struct vnode::_vt functions) */

View file

@ -157,6 +157,8 @@ int path_uses_node(struct vpath *p, struct vnode *n);
*/
int path_parse(struct vpath *p, json_t *cfg, struct vlist *nodes);
void path_parse_mask(struct vpath *p, json_t *json_mask);
bool path_is_simple(const struct vpath *p);
bool path_is_enabled(const struct vpath *p);

View file

@ -43,6 +43,7 @@ struct vpath_source {
struct pool pool;
struct vlist mappings; /**< List of mappings (struct mapping_entry). */
struct vlist secondaries;
};
int path_source_init(struct vpath_source *ps);

View file

@ -382,47 +382,68 @@ int path_parse(struct vpath *p, json_t *cfg, struct vlist *nodes)
free(json_str);
}
/* Input node(s) */
ret = mapping_list_parse(&p->mappings, json_in, nodes);
if (ret)
throw ConfigError(json_in, "node-config-path-in", "Failed to parse input mapping of path {}", path_name(p));
std::map<struct vnode *, struct vpath_source> pss;
for (size_t i = 0; i < vlist_length(&p->mappings); i++) {
struct mapping_entry *me = (struct mapping_entry *) vlist_at(&p->mappings, i);
struct vnode *n = me->node;
struct vpath_source *ps;
if (pss.find(n) != pss.end())
/* We already have a path source for this mapping entry */
ps = pss[n]
else {
/* Create new path source */
ps = pss[n] = new struct vpath_source;
if (!ps)
throw MemoryAllocationError();
ret = path_source_init(ps);
if (ret)
return ret;
/* No mask provided -> enable all */
if (!json_mask)
ps->masked = true;
/* The node is used as a source by more than one path.
* We are creating an internal loopbacks nodes
* to forward/copy samples to other paths.
*/
if (vlist_length(&n->sources) > 0) {
struct node *lo = new struct vnode;
if (!lo)
throw MemoryAllocationError();
auto *vt = node_type_lookup("loopback");
ret = node_init(lo, vt);
if (ret)
return ret;
ps->node = lo;
auto *master_ps = (struct vpath_source *) vlist_at_safe(&n->sources, 0);
vlist_push(&master_ps->secondaries, ps);
}
vlist_push(&n->sources, ps);
vlist_push(&p->sources, ps);
}
vlist_push(&ps->mappings, me);
}
/* Output node(s) */
if (json_out) {
ret = node_list_parse(&destinations, json_out, nodes);
if (ret)
jerror(&err, "Failed to parse output nodes");
}
for (size_t i = 0; i < vlist_length(&p->mappings); i++) {
struct mapping_entry *me = (struct mapping_entry *) vlist_at(&p->mappings, i);
struct vpath_source *ps = nullptr;
/* Check if there is already a path_source for this source */
for (size_t j = 0; j < vlist_length(&p->sources); j++) {
struct vpath_source *pt = (struct vpath_source *) vlist_at(&p->sources, j);
if (pt->node == me->node) {
ps = pt;
break;
}
}
/* Create new path_source of not existing */
if (!ps) {
ps = new struct vpath_source;
if (!ps)
throw MemoryAllocationError();
ps->node = me->node;
ps->masked = false;
vlist_init(&ps->mappings);
vlist_push(&p->sources, ps);
}
if (!node_is_enabled(ps->node)) {
p->logger->error("Source {} of path {} is not enabled", node_name(ps->node), path_name(p));
return -1;
}
vlist_push(&ps->mappings, me);
throw ConfigError(json_out, "node-config-path-out", "Failed to parse output nodes");
}
for (size_t i = 0; i < vlist_length(&destinations); i++) {
@ -437,68 +458,18 @@ int path_parse(struct vpath *p, json_t *cfg, struct vlist *nodes)
path_destination_init(pd);
if (!node_is_enabled(pd->node)) {
p->logger->error("Destination {} of path {} is not enabled", node_name(pd->node), path_name(p));
return -1;
}
vlist_push(&n->destinations, pd);
vlist_push(&p->destinations, pd);
}
if (json_mask) {
json_t *json_entry;
size_t i;
if (!json_is_array(json_mask))
throw ConfigError(json_mask, "node-config-path-mask", "The 'mask' setting must be a list of node names");
json_array_foreach(json_mask, i, json_entry) {
const char *name;
struct vnode *node;
struct vpath_source *ps = nullptr;
name = json_string_value(json_entry);
if (!name)
throw ConfigError(json_mask, "node-config-path-mask", "The 'mask' setting must be a list of node names");
node = vlist_lookup_name<struct vnode>(nodes, name);
if (!node)
throw ConfigError(json_mask, "node-config-path-mask", "The 'mask' entry '{}' is not a valid node name", name);
/* Search correspondending path_source to node */
for (size_t i = 0; i < vlist_length(&p->sources); i++) {
struct vpath_source *pt = (struct vpath_source *) vlist_at(&p->sources, i);
if (pt->node == node) {
ps = pt;
break;
}
}
if (!ps) {
p->logger->error("Node {} is not a source of the path {}", node_name(node), path_name(p));
return -1;
}
ps->masked = true;
}
}
/* Enable all by default */
else {
for (size_t i = 0; i < vlist_length(&p->sources); i++) {
struct vpath_source *ps = (struct vpath_source *) vlist_at(&p->sources, i);
ps->masked = true;
}
}
#ifdef WITH_HOOKS
if (json_hooks) {
if (json_hooks)
hook_list_parse(&p->hooks, json_hooks, (int) Hook::Flags::PATH, p, nullptr);
}
#endif /* WITH_HOOKS */
if (json_mask)
path_parse_mask(p, json_mask);
/* Autodetect whether to use poll() for this path or not */
if (p->poll == -1) {
if (p->rate > 0)
@ -519,6 +490,44 @@ int path_parse(struct vpath *p, json_t *cfg, struct vlist *nodes)
return 0;
}
void path_parse_mask(struct vpath *p, json_t *json_mask)
{
json_t *json_entry;
size_t i;
if (!json_is_array(json_mask))
throw ConfigError(json_mask, "node-config-path-mask", "The 'mask' setting must be a list of node names");
json_array_foreach(json_mask, i, json_entry) {
const char *name;
struct vnode *node;
struct vpath_source *ps = nullptr;
name = json_string_value(json_entry);
if (!name)
throw ConfigError(json_mask, "node-config-path-mask", "The 'mask' setting must be a list of node names");
node = vlist_lookup_name<struct vnode>(nodes, name);
if (!node)
throw ConfigError(json_mask, "node-config-path-mask", "The 'mask' entry '{}' is not a valid node name", name);
/* Search correspondending path_source to node */
for (size_t i = 0; i < vlist_length(&p->sources); i++) {
struct vpath_source *pt = (struct vpath_source *) vlist_at(&p->sources, i);
if (pt->node == node) {
ps = pt;
break;
}
}
if (!ps)
throw ConfigError(json_mask, "node-config-path-mask", "Node {} is not a source of the path {}", node_name(node), path_name(p));
ps->masked = true;
}
}
int path_check(struct vpath *p)
{
assert(p->state != State::DESTROYED);
@ -873,4 +882,4 @@ json_t * path_to_json(struct vpath *p)
);
return json_path;
}
}

View file

@ -65,6 +65,10 @@ int path_source_destroy(struct vpath_source *ps)
if (ret)
return ret;
ret = vlist_destroy(&ps->secondaries, nullptr, false);
if (ret)
return ret;
return 0;
}
@ -107,6 +111,19 @@ int path_source_read(struct vpath_source *ps, struct vpath *p, int i)
else if (recv < allocated)
p->logger->warn("Partial read for path {}: read={}, expected={}", path_name(p), recv, allocated);
/* Forward samples to secondary path sources */
for (size_t i = 0; i < vlist_length(&ps->secondaries); i++) {
struct vpath_source *sps = (struct vpath_source *) vlist_at(&ps->secondaries, i);
sample_incref_many(read_smps, recv);
int sent, release = recv;
sent = node_write(sps->node, read_smps, recv, &release);
if (sent < recv)
p->logger->warn("Partial write to secondary path source");
}
p->received.set(i);
if (p->mode == PathMode::ANY) { /* Mux all samples */