1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

path: separated path_{source, destination}

This commit is contained in:
Steffen Vogel 2019-02-24 09:39:41 +01:00
parent e8de9df993
commit 3b99227537
7 changed files with 417 additions and 238 deletions

View file

@ -49,21 +49,6 @@ extern "C" {
struct stats;
struct node;
struct path_source {
struct node *node;
bool masked;
struct pool pool;
struct vlist mappings; /**< List of mappings (struct mapping_entry). */
};
struct path_destination {
struct node *node;
struct queue queue;
};
/** The register mode determines under which condition the path is triggered. */
enum path_mode {
PATH_MODE_ANY, /**< The path is triggered whenever one of the sources receives samples. */

View file

@ -0,0 +1,60 @@
/** Path destination
*
* @file
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
/** A path connects one input node to multiple output nodes (1-to-n).
*
* @addtogroup path Path
* @{
*/
#pragma once
#include <villas/queue.h>
#ifdef __cplusplus
extern "C" {
#endif
/* Forward declarations */
struct path;
struct sample;
struct path_destination {
struct node *node;
struct queue queue;
};
int path_destination_init(struct path_destination *pd, int queuelen);
int path_destination_destroy(struct path_destination *pd);
void path_destination_enqueue(struct path *p, struct sample *smps[], unsigned cnt);
void path_destination_write(struct path_destination *pd, struct path *p);
#ifdef __cplusplus
}
#endif
/** @} */

View file

@ -0,0 +1,62 @@
/** Message source
*
* @file
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
/** A path connects one input node to multiple output nodes (1-to-n).
*
* @addtogroup path Path
* @{
*/
#pragma once
#include <villas/pool.h>
#include <villas/list.h>
#ifdef __cplusplus
extern "C" {
#endif
/* Forward declarations */
struct path;
struct sample;
struct path_source {
struct node *node;
bool masked;
struct pool pool;
struct vlist mappings; /**< List of mappings (struct mapping_entry). */
};
int path_source_init(struct path_source *ps);
int path_source_destroy(struct path_source *ps);
int path_source_read(struct path_source *ps, struct path *p, int i);
#ifdef __cplusplus
}
#endif
/** @} */

View file

@ -46,6 +46,8 @@ set(LIB_SRC
memory/managed.c
sample.c
path.c
path_source.c
path_destination.c
node.c
memory.c
plugin.c

View file

@ -29,7 +29,6 @@
#include <villas/node/config.h>
#include <villas/utils.h>
#include <villas/path.h>
#include <villas/timing.h>
#include <villas/pool.h>
#include <villas/queue.h>
@ -39,221 +38,9 @@
#include <villas/stats.h>
#include <villas/node.h>
#include <villas/signal.h>
/* Forward declaration */
static void path_destination_enqueue(struct path *p, struct sample *smps[], unsigned cnt);
static int path_source_init(struct path_source *ps)
{
int ret;
int pool_size = MAX(DEFAULT_QUEUE_LENGTH, ps->node->in.vectorize);
if (ps->node->_vt->pool_size)
pool_size = ps->node->_vt->pool_size;
ret = pool_init(&ps->pool, pool_size, SAMPLE_LENGTH(vlist_length(&ps->node->in.signals)), node_memory_type(ps->node, &memory_hugepage));
if (ret)
return ret;
return 0;
}
static int path_source_destroy(struct path_source *ps)
{
int ret;
ret = pool_destroy(&ps->pool);
if (ret)
return ret;
ret = vlist_destroy(&ps->mappings, NULL, true);
if (ret)
return ret;
return 0;
}
static int path_source_read(struct path_source *ps, struct path *p, int i)
{
int recv, tomux, allocated, cnt, toenqueue, enqueued = 0;
unsigned release;
cnt = ps->node->in.vectorize;
struct sample *read_smps[cnt];
struct sample *muxed_smps[cnt];
struct sample **tomux_smps;
/* Fill smps[] free sample blocks from the pool */
allocated = sample_alloc_many(&ps->pool, read_smps, cnt);
if (allocated != cnt)
warning("Pool underrun for path source %s", node_name(ps->node));
/* Read ready samples and store them to blocks pointed by smps[] */
release = allocated;
recv = node_read(ps->node, read_smps, allocated, &release);
if (recv == 0) {
enqueued = 0;
goto out2;
}
else if (recv < 0) {
if (ps->node->state == STATE_STOPPING) {
p->state = STATE_STOPPING;
enqueued = -1;
goto out2;
}
else
error("Failed to read samples from node %s", node_name(ps->node));
}
else if (recv < allocated)
warning("Partial read for path %s: read=%u, expected=%u", path_name(p), recv, allocated);
bitset_set(&p->received, i);
if (p->mode == PATH_MODE_ANY) { /* Mux all samples */
tomux_smps = read_smps;
tomux = recv;
}
else { /* Mux only last sample and discard others */
tomux_smps = read_smps + recv - 1;
tomux = 1;
}
for (int i = 0; i < tomux; i++) {
muxed_smps[i] = i == 0
? sample_clone(p->last_sample)
: sample_clone(muxed_smps[i-1]);
if (p->original_sequence_no)
muxed_smps[i]->sequence = tomux_smps[i]->sequence;
else {
muxed_smps[i]->sequence = p->last_sequence++;
muxed_smps[i]->flags |= SAMPLE_HAS_SEQUENCE;
}
muxed_smps[i]->ts = tomux_smps[i]->ts;
muxed_smps[i]->flags |= tomux_smps[i]->flags & (SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_TS_RECEIVED);
mapping_remap(&ps->mappings, muxed_smps[i], tomux_smps[i], NULL);
}
sample_copy(p->last_sample, muxed_smps[tomux-1]);
debug(15, "Path %s received = %s", path_name(p), bitset_dump(&p->received));
#ifdef WITH_HOOKS
toenqueue = hook_process_list(&p->hooks, muxed_smps, tomux);
if (toenqueue != tomux) {
int skipped = tomux - toenqueue;
debug(LOG_NODES | 10, "Hooks skipped %u out of %u samples for path %s", skipped, tomux, path_name(p));
}
#else
toenqueue = tomux;
#endif
if (bitset_test(&p->mask, i)) {
/* Check if we received an update from all nodes/ */
if ((p->mode == PATH_MODE_ANY) ||
(p->mode == PATH_MODE_ALL && !bitset_cmp(&p->mask, &p->received))) {
path_destination_enqueue(p, muxed_smps, toenqueue);
/* Reset bitset of updated nodes */
bitset_clear_all(&p->received);
enqueued = toenqueue;
}
}
sample_decref_many(muxed_smps, tomux);
out2: sample_decref_many(read_smps, release);
return enqueued;
}
static int path_destination_init(struct path_destination *pd, int queuelen)
{
int ret;
ret = queue_init(&pd->queue, queuelen, &memory_hugepage);
if (ret)
return ret;
return 0;
}
static int path_destination_destroy(struct path_destination *pd)
{
int ret;
ret = queue_destroy(&pd->queue);
if (ret)
return ret;
return 0;
}
static void path_destination_enqueue(struct path *p, struct sample *smps[], unsigned cnt)
{
unsigned enqueued, cloned;
struct sample *clones[cnt];
cloned = sample_clone_many(clones, smps, cnt);
if (cloned < cnt)
warning("Pool underrun in path %s", path_name(p));
for (size_t i = 0; i < vlist_length(&p->destinations); i++) {
struct path_destination *pd = (struct path_destination *) vlist_at(&p->destinations, i);
enqueued = queue_push_many(&pd->queue, (void **) clones, cloned);
if (enqueued != cnt)
warning("Queue overrun for path %s", path_name(p));
/* Increase reference counter of these samples as they are now also owned by the queue. */
sample_incref_many(clones, cloned);
debug(LOG_PATH | 15, "Enqueued %u samples to destination %s of path %s", enqueued, node_name(pd->node), path_name(p));
}
sample_decref_many(clones, cloned);
}
static void path_destination_write(struct path_destination *pd, struct path *p)
{
int cnt = pd->node->out.vectorize;
int sent;
int released;
int allocated;
unsigned release;
struct sample *smps[cnt];
/* As long as there are still samples in the queue */
while (1) {
allocated = queue_pull_many(&pd->queue, (void **) smps, cnt);
if (allocated == 0)
break;
else if (allocated < cnt)
debug(LOG_PATH | 5, "Queue underrun for path %s: allocated=%u expected=%u", path_name(p), allocated, cnt);
debug(LOG_PATH | 15, "Dequeued %u samples from queue of node %s which is part of path %s", allocated, node_name(pd->node), path_name(p));
release = allocated;
sent = node_write(pd->node, smps, allocated, &release);
if (sent < 0)
error("Failed to sent %u samples to node %s: reason=%d", cnt, node_name(pd->node), sent);
else if (sent < allocated)
warning("Partial write to node %s: written=%d, expected=%d", node_name(pd->node), sent, allocated);
released = sample_decref_many(smps, release);
debug(LOG_PATH | 15, "Released %d samples back to memory pool", released);
}
}
#include <villas/path.h>
#include <villas/path_source.h>
#include <villas/path_destination.h>
static void * path_run_single(void *arg)
{
@ -337,7 +124,7 @@ int path_init(struct path *p)
if (ret)
return ret;
ret = vlist_init(&p->hooks);
ret = hook_list_init(&p->hooks);
if (ret)
return ret;
@ -411,8 +198,10 @@ int path_prepare(struct path *p)
assert(p->state == STATE_CHECKED);
#ifdef WITH_HOOKS
int m = p->builtin ? HOOK_PATH | HOOK_BUILTIN : 0;
/* Add internal hooks if they are not already in the list */
ret = hook_init_builtin_list(&p->hooks, p->builtin, HOOK_PATH, p, NULL);
ret = hook_list_prepare(&p->hooks, &p->signals, m, p, NULL);
if (ret)
return ret;
@ -646,7 +435,7 @@ int path_parse(struct path *p, json_t *cfg, struct vlist *nodes)
#ifdef WITH_HOOKS
if (json_hooks) {
ret = hook_parse_list(&p->hooks, json_hooks, HOOK_PATH, p, NULL);
ret = hook_list_parse(&p->hooks, json_hooks, HOOK_PATH, p, NULL);
if (ret)
return ret;
}
@ -846,15 +635,27 @@ int path_stop(struct path *p)
int path_destroy(struct path *p)
{
int ret;
if (p->state == STATE_DESTROYED)
return 0;
#ifdef WITH_HOOKS
vlist_destroy(&p->hooks, (dtor_cb_t) hook_destroy, true);
ret = hook_list_destroy(&p->hooks);
if (ret)
return ret;
#endif
vlist_destroy(&p->sources, (dtor_cb_t) path_source_destroy, true);
vlist_destroy(&p->destinations, (dtor_cb_t) path_destination_destroy, true);
vlist_destroy(&p->signals, (dtor_cb_t) signal_decref, false);
ret = signal_list_destroy(&p->signals);
if (ret)
return ret;
ret = vlist_destroy(&p->sources, (dtor_cb_t) path_source_destroy, true);
if (ret)
return ret;
ret = vlist_destroy(&p->destinations, (dtor_cb_t) path_destination_destroy, true);
if (ret)
return ret;
if (p->reader.pfds)
free(p->reader.pfds);

110
lib/path_destination.c Normal file
View file

@ -0,0 +1,110 @@
/** Path destination
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#include <villas/utils.h>
#include <villas/memory.h>
#include <villas/sample.h>
#include <villas/node.h>
#include <villas/path.h>
#include <villas/path_destination.h>
int path_destination_init(struct path_destination *pd, int queuelen)
{
int ret;
ret = queue_init(&pd->queue, queuelen, &memory_hugepage);
if (ret)
return ret;
return 0;
}
int path_destination_destroy(struct path_destination *pd)
{
int ret;
ret = queue_destroy(&pd->queue);
if (ret)
return ret;
return 0;
}
void path_destination_enqueue(struct path *p, struct sample *smps[], unsigned cnt)
{
unsigned enqueued, cloned;
struct sample *clones[cnt];
cloned = sample_clone_many(clones, smps, cnt);
if (cloned < cnt)
warning("Pool underrun in path %s", path_name(p));
for (size_t i = 0; i < vlist_length(&p->destinations); i++) {
struct path_destination *pd = (struct path_destination *) vlist_at(&p->destinations, i);
enqueued = queue_push_many(&pd->queue, (void **) clones, cloned);
if (enqueued != cnt)
warning("Queue overrun for path %s", path_name(p));
/* Increase reference counter of these samples as they are now also owned by the queue. */
sample_incref_many(clones, cloned);
debug(LOG_PATH | 15, "Enqueued %u samples to destination %s of path %s", enqueued, node_name(pd->node), path_name(p));
}
sample_decref_many(clones, cloned);
}
void path_destination_write(struct path_destination *pd, struct path *p)
{
int cnt = pd->node->out.vectorize;
int sent;
int released;
int allocated;
unsigned release;
struct sample *smps[cnt];
/* As long as there are still samples in the queue */
while (1) {
allocated = queue_pull_many(&pd->queue, (void **) smps, cnt);
if (allocated == 0)
break;
else if (allocated < cnt)
debug(LOG_PATH | 5, "Queue underrun for path %s: allocated=%u expected=%u", path_name(p), allocated, cnt);
debug(LOG_PATH | 15, "Dequeued %u samples from queue of node %s which is part of path %s", allocated, node_name(pd->node), path_name(p));
release = allocated;
sent = node_write(pd->node, smps, allocated, &release);
if (sent < 0)
error("Failed to sent %u samples to node %s: reason=%d", cnt, node_name(pd->node), sent);
else if (sent < allocated)
warning("Partial write to node %s: written=%d, expected=%d", node_name(pd->node), sent, allocated);
released = sample_decref_many(smps, release);
debug(LOG_PATH | 15, "Released %d samples back to memory pool", released);
}
}

159
lib/path_source.c Normal file
View file

@ -0,0 +1,159 @@
/** Path source
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#include <villas/utils.h>
#include <villas/bitset.h>
#include <villas/sample.h>
#include <villas/node.h>
#include <villas/path.h>
#include <villas/path_destination.h>
#include <villas/path_source.h>
int path_source_init(struct path_source *ps)
{
int ret;
int pool_size = MAX(DEFAULT_QUEUE_LENGTH, ps->node->in.vectorize);
if (ps->node->_vt->pool_size)
pool_size = ps->node->_vt->pool_size;
ret = pool_init(&ps->pool, pool_size, SAMPLE_LENGTH(vlist_length(&ps->node->in.signals)), node_memory_type(ps->node, &memory_hugepage));
if (ret)
return ret;
return 0;
}
int path_source_destroy(struct path_source *ps)
{
int ret;
ret = pool_destroy(&ps->pool);
if (ret)
return ret;
ret = vlist_destroy(&ps->mappings, NULL, true);
if (ret)
return ret;
return 0;
}
int path_source_read(struct path_source *ps, struct path *p, int i)
{
int recv, tomux, allocated, cnt, toenqueue, enqueued = 0;
unsigned release;
cnt = ps->node->in.vectorize;
struct sample *read_smps[cnt];
struct sample *muxed_smps[cnt];
struct sample **tomux_smps;
/* Fill smps[] free sample blocks from the pool */
allocated = sample_alloc_many(&ps->pool, read_smps, cnt);
if (allocated != cnt)
warning("Pool underrun for path source %s", node_name(ps->node));
/* Read ready samples and store them to blocks pointed by smps[] */
release = allocated;
recv = node_read(ps->node, read_smps, allocated, &release);
if (recv == 0) {
enqueued = 0;
goto out2;
}
else if (recv < 0) {
if (ps->node->state == STATE_STOPPING) {
p->state = STATE_STOPPING;
enqueued = -1;
goto out2;
}
else
error("Failed to read samples from node %s", node_name(ps->node));
}
else if (recv < allocated)
warning("Partial read for path %s: read=%u, expected=%u", path_name(p), recv, allocated);
bitset_set(&p->received, i);
if (p->mode == PATH_MODE_ANY) { /* Mux all samples */
tomux_smps = read_smps;
tomux = recv;
}
else { /* Mux only last sample and discard others */
tomux_smps = read_smps + recv - 1;
tomux = 1;
}
for (int i = 0; i < tomux; i++) {
muxed_smps[i] = i == 0
? sample_clone(p->last_sample)
: sample_clone(muxed_smps[i-1]);
if (p->original_sequence_no)
muxed_smps[i]->sequence = tomux_smps[i]->sequence;
else {
muxed_smps[i]->sequence = p->last_sequence++;
muxed_smps[i]->flags |= SAMPLE_HAS_SEQUENCE;
}
muxed_smps[i]->ts = tomux_smps[i]->ts;
muxed_smps[i]->flags |= tomux_smps[i]->flags & (SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_TS_RECEIVED);
mapping_remap(&ps->mappings, muxed_smps[i], tomux_smps[i], NULL);
}
sample_copy(p->last_sample, muxed_smps[tomux-1]);
debug(15, "Path %s received = %s", path_name(p), bitset_dump(&p->received));
#ifdef WITH_HOOKS
toenqueue = hook_list_process(&p->hooks, muxed_smps, tomux);
if (toenqueue != tomux) {
int skipped = tomux - toenqueue;
debug(LOG_NODES | 10, "Hooks skipped %u out of %u samples for path %s", skipped, tomux, path_name(p));
}
#else
toenqueue = tomux;
#endif
if (bitset_test(&p->mask, i)) {
/* Check if we received an update from all nodes/ */
if ((p->mode == PATH_MODE_ANY) ||
(p->mode == PATH_MODE_ALL && !bitset_cmp(&p->mask, &p->received))) {
path_destination_enqueue(p, muxed_smps, toenqueue);
/* Reset bitset of updated nodes */
bitset_clear_all(&p->received);
enqueued = toenqueue;
}
}
sample_decref_many(muxed_smps, tomux);
out2: sample_decref_many(read_smps, release);
return enqueued;
}