1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

hooks: simplify hook processing system so that ever hook only processes a single sample at once

This commit is contained in:
Steffen Vogel 2019-03-16 09:05:34 +01:00
parent 6c01a32ee3
commit 49fd6d89f5
22 changed files with 254 additions and 299 deletions

View file

@ -47,6 +47,13 @@ extern "C" {
struct path;
struct sample;
enum hook_reason {
HOOK_OK,
HOOK_ERROR,
HOOK_SKIP_SAMPLE,
HOOK_STOP_PROCESSING
};
/** Descriptor for user defined hooks. See hooks[]. */
struct hook {
enum state state;
@ -83,7 +90,7 @@ int hook_periodic(struct hook *h);
int hook_restart(struct hook *h);
int hook_process(struct hook *h, struct sample *smps[], unsigned *cnt);
int hook_process(struct hook *h, struct sample *smp);
/** Compare two hook functions with their priority. Used by vlist_sort() */
int hook_cmp_priority(const void *a, const void *b);

View file

@ -73,7 +73,7 @@ struct hook_type {
int (*periodic)(struct hook *h);/**< Called periodically. Period is set by global 'stats' option in the configuration file. */
int (*restart)(struct hook *h); /**< Called whenever a new simulation case is started. This is detected by a sequence no equal to zero. */
int (*process)(struct hook *h, struct sample *smps[], unsigned *cnt); /**< Called whenever muxed samples are processed. */
int (*process)(struct hook *h, struct sample *smp); /**< Called whenever a sample is processed. */
};
struct hook_type * hook_type_lookup(const char *name);

View file

@ -60,8 +60,7 @@ enum sample_flags {
SAMPLE_HAS_ALL = (1 << 5) - 1, /**< Enable all output options. */
SAMPLE_IS_FIRST = (1 << 16), /**< This sample is the first of a new simulation case */
SAMPLE_IS_LAST = (1 << 17), /**< This sample is the last of a running simulation case */
SAMPLE_IS_REORDERED = (1 << 18), /**< This sample is reordered. */
SAMPLE_IS_LAST = (1 << 17) /**< This sample is the last of a running simulation case */
};
struct sample {

View file

@ -31,6 +31,10 @@
#include <villas/node.h>
#include <villas/plugin.h>
const char *hook_reasons[] = {
"ok", "error", "skip-sample", "stop-processing"
};
int hook_init(struct hook *h, struct hook_type *vt, struct path *p, struct node *n)
{
int ret;
@ -200,7 +204,7 @@ int hook_restart(struct hook *h)
return 0;
}
int hook_process(struct hook *h, struct sample *smps[], unsigned *cnt)
int hook_process(struct hook *h, struct sample *smp)
{
int ret;
assert(h->state == STATE_STARTED);
@ -208,12 +212,12 @@ int hook_process(struct hook *h, struct sample *smps[], unsigned *cnt)
if (!h->enabled)
return 0;
debug(LOG_HOOK | 10, "Process hook %s: priority=%d, cnt=%d", hook_type_name(hook_type(h)), h->priority, *cnt);
ret = hook_type(h)->process ? hook_type(h)->process(h, smps, cnt) : 0;
ret = hook_type(h)->process ? hook_type(h)->process(h, smp) : 0;
if (ret)
return ret;
debug(LOG_HOOK | 10, "Hook %s processed: priority=%d, return=%s", hook_type_name(hook_type(h)), h->priority, hook_reasons[ret]);
return 0;
}
@ -350,17 +354,32 @@ const char * hook_type_name(struct hook_type *vt)
int hook_list_process(struct vlist *hs, struct sample *smps[], unsigned cnt)
{
unsigned ret;
unsigned ret, curent, processed = 0;
for (size_t i = 0; i < vlist_length(hs); i++) {
struct hook *h = (struct hook *) vlist_at(hs, i);
for (curent = 0; curent < cnt; curent++) {
struct sample *smp = smps[curent];
ret = hook_process(h, smps, &cnt);
if (ret || !cnt)
/* Abort hook processing if earlier hooks removed all samples
* or they returned something non-zero */
break;
for (size_t i = 0; i < vlist_length(hs); i++) {
struct hook *h = (struct hook *) vlist_at(hs, i);
ret = hook_process(h, smp);
switch (ret) {
case HOOK_ERROR:
return -1;
case HOOK_OK:
smps[processed++] = smp;
break;
case HOOK_SKIP_SAMPLE:
goto skip;
case HOOK_STOP_PROCESSING:
goto stop;
}
}
skip: {}
}
return cnt;
stop: return processed;
}

View file

@ -138,43 +138,40 @@ static int average_parse(struct hook *h, json_t *cfg)
return 0;
}
static int average_process(struct hook *h, struct sample *smps[], unsigned *cnt)
static int average_process(struct hook *h, struct sample *smp)
{
struct average *a = (struct average *) h->_vd;
for (int i = 0; i < *cnt; i++) {
struct sample *smp = smps[i];
double avg, sum = 0;
int n = 0;
double avg, sum = 0;
int n = 0;
for (int k = 0; k < smp->length; k++) {
if (!bitset_test(&a->mask, k))
continue;
for (int k = 0; k < smp->length; k++) {
if (!bitset_test(&a->mask, k))
continue;
switch (sample_format(smp, k)) {
case SIGNAL_TYPE_INTEGER:
sum += smp->data[k].i;
break;
switch (sample_format(smp, k)) {
case SIGNAL_TYPE_INTEGER:
sum += smp->data[k].i;
break;
case SIGNAL_TYPE_FLOAT:
sum += smp->data[k].f;
break;
case SIGNAL_TYPE_FLOAT:
sum += smp->data[k].f;
break;
case SIGNAL_TYPE_INVALID:
case SIGNAL_TYPE_COMPLEX:
case SIGNAL_TYPE_BOOLEAN:
return -1; /* not supported */
}
n++;
case SIGNAL_TYPE_INVALID:
case SIGNAL_TYPE_COMPLEX:
case SIGNAL_TYPE_BOOLEAN:
return -1; /* not supported */
}
avg = n == 0 ? 0 : sum / n;
sample_data_insert(smp, (union signal_data *) &avg, a->offset, 1);
smp->signals = &h->signals;
n++;
}
return 0;
avg = n == 0 ? 0 : sum / n;
sample_data_insert(smp, (union signal_data *) &avg, a->offset, 1);
smp->signals = &h->signals;
return HOOK_OK;
}
static struct plugin p = {

View file

@ -136,23 +136,19 @@ static int cast_parse(struct hook *h, json_t *cfg)
return 0;
}
static int cast_process(struct hook *h, struct sample *smps[], unsigned *cnt)
static int cast_process(struct hook *h, struct sample *smp)
{
struct cast *c = (struct cast *) h->_vd;
for (int i = 0; i < *cnt; i++) {
struct sample *smp = smps[i];
struct signal *orig_sig = vlist_at(smp->signals, c->signal_index);
struct signal *new_sig = vlist_at(&h->signals, c->signal_index);
struct signal *orig_sig = vlist_at(smp->signals, c->signal_index);
struct signal *new_sig = vlist_at(&h->signals, c->signal_index);
signal_data_cast(&smp->data[c->signal_index], orig_sig, new_sig);
signal_data_cast(&smp->data[c->signal_index], orig_sig, new_sig);
/* Replace signal descriptors of sample */
smp->signals = &h->signals;
/* Replace signal descriptors of sample */
smp->signals = &h->signals;
}
return 0;
return HOOK_OK;
}
static struct plugin p = {

View file

@ -66,24 +66,14 @@ static int decimate_parse(struct hook *h, json_t *cfg)
return 0;
}
static int decimate_process(struct hook *h, struct sample *smps[], unsigned *cnt)
static int decimate_process(struct hook *h, struct sample *smp)
{
struct decimate *p = (struct decimate *) h->_vd;
int i, ok;
for (i = 0, ok = 0; i < *cnt; i++) {
if (p->ratio && p->counter++ % p->ratio == 0) {
struct sample *tmp;
if (p->ratio && p->counter++ % p->ratio != 0)
return HOOK_SKIP_SAMPLE;
tmp = smps[ok];
smps[ok++] = smps[i];
smps[i] = tmp;
}
}
*cnt = ok;
return 0;
return HOOK_OK;
}
static struct plugin p = {

View file

@ -55,51 +55,29 @@ static int drop_stop(struct hook *h)
return 0;
}
static int drop_process(struct hook *h, struct sample *smps[], unsigned *cnt)
static int drop_process(struct hook *h, struct sample *smp)
{
int i, ok, dist;
int dist;
struct drop *d = (struct drop *) h->_vd;
struct sample *prev, *cur = NULL;
for (i = 0, ok = 0, prev = d->prev; i < *cnt; i++, prev = cur) {
cur = smps[i];
if (prev) {
dist = cur->sequence - (int64_t) prev->sequence;
if (dist <= 0) {
cur->flags |= SAMPLE_IS_REORDERED;
if (d->prev) {
dist = smp->sequence - (int64_t) d->prev->sequence;
if (dist <= 0) {
debug(10, "Dropping reordered sample: sequence=%" PRIu64 ", distance=%d", smp->sequence, dist);
debug(10, "Dropping reordered sample: sequence=%" PRIu64 ", distance=%d", cur->sequence, dist);
}
else
goto ok;
return HOOK_SKIP_SAMPLE;
}
else
goto ok;
continue;
ok: /* To discard the first X samples in 'smps[]' we must
* shift them to the end of the 'smps[]' array.
* In case the hook returns a number 'ok' which is smaller than 'cnt',
* only the first 'ok' samples in 'smps[]' are accepted and further processed.
*/
smps[i] = smps[ok];
smps[ok++] = cur;
}
if (cur)
sample_incref(cur);
sample_incref(smp);
if (d->prev)
sample_decref(d->prev);
d->prev = cur;
d->prev = smp;
*cnt = ok;
return 0;
return HOOK_OK;
}
static int drop_restart(struct hook *h)

View file

@ -30,12 +30,11 @@
#include <villas/node.h>
#include <villas/sample.h>
static int dump_process(struct hook *h, struct sample *smps[], unsigned *cnt)
static int dump_process(struct hook *h, struct sample *smp)
{
for (int i = 0; i < *cnt; i++)
sample_dump(smps[i]);
sample_dump(smp);
return 0;
return HOOK_OK;
}
static struct plugin p = {

View file

@ -29,7 +29,10 @@
#include <villas/sample.h>
struct ebm {
char *signal_name;
int signal_index;
double total_energy;
};
static int ebm_init(struct hook *h)
@ -75,7 +78,7 @@ static int ebm_prepare(struct hook *h)
}
static int ebm_process(struct hook *h, struct sample *smps[], unsigned *cnt)
static int ebm_process(struct hook *h, struct sample *smp)
{
__attribute__((unused)) struct ebm *e = (struct ebm *) h->_vd;

View file

@ -33,30 +33,26 @@
#include <villas/sample.h>
#include <villas/timing.h>
static int fix_process(struct hook *h, struct sample *smps[], unsigned *cnt)
static int fix_process(struct hook *h, struct sample *smp)
{
struct timespec now = time_now();
for (int i = 0; i < *cnt; i++) {
struct sample *smp = smps[i];
if (!(smp->flags & SAMPLE_HAS_SEQUENCE) && h->node) {
smp->sequence = h->node->sequence++;
smp->flags |= SAMPLE_HAS_SEQUENCE;
}
if (!(smp->flags & SAMPLE_HAS_TS_RECEIVED)) {
smp->ts.received = now;
smp->flags |= SAMPLE_HAS_TS_RECEIVED;
}
if (!(smp->flags & SAMPLE_HAS_TS_ORIGIN)) {
smp->ts.origin = smp->ts.received;
smp->flags |= SAMPLE_HAS_TS_ORIGIN;
}
if (!(smp->flags & SAMPLE_HAS_SEQUENCE) && h->node) {
smp->sequence = h->node->sequence++;
smp->flags |= SAMPLE_HAS_SEQUENCE;
}
return 0;
if (!(smp->flags & SAMPLE_HAS_TS_RECEIVED)) {
smp->ts.received = now;
smp->flags |= SAMPLE_HAS_TS_RECEIVED;
}
if (!(smp->flags & SAMPLE_HAS_TS_ORIGIN)) {
smp->ts.origin = smp->ts.received;
smp->flags |= SAMPLE_HAS_TS_ORIGIN;
}
return HOOK_OK;
}
static struct plugin p = {

View file

@ -87,42 +87,40 @@ int jitter_calc_deinit(struct hook *h)
* is high (i.e. several mins depending on GPS_NTP_DELAY_WIN_SIZE),
* the variance value will overrun the 64bit value.
*/
static int jitter_calc_process(struct hook *h, struct sample *smps[], unsigned *cnt)
static int jitter_calc_process(struct hook *h, struct sample *smp)
{
struct jitter_calc *j = (struct jitter_calc *) h->_vd;
struct timespec now = time_now();
int64_t delay_sec, delay_nsec, curr_delay_us;
for(int i = 0; i < *cnt; i++) {
delay_sec = now.tv_sec - smps[i]->ts.origin.tv_sec;
delay_nsec = now.tv_nsec - smps[i]->ts.origin.tv_nsec;
delay_sec = now.tv_sec - smp->ts.origin.tv_sec;
delay_nsec = now.tv_nsec - smp->ts.origin.tv_nsec;
/* Calc on microsec instead of nenosec delay as variance formula overflows otherwise.*/
curr_delay_us = delay_sec * 1000000 + delay_nsec / 1000;
/* Calc on microsec instead of nenosec delay as variance formula overflows otherwise.*/
curr_delay_us = delay_sec * 1000000 + delay_nsec / 1000;
j->delay_mov_sum = j->delay_mov_sum + curr_delay_us - j->delay_series[j->curr_count];
j->moving_avg[j->curr_count] = j->delay_mov_sum / GPS_NTP_DELAY_WIN_SIZE; /* Will be valid after GPS_NTP_DELAY_WIN_SIZE initial values */
j->delay_mov_sum = j->delay_mov_sum + curr_delay_us - j->delay_series[j->curr_count];
j->moving_avg[j->curr_count] = j->delay_mov_sum / GPS_NTP_DELAY_WIN_SIZE; /* Will be valid after GPS_NTP_DELAY_WIN_SIZE initial values */
j->delay_mov_sum_sqrd = j->delay_mov_sum_sqrd + (curr_delay_us * curr_delay_us) - (j->delay_series[j->curr_count] * j->delay_series[j->curr_count]);
j->moving_var[j->curr_count] = (j->delay_mov_sum_sqrd - (j->delay_mov_sum * j->delay_mov_sum) / GPS_NTP_DELAY_WIN_SIZE) / (GPS_NTP_DELAY_WIN_SIZE - 1);
j->delay_mov_sum_sqrd = j->delay_mov_sum_sqrd + (curr_delay_us * curr_delay_us) - (j->delay_series[j->curr_count] * j->delay_series[j->curr_count]);
j->moving_var[j->curr_count] = (j->delay_mov_sum_sqrd - (j->delay_mov_sum * j->delay_mov_sum) / GPS_NTP_DELAY_WIN_SIZE) / (GPS_NTP_DELAY_WIN_SIZE - 1);
j->delay_series[j->curr_count] = curr_delay_us; /* Update the last delay value */
j->delay_series[j->curr_count] = curr_delay_us; /* Update the last delay value */
/* Jitter calc formula as used in Wireshark according to RFC3550 (RTP)
D(i,j) = (Rj-Ri)-(Sj-Si) = (Rj-Sj)-(Ri-Si)
J(i) = J(i-1)+(|D(i-1,i)|-J(i-1))/16
*/
j->jitter_val[(j->curr_count + 1) % GPS_NTP_DELAY_WIN_SIZE] = j->jitter_val[j->curr_count] + (labs(curr_delay_us) - j->jitter_val[j->curr_count]) / 16;
/* Jitter calc formula as used in Wireshark according to RFC3550 (RTP)
D(i,j) = (Rj-Ri)-(Sj-Si) = (Rj-Sj)-(Ri-Si)
J(i) = J(i-1)+(|D(i-1,i)|-J(i-1))/16
*/
j->jitter_val[(j->curr_count + 1) % GPS_NTP_DELAY_WIN_SIZE] = j->jitter_val[j->curr_count] + (labs(curr_delay_us) - j->jitter_val[j->curr_count]) / 16;
info("%s: jitter=%" PRId64 " usec, moving average=%" PRId64 " usec, moving variance=%" PRId64 " usec", __FUNCTION__, j->jitter_val[(j->curr_count + 1) % GPS_NTP_DELAY_WIN_SIZE], j->moving_avg[j->curr_count], j->moving_var[j->curr_count]);
info("%s: jitter=%" PRId64 " usec, moving average=%" PRId64 " usec, moving variance=%" PRId64 " usec", __FUNCTION__, j->jitter_val[(j->curr_count + 1) % GPS_NTP_DELAY_WIN_SIZE], j->moving_avg[j->curr_count], j->moving_var[j->curr_count]);
j->curr_count++;
if (j->curr_count >= GPS_NTP_DELAY_WIN_SIZE)
j->curr_count = 0;
}
j->curr_count++;
if (j->curr_count >= GPS_NTP_DELAY_WIN_SIZE)
j->curr_count = 0;
return 0;
return HOOK_OK;
}
static struct plugin p = {

View file

@ -96,38 +96,31 @@ static int limit_rate_parse(struct hook *h, json_t *cfg)
return 0;
}
static int limit_rate_process(struct hook *h, struct sample *smps[], unsigned *cnt)
static int limit_rate_process(struct hook *h, struct sample *smp)
{
struct limit_rate *p = (struct limit_rate *) h->_vd;
struct timespec next;
unsigned ret = 0;
switch (p->mode) {
case LIMIT_RATE_LOCAL:
next = time_now();
break;
for (unsigned i = 0; i < *cnt; i++) {
switch (p->mode) {
case LIMIT_RATE_LOCAL:
next = time_now();
break;
case LIMIT_RATE_ORIGIN:
next = smp->ts.origin;
break;
case LIMIT_RATE_ORIGIN:
next = smps[i]->ts.origin;
break;
case LIMIT_RATE_RECEIVED:
next = smps[i]->ts.received;
break;
}
if (time_delta(&p->last, &next) < p->deadtime)
continue; /* Drop this sample */
p->last = next;
smps[ret++] = smps[i];
case LIMIT_RATE_RECEIVED:
next = smp->ts.received;
break;
}
*cnt = ret;
if (time_delta(&p->last, &next) < p->deadtime)
return HOOK_SKIP_SAMPLE;
return 0;
p->last = next;
return HOOK_OK;
}
static struct plugin p = {

View file

@ -117,7 +117,7 @@ static int print_parse(struct hook *h, json_t *cfg)
return 0;
}
static int print_process(struct hook *h, struct sample *smps[], unsigned *cnt)
static int print_process(struct hook *h, struct sample *smp)
{
struct print *p = (struct print *) h->_vd;
@ -128,7 +128,7 @@ static int print_process(struct hook *h, struct sample *smps[], unsigned *cnt)
else if (h->path)
printf("Path %s: ", path_name(h->path));
io_print(&p->io, smps, *cnt);
io_print(&p->io, &smp, 1);
return 0;
}

View file

@ -54,49 +54,42 @@ static int restart_stop(struct hook *h)
return 0;
}
static int restart_process(struct hook *h, struct sample *smps[], unsigned *cnt)
static int restart_process(struct hook *h, struct sample *smp)
{
unsigned i;
struct restart *r = (struct restart *) h->_vd;
struct sample *prev, *cur = NULL;
assert(h->node);
for (i = 0, prev = r->prev; i < *cnt; i++, prev = cur) {
cur = smps[i];
if (r->prev) {
/* A wrap around of the sequence no should not be treated as a simulation restart */
if (smp->sequence == 0 && r->prev->sequence != 0 && r->prev->sequence > UINT64_MAX - 16) {
warning("Simulation from node %s restarted (previous->sequence=%" PRIu64 ", current->sequence=%" PRIu64 ")",
node_name(h->node), r->prev->sequence, smp->sequence);
if (prev) {
/* A wrap around of the sequence no should not be treated as a simulation restart */
if (cur->sequence == 0 && prev->sequence != 0 && prev->sequence > UINT64_MAX - 16) {
warning("Simulation from node %s restarted (previous->sequence=%" PRIu64 ", current->sequence=%" PRIu64 ")",
node_name(h->node), prev->sequence, cur->sequence);
smp->flags |= SAMPLE_IS_FIRST;
cur->flags |= SAMPLE_IS_FIRST;
/* Run restart hooks */
for (size_t i = 0; i < vlist_length(&h->node->in.hooks); i++) {
struct hook *k = (struct hook *) vlist_at(&h->node->in.hooks, i);
/* Run restart hooks */
for (size_t i = 0; i < vlist_length(&h->node->in.hooks); i++) {
struct hook *k = (struct hook *) vlist_at(&h->node->in.hooks, i);
hook_restart(k);
}
hook_restart(k);
}
for (size_t i = 0; i < vlist_length(&h->node->out.hooks); i++) {
struct hook *k = (struct hook *) vlist_at(&h->node->out.hooks, i);
for (size_t i = 0; i < vlist_length(&h->node->out.hooks); i++) {
struct hook *k = (struct hook *) vlist_at(&h->node->out.hooks, i);
hook_restart(k);
}
hook_restart(k);
}
}
}
if (cur)
sample_incref(cur);
sample_incref(smp);
if (r->prev)
sample_decref(r->prev);
r->prev = cur;
r->prev = smp;
return 0;
return HOOK_OK;
}
static struct plugin p = {

View file

@ -104,36 +104,33 @@ static int scale_parse(struct hook *h, json_t *cfg)
return 0;
}
static int scale_process(struct hook *h, struct sample *smps[], unsigned *cnt)
static int scale_process(struct hook *h, struct sample *smp)
{
struct scale *s = (struct scale *) h->_vd;
for (int i = 0; i < *cnt; i++) {
struct sample *smp = smps[i];
int k = s->signal_index;
int k = s->signal_index;
switch (sample_format(smp, k)) {
case SIGNAL_TYPE_INTEGER:
smp->data[k].i = smp->data[k].i * s->scale + s->offset;
break;
switch (sample_format(smp, k)) {
case SIGNAL_TYPE_INTEGER:
smp->data[k].i = smp->data[k].i * s->scale + s->offset;
break;
case SIGNAL_TYPE_FLOAT:
smp->data[k].f = smp->data[k].f * s->scale + s->offset;
break;
case SIGNAL_TYPE_FLOAT:
smp->data[k].f = smp->data[k].f * s->scale + s->offset;
break;
case SIGNAL_TYPE_COMPLEX:
smp->data[k].z = smp->data[k].z * s->scale + s->offset;
break;
case SIGNAL_TYPE_COMPLEX:
smp->data[k].z = smp->data[k].z * s->scale + s->offset;
break;
case SIGNAL_TYPE_BOOLEAN:
smp->data[k].b = smp->data[k].b * s->scale + s->offset;
break;
case SIGNAL_TYPE_BOOLEAN:
smp->data[k].b = smp->data[k].b * s->scale + s->offset;
break;
default: { }
}
default: { }
}
return 0;
return HOOK_OK;
}
static struct plugin p = {

View file

@ -48,14 +48,13 @@ static int shift_seq_parse(struct hook *h, json_t *cfg)
return 0;
}
static int shift_seq_process(struct hook *h, struct sample *smps[], unsigned *cnt)
static int shift_seq_process(struct hook *h, struct sample *smp)
{
struct shift *p = (struct shift *) h->_vd;
for (int i = 0; i < *cnt; i++)
smps[i]->sequence += p->offset;
smp->sequence += p->offset;
return 0;
return HOOK_OK;
}
static struct plugin p = {

View file

@ -77,24 +77,28 @@ static int shift_ts_parse(struct hook *h, json_t *cfg)
return 0;
}
static int shift_ts_process(struct hook *h, struct sample *smps[], unsigned *cnt)
static int shift_ts_process(struct hook *h, struct sample *smp)
{
struct shift_ts *p = (struct shift_ts *) h->_vd;
for (int i = 0; i < *cnt; i++) {
struct sample *s = smps[i];
struct timespec *ts;
struct timespec *ts;
switch (p->mode) {
case SHIFT_ORIGIN: ts = &s->ts.origin; break;
case SHIFT_RECEIVED: ts = &s->ts.received; break;
default: return -1;
}
switch (p->mode) {
case SHIFT_ORIGIN:
ts = &smp->ts.origin;
break;
*ts = time_add(ts, &p->offset); break;
case SHIFT_RECEIVED:
ts = &smp->ts.received;
break;
default:
return HOOK_ERROR;
}
return 0;
*ts = time_add(ts, &p->offset);;
return HOOK_OK;
}
static struct plugin p = {

View file

@ -89,7 +89,7 @@ static int skip_first_restart(struct hook *h)
return 0;
}
static int skip_first_process(struct hook *h, struct sample *smps[], unsigned *cnt)
static int skip_first_process(struct hook *h, struct sample *smp)
{
struct skip_first *p = (struct skip_first *) h->_vd;
@ -97,51 +97,33 @@ static int skip_first_process(struct hook *h, struct sample *smps[], unsigned *c
if (p->state == HOOK_SKIP_FIRST_STATE_STARTED) {
switch (p->mode) {
case HOOK_SKIP_MODE_SAMPLES:
p->samples.until = smps[0]->sequence + p->samples.wait;
p->samples.until = smp->sequence + p->samples.wait;
break;
case HOOK_SKIP_MODE_SECONDS:
p->seconds.until = time_add(&smps[0]->ts.origin, &p->seconds.wait);
p->seconds.until = time_add(&smp->ts.origin, &p->seconds.wait);
break;
}
p->state = HOOK_SKIP_FIRST_STATE_SKIPPING;
}
int i, ok;
for (i = 0, ok = 0; i < *cnt; i++) {
bool skip;
switch (p->mode) {
case HOOK_SKIP_MODE_SAMPLES:
skip = p->samples.until > smps[i]->sequence;
break;
switch (p->mode) {
case HOOK_SKIP_MODE_SAMPLES:
if (p->samples.until > smp->sequence)
return HOOK_SKIP_SAMPLE;
break;
case HOOK_SKIP_MODE_SECONDS:
skip = time_delta(&p->seconds.until, &smps[i]->ts.origin) < 0;
break;
default:
skip = false;
break;
}
case HOOK_SKIP_MODE_SECONDS:
if (time_delta(&p->seconds.until, &smp->ts.origin) < 0)
return HOOK_SKIP_SAMPLE;
break;
if (!skip) {
struct sample *tmp;
tmp = smps[i];
smps[i] = smps[ok];
smps[ok++] = tmp;
}
/* To discard the first X samples in 'smps[]' we must
* shift them to the end of the 'smps[]' array.
* In case the hook returns a number 'ok' which is smaller than 'cnt',
* only the first 'ok' samples in 'smps[]' are accepted and further processed.
*/
default:
break;
}
*cnt = ok;
return 0;
return HOOK_OK;
}
static struct plugin p = {

View file

@ -155,48 +155,38 @@ static int stats_collect_parse(struct hook *h, json_t *cfg)
return 0;
}
static int stats_collect_process(struct hook *h, struct sample *smps[], unsigned *cnt)
static int stats_collect_process(struct hook *h, struct sample *smp)
{
struct stats_collect *p = (struct stats_collect *) h->_vd;
struct stats *s = &p->stats;
int dist;
struct sample *previous = p->last;
if (p->last) {
if (smp->flags & p->last->flags & SAMPLE_HAS_TS_RECEIVED)
stats_update(s, STATS_METRIC_GAP_RECEIVED, time_delta(&p->last->ts.received, &smp->ts.received));
for (int i = 0; i < *cnt; i++) {
struct sample *current = smps[i];
if (smp->flags & p->last->flags & SAMPLE_HAS_TS_ORIGIN)
stats_update(s, STATS_METRIC_GAP_SAMPLE, time_delta(&p->last->ts.origin, &smp->ts.origin));
if (previous) {
if (current->flags & previous->flags & SAMPLE_HAS_TS_RECEIVED)
stats_update(s, STATS_METRIC_GAP_RECEIVED, time_delta(&previous->ts.received, &current->ts.received));
if ((smp->flags & SAMPLE_HAS_TS_ORIGIN) && (smp->flags & SAMPLE_HAS_TS_RECEIVED))
stats_update(s, STATS_METRIC_OWD, time_delta(&smp->ts.origin, &smp->ts.received));
if (current->flags & previous->flags & SAMPLE_HAS_TS_ORIGIN)
stats_update(s, STATS_METRIC_GAP_SAMPLE, time_delta(&previous->ts.origin, &current->ts.origin));
if ((current->flags & SAMPLE_HAS_TS_ORIGIN) && (current->flags & SAMPLE_HAS_TS_RECEIVED))
stats_update(s, STATS_METRIC_OWD, time_delta(&current->ts.origin, &current->ts.received));
if (current->flags & previous->flags & SAMPLE_HAS_SEQUENCE) {
dist = current->sequence - (int32_t) previous->sequence;
if (dist != 1)
stats_update(s, STATS_METRIC_REORDERED, dist);
}
if (smp->flags & p->last->flags & SAMPLE_HAS_SEQUENCE) {
int dist = smp->sequence - (int32_t) p->last->sequence;
if (dist != 1)
stats_update(s, STATS_METRIC_REORDERED, dist);
}
previous = current;
}
sample_incref(smp);
if (p->last)
sample_decref(p->last);
if (previous)
sample_incref(previous);
p->last = previous;
p->last = smp;
stats_commit(&p->stats);
return 0;
return HOOK_OK;
}
static struct plugin p = {

View file

@ -29,12 +29,11 @@
#include <villas/timing.h>
#include <villas/sample.h>
static int ts_process(struct hook *h, struct sample *smps[], unsigned *cnt)
static int ts_process(struct hook *h, struct sample *smp)
{
for (int i = 0; i < *cnt; i++)
smps[i]->ts.origin = smps[i]->ts.received;
smp->ts.origin = smp->ts.received;
return 0;
return HOOK_OK;
}
static struct plugin p = {

View file

@ -217,13 +217,29 @@ check: if (optarg == endptr)
logger->debug("Read {} smps from stdin", recv);
unsigned send = recv;
unsigned send = 0;
for (int processed = 0; processed < recv; processed++) {
struct sample *smp = smps[processed];
ret = hook_process(&h, smps, (unsigned *) &send);
if (ret < 0)
throw RuntimeError("Failed to process samples");
ret = hook_process(&h, smp);
switch (ret) {
case HOOK_ERROR:
throw RuntimeError("Failed to process samples");
sent = io_print(&io, smps, send);
case HOOK_OK:
smps[send++] = smp;
break;
case HOOK_SKIP_SAMPLE:
goto skip;
case HOOK_STOP_PROCESSING:
goto stop;
}
skip: logger->info("Skip: seq={}", smp->sequence);
}
stop: sent = io_print(&io, smps, send);
if (sent < 0)
throw RuntimeError("Failed to write to stdout");