diff --git a/include/villas/sample.h b/include/villas/sample.h index 124220860..9f80d2e22 100644 --- a/include/villas/sample.h +++ b/include/villas/sample.h @@ -63,6 +63,13 @@ enum sample_flags { SAMPLE_HAS_VALUES = (1 << 6), /**< Include values in output. */ SAMPLE_HAS_FORMAT = (1 << 7), /**< This sample has a valid sample::format field. */ SAMPLE_HAS_ALL = (1 << 7) - 1, /**< Enable all output options. */ + + SAMPLE_IS_FIRST = (1 << 16), /**< This sample is the first of a new simulation case */ + SAMPLE_IS_LAST = (1 << 17), /**< This sample is the last of a running simulation case */ + SAMPLE_IS_REORDERED = (1 << 18), /**< This sample is reordered. */ + +// SAMPLE_DO_DROP = (1 << 19), /**< This sample should be dropped. */ +// SAMPLE_DO_SKIP = (1 << 20) /**< This sample was skipped by a previous hook. */ }; struct sample { diff --git a/lib/hook.c b/lib/hook.c index 74fda3b47..702efa7c4 100644 --- a/lib/hook.c +++ b/lib/hook.c @@ -158,9 +158,13 @@ int hook_periodic(struct hook *h) int hook_restart(struct hook *h) { - debug(LOG_HOOK | 10, "Running hook %s: type=restart, priority=%d", plugin_name(h->_vt), h->priority); + if (h->_vt->restart) { + debug(LOG_HOOK | 10, "Running hook %s: type=restart, priority=%d", plugin_name(h->_vt), h->priority); - return h->_vt->restart ? h->_vt->restart(h) : 0; + return h->_vt->restart(h); + } + else + return 0; } int hook_read(struct hook *h, struct sample *smps[], unsigned *cnt) diff --git a/lib/hooks/drop.c b/lib/hooks/drop.c index 983182e6d..44e236fc9 100644 --- a/lib/hooks/drop.c +++ b/lib/hooks/drop.c @@ -66,6 +66,8 @@ static int drop_read(struct hook *h, struct sample *smps[], unsigned *cnt) if (prev) { dist = cur->sequence - (int32_t) prev->sequence; if (dist <= 0) { + cur->flags |= SAMPLE_IS_REORDERED; + debug(10, "Reordered sample: sequence=%u, distance=%d", cur->sequence, dist); if (h->node && h->node->stats) stats_update(h->node->stats, STATS_REORDERED, dist); @@ -100,6 +102,18 @@ ok: /* To discard the first X samples in 'smps[]' we must return 0; } +static int drop_restart(struct hook *h) +{ + struct drop *d = h->_vd; + + if (d->prev) { + sample_put(d->prev); + d->prev = NULL; + } + + return 0; +} + static struct plugin p = { .name = "drop", .description = "Drop messages with reordered sequence numbers", @@ -110,6 +124,7 @@ static struct plugin p = { .read = drop_read, .start = drop_start, .stop = drop_stop, + .restart= drop_restart, .size = sizeof(struct drop) } }; diff --git a/lib/hooks/restart.c b/lib/hooks/restart.c index fc46b7a1f..d48ff4a9d 100644 --- a/lib/hooks/restart.c +++ b/lib/hooks/restart.c @@ -64,10 +64,13 @@ static int restart_read(struct hook *h, struct sample *smps[], unsigned *cnt) cur = smps[i]; if (prev) { - if (cur->sequence == 0 && prev->sequence <= UINT32_MAX - 32) { + /* A wrap around of the sequence no should not be treated as a simulation restart */ + if (cur->sequence == 0 && prev->sequence <= UINT32_MAX - 128) { warn("Simulation from node %s restarted (previous->seq=%u, current->seq=%u)", node_name(h->node), prev->sequence, cur->sequence); + cur->flags |= SAMPLE_IS_FIRST; + /* Run restart hooks */ for (size_t i = 0; i < list_length(&h->node->hooks); i++) { struct hook *k = list_at(&h->node->hooks, i); diff --git a/lib/path.c b/lib/path.c index 5d4c2f07c..0d13cee0a 100644 --- a/lib/path.c +++ b/lib/path.c @@ -93,10 +93,15 @@ static void path_source_read(struct path *p, struct path_source *ps) warn("Pool underrun for path %s", path_name(p)); for (int i = 0; i < mux; i++) { - mapping_remap(&ps->mappings, p->last_sample, read_smps[i], NULL); + /** @todo: This is ugly. We should try to make use of the 'restart' hook */ + if (read_smps[i]->flags & SAMPLE_IS_FIRST) { + p->sequence = 0; + p->last_sample->length = 0; + } p->last_sample->sequence = p->sequence++; + mapping_remap(&ps->mappings, p->last_sample, read_smps[i], NULL); sample_copy(muxed_smps[i], p->last_sample); }