From 3b99227537951eed2b48829e13533b5f86b30d24 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Sun, 24 Feb 2019 09:39:41 +0100 Subject: [PATCH] path: separated path_{source, destination} --- include/villas/path.h | 15 -- include/villas/path_destination.h | 60 ++++++++ include/villas/path_source.h | 62 ++++++++ lib/CMakeLists.txt | 2 + lib/path.c | 247 +++--------------------------- lib/path_destination.c | 110 +++++++++++++ lib/path_source.c | 159 +++++++++++++++++++ 7 files changed, 417 insertions(+), 238 deletions(-) create mode 100644 include/villas/path_destination.h create mode 100644 include/villas/path_source.h create mode 100644 lib/path_destination.c create mode 100644 lib/path_source.c diff --git a/include/villas/path.h b/include/villas/path.h index 6024139c3..1abe9ab0c 100644 --- a/include/villas/path.h +++ b/include/villas/path.h @@ -49,21 +49,6 @@ extern "C" { struct stats; struct node; -struct path_source { - struct node *node; - - bool masked; - - struct pool pool; - struct vlist mappings; /**< List of mappings (struct mapping_entry). */ -}; - -struct path_destination { - struct node *node; - - struct queue queue; -}; - /** The register mode determines under which condition the path is triggered. */ enum path_mode { PATH_MODE_ANY, /**< The path is triggered whenever one of the sources receives samples. */ diff --git a/include/villas/path_destination.h b/include/villas/path_destination.h new file mode 100644 index 000000000..fede6ffb6 --- /dev/null +++ b/include/villas/path_destination.h @@ -0,0 +1,60 @@ +/** Path destination + * + * @file + * @author Steffen Vogel + * @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +/** A path connects one input node to multiple output nodes (1-to-n). + * + * @addtogroup path Path + * @{ + */ + +#pragma once + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +/* Forward declarations */ +struct path; +struct sample; + +struct path_destination { + struct node *node; + + struct queue queue; +}; + +int path_destination_init(struct path_destination *pd, int queuelen); + +int path_destination_destroy(struct path_destination *pd); + +void path_destination_enqueue(struct path *p, struct sample *smps[], unsigned cnt); + +void path_destination_write(struct path_destination *pd, struct path *p); + +#ifdef __cplusplus +} +#endif + +/** @} */ diff --git a/include/villas/path_source.h b/include/villas/path_source.h new file mode 100644 index 000000000..4114dcda2 --- /dev/null +++ b/include/villas/path_source.h @@ -0,0 +1,62 @@ +/** Message source + * + * @file + * @author Steffen Vogel + * @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +/** A path connects one input node to multiple output nodes (1-to-n). + * + * @addtogroup path Path + * @{ + */ + +#pragma once + +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +/* Forward declarations */ +struct path; +struct sample; + +struct path_source { + struct node *node; + + bool masked; + + struct pool pool; + struct vlist mappings; /**< List of mappings (struct mapping_entry). */ +}; + +int path_source_init(struct path_source *ps); + +int path_source_destroy(struct path_source *ps); + +int path_source_read(struct path_source *ps, struct path *p, int i); + +#ifdef __cplusplus +} +#endif + +/** @} */ diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index 22a797a8e..85318fcbb 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -46,6 +46,8 @@ set(LIB_SRC memory/managed.c sample.c path.c + path_source.c + path_destination.c node.c memory.c plugin.c diff --git a/lib/path.c b/lib/path.c index 7bdb04b3f..9f1288bb3 100644 --- a/lib/path.c +++ b/lib/path.c @@ -29,7 +29,6 @@ #include #include -#include #include #include #include @@ -39,221 +38,9 @@ #include #include #include - -/* Forward declaration */ -static void path_destination_enqueue(struct path *p, struct sample *smps[], unsigned cnt); - -static int path_source_init(struct path_source *ps) -{ - int ret; - int pool_size = MAX(DEFAULT_QUEUE_LENGTH, ps->node->in.vectorize); - - if (ps->node->_vt->pool_size) - pool_size = ps->node->_vt->pool_size; - - ret = pool_init(&ps->pool, pool_size, SAMPLE_LENGTH(vlist_length(&ps->node->in.signals)), node_memory_type(ps->node, &memory_hugepage)); - if (ret) - return ret; - - return 0; -} - -static int path_source_destroy(struct path_source *ps) -{ - int ret; - - ret = pool_destroy(&ps->pool); - if (ret) - return ret; - - ret = vlist_destroy(&ps->mappings, NULL, true); - if (ret) - return ret; - - return 0; -} - -static int path_source_read(struct path_source *ps, struct path *p, int i) -{ - int recv, tomux, allocated, cnt, toenqueue, enqueued = 0; - unsigned release; - - cnt = ps->node->in.vectorize; - - struct sample *read_smps[cnt]; - struct sample *muxed_smps[cnt]; - struct sample **tomux_smps; - - /* Fill smps[] free sample blocks from the pool */ - allocated = sample_alloc_many(&ps->pool, read_smps, cnt); - if (allocated != cnt) - warning("Pool underrun for path source %s", node_name(ps->node)); - - /* Read ready samples and store them to blocks pointed by smps[] */ - release = allocated; - - recv = node_read(ps->node, read_smps, allocated, &release); - if (recv == 0) { - enqueued = 0; - goto out2; - } - else if (recv < 0) { - if (ps->node->state == STATE_STOPPING) { - p->state = STATE_STOPPING; - - enqueued = -1; - goto out2; - } - else - error("Failed to read samples from node %s", node_name(ps->node)); - } - else if (recv < allocated) - warning("Partial read for path %s: read=%u, expected=%u", path_name(p), recv, allocated); - - bitset_set(&p->received, i); - - if (p->mode == PATH_MODE_ANY) { /* Mux all samples */ - tomux_smps = read_smps; - tomux = recv; - } - else { /* Mux only last sample and discard others */ - tomux_smps = read_smps + recv - 1; - tomux = 1; - } - - for (int i = 0; i < tomux; i++) { - muxed_smps[i] = i == 0 - ? sample_clone(p->last_sample) - : sample_clone(muxed_smps[i-1]); - - if (p->original_sequence_no) - muxed_smps[i]->sequence = tomux_smps[i]->sequence; - else { - muxed_smps[i]->sequence = p->last_sequence++; - muxed_smps[i]->flags |= SAMPLE_HAS_SEQUENCE; - } - - muxed_smps[i]->ts = tomux_smps[i]->ts; - muxed_smps[i]->flags |= tomux_smps[i]->flags & (SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_TS_RECEIVED); - - mapping_remap(&ps->mappings, muxed_smps[i], tomux_smps[i], NULL); - } - - sample_copy(p->last_sample, muxed_smps[tomux-1]); - - debug(15, "Path %s received = %s", path_name(p), bitset_dump(&p->received)); - -#ifdef WITH_HOOKS - toenqueue = hook_process_list(&p->hooks, muxed_smps, tomux); - if (toenqueue != tomux) { - int skipped = tomux - toenqueue; - - debug(LOG_NODES | 10, "Hooks skipped %u out of %u samples for path %s", skipped, tomux, path_name(p)); - } -#else - toenqueue = tomux; -#endif - - if (bitset_test(&p->mask, i)) { - /* Check if we received an update from all nodes/ */ - if ((p->mode == PATH_MODE_ANY) || - (p->mode == PATH_MODE_ALL && !bitset_cmp(&p->mask, &p->received))) { - path_destination_enqueue(p, muxed_smps, toenqueue); - - /* Reset bitset of updated nodes */ - bitset_clear_all(&p->received); - - enqueued = toenqueue; - } - } - - sample_decref_many(muxed_smps, tomux); -out2: sample_decref_many(read_smps, release); - - return enqueued; -} - -static int path_destination_init(struct path_destination *pd, int queuelen) -{ - int ret; - - ret = queue_init(&pd->queue, queuelen, &memory_hugepage); - if (ret) - return ret; - - return 0; -} - -static int path_destination_destroy(struct path_destination *pd) -{ - int ret; - - ret = queue_destroy(&pd->queue); - if (ret) - return ret; - - return 0; -} - -static void path_destination_enqueue(struct path *p, struct sample *smps[], unsigned cnt) -{ - unsigned enqueued, cloned; - - struct sample *clones[cnt]; - - cloned = sample_clone_many(clones, smps, cnt); - if (cloned < cnt) - warning("Pool underrun in path %s", path_name(p)); - - for (size_t i = 0; i < vlist_length(&p->destinations); i++) { - struct path_destination *pd = (struct path_destination *) vlist_at(&p->destinations, i); - - enqueued = queue_push_many(&pd->queue, (void **) clones, cloned); - if (enqueued != cnt) - warning("Queue overrun for path %s", path_name(p)); - - /* Increase reference counter of these samples as they are now also owned by the queue. */ - sample_incref_many(clones, cloned); - - debug(LOG_PATH | 15, "Enqueued %u samples to destination %s of path %s", enqueued, node_name(pd->node), path_name(p)); - } - - sample_decref_many(clones, cloned); -} - -static void path_destination_write(struct path_destination *pd, struct path *p) -{ - int cnt = pd->node->out.vectorize; - int sent; - int released; - int allocated; - unsigned release; - - struct sample *smps[cnt]; - - /* As long as there are still samples in the queue */ - while (1) { - allocated = queue_pull_many(&pd->queue, (void **) smps, cnt); - if (allocated == 0) - break; - else if (allocated < cnt) - debug(LOG_PATH | 5, "Queue underrun for path %s: allocated=%u expected=%u", path_name(p), allocated, cnt); - - debug(LOG_PATH | 15, "Dequeued %u samples from queue of node %s which is part of path %s", allocated, node_name(pd->node), path_name(p)); - - release = allocated; - - sent = node_write(pd->node, smps, allocated, &release); - if (sent < 0) - error("Failed to sent %u samples to node %s: reason=%d", cnt, node_name(pd->node), sent); - else if (sent < allocated) - warning("Partial write to node %s: written=%d, expected=%d", node_name(pd->node), sent, allocated); - - released = sample_decref_many(smps, release); - - debug(LOG_PATH | 15, "Released %d samples back to memory pool", released); - } -} +#include +#include +#include static void * path_run_single(void *arg) { @@ -337,7 +124,7 @@ int path_init(struct path *p) if (ret) return ret; - ret = vlist_init(&p->hooks); + ret = hook_list_init(&p->hooks); if (ret) return ret; @@ -411,8 +198,10 @@ int path_prepare(struct path *p) assert(p->state == STATE_CHECKED); #ifdef WITH_HOOKS + int m = p->builtin ? HOOK_PATH | HOOK_BUILTIN : 0; + /* Add internal hooks if they are not already in the list */ - ret = hook_init_builtin_list(&p->hooks, p->builtin, HOOK_PATH, p, NULL); + ret = hook_list_prepare(&p->hooks, &p->signals, m, p, NULL); if (ret) return ret; @@ -646,7 +435,7 @@ int path_parse(struct path *p, json_t *cfg, struct vlist *nodes) #ifdef WITH_HOOKS if (json_hooks) { - ret = hook_parse_list(&p->hooks, json_hooks, HOOK_PATH, p, NULL); + ret = hook_list_parse(&p->hooks, json_hooks, HOOK_PATH, p, NULL); if (ret) return ret; } @@ -846,15 +635,27 @@ int path_stop(struct path *p) int path_destroy(struct path *p) { + int ret; + if (p->state == STATE_DESTROYED) return 0; #ifdef WITH_HOOKS - vlist_destroy(&p->hooks, (dtor_cb_t) hook_destroy, true); + ret = hook_list_destroy(&p->hooks); + if (ret) + return ret; #endif - vlist_destroy(&p->sources, (dtor_cb_t) path_source_destroy, true); - vlist_destroy(&p->destinations, (dtor_cb_t) path_destination_destroy, true); - vlist_destroy(&p->signals, (dtor_cb_t) signal_decref, false); + ret = signal_list_destroy(&p->signals); + if (ret) + return ret; + + ret = vlist_destroy(&p->sources, (dtor_cb_t) path_source_destroy, true); + if (ret) + return ret; + + ret = vlist_destroy(&p->destinations, (dtor_cb_t) path_destination_destroy, true); + if (ret) + return ret; if (p->reader.pfds) free(p->reader.pfds); diff --git a/lib/path_destination.c b/lib/path_destination.c new file mode 100644 index 000000000..fa0d2c032 --- /dev/null +++ b/lib/path_destination.c @@ -0,0 +1,110 @@ +/** Path destination + * + * @author Steffen Vogel + * @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +#include +#include +#include +#include +#include +#include + +int path_destination_init(struct path_destination *pd, int queuelen) +{ + int ret; + + ret = queue_init(&pd->queue, queuelen, &memory_hugepage); + if (ret) + return ret; + + return 0; +} + +int path_destination_destroy(struct path_destination *pd) +{ + int ret; + + ret = queue_destroy(&pd->queue); + if (ret) + return ret; + + return 0; +} + +void path_destination_enqueue(struct path *p, struct sample *smps[], unsigned cnt) +{ + unsigned enqueued, cloned; + + struct sample *clones[cnt]; + + cloned = sample_clone_many(clones, smps, cnt); + if (cloned < cnt) + warning("Pool underrun in path %s", path_name(p)); + + for (size_t i = 0; i < vlist_length(&p->destinations); i++) { + struct path_destination *pd = (struct path_destination *) vlist_at(&p->destinations, i); + + enqueued = queue_push_many(&pd->queue, (void **) clones, cloned); + if (enqueued != cnt) + warning("Queue overrun for path %s", path_name(p)); + + /* Increase reference counter of these samples as they are now also owned by the queue. */ + sample_incref_many(clones, cloned); + + debug(LOG_PATH | 15, "Enqueued %u samples to destination %s of path %s", enqueued, node_name(pd->node), path_name(p)); + } + + sample_decref_many(clones, cloned); +} + +void path_destination_write(struct path_destination *pd, struct path *p) +{ + int cnt = pd->node->out.vectorize; + int sent; + int released; + int allocated; + unsigned release; + + struct sample *smps[cnt]; + + /* As long as there are still samples in the queue */ + while (1) { + allocated = queue_pull_many(&pd->queue, (void **) smps, cnt); + if (allocated == 0) + break; + else if (allocated < cnt) + debug(LOG_PATH | 5, "Queue underrun for path %s: allocated=%u expected=%u", path_name(p), allocated, cnt); + + debug(LOG_PATH | 15, "Dequeued %u samples from queue of node %s which is part of path %s", allocated, node_name(pd->node), path_name(p)); + + release = allocated; + + sent = node_write(pd->node, smps, allocated, &release); + if (sent < 0) + error("Failed to sent %u samples to node %s: reason=%d", cnt, node_name(pd->node), sent); + else if (sent < allocated) + warning("Partial write to node %s: written=%d, expected=%d", node_name(pd->node), sent, allocated); + + released = sample_decref_many(smps, release); + + debug(LOG_PATH | 15, "Released %d samples back to memory pool", released); + } +} diff --git a/lib/path_source.c b/lib/path_source.c new file mode 100644 index 000000000..ae8225588 --- /dev/null +++ b/lib/path_source.c @@ -0,0 +1,159 @@ +/** Path source + * + * @author Steffen Vogel + * @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +#include +#include +#include +#include +#include +#include +#include + +int path_source_init(struct path_source *ps) +{ + int ret; + int pool_size = MAX(DEFAULT_QUEUE_LENGTH, ps->node->in.vectorize); + + if (ps->node->_vt->pool_size) + pool_size = ps->node->_vt->pool_size; + + ret = pool_init(&ps->pool, pool_size, SAMPLE_LENGTH(vlist_length(&ps->node->in.signals)), node_memory_type(ps->node, &memory_hugepage)); + if (ret) + return ret; + + return 0; +} + +int path_source_destroy(struct path_source *ps) +{ + int ret; + + ret = pool_destroy(&ps->pool); + if (ret) + return ret; + + ret = vlist_destroy(&ps->mappings, NULL, true); + if (ret) + return ret; + + return 0; +} + +int path_source_read(struct path_source *ps, struct path *p, int i) +{ + int recv, tomux, allocated, cnt, toenqueue, enqueued = 0; + unsigned release; + + cnt = ps->node->in.vectorize; + + struct sample *read_smps[cnt]; + struct sample *muxed_smps[cnt]; + struct sample **tomux_smps; + + /* Fill smps[] free sample blocks from the pool */ + allocated = sample_alloc_many(&ps->pool, read_smps, cnt); + if (allocated != cnt) + warning("Pool underrun for path source %s", node_name(ps->node)); + + /* Read ready samples and store them to blocks pointed by smps[] */ + release = allocated; + + recv = node_read(ps->node, read_smps, allocated, &release); + if (recv == 0) { + enqueued = 0; + goto out2; + } + else if (recv < 0) { + if (ps->node->state == STATE_STOPPING) { + p->state = STATE_STOPPING; + + enqueued = -1; + goto out2; + } + else + error("Failed to read samples from node %s", node_name(ps->node)); + } + else if (recv < allocated) + warning("Partial read for path %s: read=%u, expected=%u", path_name(p), recv, allocated); + + bitset_set(&p->received, i); + + if (p->mode == PATH_MODE_ANY) { /* Mux all samples */ + tomux_smps = read_smps; + tomux = recv; + } + else { /* Mux only last sample and discard others */ + tomux_smps = read_smps + recv - 1; + tomux = 1; + } + + for (int i = 0; i < tomux; i++) { + muxed_smps[i] = i == 0 + ? sample_clone(p->last_sample) + : sample_clone(muxed_smps[i-1]); + + if (p->original_sequence_no) + muxed_smps[i]->sequence = tomux_smps[i]->sequence; + else { + muxed_smps[i]->sequence = p->last_sequence++; + muxed_smps[i]->flags |= SAMPLE_HAS_SEQUENCE; + } + + muxed_smps[i]->ts = tomux_smps[i]->ts; + muxed_smps[i]->flags |= tomux_smps[i]->flags & (SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_TS_RECEIVED); + + mapping_remap(&ps->mappings, muxed_smps[i], tomux_smps[i], NULL); + } + + sample_copy(p->last_sample, muxed_smps[tomux-1]); + + debug(15, "Path %s received = %s", path_name(p), bitset_dump(&p->received)); + +#ifdef WITH_HOOKS + toenqueue = hook_list_process(&p->hooks, muxed_smps, tomux); + if (toenqueue != tomux) { + int skipped = tomux - toenqueue; + + debug(LOG_NODES | 10, "Hooks skipped %u out of %u samples for path %s", skipped, tomux, path_name(p)); + } +#else + toenqueue = tomux; +#endif + + if (bitset_test(&p->mask, i)) { + /* Check if we received an update from all nodes/ */ + if ((p->mode == PATH_MODE_ANY) || + (p->mode == PATH_MODE_ALL && !bitset_cmp(&p->mask, &p->received))) { + path_destination_enqueue(p, muxed_smps, toenqueue); + + /* Reset bitset of updated nodes */ + bitset_clear_all(&p->received); + + enqueued = toenqueue; + } + } + + sample_decref_many(muxed_smps, tomux); +out2: sample_decref_many(read_smps, release); + + return enqueued; +}