From 99ee9a08b7e542d77e9b476d63006dc1bfaf75f8 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Fri, 17 Aug 2018 11:15:51 +0200 Subject: [PATCH] hooks: simplifying hooks system by consolidating read, write, and processing into a single process callback --- include/villas/hook.h | 5 ----- include/villas/hook_type.h | 4 +--- lib/hooks/decimate.c | 5 ++--- lib/hooks/drop.c | 4 ++-- lib/hooks/fix.c | 4 ++-- lib/hooks/jitter_calc.c | 4 ++-- lib/hooks/limit_rate.c | 6 ++---- lib/hooks/print.c | 34 +++------------------------------- lib/hooks/restart.c | 6 +++--- lib/hooks/shift_seq.c | 5 ++--- lib/hooks/shift_ts.c | 5 ++--- lib/hooks/skip_first.c | 5 ++--- lib/hooks/stats.c | 6 +++--- lib/hooks/ts.c | 4 ++-- lib/node.c | 4 ++-- src/villas-hook.cpp | 2 -- 16 files changed, 30 insertions(+), 73 deletions(-) diff --git a/include/villas/hook.h b/include/villas/hook.h index 407dc7e96..05a183661 100644 --- a/include/villas/hook.h +++ b/include/villas/hook.h @@ -73,13 +73,8 @@ int hook_stop(struct hook *h); int hook_periodic(struct hook *h); int hook_restart(struct hook *h); -int hook_read(struct hook *h, struct sample *smps[], unsigned *cnt); int hook_process(struct hook *h, struct sample *smps[], unsigned *cnt); -int hook_write(struct hook *h, struct sample *smps[], unsigned *cnt); - -int hook_read_list(struct list *hs, struct sample *smps[], unsigned cnt); int hook_process_list(struct list *hs, struct sample *smps[], unsigned cnt); -int hook_write_list(struct list *hs, struct sample *smps[], unsigned cnt); /** Compare two hook functions with their priority. Used by list_sort() */ int hook_cmp_priority(const void *a, const void *b); diff --git a/include/villas/hook_type.h b/include/villas/hook_type.h index 5e0471b18..0963d246a 100644 --- a/include/villas/hook_type.h +++ b/include/villas/hook_type.h @@ -61,7 +61,7 @@ struct hook_type { int (*parse)(struct hook *h, json_t *cfg); - int (*init)(struct hook *h); /**< Called before path is started to parseHOOK_DESTROYs. */ + int (*init)(struct hook *h); /**< Called before path is started to parsed. */ int (*destroy)(struct hook *h); /**< Called after path has been stopped to release memory allocated by HOOK_INIT */ int (*start)(struct hook *h); /**< Called whenever a path is started; before threads are created. */ @@ -70,9 +70,7 @@ struct hook_type { int (*periodic)(struct hook *h);/**< Called periodically. Period is set by global 'stats' option in the configuration file. */ int (*restart)(struct hook *h); /**< Called whenever a new simulation case is started. This is detected by a sequence no equal to zero. */ - int (*read)(struct hook *h, struct sample *smps[], unsigned *cnt); /**< Called whenever samples have been read from a node. */ int (*process)(struct hook *h, struct sample *smps[], unsigned *cnt); /**< Called whenever muxed samples are processed. */ - int (*write)(struct hook *h, struct sample *smps[], unsigned *cnt); /**< Called whenever samples are written to a node. */ }; struct hook_type * hook_type_lookup(const char *name); diff --git a/lib/hooks/decimate.c b/lib/hooks/decimate.c index 2584d9652..eacbdf9cf 100644 --- a/lib/hooks/decimate.c +++ b/lib/hooks/decimate.c @@ -57,7 +57,7 @@ static int decimate_parse(struct hook *h, json_t *cfg) return 0; } -static int decimate_read(struct hook *h, struct sample *smps[], unsigned *cnt) +static int decimate_process(struct hook *h, struct sample *smps[], unsigned *cnt) { struct decimate *p = (struct decimate *) h->_vd; @@ -86,8 +86,7 @@ static struct plugin p = { .priority = 99, .init = decimate_init, .parse = decimate_parse, - .read = decimate_read, - .process = decimate_read, + .process = decimate_process, .size = sizeof(struct decimate) } }; diff --git a/lib/hooks/drop.c b/lib/hooks/drop.c index 14033150e..5b6d5098a 100644 --- a/lib/hooks/drop.c +++ b/lib/hooks/drop.c @@ -55,7 +55,7 @@ static int drop_stop(struct hook *h) return 0; } -static int drop_read(struct hook *h, struct sample *smps[], unsigned *cnt) +static int drop_process(struct hook *h, struct sample *smps[], unsigned *cnt) { int i, ok, dist; @@ -121,7 +121,7 @@ static struct plugin p = { .hook = { .flags = HOOK_BUILTIN | HOOK_NODE, .priority = 3, - .read = drop_read, + .process = drop_process, .start = drop_start, .stop = drop_stop, .restart = drop_restart, diff --git a/lib/hooks/fix.c b/lib/hooks/fix.c index 347ecf8c6..ee8099f43 100644 --- a/lib/hooks/fix.c +++ b/lib/hooks/fix.c @@ -33,7 +33,7 @@ #include #include -static int fix_read(struct hook *h, struct sample *smps[], unsigned *cnt) +static int fix_process(struct hook *h, struct sample *smps[], unsigned *cnt) { struct timespec now = time_now(); @@ -66,7 +66,7 @@ static struct plugin p = { .hook = { .flags = HOOK_BUILTIN | HOOK_NODE, .priority = 1, - .read = fix_read + .process = fix_process } }; diff --git a/lib/hooks/jitter_calc.c b/lib/hooks/jitter_calc.c index 771b036f6..f4c32a1f9 100644 --- a/lib/hooks/jitter_calc.c +++ b/lib/hooks/jitter_calc.c @@ -87,7 +87,7 @@ int jitter_calc_deinit(struct hook *h) * is high (i.e. several mins depending on GPS_NTP_DELAY_WIN_SIZE), * the variance value will overrun the 64bit value. */ -int jitter_calc_read(struct hook *h, struct sample *smps[], unsigned *cnt) +static int jitter_calc_process(struct hook *h, struct sample *smps[], unsigned *cnt) { struct jitter_calc *j = (struct jitter_calc *) h->_vd; @@ -134,7 +134,7 @@ static struct plugin p = { .priority = 0, .init = jitter_calc_init, .destroy = jitter_calc_deinit, - .read = jitter_calc_read, + .process = jitter_calc_process, .size = sizeof(struct jitter_calc) } }; diff --git a/lib/hooks/limit_rate.c b/lib/hooks/limit_rate.c index d40842e97..028b3da86 100644 --- a/lib/hooks/limit_rate.c +++ b/lib/hooks/limit_rate.c @@ -87,7 +87,7 @@ static int limit_rate_parse(struct hook *h, json_t *cfg) return 0; } -static int limit_rate_write(struct hook *h, struct sample *smps[], unsigned *cnt) +static int limit_rate_process(struct hook *h, struct sample *smps[], unsigned *cnt) { struct limit_rate *p = (struct limit_rate *) h->_vd; @@ -130,9 +130,7 @@ static struct plugin p = { .priority = 99, .init = limit_rate_init, .parse = limit_rate_parse, - .read = limit_rate_write, - .write = limit_rate_write, - .process = limit_rate_write, + .process = limit_rate_process, .size = sizeof(struct limit_rate) } }; diff --git a/lib/hooks/print.c b/lib/hooks/print.c index 53b8862bd..729df88c5 100644 --- a/lib/hooks/print.c +++ b/lib/hooks/print.c @@ -109,42 +109,16 @@ static int print_parse(struct hook *h, json_t *cfg) return 0; } -static int print_read(struct hook *h, struct sample *smps[], unsigned *cnt) -{ - struct print *p = (struct print *) h->_vd; - - if (p->prefix) - printf("%s", p->prefix); - else if (h->node) - printf("Node %s read: ", node_name(h->node)); - - io_print(&p->io, smps, *cnt); - - return 0; -} - -static int print_write(struct hook *h, struct sample *smps[], unsigned *cnt) -{ - struct print *p = (struct print *) h->_vd; - - if (p->prefix) - printf("%s", p->prefix); - else if (h->node) - printf("Node %s write: ", node_name(h->node)); - - io_print(&p->io, smps, *cnt); - - return 0; -} - static int print_process(struct hook *h, struct sample *smps[], unsigned *cnt) { struct print *p = (struct print *) h->_vd; if (p->prefix) printf("%s", p->prefix); + else if (h->node) + printf("Node %s: ", node_name(h->node)); else if (h->path) - printf("Path %s process: ", path_name(h->path)); + printf("Path %s: ", path_name(h->path)); io_print(&p->io, smps, *cnt); @@ -181,8 +155,6 @@ static struct plugin p = { .destroy = print_destroy, .start = print_start, .stop = print_stop, - .read = print_read, - .write = print_write, .process = print_process, .size = sizeof(struct print) } diff --git a/lib/hooks/restart.c b/lib/hooks/restart.c index 00cf0ce5d..30522f910 100644 --- a/lib/hooks/restart.c +++ b/lib/hooks/restart.c @@ -54,7 +54,7 @@ static int restart_stop(struct hook *h) return 0; } -static int restart_read(struct hook *h, struct sample *smps[], unsigned *cnt) +static int restart_process(struct hook *h, struct sample *smps[], unsigned *cnt) { unsigned i; struct restart *r = (struct restart *) h->_vd; @@ -67,7 +67,7 @@ static int restart_read(struct hook *h, struct sample *smps[], unsigned *cnt) if (prev) { /* A wrap around of the sequence no should not be treated as a simulation restart */ - if (cur->sequence == 0 && prev->sequence <= (int) (UINT32_MAX - 128)) { + if (cur->sequence == 0 && prev->sequence != 0 && prev->sequence > UINT64_MAX - 16) { warn("Simulation from node %s restarted (previous->sequence=%" PRIu64 ", current->sequence=%" PRIu64 ")", node_name(h->node), prev->sequence, cur->sequence); @@ -106,7 +106,7 @@ static struct plugin p = { .hook = { .flags = HOOK_NODE | HOOK_BUILTIN, .priority = 1, - .read = restart_read, + .process = restart_process, .start = restart_start, .stop = restart_stop, .size = sizeof(struct restart) diff --git a/lib/hooks/shift_seq.c b/lib/hooks/shift_seq.c index 6d82d5459..c79640885 100644 --- a/lib/hooks/shift_seq.c +++ b/lib/hooks/shift_seq.c @@ -48,7 +48,7 @@ static int shift_seq_parse(struct hook *h, json_t *cfg) return 0; } -static int shift_seq_read(struct hook *h, struct sample *smps[], unsigned *cnt) +static int shift_seq_process(struct hook *h, struct sample *smps[], unsigned *cnt) { struct shift *p = (struct shift *) h->_vd; @@ -66,8 +66,7 @@ static struct plugin p = { .flags = HOOK_NODE | HOOK_PATH, .priority = 99, .parse = shift_seq_parse, - .read = shift_seq_read, - .process = shift_seq_read, + .process = shift_seq_process, .size = sizeof(struct shift), } }; diff --git a/lib/hooks/shift_ts.c b/lib/hooks/shift_ts.c index a4683a7e1..2a094ed5c 100644 --- a/lib/hooks/shift_ts.c +++ b/lib/hooks/shift_ts.c @@ -77,7 +77,7 @@ static int shift_ts_parse(struct hook *h, json_t *cfg) return 0; } -static int shift_ts_read(struct hook *h, struct sample *smps[], unsigned *cnt) +static int shift_ts_process(struct hook *h, struct sample *smps[], unsigned *cnt) { struct shift_ts *p = (struct shift_ts *) h->_vd; @@ -106,8 +106,7 @@ static struct plugin p = { .priority = 99, .init = shift_ts_init, .parse = shift_ts_parse, - .read = shift_ts_read, - .process = shift_ts_read, + .process = shift_ts_process, .size = sizeof(struct shift_ts) } }; diff --git a/lib/hooks/skip_first.c b/lib/hooks/skip_first.c index a6205a6b8..19d1ca4a6 100644 --- a/lib/hooks/skip_first.c +++ b/lib/hooks/skip_first.c @@ -89,7 +89,7 @@ static int skip_first_restart(struct hook *h) return 0; } -static int skip_first_read(struct hook *h, struct sample *smps[], unsigned *cnt) +static int skip_first_process(struct hook *h, struct sample *smps[], unsigned *cnt) { struct skip_first *p = (struct skip_first *) h->_vd; @@ -154,8 +154,7 @@ static struct plugin p = { .parse = skip_first_parse, .start = skip_first_restart, .restart = skip_first_restart, - .read = skip_first_read, - .process = skip_first_read, + .process = skip_first_process, .size = sizeof(struct skip_first) } }; diff --git a/lib/hooks/stats.c b/lib/hooks/stats.c index 0a2748056..a879ed7ed 100644 --- a/lib/hooks/stats.c +++ b/lib/hooks/stats.c @@ -155,7 +155,7 @@ static int stats_collect_parse(struct hook *h, json_t *cfg) return 0; } -static int stats_collect_read(struct hook *h, struct sample *smps[], unsigned *cnt) +static int stats_collect_process(struct hook *h, struct sample *smps[], unsigned *cnt) { struct stats_collect *p = (struct stats_collect *) h->_vd; struct stats *s = &p->stats; @@ -190,6 +190,7 @@ static int stats_collect_read(struct hook *h, struct sample *smps[], unsigned *c return 0; } +__attribute__((unused)) static int stats_collect_write(struct hook *h, struct sample *smps[], unsigned *cnt) { struct stats_collect *p = (struct stats_collect *) h->_vd; @@ -215,8 +216,7 @@ static struct plugin p = { .destroy = stats_collect_destroy, .start = stats_collect_start, .stop = stats_collect_stop, - .read = stats_collect_read, - .write = stats_collect_write, + .process = stats_collect_process, .restart = stats_collect_restart, .periodic = stats_collect_periodic, .parse = stats_collect_parse, diff --git a/lib/hooks/ts.c b/lib/hooks/ts.c index c52279bb2..07125fc59 100644 --- a/lib/hooks/ts.c +++ b/lib/hooks/ts.c @@ -29,7 +29,7 @@ #include #include -static int ts_read(struct hook *h, struct sample *smps[], unsigned *cnt) +static int ts_process(struct hook *h, struct sample *smps[], unsigned *cnt) { for (int i = 0; i < *cnt; i++) smps[i]->ts.origin = smps[i]->ts.received; @@ -44,7 +44,7 @@ static struct plugin p = { .hook = { .flags = HOOK_NODE, .priority = 99, - .read = ts_read + .process = ts_process } }; diff --git a/lib/node.c b/lib/node.c index 29b8dd4cb..575186db7 100644 --- a/lib/node.c +++ b/lib/node.c @@ -411,7 +411,7 @@ int node_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rel #ifdef WITH_HOOKS /* Run read hooks */ - int rread = hook_read_list(&n->in.hooks, smps, nread); + int rread = hook_process_list(&n->in.hooks, smps, nread); int skipped = nread - rread; if (skipped > 0 && n->stats != NULL) { @@ -437,7 +437,7 @@ int node_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re #ifdef WITH_HOOKS /* Run write hooks */ - cnt = hook_write_list(&n->out.hooks, smps, cnt); + cnt = hook_process_list(&n->out.hooks, smps, cnt); if (cnt <= 0) return cnt; #endif /* WITH_HOOKS */ diff --git a/src/villas-hook.cpp b/src/villas-hook.cpp index ad41ec4dd..a18e3961a 100644 --- a/src/villas-hook.cpp +++ b/src/villas-hook.cpp @@ -239,9 +239,7 @@ check: if (optarg == endptr) unsigned send = recv; - hook_read(&h, smps, (unsigned *) &send); hook_process(&h, smps, (unsigned *) &send); - hook_write(&h, smps, (unsigned *) &send); sent = io_print(&io, smps, send); if (sent < 0)