mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
path: add support for using a node as a source in multiple paths
This commit is contained in:
parent
baddec2228
commit
1aebd550de
7 changed files with 253 additions and 200 deletions
|
@ -93,6 +93,8 @@ struct vpath {
|
|||
|
||||
villas::Logger logger;
|
||||
|
||||
std::list<struct vnode *> mask_list;
|
||||
|
||||
std::bitset<MAX_SAMPLE_LENGTH> mask; /**< A mask of path_sources which are enabled for poll(). */
|
||||
std::bitset<MAX_SAMPLE_LENGTH> received; /**< A mask of path_sources for which we already received samples. */
|
||||
};
|
||||
|
@ -100,10 +102,10 @@ struct vpath {
|
|||
/** Initialize internal data structures. */
|
||||
int path_init(struct vpath *p) __attribute__ ((warn_unused_result));
|
||||
|
||||
int path_prepare(struct vpath *p);
|
||||
int path_prepare(struct vpath *p, struct vlist *nodes);
|
||||
|
||||
/** Check if path configuration is proper. */
|
||||
int path_check(struct vpath *p);
|
||||
void path_check(struct vpath *p);
|
||||
|
||||
/** Start a path.
|
||||
*
|
||||
|
@ -154,7 +156,7 @@ int path_reverse(struct vpath *p, struct vpath *r);
|
|||
*/
|
||||
int path_parse(struct vpath *p, json_t *cfg, struct vlist *nodes);
|
||||
|
||||
void path_parse_mask(struct vpath *p, json_t *json_mask);
|
||||
void path_parse_mask(struct vpath *p, json_t *json_mask, struct vlist *nodes);
|
||||
|
||||
bool path_is_simple(const struct vpath *p);
|
||||
|
||||
|
|
|
@ -41,11 +41,13 @@ struct vpath_destination {
|
|||
struct queue queue;
|
||||
};
|
||||
|
||||
int path_destination_init(struct vpath_destination *pd);
|
||||
int path_destination_init(struct vpath_destination *pd, struct vnode *n) __attribute__ ((warn_unused_result));
|
||||
|
||||
int path_destination_destroy(struct vpath_destination *pd) __attribute__ ((warn_unused_result));
|
||||
|
||||
int path_destination_prepare(struct vpath_destination *pd, int queuelen);
|
||||
|
||||
int path_destination_destroy(struct vpath_destination *pd);
|
||||
void path_destination_check(struct vpath_destination *pd);
|
||||
|
||||
void path_destination_enqueue(struct vpath *p, struct sample *smps[], unsigned cnt);
|
||||
|
||||
|
|
|
@ -36,21 +36,30 @@
|
|||
struct vpath;
|
||||
struct sample;
|
||||
|
||||
enum PathSourceType {
|
||||
MASTER,
|
||||
SECONDARY
|
||||
};
|
||||
|
||||
struct vpath_source {
|
||||
struct vnode *node;
|
||||
|
||||
bool masked;
|
||||
|
||||
enum PathSourceType type;
|
||||
|
||||
struct pool pool;
|
||||
struct vlist mappings; /**< List of mappings (struct mapping_entry). */
|
||||
struct vlist secondaries;
|
||||
};
|
||||
|
||||
int path_source_init(struct vpath_source *ps);
|
||||
int path_source_init_master(struct vpath_source *ps, struct vnode *n) __attribute__ ((warn_unused_result));
|
||||
|
||||
int path_source_prepare(struct vpath_source *ps);
|
||||
int path_source_init_secondary(struct vpath_source *ps, struct vnode *n) __attribute__ ((warn_unused_result));
|
||||
|
||||
int path_source_destroy(struct vpath_source *ps);
|
||||
int path_source_destroy(struct vpath_source *ps) __attribute__ ((warn_unused_result));
|
||||
|
||||
void path_source_check(struct vpath_source *ps);
|
||||
|
||||
int path_source_read(struct vpath_source *ps, struct vpath *p, int i);
|
||||
|
||||
|
|
|
@ -117,8 +117,7 @@ int node_direction_parse(struct vnode_direction *nd, struct vnode *n, json_t *cf
|
|||
jerror(&err, "Failed to parse node %s", node_name(n));
|
||||
|
||||
if (n->_vt->flags & (int) NodeFlags::PROVIDES_SIGNALS) {
|
||||
if (json_signals)
|
||||
error("Node %s does not support signal definitions", node_name(n));
|
||||
/* Do nothing.. Node-type will provide signals */
|
||||
}
|
||||
else if (json_is_array(json_signals)) {
|
||||
ret = signal_list_parse(&nd->signals, json_signals);
|
||||
|
@ -169,7 +168,7 @@ int node_direction_parse(struct vnode_direction *nd, struct vnode *n, json_t *cf
|
|||
|
||||
int node_direction_check(struct vnode_direction *nd, struct vnode *n)
|
||||
{
|
||||
assert(nd->state == State::PARSED);
|
||||
assert(n->state != State::DESTROYED);
|
||||
|
||||
if (nd->vectorize <= 0)
|
||||
error("Invalid setting 'vectorize' with value %d for node %s. Must be natural number!", nd->vectorize, node_name(n));
|
||||
|
|
319
lib/path.cpp
319
lib/path.cpp
|
@ -25,6 +25,10 @@
|
|||
#include <cinttypes>
|
||||
#include <cerrno>
|
||||
|
||||
#include <algorithm>
|
||||
#include <list>
|
||||
#include <map>
|
||||
|
||||
#include <unistd.h>
|
||||
#include <poll.h>
|
||||
#include <openssl/md5.h>
|
||||
|
@ -194,8 +198,8 @@ static int path_prepare_poll(struct vpath *p)
|
|||
struct vpath_source *ps = (struct vpath_source *) vlist_at(&p->sources, i);
|
||||
|
||||
m = node_poll_fds(ps->node, fds);
|
||||
if (m < 0)
|
||||
continue;
|
||||
if (m <= 0)
|
||||
throw RuntimeError("Failed to get file descriptor for node {}", node_name(ps->node));
|
||||
|
||||
p->reader.nfds += m;
|
||||
p->reader.pfds = (struct pollfd *) realloc(p->reader.pfds, p->reader.nfds * sizeof(struct pollfd));
|
||||
|
@ -228,18 +232,103 @@ static int path_prepare_poll(struct vpath *p)
|
|||
return 0;
|
||||
}
|
||||
|
||||
int path_prepare(struct vpath *p)
|
||||
int path_prepare(struct vpath *p, struct vlist *nodes)
|
||||
{
|
||||
int ret;
|
||||
unsigned pool_size;
|
||||
|
||||
struct memory_type *pool_mt = memory_default;
|
||||
|
||||
assert(p->state == State::CHECKED);
|
||||
|
||||
/* Prepare destinations */
|
||||
struct memory_type *pool_mt = memory_default;
|
||||
unsigned pool_size = MAX(1UL, vlist_length(&p->destinations)) * p->queuelen;
|
||||
p->mask.reset();
|
||||
|
||||
/* Prepare mappings */
|
||||
ret = mapping_list_prepare(&p->mappings, nodes);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
/* Create path sources */
|
||||
std::map<struct vnode *, struct vpath_source *> pss;
|
||||
for (size_t i = 0; i < vlist_length(&p->mappings); i++) {
|
||||
struct mapping_entry *me = (struct mapping_entry *) vlist_at(&p->mappings, i);
|
||||
struct vnode *n = me->node;
|
||||
struct vpath_source *ps;
|
||||
|
||||
if (pss.find(n) != pss.end())
|
||||
/* We already have a path source for this mapping entry */
|
||||
ps = pss[n];
|
||||
else {
|
||||
/* Create new path source */
|
||||
ps = pss[n] = new struct vpath_source;
|
||||
if (!ps)
|
||||
throw MemoryAllocationError();
|
||||
|
||||
/* Depending on weather the node belonging to this mapping is already
|
||||
* used by another path or not, we will create a master or secondary
|
||||
* path source.
|
||||
* A secondary path source uses an internal loopback node / queue
|
||||
* to forward samples from on path to another.
|
||||
*/
|
||||
bool isSecondary = vlist_length(&n->sources) > 0;
|
||||
ret = isSecondary
|
||||
? path_source_init_secondary(ps, n)
|
||||
: path_source_init_master(ps, n);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
if (ps->type == PathSourceType::SECONDARY) {
|
||||
vlist_push(nodes, ps->node);
|
||||
vlist_push(&ps->node->sources, ps);
|
||||
}
|
||||
|
||||
if (p->mask_list.empty() || std::find(p->mask_list.begin(), p->mask_list.end(), n) != p->mask_list.end()) {
|
||||
ps->masked = true;
|
||||
p->mask.set(i);
|
||||
}
|
||||
|
||||
vlist_push(&n->sources, ps);
|
||||
vlist_push(&p->sources, ps);
|
||||
}
|
||||
|
||||
struct vlist *sigs = node_get_signals(me->node, NodeDir::IN);
|
||||
|
||||
/* Update signals of path */
|
||||
for (unsigned j = 0; j < (unsigned) me->length; j++) {
|
||||
struct signal *sig;
|
||||
|
||||
/* For data mappings we simple refer to the existing
|
||||
* signal descriptors of the source node. */
|
||||
if (me->type == MappingType::DATA) {
|
||||
sig = (struct signal *) vlist_at_safe(sigs, me->data.offset + j);
|
||||
if (!sig) {
|
||||
p->logger->warn("Failed to create signal description for path {}", path_name(p));
|
||||
continue;
|
||||
}
|
||||
|
||||
signal_incref(sig);
|
||||
}
|
||||
/* For other mappings we create new signal descriptors */
|
||||
else {
|
||||
sig = new struct signal;
|
||||
if (!sig)
|
||||
throw MemoryAllocationError();
|
||||
|
||||
ret = signal_init_from_mapping(sig, me, j);
|
||||
if (ret)
|
||||
return -1;
|
||||
}
|
||||
|
||||
vlist_extend(&p->signals, me->offset + j + 1, nullptr);
|
||||
vlist_set(&p->signals, me->offset + j, sig);
|
||||
}
|
||||
|
||||
vlist_push(&ps->mappings, me);
|
||||
}
|
||||
|
||||
/* Prepare path destinations */
|
||||
for (size_t i = 0; i < vlist_length(&p->destinations); i++) {
|
||||
struct vpath_destination *pd = (struct vpath_destination *) vlist_at(&p->destinations, i);
|
||||
auto *pd = (struct vpath_destination *) vlist_at(&p->destinations, i);
|
||||
|
||||
if (node_type(pd->node)->pool_size > pool_size)
|
||||
pool_size = node_type(pd->node)->pool_size;
|
||||
|
@ -252,54 +341,31 @@ int path_prepare(struct vpath *p)
|
|||
return ret;
|
||||
}
|
||||
|
||||
/* Prepare sources */
|
||||
ret = mapping_list_prepare(&p->mappings);
|
||||
/* Prepare pool */
|
||||
pool_size = MAX(1UL, vlist_length(&p->destinations)) * p->queuelen;
|
||||
ret = pool_init(&p->pool, pool_size, SAMPLE_LENGTH(vlist_length(&p->signals)), pool_mt);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
for (size_t i = 0; i < vlist_length(&p->sources); i++) {
|
||||
struct vpath_source *ps = (struct vpath_source *) vlist_at(&p->sources, i);
|
||||
/* Autodetect whether to use original sequence numbers or not */
|
||||
if (p->original_sequence_no == -1)
|
||||
p->original_sequence_no = vlist_length(&p->sources) == 1;
|
||||
|
||||
ret = path_source_prepare(ps);
|
||||
/* Autodetect whether to use poll() for this path or not */
|
||||
if (p->poll == -1) {
|
||||
if (p->rate > 0)
|
||||
p->poll = 1;
|
||||
else if (vlist_length(&p->sources) > 1)
|
||||
p->poll = 1;
|
||||
else
|
||||
p->poll = 0;
|
||||
}
|
||||
|
||||
/* Prepare poll() */
|
||||
if (p->poll) {
|
||||
ret = path_prepare_poll(p);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
if (ps->masked)
|
||||
p->mask.set(i);
|
||||
|
||||
for (size_t i = 0; i < vlist_length(&ps->mappings); i++) {
|
||||
struct mapping_entry *me = (struct mapping_entry *) vlist_at(&ps->mappings, i);
|
||||
struct vlist *sigs = node_get_signals(me->node, NodeDir::IN);
|
||||
|
||||
for (unsigned j = 0; j < (unsigned) me->length; j++) {
|
||||
struct signal *sig;
|
||||
|
||||
/* For data mappings we simple refer to the existing
|
||||
* signal descriptors of the source node. */
|
||||
if (me->type == MappingType::DATA) {
|
||||
sig = (struct signal *) vlist_at_safe(sigs, me->data.offset + j);
|
||||
if (!sig) {
|
||||
p->logger->warn("Failed to create signal description for path {}", path_name(p));
|
||||
continue;
|
||||
}
|
||||
|
||||
signal_incref(sig);
|
||||
}
|
||||
/* For other mappings we create new signal descriptors */
|
||||
else {
|
||||
sig = new struct signal;
|
||||
if (!sig)
|
||||
throw MemoryAllocationError();
|
||||
|
||||
ret = signal_init_from_mapping(sig, me, j);
|
||||
if (ret)
|
||||
return -1;
|
||||
}
|
||||
|
||||
vlist_extend(&p->signals, me->offset + j + 1, nullptr);
|
||||
vlist_set(&p->signals, me->offset + j, sig);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef WITH_HOOKS
|
||||
|
@ -309,14 +375,6 @@ int path_prepare(struct vpath *p)
|
|||
hook_list_prepare(&p->hooks, &p->signals, m, p, nullptr);
|
||||
#endif /* WITH_HOOKS */
|
||||
|
||||
/* Initialize pool */
|
||||
ret = pool_init(&p->pool, pool_size, SAMPLE_LENGTH(vlist_length(&p->signals)), pool_mt);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
if (p->original_sequence_no == -1)
|
||||
p->original_sequence_no = vlist_length(&p->sources) == 1;
|
||||
|
||||
p->logger->info("Prepared path {} with output signals:", path_name(p));
|
||||
signal_list_dump(&p->signals);
|
||||
|
||||
|
@ -338,7 +396,10 @@ int path_parse(struct vpath *p, json_t *cfg, struct vlist *nodes)
|
|||
const char *mode = nullptr, *uuid = nullptr;
|
||||
|
||||
struct vlist destinations;
|
||||
vlist_init(&destinations);
|
||||
|
||||
ret = vlist_init(&destinations);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
ret = json_unpack_ex(cfg, &err, 0, "{ s: o, s?: o, s?: o, s?: b, s?: b, s?: b, s?: i, s?: s, s?: b, s?: F, s?: o, s?: b, s?: s}",
|
||||
"in", &json_in,
|
||||
|
@ -356,7 +417,7 @@ int path_parse(struct vpath *p, json_t *cfg, struct vlist *nodes)
|
|||
"uuid", &uuid
|
||||
);
|
||||
if (ret)
|
||||
throw ConfigError(cfg, &err, "node-config-path", "Failed to parse path configuration");
|
||||
throw ConfigError(cfg, err, "node-config-path", "Failed to parse path configuration");
|
||||
|
||||
/* Optional settings */
|
||||
if (mode) {
|
||||
|
@ -368,6 +429,7 @@ int path_parse(struct vpath *p, json_t *cfg, struct vlist *nodes)
|
|||
throw ConfigError(cfg, "node-config-path", "Invalid path mode '{}'", mode);
|
||||
}
|
||||
|
||||
/* UUID */
|
||||
if (uuid) {
|
||||
ret = uuid_parse(uuid, p->uuid);
|
||||
if (ret)
|
||||
|
@ -383,62 +445,10 @@ int path_parse(struct vpath *p, json_t *cfg, struct vlist *nodes)
|
|||
}
|
||||
|
||||
/* Input node(s) */
|
||||
ret = mapping_list_parse(&p->mappings, json_in, nodes);
|
||||
ret = mapping_list_parse(&p->mappings, json_in);
|
||||
if (ret)
|
||||
throw ConfigError(json_in, "node-config-path-in", "Failed to parse input mapping of path {}", path_name(p));
|
||||
|
||||
std::map<struct vnode *, struct vpath_source> pss;
|
||||
for (size_t i = 0; i < vlist_length(&p->mappings); i++) {
|
||||
struct mapping_entry *me = (struct mapping_entry *) vlist_at(&p->mappings, i);
|
||||
struct vnode *n = me->node;
|
||||
struct vpath_source *ps;
|
||||
|
||||
if (pss.find(n) != pss.end())
|
||||
/* We already have a path source for this mapping entry */
|
||||
ps = pss[n]
|
||||
else {
|
||||
/* Create new path source */
|
||||
ps = pss[n] = new struct vpath_source;
|
||||
if (!ps)
|
||||
throw MemoryAllocationError();
|
||||
|
||||
ret = path_source_init(ps);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
/* No mask provided -> enable all */
|
||||
if (!json_mask)
|
||||
ps->masked = true;
|
||||
|
||||
/* The node is used as a source by more than one path.
|
||||
* We are creating an internal loopbacks nodes
|
||||
* to forward/copy samples to other paths.
|
||||
*/
|
||||
if (vlist_length(&n->sources) > 0) {
|
||||
struct node *lo = new struct vnode;
|
||||
if (!lo)
|
||||
throw MemoryAllocationError();
|
||||
|
||||
auto *vt = node_type_lookup("loopback");
|
||||
|
||||
ret = node_init(lo, vt);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
ps->node = lo;
|
||||
|
||||
auto *master_ps = (struct vpath_source *) vlist_at_safe(&n->sources, 0);
|
||||
|
||||
vlist_push(&master_ps->secondaries, ps);
|
||||
}
|
||||
|
||||
vlist_push(&n->sources, ps);
|
||||
vlist_push(&p->sources, ps);
|
||||
}
|
||||
|
||||
vlist_push(&ps->mappings, me);
|
||||
}
|
||||
|
||||
/* Output node(s) */
|
||||
if (json_out) {
|
||||
ret = node_list_parse(&destinations, json_out, nodes);
|
||||
|
@ -452,11 +462,15 @@ int path_parse(struct vpath *p, json_t *cfg, struct vlist *nodes)
|
|||
if (n->output_path)
|
||||
throw ConfigError(cfg, "node-config-path", "Every node must only be used by a single path as destination");
|
||||
|
||||
n->output_path = p;
|
||||
|
||||
auto *pd = new struct vpath_destination;
|
||||
if (!pd)
|
||||
throw MemoryAllocationError();
|
||||
|
||||
path_destination_init(pd);
|
||||
ret = path_destination_init(pd, n);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
vlist_push(&n->destinations, pd);
|
||||
vlist_push(&p->destinations, pd);
|
||||
|
@ -468,17 +482,7 @@ int path_parse(struct vpath *p, json_t *cfg, struct vlist *nodes)
|
|||
#endif /* WITH_HOOKS */
|
||||
|
||||
if (json_mask)
|
||||
path_parse_mask(p, json_mask);
|
||||
|
||||
/* Autodetect whether to use poll() for this path or not */
|
||||
if (p->poll == -1) {
|
||||
if (p->rate > 0)
|
||||
p->poll = 1;
|
||||
else if (vlist_length(&p->sources) > 1)
|
||||
p->poll = 1;
|
||||
else
|
||||
p->poll = 0;
|
||||
}
|
||||
path_parse_mask(p, json_mask, nodes);
|
||||
|
||||
ret = vlist_destroy(&destinations, nullptr, false);
|
||||
if (ret)
|
||||
|
@ -490,7 +494,7 @@ int path_parse(struct vpath *p, json_t *cfg, struct vlist *nodes)
|
|||
return 0;
|
||||
}
|
||||
|
||||
void path_parse_mask(struct vpath *p, json_t *json_mask)
|
||||
void path_parse_mask(struct vpath *p, json_t *json_mask, struct vlist *nodes)
|
||||
{
|
||||
json_t *json_entry;
|
||||
size_t i;
|
||||
|
@ -501,7 +505,6 @@ void path_parse_mask(struct vpath *p, json_t *json_mask)
|
|||
json_array_foreach(json_mask, i, json_entry) {
|
||||
const char *name;
|
||||
struct vnode *node;
|
||||
struct vpath_source *ps = nullptr;
|
||||
|
||||
name = json_string_value(json_entry);
|
||||
if (!name)
|
||||
|
@ -511,31 +514,18 @@ void path_parse_mask(struct vpath *p, json_t *json_mask)
|
|||
if (!node)
|
||||
throw ConfigError(json_mask, "node-config-path-mask", "The 'mask' entry '{}' is not a valid node name", name);
|
||||
|
||||
/* Search correspondending path_source to node */
|
||||
for (size_t i = 0; i < vlist_length(&p->sources); i++) {
|
||||
struct vpath_source *pt = (struct vpath_source *) vlist_at(&p->sources, i);
|
||||
|
||||
if (pt->node == node) {
|
||||
ps = pt;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!ps)
|
||||
throw ConfigError(json_mask, "node-config-path-mask", "Node {} is not a source of the path {}", node_name(node), path_name(p));
|
||||
|
||||
ps->masked = true;
|
||||
p->mask_list.push_back(node);
|
||||
}
|
||||
}
|
||||
|
||||
int path_check(struct vpath *p)
|
||||
void path_check(struct vpath *p)
|
||||
{
|
||||
assert(p->state != State::DESTROYED);
|
||||
|
||||
if (p->rate < 0)
|
||||
throw RuntimeError("Setting 'rate' of path {} must be a positive number.", path_name(p));
|
||||
|
||||
if (p->poll) {
|
||||
if (p->poll > 0) {
|
||||
if (p->rate <= 0) {
|
||||
/* Check that all path sources provide a file descriptor for polling */
|
||||
for (size_t i = 0; i < vlist_length(&p->sources); i++) {
|
||||
|
@ -556,34 +546,24 @@ int path_check(struct vpath *p)
|
|||
throw RuntimeError("Setting 'poll' must be activated when used together with setting 'rate'");
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < vlist_length(&p->sources); i++) {
|
||||
struct vpath_source *ps = (struct vpath_source *) vlist_at(&p->sources, i);
|
||||
|
||||
if (!node_is_enabled(ps->node))
|
||||
throw RuntimeError("Source {} of path {} is not enabled", node_name(ps->node), path_name(p));
|
||||
|
||||
if (!node_type(ps->node)->read)
|
||||
throw RuntimeError("Node {} is not supported as a source for path {}", node_name(ps->node), path_name(p));
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < vlist_length(&p->destinations); i++) {
|
||||
struct vpath_destination *pd = (struct vpath_destination *) vlist_at(&p->destinations, i);
|
||||
|
||||
if (!node_is_enabled(pd->node))
|
||||
throw RuntimeError("Destination {} of path {} is not enabled", node_name(pd->node), path_name(p));
|
||||
|
||||
if (!node_type(pd->node)->write)
|
||||
throw RuntimeError("Destiation node {} is not supported as a sink for path ", node_name(pd->node), path_name(p));
|
||||
}
|
||||
|
||||
if (!IS_POW2(p->queuelen)) {
|
||||
p->queuelen = LOG2_CEIL(p->queuelen);
|
||||
p->logger->warn("Queue length should always be a power of 2. Adjusting to {}", p->queuelen);
|
||||
}
|
||||
|
||||
p->state = State::CHECKED;
|
||||
for (size_t i = 0; i < vlist_length(&p->sources); i++) {
|
||||
struct vpath_source *ps = (struct vpath_source *) vlist_at(&p->sources, i);
|
||||
|
||||
return 0;
|
||||
path_source_check(ps);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < vlist_length(&p->destinations); i++) {
|
||||
struct vpath_destination *ps = (struct vpath_destination *) vlist_at(&p->destinations, i);
|
||||
|
||||
path_destination_check(ps);
|
||||
}
|
||||
|
||||
p->state = State::CHECKED;
|
||||
}
|
||||
|
||||
int path_start(struct vpath *p)
|
||||
|
@ -609,7 +589,7 @@ int path_start(struct vpath *p)
|
|||
|
||||
p->logger->info("Starting path {}: #signals={}, #hooks={}, #sources={}, "
|
||||
"#destinations={}, mode={}, poll={}, mask={:b}, rate={}, "
|
||||
"enabled={}, reversed={}, queuelen={}, original_sequence_no={}",
|
||||
"enabled={}, reversed={}, queuelen={}, original_sequence_no={}",
|
||||
path_name(p),
|
||||
vlist_length(&p->signals),
|
||||
vlist_length(&p->hooks),
|
||||
|
@ -649,13 +629,6 @@ int path_start(struct vpath *p)
|
|||
p->last_sample->data[i] = sig->init;
|
||||
}
|
||||
|
||||
/* Prepare poll() */
|
||||
if (p->poll) {
|
||||
ret = path_prepare_poll(p);
|
||||
if (ret)
|
||||
return ret;
|
||||
}
|
||||
|
||||
p->state = State::STARTED;
|
||||
|
||||
/* Start one thread per path for sending to destinations
|
||||
|
@ -730,7 +703,7 @@ int path_destroy(struct vpath *p)
|
|||
if (ret)
|
||||
return ret;
|
||||
|
||||
ret = vlist_destroy(&p->mappings, nullptr, true);
|
||||
ret = vlist_destroy(&p->mappings, (dtor_cb_t) mapping_entry_destroy, true);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
|
|
|
@ -25,12 +25,18 @@
|
|||
#include <villas/sample.h>
|
||||
#include <villas/node.h>
|
||||
#include <villas/path.h>
|
||||
#include <villas/exceptions.hpp>
|
||||
#include <villas/path_destination.h>
|
||||
|
||||
int path_destination_init(struct vpath_destination *pd)
|
||||
using namespace villas;
|
||||
|
||||
int path_destination_init(struct vpath_destination *pd, struct vnode *n)
|
||||
{
|
||||
pd->node = n;
|
||||
pd->node->output_path = p;
|
||||
|
||||
vlist_push(&n->destinations, pd);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int path_destination_prepare(struct vpath_destination *pd, int queuelen)
|
||||
|
@ -116,3 +122,12 @@ void path_destination_write(struct vpath_destination *pd, struct vpath *p)
|
|||
p->logger->debug("Released {} samples back to memory pool", released);
|
||||
}
|
||||
}
|
||||
|
||||
void path_destination_check(struct vpath_destination *pd)
|
||||
{
|
||||
if (!node_is_enabled(pd->node))
|
||||
throw RuntimeError("Destination {} is not enabled", node_name(pd->node));
|
||||
|
||||
if (!node_type(pd->node)->write)
|
||||
throw RuntimeError("Destiation node {} is not supported as a sink for path ", node_name(pd->node));
|
||||
}
|
|
@ -20,27 +20,38 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*********************************************************************************/
|
||||
|
||||
#include <fmt/format.h>
|
||||
|
||||
#include <villas/utils.hpp>
|
||||
#include <villas/sample.h>
|
||||
#include <villas/node.h>
|
||||
#include <villas/path.h>
|
||||
#include <villas/exceptions.hpp>
|
||||
#include <villas/hook_list.hpp>
|
||||
|
||||
#include <villas/nodes/loopback_internal.hpp>
|
||||
|
||||
#include <villas/path_destination.h>
|
||||
#include <villas/path_source.h>
|
||||
|
||||
int path_source_init(struct vpath_source *ps)
|
||||
{
|
||||
ps->node = me->node;
|
||||
ps->masked = false;
|
||||
using namespace villas;
|
||||
|
||||
vlist_init(&ps->mappings);
|
||||
vlist_init(&ps->secondaries);
|
||||
}
|
||||
|
||||
int path_source_prepare(struct vpath_source *ps)
|
||||
int path_source_init_master(struct vpath_source *ps, struct vnode *n)
|
||||
{
|
||||
int ret;
|
||||
|
||||
ps->node = n;
|
||||
ps->masked = false;
|
||||
ps->type = PathSourceType::MASTER;
|
||||
|
||||
ret = vlist_init(&ps->mappings);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
ret = vlist_init(&ps->secondaries);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
int pool_size = MAX(DEFAULT_QUEUE_LENGTH, ps->node->in.vectorize);
|
||||
|
||||
if (ps->node->_vt->pool_size)
|
||||
|
@ -53,6 +64,38 @@ int path_source_prepare(struct vpath_source *ps)
|
|||
return 0;
|
||||
}
|
||||
|
||||
int path_source_init_secondary(struct vpath_source *ps, struct vnode *n)
|
||||
{
|
||||
int ret;
|
||||
struct vpath_source *mps;
|
||||
|
||||
ret = path_source_init_master(ps, n);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
ps->type = PathSourceType::SECONDARY;
|
||||
|
||||
ps->node = loopback_internal_create(n);
|
||||
if (!ps->node)
|
||||
return -1;
|
||||
|
||||
ret = node_check(ps->node);
|
||||
if (ret)
|
||||
return -1;
|
||||
|
||||
ret = node_prepare(ps->node);
|
||||
if (ret)
|
||||
return -1;
|
||||
|
||||
mps = (struct vpath_source *) vlist_at_safe(&n->sources, 0);
|
||||
if (!mps)
|
||||
return -1;
|
||||
|
||||
vlist_push(&mps->secondaries, ps);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int path_source_destroy(struct vpath_source *ps)
|
||||
{
|
||||
int ret;
|
||||
|
@ -113,15 +156,16 @@ int path_source_read(struct vpath_source *ps, struct vpath *p, int i)
|
|||
|
||||
/* Forward samples to secondary path sources */
|
||||
for (size_t i = 0; i < vlist_length(&ps->secondaries); i++) {
|
||||
struct vpath_source *sps = (struct vpath_source *) vlist_at(&ps->secondaries, i);
|
||||
auto *sps = (struct vpath_source *) vlist_at(&ps->secondaries, i);
|
||||
|
||||
sample_incref_many(read_smps, recv);
|
||||
|
||||
int sent, release = recv;
|
||||
int sent;
|
||||
unsigned release = recv;
|
||||
|
||||
sent = node_write(sps->node, read_smps, recv, &release);
|
||||
if (sent < recv)
|
||||
p->logger->warn("Partial write to secondary path source");
|
||||
p->logger->warn("Partial write to secondary path source {} of path {}", node_name(sps->node), path_name(p));
|
||||
|
||||
sample_incref_many(read_smps+release, recv-release);
|
||||
}
|
||||
|
||||
p->received.set(i);
|
||||
|
@ -194,3 +238,12 @@ out2: sample_decref_many(read_smps, release);
|
|||
|
||||
return enqueued;
|
||||
}
|
||||
|
||||
void path_source_check(struct vpath_source *ps)
|
||||
{
|
||||
if (!node_is_enabled(ps->node))
|
||||
throw RuntimeError("Source {} is not enabled", node_name(ps->node));
|
||||
|
||||
if (!node_type(ps->node)->read)
|
||||
throw RuntimeError("Node {} is not supported as a source for a path", node_name(ps->node));
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue