diff --git a/include/villas/hook.h b/include/villas/hook.h index 4451ad2c0..07e9e90fc 100644 --- a/include/villas/hook.h +++ b/include/villas/hook.h @@ -47,6 +47,13 @@ extern "C" { struct path; struct sample; +enum hook_reason { + HOOK_OK, + HOOK_ERROR, + HOOK_SKIP_SAMPLE, + HOOK_STOP_PROCESSING +}; + /** Descriptor for user defined hooks. See hooks[]. */ struct hook { enum state state; @@ -83,7 +90,7 @@ int hook_periodic(struct hook *h); int hook_restart(struct hook *h); -int hook_process(struct hook *h, struct sample *smps[], unsigned *cnt); +int hook_process(struct hook *h, struct sample *smp); /** Compare two hook functions with their priority. Used by vlist_sort() */ int hook_cmp_priority(const void *a, const void *b); diff --git a/include/villas/hook_type.h b/include/villas/hook_type.h index 632b8705d..ec3be8373 100644 --- a/include/villas/hook_type.h +++ b/include/villas/hook_type.h @@ -73,7 +73,7 @@ struct hook_type { int (*periodic)(struct hook *h);/**< Called periodically. Period is set by global 'stats' option in the configuration file. */ int (*restart)(struct hook *h); /**< Called whenever a new simulation case is started. This is detected by a sequence no equal to zero. */ - int (*process)(struct hook *h, struct sample *smps[], unsigned *cnt); /**< Called whenever muxed samples are processed. */ + int (*process)(struct hook *h, struct sample *smp); /**< Called whenever a sample is processed. */ }; struct hook_type * hook_type_lookup(const char *name); diff --git a/include/villas/sample.h b/include/villas/sample.h index 9659e306a..d50fa11a9 100644 --- a/include/villas/sample.h +++ b/include/villas/sample.h @@ -60,8 +60,7 @@ enum sample_flags { SAMPLE_HAS_ALL = (1 << 5) - 1, /**< Enable all output options. */ SAMPLE_IS_FIRST = (1 << 16), /**< This sample is the first of a new simulation case */ - SAMPLE_IS_LAST = (1 << 17), /**< This sample is the last of a running simulation case */ - SAMPLE_IS_REORDERED = (1 << 18), /**< This sample is reordered. */ + SAMPLE_IS_LAST = (1 << 17) /**< This sample is the last of a running simulation case */ }; struct sample { diff --git a/lib/hook.c b/lib/hook.c index 751f32729..fe5e9a2a6 100644 --- a/lib/hook.c +++ b/lib/hook.c @@ -31,6 +31,10 @@ #include #include +const char *hook_reasons[] = { + "ok", "error", "skip-sample", "stop-processing" +}; + int hook_init(struct hook *h, struct hook_type *vt, struct path *p, struct node *n) { int ret; @@ -200,7 +204,7 @@ int hook_restart(struct hook *h) return 0; } -int hook_process(struct hook *h, struct sample *smps[], unsigned *cnt) +int hook_process(struct hook *h, struct sample *smp) { int ret; assert(h->state == STATE_STARTED); @@ -208,12 +212,12 @@ int hook_process(struct hook *h, struct sample *smps[], unsigned *cnt) if (!h->enabled) return 0; - debug(LOG_HOOK | 10, "Process hook %s: priority=%d, cnt=%d", hook_type_name(hook_type(h)), h->priority, *cnt); - - ret = hook_type(h)->process ? hook_type(h)->process(h, smps, cnt) : 0; + ret = hook_type(h)->process ? hook_type(h)->process(h, smp) : 0; if (ret) return ret; + debug(LOG_HOOK | 10, "Hook %s processed: priority=%d, return=%s", hook_type_name(hook_type(h)), h->priority, hook_reasons[ret]); + return 0; } @@ -350,17 +354,32 @@ const char * hook_type_name(struct hook_type *vt) int hook_list_process(struct vlist *hs, struct sample *smps[], unsigned cnt) { - unsigned ret; + unsigned ret, curent, processed = 0; - for (size_t i = 0; i < vlist_length(hs); i++) { - struct hook *h = (struct hook *) vlist_at(hs, i); + for (curent = 0; curent < cnt; curent++) { + struct sample *smp = smps[curent]; - ret = hook_process(h, smps, &cnt); - if (ret || !cnt) - /* Abort hook processing if earlier hooks removed all samples - * or they returned something non-zero */ - break; + for (size_t i = 0; i < vlist_length(hs); i++) { + struct hook *h = (struct hook *) vlist_at(hs, i); + + ret = hook_process(h, smp); + switch (ret) { + case HOOK_ERROR: + return -1; + + case HOOK_OK: + smps[processed++] = smp; + break; + + case HOOK_SKIP_SAMPLE: + goto skip; + + case HOOK_STOP_PROCESSING: + goto stop; + } + } +skip: {} } - return cnt; +stop: return processed; } diff --git a/lib/hooks/average.c b/lib/hooks/average.c index 6093b8620..a7cfe7f3b 100644 --- a/lib/hooks/average.c +++ b/lib/hooks/average.c @@ -138,43 +138,40 @@ static int average_parse(struct hook *h, json_t *cfg) return 0; } -static int average_process(struct hook *h, struct sample *smps[], unsigned *cnt) +static int average_process(struct hook *h, struct sample *smp) { struct average *a = (struct average *) h->_vd; - for (int i = 0; i < *cnt; i++) { - struct sample *smp = smps[i]; - double avg, sum = 0; - int n = 0; + double avg, sum = 0; + int n = 0; - for (int k = 0; k < smp->length; k++) { - if (!bitset_test(&a->mask, k)) - continue; + for (int k = 0; k < smp->length; k++) { + if (!bitset_test(&a->mask, k)) + continue; - switch (sample_format(smp, k)) { - case SIGNAL_TYPE_INTEGER: - sum += smp->data[k].i; - break; + switch (sample_format(smp, k)) { + case SIGNAL_TYPE_INTEGER: + sum += smp->data[k].i; + break; - case SIGNAL_TYPE_FLOAT: - sum += smp->data[k].f; - break; + case SIGNAL_TYPE_FLOAT: + sum += smp->data[k].f; + break; - case SIGNAL_TYPE_INVALID: - case SIGNAL_TYPE_COMPLEX: - case SIGNAL_TYPE_BOOLEAN: - return -1; /* not supported */ - } - - n++; + case SIGNAL_TYPE_INVALID: + case SIGNAL_TYPE_COMPLEX: + case SIGNAL_TYPE_BOOLEAN: + return -1; /* not supported */ } - avg = n == 0 ? 0 : sum / n; - sample_data_insert(smp, (union signal_data *) &avg, a->offset, 1); - smp->signals = &h->signals; + n++; } - return 0; + avg = n == 0 ? 0 : sum / n; + sample_data_insert(smp, (union signal_data *) &avg, a->offset, 1); + smp->signals = &h->signals; + + return HOOK_OK; } static struct plugin p = { diff --git a/lib/hooks/cast.c b/lib/hooks/cast.c index 1be7c134e..1820163c4 100644 --- a/lib/hooks/cast.c +++ b/lib/hooks/cast.c @@ -136,23 +136,19 @@ static int cast_parse(struct hook *h, json_t *cfg) return 0; } -static int cast_process(struct hook *h, struct sample *smps[], unsigned *cnt) +static int cast_process(struct hook *h, struct sample *smp) { struct cast *c = (struct cast *) h->_vd; - for (int i = 0; i < *cnt; i++) { - struct sample *smp = smps[i]; + struct signal *orig_sig = vlist_at(smp->signals, c->signal_index); + struct signal *new_sig = vlist_at(&h->signals, c->signal_index); - struct signal *orig_sig = vlist_at(smp->signals, c->signal_index); - struct signal *new_sig = vlist_at(&h->signals, c->signal_index); + signal_data_cast(&smp->data[c->signal_index], orig_sig, new_sig); - signal_data_cast(&smp->data[c->signal_index], orig_sig, new_sig); + /* Replace signal descriptors of sample */ + smp->signals = &h->signals; - /* Replace signal descriptors of sample */ - smp->signals = &h->signals; - } - - return 0; + return HOOK_OK; } static struct plugin p = { diff --git a/lib/hooks/decimate.c b/lib/hooks/decimate.c index ae80a7cb0..e76f66272 100644 --- a/lib/hooks/decimate.c +++ b/lib/hooks/decimate.c @@ -66,24 +66,14 @@ static int decimate_parse(struct hook *h, json_t *cfg) return 0; } -static int decimate_process(struct hook *h, struct sample *smps[], unsigned *cnt) +static int decimate_process(struct hook *h, struct sample *smp) { struct decimate *p = (struct decimate *) h->_vd; - int i, ok; - for (i = 0, ok = 0; i < *cnt; i++) { - if (p->ratio && p->counter++ % p->ratio == 0) { - struct sample *tmp; + if (p->ratio && p->counter++ % p->ratio != 0) + return HOOK_SKIP_SAMPLE; - tmp = smps[ok]; - smps[ok++] = smps[i]; - smps[i] = tmp; - } - } - - *cnt = ok; - - return 0; + return HOOK_OK; } static struct plugin p = { diff --git a/lib/hooks/drop.c b/lib/hooks/drop.c index 23d055fd2..bdca0e3e8 100644 --- a/lib/hooks/drop.c +++ b/lib/hooks/drop.c @@ -55,51 +55,29 @@ static int drop_stop(struct hook *h) return 0; } -static int drop_process(struct hook *h, struct sample *smps[], unsigned *cnt) +static int drop_process(struct hook *h, struct sample *smp) { - int i, ok, dist; + int dist; struct drop *d = (struct drop *) h->_vd; - struct sample *prev, *cur = NULL; - for (i = 0, ok = 0, prev = d->prev; i < *cnt; i++, prev = cur) { - cur = smps[i]; - if (prev) { - dist = cur->sequence - (int64_t) prev->sequence; - if (dist <= 0) { - cur->flags |= SAMPLE_IS_REORDERED; + if (d->prev) { + dist = smp->sequence - (int64_t) d->prev->sequence; + if (dist <= 0) { + debug(10, "Dropping reordered sample: sequence=%" PRIu64 ", distance=%d", smp->sequence, dist); - debug(10, "Dropping reordered sample: sequence=%" PRIu64 ", distance=%d", cur->sequence, dist); - } - else - goto ok; + return HOOK_SKIP_SAMPLE; } - else - goto ok; - - continue; - -ok: /* To discard the first X samples in 'smps[]' we must - * shift them to the end of the 'smps[]' array. - * In case the hook returns a number 'ok' which is smaller than 'cnt', - * only the first 'ok' samples in 'smps[]' are accepted and further processed. - */ - - smps[i] = smps[ok]; - smps[ok++] = cur; } - if (cur) - sample_incref(cur); + sample_incref(smp); if (d->prev) sample_decref(d->prev); - d->prev = cur; + d->prev = smp; - *cnt = ok; - - return 0; + return HOOK_OK; } static int drop_restart(struct hook *h) diff --git a/lib/hooks/dump.c b/lib/hooks/dump.c index 32dd3c949..b285e93e0 100644 --- a/lib/hooks/dump.c +++ b/lib/hooks/dump.c @@ -30,12 +30,11 @@ #include #include -static int dump_process(struct hook *h, struct sample *smps[], unsigned *cnt) +static int dump_process(struct hook *h, struct sample *smp) { - for (int i = 0; i < *cnt; i++) - sample_dump(smps[i]); + sample_dump(smp); - return 0; + return HOOK_OK; } static struct plugin p = { diff --git a/lib/hooks/ebm.c b/lib/hooks/ebm.c index e82671c17..1f276b8a6 100644 --- a/lib/hooks/ebm.c +++ b/lib/hooks/ebm.c @@ -29,7 +29,10 @@ #include struct ebm { + char *signal_name; + int signal_index; + double total_energy; }; static int ebm_init(struct hook *h) @@ -75,7 +78,7 @@ static int ebm_prepare(struct hook *h) } -static int ebm_process(struct hook *h, struct sample *smps[], unsigned *cnt) +static int ebm_process(struct hook *h, struct sample *smp) { __attribute__((unused)) struct ebm *e = (struct ebm *) h->_vd; diff --git a/lib/hooks/fix.c b/lib/hooks/fix.c index 1556cd83f..89708647f 100644 --- a/lib/hooks/fix.c +++ b/lib/hooks/fix.c @@ -33,30 +33,26 @@ #include #include -static int fix_process(struct hook *h, struct sample *smps[], unsigned *cnt) +static int fix_process(struct hook *h, struct sample *smp) { struct timespec now = time_now(); - for (int i = 0; i < *cnt; i++) { - struct sample *smp = smps[i]; - - if (!(smp->flags & SAMPLE_HAS_SEQUENCE) && h->node) { - smp->sequence = h->node->sequence++; - smp->flags |= SAMPLE_HAS_SEQUENCE; - } - - if (!(smp->flags & SAMPLE_HAS_TS_RECEIVED)) { - smp->ts.received = now; - smp->flags |= SAMPLE_HAS_TS_RECEIVED; - } - - if (!(smp->flags & SAMPLE_HAS_TS_ORIGIN)) { - smp->ts.origin = smp->ts.received; - smp->flags |= SAMPLE_HAS_TS_ORIGIN; - } + if (!(smp->flags & SAMPLE_HAS_SEQUENCE) && h->node) { + smp->sequence = h->node->sequence++; + smp->flags |= SAMPLE_HAS_SEQUENCE; } - return 0; + if (!(smp->flags & SAMPLE_HAS_TS_RECEIVED)) { + smp->ts.received = now; + smp->flags |= SAMPLE_HAS_TS_RECEIVED; + } + + if (!(smp->flags & SAMPLE_HAS_TS_ORIGIN)) { + smp->ts.origin = smp->ts.received; + smp->flags |= SAMPLE_HAS_TS_ORIGIN; + } + + return HOOK_OK; } static struct plugin p = { diff --git a/lib/hooks/jitter_calc.c b/lib/hooks/jitter_calc.c index a410dc9e3..be38cd334 100644 --- a/lib/hooks/jitter_calc.c +++ b/lib/hooks/jitter_calc.c @@ -87,42 +87,40 @@ int jitter_calc_deinit(struct hook *h) * is high (i.e. several mins depending on GPS_NTP_DELAY_WIN_SIZE), * the variance value will overrun the 64bit value. */ -static int jitter_calc_process(struct hook *h, struct sample *smps[], unsigned *cnt) +static int jitter_calc_process(struct hook *h, struct sample *smp) { struct jitter_calc *j = (struct jitter_calc *) h->_vd; struct timespec now = time_now(); int64_t delay_sec, delay_nsec, curr_delay_us; - for(int i = 0; i < *cnt; i++) { - delay_sec = now.tv_sec - smps[i]->ts.origin.tv_sec; - delay_nsec = now.tv_nsec - smps[i]->ts.origin.tv_nsec; + delay_sec = now.tv_sec - smp->ts.origin.tv_sec; + delay_nsec = now.tv_nsec - smp->ts.origin.tv_nsec; - /* Calc on microsec instead of nenosec delay as variance formula overflows otherwise.*/ - curr_delay_us = delay_sec * 1000000 + delay_nsec / 1000; + /* Calc on microsec instead of nenosec delay as variance formula overflows otherwise.*/ + curr_delay_us = delay_sec * 1000000 + delay_nsec / 1000; - j->delay_mov_sum = j->delay_mov_sum + curr_delay_us - j->delay_series[j->curr_count]; - j->moving_avg[j->curr_count] = j->delay_mov_sum / GPS_NTP_DELAY_WIN_SIZE; /* Will be valid after GPS_NTP_DELAY_WIN_SIZE initial values */ + j->delay_mov_sum = j->delay_mov_sum + curr_delay_us - j->delay_series[j->curr_count]; + j->moving_avg[j->curr_count] = j->delay_mov_sum / GPS_NTP_DELAY_WIN_SIZE; /* Will be valid after GPS_NTP_DELAY_WIN_SIZE initial values */ - j->delay_mov_sum_sqrd = j->delay_mov_sum_sqrd + (curr_delay_us * curr_delay_us) - (j->delay_series[j->curr_count] * j->delay_series[j->curr_count]); - j->moving_var[j->curr_count] = (j->delay_mov_sum_sqrd - (j->delay_mov_sum * j->delay_mov_sum) / GPS_NTP_DELAY_WIN_SIZE) / (GPS_NTP_DELAY_WIN_SIZE - 1); + j->delay_mov_sum_sqrd = j->delay_mov_sum_sqrd + (curr_delay_us * curr_delay_us) - (j->delay_series[j->curr_count] * j->delay_series[j->curr_count]); + j->moving_var[j->curr_count] = (j->delay_mov_sum_sqrd - (j->delay_mov_sum * j->delay_mov_sum) / GPS_NTP_DELAY_WIN_SIZE) / (GPS_NTP_DELAY_WIN_SIZE - 1); - j->delay_series[j->curr_count] = curr_delay_us; /* Update the last delay value */ + j->delay_series[j->curr_count] = curr_delay_us; /* Update the last delay value */ - /* Jitter calc formula as used in Wireshark according to RFC3550 (RTP) - D(i,j) = (Rj-Ri)-(Sj-Si) = (Rj-Sj)-(Ri-Si) - J(i) = J(i-1)+(|D(i-1,i)|-J(i-1))/16 - */ - j->jitter_val[(j->curr_count + 1) % GPS_NTP_DELAY_WIN_SIZE] = j->jitter_val[j->curr_count] + (labs(curr_delay_us) - j->jitter_val[j->curr_count]) / 16; + /* Jitter calc formula as used in Wireshark according to RFC3550 (RTP) + D(i,j) = (Rj-Ri)-(Sj-Si) = (Rj-Sj)-(Ri-Si) + J(i) = J(i-1)+(|D(i-1,i)|-J(i-1))/16 + */ + j->jitter_val[(j->curr_count + 1) % GPS_NTP_DELAY_WIN_SIZE] = j->jitter_val[j->curr_count] + (labs(curr_delay_us) - j->jitter_val[j->curr_count]) / 16; - info("%s: jitter=%" PRId64 " usec, moving average=%" PRId64 " usec, moving variance=%" PRId64 " usec", __FUNCTION__, j->jitter_val[(j->curr_count + 1) % GPS_NTP_DELAY_WIN_SIZE], j->moving_avg[j->curr_count], j->moving_var[j->curr_count]); + info("%s: jitter=%" PRId64 " usec, moving average=%" PRId64 " usec, moving variance=%" PRId64 " usec", __FUNCTION__, j->jitter_val[(j->curr_count + 1) % GPS_NTP_DELAY_WIN_SIZE], j->moving_avg[j->curr_count], j->moving_var[j->curr_count]); - j->curr_count++; - if (j->curr_count >= GPS_NTP_DELAY_WIN_SIZE) - j->curr_count = 0; - } + j->curr_count++; + if (j->curr_count >= GPS_NTP_DELAY_WIN_SIZE) + j->curr_count = 0; - return 0; + return HOOK_OK; } static struct plugin p = { diff --git a/lib/hooks/limit_rate.c b/lib/hooks/limit_rate.c index fc27d2d8f..3f080a2cc 100644 --- a/lib/hooks/limit_rate.c +++ b/lib/hooks/limit_rate.c @@ -96,38 +96,31 @@ static int limit_rate_parse(struct hook *h, json_t *cfg) return 0; } -static int limit_rate_process(struct hook *h, struct sample *smps[], unsigned *cnt) +static int limit_rate_process(struct hook *h, struct sample *smp) { struct limit_rate *p = (struct limit_rate *) h->_vd; struct timespec next; - unsigned ret = 0; + switch (p->mode) { + case LIMIT_RATE_LOCAL: + next = time_now(); + break; - for (unsigned i = 0; i < *cnt; i++) { - switch (p->mode) { - case LIMIT_RATE_LOCAL: - next = time_now(); - break; + case LIMIT_RATE_ORIGIN: + next = smp->ts.origin; + break; - case LIMIT_RATE_ORIGIN: - next = smps[i]->ts.origin; - break; - - case LIMIT_RATE_RECEIVED: - next = smps[i]->ts.received; - break; - } - - if (time_delta(&p->last, &next) < p->deadtime) - continue; /* Drop this sample */ - - p->last = next; - smps[ret++] = smps[i]; + case LIMIT_RATE_RECEIVED: + next = smp->ts.received; + break; } - *cnt = ret; + if (time_delta(&p->last, &next) < p->deadtime) + return HOOK_SKIP_SAMPLE; - return 0; + p->last = next; + + return HOOK_OK; } static struct plugin p = { diff --git a/lib/hooks/print.c b/lib/hooks/print.c index a88a541db..953438edc 100644 --- a/lib/hooks/print.c +++ b/lib/hooks/print.c @@ -117,7 +117,7 @@ static int print_parse(struct hook *h, json_t *cfg) return 0; } -static int print_process(struct hook *h, struct sample *smps[], unsigned *cnt) +static int print_process(struct hook *h, struct sample *smp) { struct print *p = (struct print *) h->_vd; @@ -128,7 +128,7 @@ static int print_process(struct hook *h, struct sample *smps[], unsigned *cnt) else if (h->path) printf("Path %s: ", path_name(h->path)); - io_print(&p->io, smps, *cnt); + io_print(&p->io, &smp, 1); return 0; } diff --git a/lib/hooks/restart.c b/lib/hooks/restart.c index e4bbc8cb4..d422e8e93 100644 --- a/lib/hooks/restart.c +++ b/lib/hooks/restart.c @@ -54,49 +54,42 @@ static int restart_stop(struct hook *h) return 0; } -static int restart_process(struct hook *h, struct sample *smps[], unsigned *cnt) +static int restart_process(struct hook *h, struct sample *smp) { - unsigned i; struct restart *r = (struct restart *) h->_vd; - struct sample *prev, *cur = NULL; assert(h->node); - for (i = 0, prev = r->prev; i < *cnt; i++, prev = cur) { - cur = smps[i]; + if (r->prev) { + /* A wrap around of the sequence no should not be treated as a simulation restart */ + if (smp->sequence == 0 && r->prev->sequence != 0 && r->prev->sequence > UINT64_MAX - 16) { + warning("Simulation from node %s restarted (previous->sequence=%" PRIu64 ", current->sequence=%" PRIu64 ")", + node_name(h->node), r->prev->sequence, smp->sequence); - if (prev) { - /* A wrap around of the sequence no should not be treated as a simulation restart */ - if (cur->sequence == 0 && prev->sequence != 0 && prev->sequence > UINT64_MAX - 16) { - warning("Simulation from node %s restarted (previous->sequence=%" PRIu64 ", current->sequence=%" PRIu64 ")", - node_name(h->node), prev->sequence, cur->sequence); + smp->flags |= SAMPLE_IS_FIRST; - cur->flags |= SAMPLE_IS_FIRST; + /* Run restart hooks */ + for (size_t i = 0; i < vlist_length(&h->node->in.hooks); i++) { + struct hook *k = (struct hook *) vlist_at(&h->node->in.hooks, i); - /* Run restart hooks */ - for (size_t i = 0; i < vlist_length(&h->node->in.hooks); i++) { - struct hook *k = (struct hook *) vlist_at(&h->node->in.hooks, i); + hook_restart(k); + } - hook_restart(k); - } + for (size_t i = 0; i < vlist_length(&h->node->out.hooks); i++) { + struct hook *k = (struct hook *) vlist_at(&h->node->out.hooks, i); - for (size_t i = 0; i < vlist_length(&h->node->out.hooks); i++) { - struct hook *k = (struct hook *) vlist_at(&h->node->out.hooks, i); - - hook_restart(k); - } + hook_restart(k); } } } - if (cur) - sample_incref(cur); + sample_incref(smp); if (r->prev) sample_decref(r->prev); - r->prev = cur; + r->prev = smp; - return 0; + return HOOK_OK; } static struct plugin p = { diff --git a/lib/hooks/scale.c b/lib/hooks/scale.c index 3b8b7e59f..8550286a2 100644 --- a/lib/hooks/scale.c +++ b/lib/hooks/scale.c @@ -104,36 +104,33 @@ static int scale_parse(struct hook *h, json_t *cfg) return 0; } -static int scale_process(struct hook *h, struct sample *smps[], unsigned *cnt) +static int scale_process(struct hook *h, struct sample *smp) { struct scale *s = (struct scale *) h->_vd; - for (int i = 0; i < *cnt; i++) { - struct sample *smp = smps[i]; - int k = s->signal_index; + int k = s->signal_index; - switch (sample_format(smp, k)) { - case SIGNAL_TYPE_INTEGER: - smp->data[k].i = smp->data[k].i * s->scale + s->offset; - break; + switch (sample_format(smp, k)) { + case SIGNAL_TYPE_INTEGER: + smp->data[k].i = smp->data[k].i * s->scale + s->offset; + break; - case SIGNAL_TYPE_FLOAT: - smp->data[k].f = smp->data[k].f * s->scale + s->offset; - break; + case SIGNAL_TYPE_FLOAT: + smp->data[k].f = smp->data[k].f * s->scale + s->offset; + break; - case SIGNAL_TYPE_COMPLEX: - smp->data[k].z = smp->data[k].z * s->scale + s->offset; - break; + case SIGNAL_TYPE_COMPLEX: + smp->data[k].z = smp->data[k].z * s->scale + s->offset; + break; - case SIGNAL_TYPE_BOOLEAN: - smp->data[k].b = smp->data[k].b * s->scale + s->offset; - break; + case SIGNAL_TYPE_BOOLEAN: + smp->data[k].b = smp->data[k].b * s->scale + s->offset; + break; - default: { } - } + default: { } } - return 0; + return HOOK_OK; } static struct plugin p = { diff --git a/lib/hooks/shift_seq.c b/lib/hooks/shift_seq.c index ddd3bd484..da8595078 100644 --- a/lib/hooks/shift_seq.c +++ b/lib/hooks/shift_seq.c @@ -48,14 +48,13 @@ static int shift_seq_parse(struct hook *h, json_t *cfg) return 0; } -static int shift_seq_process(struct hook *h, struct sample *smps[], unsigned *cnt) +static int shift_seq_process(struct hook *h, struct sample *smp) { struct shift *p = (struct shift *) h->_vd; - for (int i = 0; i < *cnt; i++) - smps[i]->sequence += p->offset; + smp->sequence += p->offset; - return 0; + return HOOK_OK; } static struct plugin p = { diff --git a/lib/hooks/shift_ts.c b/lib/hooks/shift_ts.c index 102940728..7d5afbd93 100644 --- a/lib/hooks/shift_ts.c +++ b/lib/hooks/shift_ts.c @@ -77,24 +77,28 @@ static int shift_ts_parse(struct hook *h, json_t *cfg) return 0; } -static int shift_ts_process(struct hook *h, struct sample *smps[], unsigned *cnt) +static int shift_ts_process(struct hook *h, struct sample *smp) { struct shift_ts *p = (struct shift_ts *) h->_vd; - for (int i = 0; i < *cnt; i++) { - struct sample *s = smps[i]; - struct timespec *ts; + struct timespec *ts; - switch (p->mode) { - case SHIFT_ORIGIN: ts = &s->ts.origin; break; - case SHIFT_RECEIVED: ts = &s->ts.received; break; - default: return -1; - } + switch (p->mode) { + case SHIFT_ORIGIN: + ts = &smp->ts.origin; + break; - *ts = time_add(ts, &p->offset); break; + case SHIFT_RECEIVED: + ts = &smp->ts.received; + break; + + default: + return HOOK_ERROR; } - return 0; + *ts = time_add(ts, &p->offset);; + + return HOOK_OK; } static struct plugin p = { diff --git a/lib/hooks/skip_first.c b/lib/hooks/skip_first.c index 4b1120ee6..87bf27c35 100644 --- a/lib/hooks/skip_first.c +++ b/lib/hooks/skip_first.c @@ -89,7 +89,7 @@ static int skip_first_restart(struct hook *h) return 0; } -static int skip_first_process(struct hook *h, struct sample *smps[], unsigned *cnt) +static int skip_first_process(struct hook *h, struct sample *smp) { struct skip_first *p = (struct skip_first *) h->_vd; @@ -97,51 +97,33 @@ static int skip_first_process(struct hook *h, struct sample *smps[], unsigned *c if (p->state == HOOK_SKIP_FIRST_STATE_STARTED) { switch (p->mode) { case HOOK_SKIP_MODE_SAMPLES: - p->samples.until = smps[0]->sequence + p->samples.wait; + p->samples.until = smp->sequence + p->samples.wait; break; case HOOK_SKIP_MODE_SECONDS: - p->seconds.until = time_add(&smps[0]->ts.origin, &p->seconds.wait); + p->seconds.until = time_add(&smp->ts.origin, &p->seconds.wait); break; } p->state = HOOK_SKIP_FIRST_STATE_SKIPPING; } - int i, ok; - for (i = 0, ok = 0; i < *cnt; i++) { - bool skip; - switch (p->mode) { - case HOOK_SKIP_MODE_SAMPLES: - skip = p->samples.until > smps[i]->sequence; - break; + switch (p->mode) { + case HOOK_SKIP_MODE_SAMPLES: + if (p->samples.until > smp->sequence) + return HOOK_SKIP_SAMPLE; + break; - case HOOK_SKIP_MODE_SECONDS: - skip = time_delta(&p->seconds.until, &smps[i]->ts.origin) < 0; - break; - default: - skip = false; - break; - } + case HOOK_SKIP_MODE_SECONDS: + if (time_delta(&p->seconds.until, &smp->ts.origin) < 0) + return HOOK_SKIP_SAMPLE; + break; - if (!skip) { - struct sample *tmp; - - tmp = smps[i]; - smps[i] = smps[ok]; - smps[ok++] = tmp; - } - - /* To discard the first X samples in 'smps[]' we must - * shift them to the end of the 'smps[]' array. - * In case the hook returns a number 'ok' which is smaller than 'cnt', - * only the first 'ok' samples in 'smps[]' are accepted and further processed. - */ + default: + break; } - *cnt = ok; - - return 0; + return HOOK_OK; } static struct plugin p = { diff --git a/lib/hooks/stats.c b/lib/hooks/stats.c index b5ed90675..95dfcb658 100644 --- a/lib/hooks/stats.c +++ b/lib/hooks/stats.c @@ -155,48 +155,38 @@ static int stats_collect_parse(struct hook *h, json_t *cfg) return 0; } -static int stats_collect_process(struct hook *h, struct sample *smps[], unsigned *cnt) +static int stats_collect_process(struct hook *h, struct sample *smp) { struct stats_collect *p = (struct stats_collect *) h->_vd; struct stats *s = &p->stats; - int dist; - struct sample *previous = p->last; + if (p->last) { + if (smp->flags & p->last->flags & SAMPLE_HAS_TS_RECEIVED) + stats_update(s, STATS_METRIC_GAP_RECEIVED, time_delta(&p->last->ts.received, &smp->ts.received)); - for (int i = 0; i < *cnt; i++) { - struct sample *current = smps[i]; + if (smp->flags & p->last->flags & SAMPLE_HAS_TS_ORIGIN) + stats_update(s, STATS_METRIC_GAP_SAMPLE, time_delta(&p->last->ts.origin, &smp->ts.origin)); - if (previous) { - if (current->flags & previous->flags & SAMPLE_HAS_TS_RECEIVED) - stats_update(s, STATS_METRIC_GAP_RECEIVED, time_delta(&previous->ts.received, ¤t->ts.received)); + if ((smp->flags & SAMPLE_HAS_TS_ORIGIN) && (smp->flags & SAMPLE_HAS_TS_RECEIVED)) + stats_update(s, STATS_METRIC_OWD, time_delta(&smp->ts.origin, &smp->ts.received)); - if (current->flags & previous->flags & SAMPLE_HAS_TS_ORIGIN) - stats_update(s, STATS_METRIC_GAP_SAMPLE, time_delta(&previous->ts.origin, ¤t->ts.origin)); - - if ((current->flags & SAMPLE_HAS_TS_ORIGIN) && (current->flags & SAMPLE_HAS_TS_RECEIVED)) - stats_update(s, STATS_METRIC_OWD, time_delta(¤t->ts.origin, ¤t->ts.received)); - - if (current->flags & previous->flags & SAMPLE_HAS_SEQUENCE) { - dist = current->sequence - (int32_t) previous->sequence; - if (dist != 1) - stats_update(s, STATS_METRIC_REORDERED, dist); - } + if (smp->flags & p->last->flags & SAMPLE_HAS_SEQUENCE) { + int dist = smp->sequence - (int32_t) p->last->sequence; + if (dist != 1) + stats_update(s, STATS_METRIC_REORDERED, dist); } - - previous = current; } + sample_incref(smp); + if (p->last) sample_decref(p->last); - if (previous) - sample_incref(previous); - - p->last = previous; + p->last = smp; stats_commit(&p->stats); - return 0; + return HOOK_OK; } static struct plugin p = { diff --git a/lib/hooks/ts.c b/lib/hooks/ts.c index 6ad3566ac..4d3ef2250 100644 --- a/lib/hooks/ts.c +++ b/lib/hooks/ts.c @@ -29,12 +29,11 @@ #include #include -static int ts_process(struct hook *h, struct sample *smps[], unsigned *cnt) +static int ts_process(struct hook *h, struct sample *smp) { - for (int i = 0; i < *cnt; i++) - smps[i]->ts.origin = smps[i]->ts.received; + smp->ts.origin = smp->ts.received; - return 0; + return HOOK_OK; } static struct plugin p = { diff --git a/src/villas-hook.cpp b/src/villas-hook.cpp index 2087ab384..9f1195ec0 100644 --- a/src/villas-hook.cpp +++ b/src/villas-hook.cpp @@ -217,13 +217,29 @@ check: if (optarg == endptr) logger->debug("Read {} smps from stdin", recv); - unsigned send = recv; + unsigned send = 0; + for (int processed = 0; processed < recv; processed++) { + struct sample *smp = smps[processed]; - ret = hook_process(&h, smps, (unsigned *) &send); - if (ret < 0) - throw RuntimeError("Failed to process samples"); + ret = hook_process(&h, smp); + switch (ret) { + case HOOK_ERROR: + throw RuntimeError("Failed to process samples"); - sent = io_print(&io, smps, send); + case HOOK_OK: + smps[send++] = smp; + break; + + case HOOK_SKIP_SAMPLE: + goto skip; + + case HOOK_STOP_PROCESSING: + goto stop; + } +skip: logger->info("Skip: seq={}", smp->sequence); + } + +stop: sent = io_print(&io, smps, send); if (sent < 0) throw RuntimeError("Failed to write to stdout");