1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

miscelaneous fixes

This commit is contained in:
Steffen Vogel 2021-02-22 23:16:53 +01:00
parent 3a7f9c05de
commit 42133b2882
4 changed files with 25 additions and 9 deletions

View file

@ -52,7 +52,7 @@ public:
};
enum class Reason {
OK,
OK = 0,
ERROR,
SKIP_SAMPLE,
STOP_PROCESSING
@ -135,7 +135,7 @@ public:
return flags;
}
struct vlist *getSignals()
virtual struct vlist *getSignals()
{
return &signals;
}

View file

@ -54,6 +54,8 @@ enum class SampleFlags {
HAS_OFFSET = (1 << 2), /**< Include offset (received - origin timestamp) in output. */
HAS_SEQUENCE = (1 << 3), /**< Include sequence number in output. */
HAS_DATA = (1 << 4), /**< Include values in output. */
HAS_TS = HAS_TS_ORIGIN | HAS_TS_RECEIVED, /**< Include origin timestamp in output. */
HAS_ALL = (1 << 5) - 1, /**< Enable all output options. */
IS_FIRST = (1 << 16), /**< This sample is the first of a new simulation case */

View file

@ -467,13 +467,18 @@ int node_read(struct vnode *n, struct sample *smps[], unsigned cnt, unsigned *re
#ifdef WITH_HOOKS
/* Run read hooks */
int rread = hook_list_process(&n->in.hooks, smps, nread);
if (rread < 0)
return rread;
int skipped = nread - rread;
if (skipped > 0) {
if (n->stats != nullptr)
n->stats->update(Stats::Metric::SMPS_SKIPPED, skipped);
if (skipped > 0 && n->stats != nullptr) {
n->stats->update(Stats::Metric::SMPS_SKIPPED, skipped);
debug(LOG_NODE | 5, "Received %u samples from node %s of which %d have been skipped", nread, node_name(n), skipped);
}
debug(LOG_NODE | 5, "Received %u samples from node %s of which %d have been skipped", nread, node_name(n), skipped);
else
debug(LOG_NODE | 5, "Received %u samples from node %s", nread, node_name(n));
return rread;
#else

View file

@ -184,8 +184,10 @@ int path_source_read(struct vpath_source *ps, struct vpath *p, int i)
? sample_clone(p->last_sample)
: sample_clone(muxed_smps[i-1]);
if (p->original_sequence_no)
if (p->original_sequence_no) {
muxed_smps[i]->sequence = tomux_smps[i]->sequence;
muxed_smps[i]->flags |= tomux_smps[i]->flags & (int) SampleFlags::HAS_SEQUENCE;
}
else {
muxed_smps[i]->sequence = p->last_sequence++;
muxed_smps[i]->flags |= (int) SampleFlags::HAS_SEQUENCE;
@ -198,11 +200,14 @@ int path_source_read(struct vpath_source *ps, struct vpath *p, int i)
muxed_smps[i]->length = 0;
muxed_smps[i]->ts = tomux_smps[i]->ts;
muxed_smps[i]->flags |= tomux_smps[i]->flags & ((int) SampleFlags::HAS_TS_ORIGIN | (int) SampleFlags::HAS_TS_RECEIVED);
muxed_smps[i]->flags |= tomux_smps[i]->flags & (int) SampleFlags::HAS_TS;
ret = mapping_list_remap(&ps->mappings, muxed_smps[i], tomux_smps[i]);
if (ret)
return ret;
if (muxed_smps[i]->length > 0)
muxed_smps[i]->flags |= (int) SampleFlags::HAS_DATA;
}
sample_copy(p->last_sample, muxed_smps[tomux-1]);
@ -211,7 +216,11 @@ int path_source_read(struct vpath_source *ps, struct vpath *p, int i)
#ifdef WITH_HOOKS
toenqueue = hook_list_process(&p->hooks, muxed_smps, tomux);
if (toenqueue != tomux) {
if (toenqueue == -1) {
p->logger->error("An error occured during hook processing. Skipping sample");
}
else if (toenqueue != tomux) {
int skipped = tomux - toenqueue;
p->logger->debug("Hooks skipped {} out of {} samples for path {}", skipped, tomux, path_name(p));