diff --git a/include/villas/path_source.h b/include/villas/path_source.h index 225972bb8..81ef6eea5 100644 --- a/include/villas/path_source.h +++ b/include/villas/path_source.h @@ -49,6 +49,7 @@ struct vpath_source { struct vpath *path; bool masked; + int index; enum PathSourceType type; diff --git a/lib/path_source.cpp b/lib/path_source.cpp index 5ab8dc0e2..c7e5b6f30 100644 --- a/lib/path_source.cpp +++ b/lib/path_source.cpp @@ -121,26 +121,28 @@ int path_source_destroy(struct vpath_source *ps) int path_source_read(struct vpath_source *ps, int i) { - int ret, recv, tomux, allocated, cnt, toenqueue, enqueued = 0; + int ret; + int alloc_cnt + int read_cnt; + int mux_cnt; + int allocated_cnt; + int toenqueue_cnt; + int enqueued_cnt = 0; - cnt = ps->node->in.vectorize; - - struct sample *read_smps[cnt]; + struct sample *smps[cnt]; struct sample *muxed_smps[cnt]; - struct sample **tomux_smps; + struct sample **mux_smps; /* Fill smps[] free sample blocks from the pool */ - allocated = sample_alloc_many(&ps->pool, read_smps, cnt); - if (allocated != cnt) - p->logger->warn("Pool underrun for path source {}", node_name(ps->node)); + alloc_cnt = sample_alloc_many(&ps->pool, smps, ps->node->in.vectorize); + if (smp_cnt != ps->node->in.vectorize) + ps->logger->warn("Pool underrun for path source {}", node_name(ps->node)); /* Read ready samples and store them to blocks pointed by smps[] */ - recv = node_read(ps->node, read_smps, allocated); - if (recv == 0) { - enqueued = 0; + read_cnt = node_read(ps->node, smps, alloc_cnt); + if (read_cnt == 0) goto out2; - } - else if (recv < 0) { + else if (read_cnt < 0) { if (ps->node->state == State::STOPPING) { p->state = State::STOPPING; @@ -148,92 +150,51 @@ int path_source_read(struct vpath_source *ps, int i) goto out2; } else { - p->logger->error("Failed to read samples from node {}", node_name(ps->node)); + ps->logger->error("Failed to read samples from node {}", node_name(ps->node)); goto out2; } } - else if (recv < allocated) - p->logger->warn("Partial read for path {}: read={}, expected={}", path_name(p), recv, allocated); + else if (read_cnt < allocated) + ps->logger->warn("Partial read for path {}: read={}, expected={}", path_name(ps->path), recv, allocated); /* Forward samples to secondary path sources */ - for (size_t i = 0; i < vlist_length(&ps->secondaries); i++) { - auto *sps = (struct vpath_source *) vlist_at(&ps->secondaries, i); - - int sent; - - sent = node_write(sps->node, read_smps, recv); - if (sent < recv) - p->logger->warn("Partial write to secondary path source {} of path {}", node_name(sps->node), path_name(p)); - - sample_incref_many(read_smps, recv); - } + path_source_forward(ps, read_smps, read_cnt); p->received.set(i); if (p->mode == PathMode::ANY) { /* Mux all samples */ - tomux_smps = read_smps; - tomux = recv; + mux_smps = read_smps; + mux_cnt = read_cnt; } else { /* Mux only last sample and discard others */ - tomux_smps = read_smps + recv - 1; - tomux = 1; + mux_smps = read_smps + read_cnt - 1; + mux_cnt = 1; } - for (int i = 0; i < tomux; i++) { - muxed_smps[i] = i == 0 - ? sample_clone(p->last_sample) - : sample_clone(muxed_smps[i-1]); + path_source_mux(ps, muxed_smps, mux_smps, mux_cnt); - if (p->original_sequence_no) { - muxed_smps[i]->sequence = tomux_smps[i]->sequence; - muxed_smps[i]->flags |= tomux_smps[i]->flags & (int) SampleFlags::HAS_SEQUENCE; - } - else { - muxed_smps[i]->sequence = p->last_sequence++; - muxed_smps[i]->flags |= (int) SampleFlags::HAS_SEQUENCE; - } - - /* We reset the sample length after each restart of the simulation. - * This is necessary for the test_rtt node to work properly. - */ - if (tomux_smps[i]->flags & (int) SampleFlags::IS_FIRST) - muxed_smps[i]->length = 0; - - muxed_smps[i]->ts = tomux_smps[i]->ts; - muxed_smps[i]->flags |= tomux_smps[i]->flags & (int) SampleFlags::HAS_TS; - - ret = mapping_list_remap(&ps->mappings, muxed_smps[i], tomux_smps[i]); - if (ret) - return ret; - - if (muxed_smps[i]->length > 0) - muxed_smps[i]->flags |= (int) SampleFlags::HAS_DATA; - } - - sample_copy(p->last_sample, muxed_smps[tomux-1]); - - p->logger->debug("Path {} received = {}", path_name(p), p->received.to_ullong()); + ps->logger->debug("Path {} received = {}", path_name(ps->path), p->received.to_ullong()); #ifdef WITH_HOOKS - toenqueue = hook_list_process(&p->hooks, muxed_smps, tomux); - if (toenqueue == -1) { - p->logger->error("An error occured during hook processing. Skipping sample"); + enqueue_cnt = hook_list_process(&p->hooks, muxed_smps, mux_cnt); + if (enqueue_cnt == -1) { + ps->logger->error("An error occured during hook processing. Skipping sample"); } - else if (toenqueue != tomux) { - int skipped = tomux - toenqueue; + else if (enqueue_cnt != mux_cnt) { + int skipped = mux_cnt - enqueue_cnt; - p->logger->debug("Hooks skipped {} out of {} samples for path {}", skipped, tomux, path_name(p)); + ps->logger->debug("Hooks skipped {} out of {} samples for path {}", skipped, mux_cnt, path_name(ps->path)); } #else - toenqueue = tomux; + enqueue_cnt = mux_cnt; #endif if (p->mask.test(i)) { /* Check if we received an update from all nodes */ if ((p->mode == PathMode::ANY) || (p->mode == PathMode::ALL && p->mask == p->received)) { - path_destination_enqueue(p, muxed_smps, toenqueue); + path_destination_enqueue_all(p, muxed_smps, toenqueue); /* Reset mask of updated nodes */ p->received.reset(); @@ -242,8 +203,8 @@ int path_source_read(struct vpath_source *ps, int i) } } - sample_decref_many(muxed_smps, tomux); -out2: sample_decref_many(read_smps, recv); + sample_decref_many(muxed_smps, mux_cnt); +out2: sample_decref_many(smps, alloc_cont); return enqueued; } @@ -256,3 +217,55 @@ void path_source_check(struct vpath_source *ps) if (!node_type(ps->node)->read) throw RuntimeError("Node {} is not supported as a source for a path", node_name(ps->node)); } + +void path_source_forward(struct vpath_source *ps, const struct *smps[], unsigned cnt) +{ + for (size_t i = 0; i < vlist_length(&ps->secondaries); i++) { + auto *sps = (struct vpath_source *) vlist_at(&ps->secondaries, i); + + int sent; + + sent = node_write(sps->node, smps, cnt); + if (sent < cnt) + ps->logger->warn("Partial write to secondary path source {} of path {}", node_name(sps->node), path_name(ps->path)); + + sample_incref_many(smps, cnt); + } +} + + +void path_source_mux(struct vpath_source *ps, struct *smps_out[], struct sample *smps_in[], unsigned cnt) +{ + for (int i = 0; i < cnt; i++) { + smps_out[i] = i == 0 + ? sample_clone(p->last_sample) + : sample_clone(muxed_smps[i-1]); + + if (p->original_sequence_no) { + smps_out[i]->sequence = smps_in[i]->sequence; + smps_out[i]->flags |= smps_in[i]->flags & (int) SampleFlags::HAS_SEQUENCE; + } + else { + smps_out[i]->sequence = p->last_sequence++; + smps_out[i]->flags |= (int) SampleFlags::HAS_SEQUENCE; + } + + /* We reset the sample length after each restart of the simulation. + * This is necessary for the test_rtt node to work properly. + */ + if (smps_in[i]->flags & (int) SampleFlags::IS_FIRST) + smps_out[i]->length = 0; + + smps_out[i]->ts = smps_in[i]->ts; + smps_out[i]->flags |= smps_in[i]->flags & (int) SampleFlags::HAS_TS; + + ret = mapping_list_remap(&ps->mappings, smps_out[i], smps_in[i]); + if (ret) + return ret; + + if (smps_out[i]->length > 0) + smps_out[i]->flags |= (int) SampleFlags::HAS_DATA; + } + + sample_copy(p->last_sample, smps_out[cnt - 1]); +}