mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
wip: restructure path_source
This commit is contained in:
parent
af30d9a5f9
commit
99100f526a
2 changed files with 88 additions and 74 deletions
|
@ -49,6 +49,7 @@ struct vpath_source {
|
|||
struct vpath *path;
|
||||
|
||||
bool masked;
|
||||
int index;
|
||||
|
||||
enum PathSourceType type;
|
||||
|
||||
|
|
|
@ -121,26 +121,28 @@ int path_source_destroy(struct vpath_source *ps)
|
|||
|
||||
int path_source_read(struct vpath_source *ps, int i)
|
||||
{
|
||||
int ret, recv, tomux, allocated, cnt, toenqueue, enqueued = 0;
|
||||
int ret;
|
||||
int alloc_cnt
|
||||
int read_cnt;
|
||||
int mux_cnt;
|
||||
int allocated_cnt;
|
||||
int toenqueue_cnt;
|
||||
int enqueued_cnt = 0;
|
||||
|
||||
cnt = ps->node->in.vectorize;
|
||||
|
||||
struct sample *read_smps[cnt];
|
||||
struct sample *smps[cnt];
|
||||
struct sample *muxed_smps[cnt];
|
||||
struct sample **tomux_smps;
|
||||
struct sample **mux_smps;
|
||||
|
||||
/* Fill smps[] free sample blocks from the pool */
|
||||
allocated = sample_alloc_many(&ps->pool, read_smps, cnt);
|
||||
if (allocated != cnt)
|
||||
p->logger->warn("Pool underrun for path source {}", node_name(ps->node));
|
||||
alloc_cnt = sample_alloc_many(&ps->pool, smps, ps->node->in.vectorize);
|
||||
if (smp_cnt != ps->node->in.vectorize)
|
||||
ps->logger->warn("Pool underrun for path source {}", node_name(ps->node));
|
||||
|
||||
/* Read ready samples and store them to blocks pointed by smps[] */
|
||||
recv = node_read(ps->node, read_smps, allocated);
|
||||
if (recv == 0) {
|
||||
enqueued = 0;
|
||||
read_cnt = node_read(ps->node, smps, alloc_cnt);
|
||||
if (read_cnt == 0)
|
||||
goto out2;
|
||||
}
|
||||
else if (recv < 0) {
|
||||
else if (read_cnt < 0) {
|
||||
if (ps->node->state == State::STOPPING) {
|
||||
p->state = State::STOPPING;
|
||||
|
||||
|
@ -148,92 +150,51 @@ int path_source_read(struct vpath_source *ps, int i)
|
|||
goto out2;
|
||||
}
|
||||
else {
|
||||
p->logger->error("Failed to read samples from node {}", node_name(ps->node));
|
||||
ps->logger->error("Failed to read samples from node {}", node_name(ps->node));
|
||||
goto out2;
|
||||
}
|
||||
}
|
||||
else if (recv < allocated)
|
||||
p->logger->warn("Partial read for path {}: read={}, expected={}", path_name(p), recv, allocated);
|
||||
else if (read_cnt < allocated)
|
||||
ps->logger->warn("Partial read for path {}: read={}, expected={}", path_name(ps->path), recv, allocated);
|
||||
|
||||
/* Forward samples to secondary path sources */
|
||||
for (size_t i = 0; i < vlist_length(&ps->secondaries); i++) {
|
||||
auto *sps = (struct vpath_source *) vlist_at(&ps->secondaries, i);
|
||||
|
||||
int sent;
|
||||
|
||||
sent = node_write(sps->node, read_smps, recv);
|
||||
if (sent < recv)
|
||||
p->logger->warn("Partial write to secondary path source {} of path {}", node_name(sps->node), path_name(p));
|
||||
|
||||
sample_incref_many(read_smps, recv);
|
||||
}
|
||||
path_source_forward(ps, read_smps, read_cnt);
|
||||
|
||||
p->received.set(i);
|
||||
|
||||
if (p->mode == PathMode::ANY) { /* Mux all samples */
|
||||
tomux_smps = read_smps;
|
||||
tomux = recv;
|
||||
mux_smps = read_smps;
|
||||
mux_cnt = read_cnt;
|
||||
}
|
||||
else { /* Mux only last sample and discard others */
|
||||
tomux_smps = read_smps + recv - 1;
|
||||
tomux = 1;
|
||||
mux_smps = read_smps + read_cnt - 1;
|
||||
mux_cnt = 1;
|
||||
}
|
||||
|
||||
for (int i = 0; i < tomux; i++) {
|
||||
muxed_smps[i] = i == 0
|
||||
? sample_clone(p->last_sample)
|
||||
: sample_clone(muxed_smps[i-1]);
|
||||
path_source_mux(ps, muxed_smps, mux_smps, mux_cnt);
|
||||
|
||||
if (p->original_sequence_no) {
|
||||
muxed_smps[i]->sequence = tomux_smps[i]->sequence;
|
||||
muxed_smps[i]->flags |= tomux_smps[i]->flags & (int) SampleFlags::HAS_SEQUENCE;
|
||||
}
|
||||
else {
|
||||
muxed_smps[i]->sequence = p->last_sequence++;
|
||||
muxed_smps[i]->flags |= (int) SampleFlags::HAS_SEQUENCE;
|
||||
}
|
||||
|
||||
/* We reset the sample length after each restart of the simulation.
|
||||
* This is necessary for the test_rtt node to work properly.
|
||||
*/
|
||||
if (tomux_smps[i]->flags & (int) SampleFlags::IS_FIRST)
|
||||
muxed_smps[i]->length = 0;
|
||||
|
||||
muxed_smps[i]->ts = tomux_smps[i]->ts;
|
||||
muxed_smps[i]->flags |= tomux_smps[i]->flags & (int) SampleFlags::HAS_TS;
|
||||
|
||||
ret = mapping_list_remap(&ps->mappings, muxed_smps[i], tomux_smps[i]);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
if (muxed_smps[i]->length > 0)
|
||||
muxed_smps[i]->flags |= (int) SampleFlags::HAS_DATA;
|
||||
}
|
||||
|
||||
sample_copy(p->last_sample, muxed_smps[tomux-1]);
|
||||
|
||||
p->logger->debug("Path {} received = {}", path_name(p), p->received.to_ullong());
|
||||
ps->logger->debug("Path {} received = {}", path_name(ps->path), p->received.to_ullong());
|
||||
|
||||
#ifdef WITH_HOOKS
|
||||
toenqueue = hook_list_process(&p->hooks, muxed_smps, tomux);
|
||||
if (toenqueue == -1) {
|
||||
p->logger->error("An error occured during hook processing. Skipping sample");
|
||||
enqueue_cnt = hook_list_process(&p->hooks, muxed_smps, mux_cnt);
|
||||
if (enqueue_cnt == -1) {
|
||||
ps->logger->error("An error occured during hook processing. Skipping sample");
|
||||
|
||||
}
|
||||
else if (toenqueue != tomux) {
|
||||
int skipped = tomux - toenqueue;
|
||||
else if (enqueue_cnt != mux_cnt) {
|
||||
int skipped = mux_cnt - enqueue_cnt;
|
||||
|
||||
p->logger->debug("Hooks skipped {} out of {} samples for path {}", skipped, tomux, path_name(p));
|
||||
ps->logger->debug("Hooks skipped {} out of {} samples for path {}", skipped, mux_cnt, path_name(ps->path));
|
||||
}
|
||||
#else
|
||||
toenqueue = tomux;
|
||||
enqueue_cnt = mux_cnt;
|
||||
#endif
|
||||
|
||||
if (p->mask.test(i)) {
|
||||
/* Check if we received an update from all nodes */
|
||||
if ((p->mode == PathMode::ANY) ||
|
||||
(p->mode == PathMode::ALL && p->mask == p->received)) {
|
||||
path_destination_enqueue(p, muxed_smps, toenqueue);
|
||||
path_destination_enqueue_all(p, muxed_smps, toenqueue);
|
||||
|
||||
/* Reset mask of updated nodes */
|
||||
p->received.reset();
|
||||
|
@ -242,8 +203,8 @@ int path_source_read(struct vpath_source *ps, int i)
|
|||
}
|
||||
}
|
||||
|
||||
sample_decref_many(muxed_smps, tomux);
|
||||
out2: sample_decref_many(read_smps, recv);
|
||||
sample_decref_many(muxed_smps, mux_cnt);
|
||||
out2: sample_decref_many(smps, alloc_cont);
|
||||
|
||||
return enqueued;
|
||||
}
|
||||
|
@ -256,3 +217,55 @@ void path_source_check(struct vpath_source *ps)
|
|||
if (!node_type(ps->node)->read)
|
||||
throw RuntimeError("Node {} is not supported as a source for a path", node_name(ps->node));
|
||||
}
|
||||
|
||||
void path_source_forward(struct vpath_source *ps, const struct *smps[], unsigned cnt)
|
||||
{
|
||||
for (size_t i = 0; i < vlist_length(&ps->secondaries); i++) {
|
||||
auto *sps = (struct vpath_source *) vlist_at(&ps->secondaries, i);
|
||||
|
||||
int sent;
|
||||
|
||||
sent = node_write(sps->node, smps, cnt);
|
||||
if (sent < cnt)
|
||||
ps->logger->warn("Partial write to secondary path source {} of path {}", node_name(sps->node), path_name(ps->path));
|
||||
|
||||
sample_incref_many(smps, cnt);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void path_source_mux(struct vpath_source *ps, struct *smps_out[], struct sample *smps_in[], unsigned cnt)
|
||||
{
|
||||
for (int i = 0; i < cnt; i++) {
|
||||
smps_out[i] = i == 0
|
||||
? sample_clone(p->last_sample)
|
||||
: sample_clone(muxed_smps[i-1]);
|
||||
|
||||
if (p->original_sequence_no) {
|
||||
smps_out[i]->sequence = smps_in[i]->sequence;
|
||||
smps_out[i]->flags |= smps_in[i]->flags & (int) SampleFlags::HAS_SEQUENCE;
|
||||
}
|
||||
else {
|
||||
smps_out[i]->sequence = p->last_sequence++;
|
||||
smps_out[i]->flags |= (int) SampleFlags::HAS_SEQUENCE;
|
||||
}
|
||||
|
||||
/* We reset the sample length after each restart of the simulation.
|
||||
* This is necessary for the test_rtt node to work properly.
|
||||
*/
|
||||
if (smps_in[i]->flags & (int) SampleFlags::IS_FIRST)
|
||||
smps_out[i]->length = 0;
|
||||
|
||||
smps_out[i]->ts = smps_in[i]->ts;
|
||||
smps_out[i]->flags |= smps_in[i]->flags & (int) SampleFlags::HAS_TS;
|
||||
|
||||
ret = mapping_list_remap(&ps->mappings, smps_out[i], smps_in[i]);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
if (smps_out[i]->length > 0)
|
||||
smps_out[i]->flags |= (int) SampleFlags::HAS_DATA;
|
||||
}
|
||||
|
||||
sample_copy(p->last_sample, smps_out[cnt - 1]);
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue