From 1e73500c757f3548333413d871d7fc560990482c Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Fri, 28 Aug 2020 09:42:46 +0200 Subject: [PATCH] path: use internal loopback nodes to allow nodes to be used as sources by multiple paths --- include/villas/node.h | 3 + include/villas/path.h | 2 + include/villas/path_source.h | 1 + lib/path.cpp | 195 ++++++++++++++++++----------------- lib/path_source.cpp | 17 +++ 5 files changed, 125 insertions(+), 93 deletions(-) diff --git a/include/villas/node.h b/include/villas/node.h index c29453377..f52e16e14 100644 --- a/include/villas/node.h +++ b/include/villas/node.h @@ -91,6 +91,9 @@ struct vnode { #endif /* WITH_NETEM */ #endif /* __linux__ */ + struct vpath sources; /**< A list of path sources which reference this node (struct vpath_sources). */ + struct vpath destinations; /**< A list of path destinations which reference this node (struct vpath_destinations). */ + struct vnode_type *_vt; /**< Virtual functions (C++ OOP style) */ void *_vd; /**< Virtual data (used by struct vnode::_vt functions) */ diff --git a/include/villas/path.h b/include/villas/path.h index c2125f6f7..84c29940c 100644 --- a/include/villas/path.h +++ b/include/villas/path.h @@ -157,6 +157,8 @@ int path_uses_node(struct vpath *p, struct vnode *n); */ int path_parse(struct vpath *p, json_t *cfg, struct vlist *nodes); +void path_parse_mask(struct vpath *p, json_t *json_mask); + bool path_is_simple(const struct vpath *p); bool path_is_enabled(const struct vpath *p); diff --git a/include/villas/path_source.h b/include/villas/path_source.h index a1448d288..d4730fa0e 100644 --- a/include/villas/path_source.h +++ b/include/villas/path_source.h @@ -43,6 +43,7 @@ struct vpath_source { struct pool pool; struct vlist mappings; /**< List of mappings (struct mapping_entry). */ + struct vlist secondaries; }; int path_source_init(struct vpath_source *ps); diff --git a/lib/path.cpp b/lib/path.cpp index 1f803ea73..ac256d4ce 100644 --- a/lib/path.cpp +++ b/lib/path.cpp @@ -382,47 +382,68 @@ int path_parse(struct vpath *p, json_t *cfg, struct vlist *nodes) free(json_str); } + /* Input node(s) */ + ret = mapping_list_parse(&p->mappings, json_in, nodes); + if (ret) + throw ConfigError(json_in, "node-config-path-in", "Failed to parse input mapping of path {}", path_name(p)); + + std::map pss; + for (size_t i = 0; i < vlist_length(&p->mappings); i++) { + struct mapping_entry *me = (struct mapping_entry *) vlist_at(&p->mappings, i); + struct vnode *n = me->node; + struct vpath_source *ps; + + if (pss.find(n) != pss.end()) + /* We already have a path source for this mapping entry */ + ps = pss[n] + else { + /* Create new path source */ + ps = pss[n] = new struct vpath_source; + if (!ps) + throw MemoryAllocationError(); + + ret = path_source_init(ps); + if (ret) + return ret; + + /* No mask provided -> enable all */ + if (!json_mask) + ps->masked = true; + + /* The node is used as a source by more than one path. + * We are creating an internal loopbacks nodes + * to forward/copy samples to other paths. + */ + if (vlist_length(&n->sources) > 0) { + struct node *lo = new struct vnode; + if (!lo) + throw MemoryAllocationError(); + + auto *vt = node_type_lookup("loopback"); + + ret = node_init(lo, vt); + if (ret) + return ret; + + ps->node = lo; + + auto *master_ps = (struct vpath_source *) vlist_at_safe(&n->sources, 0); + + vlist_push(&master_ps->secondaries, ps); + } + + vlist_push(&n->sources, ps); + vlist_push(&p->sources, ps); + } + + vlist_push(&ps->mappings, me); + } + /* Output node(s) */ if (json_out) { ret = node_list_parse(&destinations, json_out, nodes); if (ret) - jerror(&err, "Failed to parse output nodes"); - } - - for (size_t i = 0; i < vlist_length(&p->mappings); i++) { - struct mapping_entry *me = (struct mapping_entry *) vlist_at(&p->mappings, i); - struct vpath_source *ps = nullptr; - - /* Check if there is already a path_source for this source */ - for (size_t j = 0; j < vlist_length(&p->sources); j++) { - struct vpath_source *pt = (struct vpath_source *) vlist_at(&p->sources, j); - - if (pt->node == me->node) { - ps = pt; - break; - } - } - - /* Create new path_source of not existing */ - if (!ps) { - ps = new struct vpath_source; - if (!ps) - throw MemoryAllocationError(); - - ps->node = me->node; - ps->masked = false; - - vlist_init(&ps->mappings); - - vlist_push(&p->sources, ps); - } - - if (!node_is_enabled(ps->node)) { - p->logger->error("Source {} of path {} is not enabled", node_name(ps->node), path_name(p)); - return -1; - } - - vlist_push(&ps->mappings, me); + throw ConfigError(json_out, "node-config-path-out", "Failed to parse output nodes"); } for (size_t i = 0; i < vlist_length(&destinations); i++) { @@ -437,68 +458,18 @@ int path_parse(struct vpath *p, json_t *cfg, struct vlist *nodes) path_destination_init(pd); - - if (!node_is_enabled(pd->node)) { - p->logger->error("Destination {} of path {} is not enabled", node_name(pd->node), path_name(p)); - return -1; - } - + vlist_push(&n->destinations, pd); vlist_push(&p->destinations, pd); } - if (json_mask) { - json_t *json_entry; - size_t i; - - if (!json_is_array(json_mask)) - throw ConfigError(json_mask, "node-config-path-mask", "The 'mask' setting must be a list of node names"); - - json_array_foreach(json_mask, i, json_entry) { - const char *name; - struct vnode *node; - struct vpath_source *ps = nullptr; - - name = json_string_value(json_entry); - if (!name) - throw ConfigError(json_mask, "node-config-path-mask", "The 'mask' setting must be a list of node names"); - - node = vlist_lookup_name(nodes, name); - if (!node) - throw ConfigError(json_mask, "node-config-path-mask", "The 'mask' entry '{}' is not a valid node name", name); - - /* Search correspondending path_source to node */ - for (size_t i = 0; i < vlist_length(&p->sources); i++) { - struct vpath_source *pt = (struct vpath_source *) vlist_at(&p->sources, i); - - if (pt->node == node) { - ps = pt; - break; - } - } - - if (!ps) { - p->logger->error("Node {} is not a source of the path {}", node_name(node), path_name(p)); - return -1; - } - - ps->masked = true; - } - } - /* Enable all by default */ - else { - for (size_t i = 0; i < vlist_length(&p->sources); i++) { - struct vpath_source *ps = (struct vpath_source *) vlist_at(&p->sources, i); - - ps->masked = true; - } - } - #ifdef WITH_HOOKS - if (json_hooks) { + if (json_hooks) hook_list_parse(&p->hooks, json_hooks, (int) Hook::Flags::PATH, p, nullptr); - } #endif /* WITH_HOOKS */ + if (json_mask) + path_parse_mask(p, json_mask); + /* Autodetect whether to use poll() for this path or not */ if (p->poll == -1) { if (p->rate > 0) @@ -519,6 +490,44 @@ int path_parse(struct vpath *p, json_t *cfg, struct vlist *nodes) return 0; } +void path_parse_mask(struct vpath *p, json_t *json_mask) +{ + json_t *json_entry; + size_t i; + + if (!json_is_array(json_mask)) + throw ConfigError(json_mask, "node-config-path-mask", "The 'mask' setting must be a list of node names"); + + json_array_foreach(json_mask, i, json_entry) { + const char *name; + struct vnode *node; + struct vpath_source *ps = nullptr; + + name = json_string_value(json_entry); + if (!name) + throw ConfigError(json_mask, "node-config-path-mask", "The 'mask' setting must be a list of node names"); + + node = vlist_lookup_name(nodes, name); + if (!node) + throw ConfigError(json_mask, "node-config-path-mask", "The 'mask' entry '{}' is not a valid node name", name); + + /* Search correspondending path_source to node */ + for (size_t i = 0; i < vlist_length(&p->sources); i++) { + struct vpath_source *pt = (struct vpath_source *) vlist_at(&p->sources, i); + + if (pt->node == node) { + ps = pt; + break; + } + } + + if (!ps) + throw ConfigError(json_mask, "node-config-path-mask", "Node {} is not a source of the path {}", node_name(node), path_name(p)); + + ps->masked = true; + } +} + int path_check(struct vpath *p) { assert(p->state != State::DESTROYED); @@ -873,4 +882,4 @@ json_t * path_to_json(struct vpath *p) ); return json_path; -} \ No newline at end of file +} diff --git a/lib/path_source.cpp b/lib/path_source.cpp index 4e825b543..72e33d2ce 100644 --- a/lib/path_source.cpp +++ b/lib/path_source.cpp @@ -65,6 +65,10 @@ int path_source_destroy(struct vpath_source *ps) if (ret) return ret; + ret = vlist_destroy(&ps->secondaries, nullptr, false); + if (ret) + return ret; + return 0; } @@ -107,6 +111,19 @@ int path_source_read(struct vpath_source *ps, struct vpath *p, int i) else if (recv < allocated) p->logger->warn("Partial read for path {}: read={}, expected={}", path_name(p), recv, allocated); + /* Forward samples to secondary path sources */ + for (size_t i = 0; i < vlist_length(&ps->secondaries); i++) { + struct vpath_source *sps = (struct vpath_source *) vlist_at(&ps->secondaries, i); + + sample_incref_many(read_smps, recv); + + int sent, release = recv; + + sent = node_write(sps->node, read_smps, recv, &release); + if (sent < recv) + p->logger->warn("Partial write to secondary path source"); + } + p->received.set(i); if (p->mode == PathMode::ANY) { /* Mux all samples */