mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
mapping: fix restart
This commit is contained in:
parent
7def6349d5
commit
d9d85ce2e8
5 changed files with 38 additions and 4 deletions
|
@ -63,6 +63,13 @@ enum sample_flags {
|
|||
SAMPLE_HAS_VALUES = (1 << 6), /**< Include values in output. */
|
||||
SAMPLE_HAS_FORMAT = (1 << 7), /**< This sample has a valid sample::format field. */
|
||||
SAMPLE_HAS_ALL = (1 << 7) - 1, /**< Enable all output options. */
|
||||
|
||||
SAMPLE_IS_FIRST = (1 << 16), /**< This sample is the first of a new simulation case */
|
||||
SAMPLE_IS_LAST = (1 << 17), /**< This sample is the last of a running simulation case */
|
||||
SAMPLE_IS_REORDERED = (1 << 18), /**< This sample is reordered. */
|
||||
|
||||
// SAMPLE_DO_DROP = (1 << 19), /**< This sample should be dropped. */
|
||||
// SAMPLE_DO_SKIP = (1 << 20) /**< This sample was skipped by a previous hook. */
|
||||
};
|
||||
|
||||
struct sample {
|
||||
|
|
|
@ -158,9 +158,13 @@ int hook_periodic(struct hook *h)
|
|||
|
||||
int hook_restart(struct hook *h)
|
||||
{
|
||||
debug(LOG_HOOK | 10, "Running hook %s: type=restart, priority=%d", plugin_name(h->_vt), h->priority);
|
||||
if (h->_vt->restart) {
|
||||
debug(LOG_HOOK | 10, "Running hook %s: type=restart, priority=%d", plugin_name(h->_vt), h->priority);
|
||||
|
||||
return h->_vt->restart ? h->_vt->restart(h) : 0;
|
||||
return h->_vt->restart(h);
|
||||
}
|
||||
else
|
||||
return 0;
|
||||
}
|
||||
|
||||
int hook_read(struct hook *h, struct sample *smps[], unsigned *cnt)
|
||||
|
|
|
@ -66,6 +66,8 @@ static int drop_read(struct hook *h, struct sample *smps[], unsigned *cnt)
|
|||
if (prev) {
|
||||
dist = cur->sequence - (int32_t) prev->sequence;
|
||||
if (dist <= 0) {
|
||||
cur->flags |= SAMPLE_IS_REORDERED;
|
||||
|
||||
debug(10, "Reordered sample: sequence=%u, distance=%d", cur->sequence, dist);
|
||||
if (h->node && h->node->stats)
|
||||
stats_update(h->node->stats, STATS_REORDERED, dist);
|
||||
|
@ -100,6 +102,18 @@ ok: /* To discard the first X samples in 'smps[]' we must
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int drop_restart(struct hook *h)
|
||||
{
|
||||
struct drop *d = h->_vd;
|
||||
|
||||
if (d->prev) {
|
||||
sample_put(d->prev);
|
||||
d->prev = NULL;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static struct plugin p = {
|
||||
.name = "drop",
|
||||
.description = "Drop messages with reordered sequence numbers",
|
||||
|
@ -110,6 +124,7 @@ static struct plugin p = {
|
|||
.read = drop_read,
|
||||
.start = drop_start,
|
||||
.stop = drop_stop,
|
||||
.restart= drop_restart,
|
||||
.size = sizeof(struct drop)
|
||||
}
|
||||
};
|
||||
|
|
|
@ -64,10 +64,13 @@ static int restart_read(struct hook *h, struct sample *smps[], unsigned *cnt)
|
|||
cur = smps[i];
|
||||
|
||||
if (prev) {
|
||||
if (cur->sequence == 0 && prev->sequence <= UINT32_MAX - 32) {
|
||||
/* A wrap around of the sequence no should not be treated as a simulation restart */
|
||||
if (cur->sequence == 0 && prev->sequence <= UINT32_MAX - 128) {
|
||||
warn("Simulation from node %s restarted (previous->seq=%u, current->seq=%u)",
|
||||
node_name(h->node), prev->sequence, cur->sequence);
|
||||
|
||||
cur->flags |= SAMPLE_IS_FIRST;
|
||||
|
||||
/* Run restart hooks */
|
||||
for (size_t i = 0; i < list_length(&h->node->hooks); i++) {
|
||||
struct hook *k = list_at(&h->node->hooks, i);
|
||||
|
|
|
@ -93,10 +93,15 @@ static void path_source_read(struct path *p, struct path_source *ps)
|
|||
warn("Pool underrun for path %s", path_name(p));
|
||||
|
||||
for (int i = 0; i < mux; i++) {
|
||||
mapping_remap(&ps->mappings, p->last_sample, read_smps[i], NULL);
|
||||
/** @todo: This is ugly. We should try to make use of the 'restart' hook */
|
||||
if (read_smps[i]->flags & SAMPLE_IS_FIRST) {
|
||||
p->sequence = 0;
|
||||
p->last_sample->length = 0;
|
||||
}
|
||||
|
||||
p->last_sample->sequence = p->sequence++;
|
||||
|
||||
mapping_remap(&ps->mappings, p->last_sample, read_smps[i], NULL);
|
||||
sample_copy(muxed_smps[i], p->last_sample);
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue