From 1aebd550de4f581ad3867129768b3bca29efc456 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Thu, 10 Sep 2020 17:36:08 +0200 Subject: [PATCH] path: add support for using a node as a source in multiple paths --- include/villas/path.h | 8 +- include/villas/path_destination.h | 6 +- include/villas/path_source.h | 15 +- lib/node_direction.cpp | 5 +- lib/path.cpp | 319 ++++++++++++++---------------- lib/path_destination.cpp | 19 +- lib/path_source.cpp | 81 ++++++-- 7 files changed, 253 insertions(+), 200 deletions(-) diff --git a/include/villas/path.h b/include/villas/path.h index 6eef2557b..d250efaf3 100644 --- a/include/villas/path.h +++ b/include/villas/path.h @@ -93,6 +93,8 @@ struct vpath { villas::Logger logger; + std::list mask_list; + std::bitset mask; /**< A mask of path_sources which are enabled for poll(). */ std::bitset received; /**< A mask of path_sources for which we already received samples. */ }; @@ -100,10 +102,10 @@ struct vpath { /** Initialize internal data structures. */ int path_init(struct vpath *p) __attribute__ ((warn_unused_result)); -int path_prepare(struct vpath *p); +int path_prepare(struct vpath *p, struct vlist *nodes); /** Check if path configuration is proper. */ -int path_check(struct vpath *p); +void path_check(struct vpath *p); /** Start a path. * @@ -154,7 +156,7 @@ int path_reverse(struct vpath *p, struct vpath *r); */ int path_parse(struct vpath *p, json_t *cfg, struct vlist *nodes); -void path_parse_mask(struct vpath *p, json_t *json_mask); +void path_parse_mask(struct vpath *p, json_t *json_mask, struct vlist *nodes); bool path_is_simple(const struct vpath *p); diff --git a/include/villas/path_destination.h b/include/villas/path_destination.h index 55c77c611..3e2415f26 100644 --- a/include/villas/path_destination.h +++ b/include/villas/path_destination.h @@ -41,11 +41,13 @@ struct vpath_destination { struct queue queue; }; -int path_destination_init(struct vpath_destination *pd); +int path_destination_init(struct vpath_destination *pd, struct vnode *n) __attribute__ ((warn_unused_result)); + +int path_destination_destroy(struct vpath_destination *pd) __attribute__ ((warn_unused_result)); int path_destination_prepare(struct vpath_destination *pd, int queuelen); -int path_destination_destroy(struct vpath_destination *pd); +void path_destination_check(struct vpath_destination *pd); void path_destination_enqueue(struct vpath *p, struct sample *smps[], unsigned cnt); diff --git a/include/villas/path_source.h b/include/villas/path_source.h index d4730fa0e..b2b77bc3f 100644 --- a/include/villas/path_source.h +++ b/include/villas/path_source.h @@ -36,21 +36,30 @@ struct vpath; struct sample; +enum PathSourceType { + MASTER, + SECONDARY +}; + struct vpath_source { struct vnode *node; bool masked; + + enum PathSourceType type; struct pool pool; struct vlist mappings; /**< List of mappings (struct mapping_entry). */ struct vlist secondaries; }; -int path_source_init(struct vpath_source *ps); +int path_source_init_master(struct vpath_source *ps, struct vnode *n) __attribute__ ((warn_unused_result)); -int path_source_prepare(struct vpath_source *ps); +int path_source_init_secondary(struct vpath_source *ps, struct vnode *n) __attribute__ ((warn_unused_result)); -int path_source_destroy(struct vpath_source *ps); +int path_source_destroy(struct vpath_source *ps) __attribute__ ((warn_unused_result)); + +void path_source_check(struct vpath_source *ps); int path_source_read(struct vpath_source *ps, struct vpath *p, int i); diff --git a/lib/node_direction.cpp b/lib/node_direction.cpp index caef0d418..7485c8b2b 100644 --- a/lib/node_direction.cpp +++ b/lib/node_direction.cpp @@ -117,8 +117,7 @@ int node_direction_parse(struct vnode_direction *nd, struct vnode *n, json_t *cf jerror(&err, "Failed to parse node %s", node_name(n)); if (n->_vt->flags & (int) NodeFlags::PROVIDES_SIGNALS) { - if (json_signals) - error("Node %s does not support signal definitions", node_name(n)); + /* Do nothing.. Node-type will provide signals */ } else if (json_is_array(json_signals)) { ret = signal_list_parse(&nd->signals, json_signals); @@ -169,7 +168,7 @@ int node_direction_parse(struct vnode_direction *nd, struct vnode *n, json_t *cf int node_direction_check(struct vnode_direction *nd, struct vnode *n) { - assert(nd->state == State::PARSED); + assert(n->state != State::DESTROYED); if (nd->vectorize <= 0) error("Invalid setting 'vectorize' with value %d for node %s. Must be natural number!", nd->vectorize, node_name(n)); diff --git a/lib/path.cpp b/lib/path.cpp index b04781736..6ee2c7bc8 100644 --- a/lib/path.cpp +++ b/lib/path.cpp @@ -25,6 +25,10 @@ #include #include +#include +#include +#include + #include #include #include @@ -194,8 +198,8 @@ static int path_prepare_poll(struct vpath *p) struct vpath_source *ps = (struct vpath_source *) vlist_at(&p->sources, i); m = node_poll_fds(ps->node, fds); - if (m < 0) - continue; + if (m <= 0) + throw RuntimeError("Failed to get file descriptor for node {}", node_name(ps->node)); p->reader.nfds += m; p->reader.pfds = (struct pollfd *) realloc(p->reader.pfds, p->reader.nfds * sizeof(struct pollfd)); @@ -228,18 +232,103 @@ static int path_prepare_poll(struct vpath *p) return 0; } -int path_prepare(struct vpath *p) +int path_prepare(struct vpath *p, struct vlist *nodes) { int ret; + unsigned pool_size; + + struct memory_type *pool_mt = memory_default; assert(p->state == State::CHECKED); - /* Prepare destinations */ - struct memory_type *pool_mt = memory_default; - unsigned pool_size = MAX(1UL, vlist_length(&p->destinations)) * p->queuelen; + p->mask.reset(); + /* Prepare mappings */ + ret = mapping_list_prepare(&p->mappings, nodes); + if (ret) + return ret; + + /* Create path sources */ + std::map pss; + for (size_t i = 0; i < vlist_length(&p->mappings); i++) { + struct mapping_entry *me = (struct mapping_entry *) vlist_at(&p->mappings, i); + struct vnode *n = me->node; + struct vpath_source *ps; + + if (pss.find(n) != pss.end()) + /* We already have a path source for this mapping entry */ + ps = pss[n]; + else { + /* Create new path source */ + ps = pss[n] = new struct vpath_source; + if (!ps) + throw MemoryAllocationError(); + + /* Depending on weather the node belonging to this mapping is already + * used by another path or not, we will create a master or secondary + * path source. + * A secondary path source uses an internal loopback node / queue + * to forward samples from on path to another. + */ + bool isSecondary = vlist_length(&n->sources) > 0; + ret = isSecondary + ? path_source_init_secondary(ps, n) + : path_source_init_master(ps, n); + if (ret) + return ret; + + if (ps->type == PathSourceType::SECONDARY) { + vlist_push(nodes, ps->node); + vlist_push(&ps->node->sources, ps); + } + + if (p->mask_list.empty() || std::find(p->mask_list.begin(), p->mask_list.end(), n) != p->mask_list.end()) { + ps->masked = true; + p->mask.set(i); + } + + vlist_push(&n->sources, ps); + vlist_push(&p->sources, ps); + } + + struct vlist *sigs = node_get_signals(me->node, NodeDir::IN); + + /* Update signals of path */ + for (unsigned j = 0; j < (unsigned) me->length; j++) { + struct signal *sig; + + /* For data mappings we simple refer to the existing + * signal descriptors of the source node. */ + if (me->type == MappingType::DATA) { + sig = (struct signal *) vlist_at_safe(sigs, me->data.offset + j); + if (!sig) { + p->logger->warn("Failed to create signal description for path {}", path_name(p)); + continue; + } + + signal_incref(sig); + } + /* For other mappings we create new signal descriptors */ + else { + sig = new struct signal; + if (!sig) + throw MemoryAllocationError(); + + ret = signal_init_from_mapping(sig, me, j); + if (ret) + return -1; + } + + vlist_extend(&p->signals, me->offset + j + 1, nullptr); + vlist_set(&p->signals, me->offset + j, sig); + } + + vlist_push(&ps->mappings, me); + } + + /* Prepare path destinations */ for (size_t i = 0; i < vlist_length(&p->destinations); i++) { - struct vpath_destination *pd = (struct vpath_destination *) vlist_at(&p->destinations, i); + auto *pd = (struct vpath_destination *) vlist_at(&p->destinations, i); if (node_type(pd->node)->pool_size > pool_size) pool_size = node_type(pd->node)->pool_size; @@ -252,54 +341,31 @@ int path_prepare(struct vpath *p) return ret; } - /* Prepare sources */ - ret = mapping_list_prepare(&p->mappings); + /* Prepare pool */ + pool_size = MAX(1UL, vlist_length(&p->destinations)) * p->queuelen; + ret = pool_init(&p->pool, pool_size, SAMPLE_LENGTH(vlist_length(&p->signals)), pool_mt); if (ret) return ret; - for (size_t i = 0; i < vlist_length(&p->sources); i++) { - struct vpath_source *ps = (struct vpath_source *) vlist_at(&p->sources, i); + /* Autodetect whether to use original sequence numbers or not */ + if (p->original_sequence_no == -1) + p->original_sequence_no = vlist_length(&p->sources) == 1; - ret = path_source_prepare(ps); + /* Autodetect whether to use poll() for this path or not */ + if (p->poll == -1) { + if (p->rate > 0) + p->poll = 1; + else if (vlist_length(&p->sources) > 1) + p->poll = 1; + else + p->poll = 0; + } + + /* Prepare poll() */ + if (p->poll) { + ret = path_prepare_poll(p); if (ret) return ret; - - if (ps->masked) - p->mask.set(i); - - for (size_t i = 0; i < vlist_length(&ps->mappings); i++) { - struct mapping_entry *me = (struct mapping_entry *) vlist_at(&ps->mappings, i); - struct vlist *sigs = node_get_signals(me->node, NodeDir::IN); - - for (unsigned j = 0; j < (unsigned) me->length; j++) { - struct signal *sig; - - /* For data mappings we simple refer to the existing - * signal descriptors of the source node. */ - if (me->type == MappingType::DATA) { - sig = (struct signal *) vlist_at_safe(sigs, me->data.offset + j); - if (!sig) { - p->logger->warn("Failed to create signal description for path {}", path_name(p)); - continue; - } - - signal_incref(sig); - } - /* For other mappings we create new signal descriptors */ - else { - sig = new struct signal; - if (!sig) - throw MemoryAllocationError(); - - ret = signal_init_from_mapping(sig, me, j); - if (ret) - return -1; - } - - vlist_extend(&p->signals, me->offset + j + 1, nullptr); - vlist_set(&p->signals, me->offset + j, sig); - } - } } #ifdef WITH_HOOKS @@ -309,14 +375,6 @@ int path_prepare(struct vpath *p) hook_list_prepare(&p->hooks, &p->signals, m, p, nullptr); #endif /* WITH_HOOKS */ - /* Initialize pool */ - ret = pool_init(&p->pool, pool_size, SAMPLE_LENGTH(vlist_length(&p->signals)), pool_mt); - if (ret) - return ret; - - if (p->original_sequence_no == -1) - p->original_sequence_no = vlist_length(&p->sources) == 1; - p->logger->info("Prepared path {} with output signals:", path_name(p)); signal_list_dump(&p->signals); @@ -338,7 +396,10 @@ int path_parse(struct vpath *p, json_t *cfg, struct vlist *nodes) const char *mode = nullptr, *uuid = nullptr; struct vlist destinations; - vlist_init(&destinations); + + ret = vlist_init(&destinations); + if (ret) + return ret; ret = json_unpack_ex(cfg, &err, 0, "{ s: o, s?: o, s?: o, s?: b, s?: b, s?: b, s?: i, s?: s, s?: b, s?: F, s?: o, s?: b, s?: s}", "in", &json_in, @@ -356,7 +417,7 @@ int path_parse(struct vpath *p, json_t *cfg, struct vlist *nodes) "uuid", &uuid ); if (ret) - throw ConfigError(cfg, &err, "node-config-path", "Failed to parse path configuration"); + throw ConfigError(cfg, err, "node-config-path", "Failed to parse path configuration"); /* Optional settings */ if (mode) { @@ -368,6 +429,7 @@ int path_parse(struct vpath *p, json_t *cfg, struct vlist *nodes) throw ConfigError(cfg, "node-config-path", "Invalid path mode '{}'", mode); } + /* UUID */ if (uuid) { ret = uuid_parse(uuid, p->uuid); if (ret) @@ -383,62 +445,10 @@ int path_parse(struct vpath *p, json_t *cfg, struct vlist *nodes) } /* Input node(s) */ - ret = mapping_list_parse(&p->mappings, json_in, nodes); + ret = mapping_list_parse(&p->mappings, json_in); if (ret) throw ConfigError(json_in, "node-config-path-in", "Failed to parse input mapping of path {}", path_name(p)); - std::map pss; - for (size_t i = 0; i < vlist_length(&p->mappings); i++) { - struct mapping_entry *me = (struct mapping_entry *) vlist_at(&p->mappings, i); - struct vnode *n = me->node; - struct vpath_source *ps; - - if (pss.find(n) != pss.end()) - /* We already have a path source for this mapping entry */ - ps = pss[n] - else { - /* Create new path source */ - ps = pss[n] = new struct vpath_source; - if (!ps) - throw MemoryAllocationError(); - - ret = path_source_init(ps); - if (ret) - return ret; - - /* No mask provided -> enable all */ - if (!json_mask) - ps->masked = true; - - /* The node is used as a source by more than one path. - * We are creating an internal loopbacks nodes - * to forward/copy samples to other paths. - */ - if (vlist_length(&n->sources) > 0) { - struct node *lo = new struct vnode; - if (!lo) - throw MemoryAllocationError(); - - auto *vt = node_type_lookup("loopback"); - - ret = node_init(lo, vt); - if (ret) - return ret; - - ps->node = lo; - - auto *master_ps = (struct vpath_source *) vlist_at_safe(&n->sources, 0); - - vlist_push(&master_ps->secondaries, ps); - } - - vlist_push(&n->sources, ps); - vlist_push(&p->sources, ps); - } - - vlist_push(&ps->mappings, me); - } - /* Output node(s) */ if (json_out) { ret = node_list_parse(&destinations, json_out, nodes); @@ -452,11 +462,15 @@ int path_parse(struct vpath *p, json_t *cfg, struct vlist *nodes) if (n->output_path) throw ConfigError(cfg, "node-config-path", "Every node must only be used by a single path as destination"); + n->output_path = p; + auto *pd = new struct vpath_destination; if (!pd) throw MemoryAllocationError(); - path_destination_init(pd); + ret = path_destination_init(pd, n); + if (ret) + return ret; vlist_push(&n->destinations, pd); vlist_push(&p->destinations, pd); @@ -468,17 +482,7 @@ int path_parse(struct vpath *p, json_t *cfg, struct vlist *nodes) #endif /* WITH_HOOKS */ if (json_mask) - path_parse_mask(p, json_mask); - - /* Autodetect whether to use poll() for this path or not */ - if (p->poll == -1) { - if (p->rate > 0) - p->poll = 1; - else if (vlist_length(&p->sources) > 1) - p->poll = 1; - else - p->poll = 0; - } + path_parse_mask(p, json_mask, nodes); ret = vlist_destroy(&destinations, nullptr, false); if (ret) @@ -490,7 +494,7 @@ int path_parse(struct vpath *p, json_t *cfg, struct vlist *nodes) return 0; } -void path_parse_mask(struct vpath *p, json_t *json_mask) +void path_parse_mask(struct vpath *p, json_t *json_mask, struct vlist *nodes) { json_t *json_entry; size_t i; @@ -501,7 +505,6 @@ void path_parse_mask(struct vpath *p, json_t *json_mask) json_array_foreach(json_mask, i, json_entry) { const char *name; struct vnode *node; - struct vpath_source *ps = nullptr; name = json_string_value(json_entry); if (!name) @@ -511,31 +514,18 @@ void path_parse_mask(struct vpath *p, json_t *json_mask) if (!node) throw ConfigError(json_mask, "node-config-path-mask", "The 'mask' entry '{}' is not a valid node name", name); - /* Search correspondending path_source to node */ - for (size_t i = 0; i < vlist_length(&p->sources); i++) { - struct vpath_source *pt = (struct vpath_source *) vlist_at(&p->sources, i); - - if (pt->node == node) { - ps = pt; - break; - } - } - - if (!ps) - throw ConfigError(json_mask, "node-config-path-mask", "Node {} is not a source of the path {}", node_name(node), path_name(p)); - - ps->masked = true; + p->mask_list.push_back(node); } } -int path_check(struct vpath *p) +void path_check(struct vpath *p) { assert(p->state != State::DESTROYED); if (p->rate < 0) throw RuntimeError("Setting 'rate' of path {} must be a positive number.", path_name(p)); - if (p->poll) { + if (p->poll > 0) { if (p->rate <= 0) { /* Check that all path sources provide a file descriptor for polling */ for (size_t i = 0; i < vlist_length(&p->sources); i++) { @@ -556,34 +546,24 @@ int path_check(struct vpath *p) throw RuntimeError("Setting 'poll' must be activated when used together with setting 'rate'"); } - for (size_t i = 0; i < vlist_length(&p->sources); i++) { - struct vpath_source *ps = (struct vpath_source *) vlist_at(&p->sources, i); - - if (!node_is_enabled(ps->node)) - throw RuntimeError("Source {} of path {} is not enabled", node_name(ps->node), path_name(p)); - - if (!node_type(ps->node)->read) - throw RuntimeError("Node {} is not supported as a source for path {}", node_name(ps->node), path_name(p)); - } - - for (size_t i = 0; i < vlist_length(&p->destinations); i++) { - struct vpath_destination *pd = (struct vpath_destination *) vlist_at(&p->destinations, i); - - if (!node_is_enabled(pd->node)) - throw RuntimeError("Destination {} of path {} is not enabled", node_name(pd->node), path_name(p)); - - if (!node_type(pd->node)->write) - throw RuntimeError("Destiation node {} is not supported as a sink for path ", node_name(pd->node), path_name(p)); - } - if (!IS_POW2(p->queuelen)) { p->queuelen = LOG2_CEIL(p->queuelen); p->logger->warn("Queue length should always be a power of 2. Adjusting to {}", p->queuelen); } - p->state = State::CHECKED; + for (size_t i = 0; i < vlist_length(&p->sources); i++) { + struct vpath_source *ps = (struct vpath_source *) vlist_at(&p->sources, i); - return 0; + path_source_check(ps); + } + + for (size_t i = 0; i < vlist_length(&p->destinations); i++) { + struct vpath_destination *ps = (struct vpath_destination *) vlist_at(&p->destinations, i); + + path_destination_check(ps); + } + + p->state = State::CHECKED; } int path_start(struct vpath *p) @@ -609,7 +589,7 @@ int path_start(struct vpath *p) p->logger->info("Starting path {}: #signals={}, #hooks={}, #sources={}, " "#destinations={}, mode={}, poll={}, mask={:b}, rate={}, " - "enabled={}, reversed={}, queuelen={}, original_sequence_no={}", + "enabled={}, reversed={}, queuelen={}, original_sequence_no={}", path_name(p), vlist_length(&p->signals), vlist_length(&p->hooks), @@ -649,13 +629,6 @@ int path_start(struct vpath *p) p->last_sample->data[i] = sig->init; } - /* Prepare poll() */ - if (p->poll) { - ret = path_prepare_poll(p); - if (ret) - return ret; - } - p->state = State::STARTED; /* Start one thread per path for sending to destinations @@ -730,7 +703,7 @@ int path_destroy(struct vpath *p) if (ret) return ret; - ret = vlist_destroy(&p->mappings, nullptr, true); + ret = vlist_destroy(&p->mappings, (dtor_cb_t) mapping_entry_destroy, true); if (ret) return ret; diff --git a/lib/path_destination.cpp b/lib/path_destination.cpp index 239d80217..a0a482d17 100644 --- a/lib/path_destination.cpp +++ b/lib/path_destination.cpp @@ -25,12 +25,18 @@ #include #include #include +#include #include -int path_destination_init(struct vpath_destination *pd) +using namespace villas; + +int path_destination_init(struct vpath_destination *pd, struct vnode *n) { pd->node = n; - pd->node->output_path = p; + + vlist_push(&n->destinations, pd); + + return 0; } int path_destination_prepare(struct vpath_destination *pd, int queuelen) @@ -116,3 +122,12 @@ void path_destination_write(struct vpath_destination *pd, struct vpath *p) p->logger->debug("Released {} samples back to memory pool", released); } } + +void path_destination_check(struct vpath_destination *pd) +{ + if (!node_is_enabled(pd->node)) + throw RuntimeError("Destination {} is not enabled", node_name(pd->node)); + + if (!node_type(pd->node)->write) + throw RuntimeError("Destiation node {} is not supported as a sink for path ", node_name(pd->node)); +} \ No newline at end of file diff --git a/lib/path_source.cpp b/lib/path_source.cpp index 72e33d2ce..dc7c535f1 100644 --- a/lib/path_source.cpp +++ b/lib/path_source.cpp @@ -20,27 +20,38 @@ * along with this program. If not, see . *********************************************************************************/ +#include + #include #include #include #include +#include #include +#include + #include #include -int path_source_init(struct vpath_source *ps) -{ - ps->node = me->node; - ps->masked = false; +using namespace villas; - vlist_init(&ps->mappings); - vlist_init(&ps->secondaries); -} - -int path_source_prepare(struct vpath_source *ps) +int path_source_init_master(struct vpath_source *ps, struct vnode *n) { int ret; + + ps->node = n; + ps->masked = false; + ps->type = PathSourceType::MASTER; + + ret = vlist_init(&ps->mappings); + if (ret) + return ret; + + ret = vlist_init(&ps->secondaries); + if (ret) + return ret; + int pool_size = MAX(DEFAULT_QUEUE_LENGTH, ps->node->in.vectorize); if (ps->node->_vt->pool_size) @@ -53,6 +64,38 @@ int path_source_prepare(struct vpath_source *ps) return 0; } +int path_source_init_secondary(struct vpath_source *ps, struct vnode *n) +{ + int ret; + struct vpath_source *mps; + + ret = path_source_init_master(ps, n); + if (ret) + return ret; + + ps->type = PathSourceType::SECONDARY; + + ps->node = loopback_internal_create(n); + if (!ps->node) + return -1; + + ret = node_check(ps->node); + if (ret) + return -1; + + ret = node_prepare(ps->node); + if (ret) + return -1; + + mps = (struct vpath_source *) vlist_at_safe(&n->sources, 0); + if (!mps) + return -1; + + vlist_push(&mps->secondaries, ps); + + return 0; +} + int path_source_destroy(struct vpath_source *ps) { int ret; @@ -113,15 +156,16 @@ int path_source_read(struct vpath_source *ps, struct vpath *p, int i) /* Forward samples to secondary path sources */ for (size_t i = 0; i < vlist_length(&ps->secondaries); i++) { - struct vpath_source *sps = (struct vpath_source *) vlist_at(&ps->secondaries, i); + auto *sps = (struct vpath_source *) vlist_at(&ps->secondaries, i); - sample_incref_many(read_smps, recv); - - int sent, release = recv; + int sent; + unsigned release = recv; sent = node_write(sps->node, read_smps, recv, &release); if (sent < recv) - p->logger->warn("Partial write to secondary path source"); + p->logger->warn("Partial write to secondary path source {} of path {}", node_name(sps->node), path_name(p)); + + sample_incref_many(read_smps+release, recv-release); } p->received.set(i); @@ -194,3 +238,12 @@ out2: sample_decref_many(read_smps, release); return enqueued; } + +void path_source_check(struct vpath_source *ps) +{ + if (!node_is_enabled(ps->node)) + throw RuntimeError("Source {} is not enabled", node_name(ps->node)); + + if (!node_type(ps->node)->read) + throw RuntimeError("Node {} is not supported as a source for a path", node_name(ps->node)); +}