From 42133b288258e17554ef727ca4b50a5f6c576767 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Mon, 22 Feb 2021 23:16:53 +0100 Subject: [PATCH] miscelaneous fixes --- include/villas/hook.hpp | 4 ++-- include/villas/sample.h | 2 ++ lib/node.cpp | 13 +++++++++---- lib/path_source.cpp | 15 ++++++++++++--- 4 files changed, 25 insertions(+), 9 deletions(-) diff --git a/include/villas/hook.hpp b/include/villas/hook.hpp index 22617dea2..4ccbac94a 100644 --- a/include/villas/hook.hpp +++ b/include/villas/hook.hpp @@ -52,7 +52,7 @@ public: }; enum class Reason { - OK, + OK = 0, ERROR, SKIP_SAMPLE, STOP_PROCESSING @@ -135,7 +135,7 @@ public: return flags; } - struct vlist *getSignals() + virtual struct vlist *getSignals() { return &signals; } diff --git a/include/villas/sample.h b/include/villas/sample.h index 7e6e85f92..a3b610260 100644 --- a/include/villas/sample.h +++ b/include/villas/sample.h @@ -54,6 +54,8 @@ enum class SampleFlags { HAS_OFFSET = (1 << 2), /**< Include offset (received - origin timestamp) in output. */ HAS_SEQUENCE = (1 << 3), /**< Include sequence number in output. */ HAS_DATA = (1 << 4), /**< Include values in output. */ + + HAS_TS = HAS_TS_ORIGIN | HAS_TS_RECEIVED, /**< Include origin timestamp in output. */ HAS_ALL = (1 << 5) - 1, /**< Enable all output options. */ IS_FIRST = (1 << 16), /**< This sample is the first of a new simulation case */ diff --git a/lib/node.cpp b/lib/node.cpp index 79baa2ecd..4836a5630 100644 --- a/lib/node.cpp +++ b/lib/node.cpp @@ -467,13 +467,18 @@ int node_read(struct vnode *n, struct sample *smps[], unsigned cnt, unsigned *re #ifdef WITH_HOOKS /* Run read hooks */ int rread = hook_list_process(&n->in.hooks, smps, nread); + if (rread < 0) + return rread; + int skipped = nread - rread; + if (skipped > 0) { + if (n->stats != nullptr) + n->stats->update(Stats::Metric::SMPS_SKIPPED, skipped); - if (skipped > 0 && n->stats != nullptr) { - n->stats->update(Stats::Metric::SMPS_SKIPPED, skipped); + debug(LOG_NODE | 5, "Received %u samples from node %s of which %d have been skipped", nread, node_name(n), skipped); } - - debug(LOG_NODE | 5, "Received %u samples from node %s of which %d have been skipped", nread, node_name(n), skipped); + else + debug(LOG_NODE | 5, "Received %u samples from node %s", nread, node_name(n)); return rread; #else diff --git a/lib/path_source.cpp b/lib/path_source.cpp index dc7c535f1..9753c6ca5 100644 --- a/lib/path_source.cpp +++ b/lib/path_source.cpp @@ -184,8 +184,10 @@ int path_source_read(struct vpath_source *ps, struct vpath *p, int i) ? sample_clone(p->last_sample) : sample_clone(muxed_smps[i-1]); - if (p->original_sequence_no) + if (p->original_sequence_no) { muxed_smps[i]->sequence = tomux_smps[i]->sequence; + muxed_smps[i]->flags |= tomux_smps[i]->flags & (int) SampleFlags::HAS_SEQUENCE; + } else { muxed_smps[i]->sequence = p->last_sequence++; muxed_smps[i]->flags |= (int) SampleFlags::HAS_SEQUENCE; @@ -198,11 +200,14 @@ int path_source_read(struct vpath_source *ps, struct vpath *p, int i) muxed_smps[i]->length = 0; muxed_smps[i]->ts = tomux_smps[i]->ts; - muxed_smps[i]->flags |= tomux_smps[i]->flags & ((int) SampleFlags::HAS_TS_ORIGIN | (int) SampleFlags::HAS_TS_RECEIVED); + muxed_smps[i]->flags |= tomux_smps[i]->flags & (int) SampleFlags::HAS_TS; ret = mapping_list_remap(&ps->mappings, muxed_smps[i], tomux_smps[i]); if (ret) return ret; + + if (muxed_smps[i]->length > 0) + muxed_smps[i]->flags |= (int) SampleFlags::HAS_DATA; } sample_copy(p->last_sample, muxed_smps[tomux-1]); @@ -211,7 +216,11 @@ int path_source_read(struct vpath_source *ps, struct vpath *p, int i) #ifdef WITH_HOOKS toenqueue = hook_list_process(&p->hooks, muxed_smps, tomux); - if (toenqueue != tomux) { + if (toenqueue == -1) { + p->logger->error("An error occured during hook processing. Skipping sample"); + + } + else if (toenqueue != tomux) { int skipped = tomux - toenqueue; p->logger->debug("Hooks skipped {} out of {} samples for path {}", skipped, tomux, path_name(p));