1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

hooks: simplifying hooks system by consolidating read, write, and processing into a single process callback

This commit is contained in:
Steffen Vogel 2018-08-17 11:15:51 +02:00
parent 4a6c7db9ee
commit 99ee9a08b7
16 changed files with 30 additions and 73 deletions

View file

@ -73,13 +73,8 @@ int hook_stop(struct hook *h);
int hook_periodic(struct hook *h);
int hook_restart(struct hook *h);
int hook_read(struct hook *h, struct sample *smps[], unsigned *cnt);
int hook_process(struct hook *h, struct sample *smps[], unsigned *cnt);
int hook_write(struct hook *h, struct sample *smps[], unsigned *cnt);
int hook_read_list(struct list *hs, struct sample *smps[], unsigned cnt);
int hook_process_list(struct list *hs, struct sample *smps[], unsigned cnt);
int hook_write_list(struct list *hs, struct sample *smps[], unsigned cnt);
/** Compare two hook functions with their priority. Used by list_sort() */
int hook_cmp_priority(const void *a, const void *b);

View file

@ -61,7 +61,7 @@ struct hook_type {
int (*parse)(struct hook *h, json_t *cfg);
int (*init)(struct hook *h); /**< Called before path is started to parseHOOK_DESTROYs. */
int (*init)(struct hook *h); /**< Called before path is started to parsed. */
int (*destroy)(struct hook *h); /**< Called after path has been stopped to release memory allocated by HOOK_INIT */
int (*start)(struct hook *h); /**< Called whenever a path is started; before threads are created. */
@ -70,9 +70,7 @@ struct hook_type {
int (*periodic)(struct hook *h);/**< Called periodically. Period is set by global 'stats' option in the configuration file. */
int (*restart)(struct hook *h); /**< Called whenever a new simulation case is started. This is detected by a sequence no equal to zero. */
int (*read)(struct hook *h, struct sample *smps[], unsigned *cnt); /**< Called whenever samples have been read from a node. */
int (*process)(struct hook *h, struct sample *smps[], unsigned *cnt); /**< Called whenever muxed samples are processed. */
int (*write)(struct hook *h, struct sample *smps[], unsigned *cnt); /**< Called whenever samples are written to a node. */
};
struct hook_type * hook_type_lookup(const char *name);

View file

@ -57,7 +57,7 @@ static int decimate_parse(struct hook *h, json_t *cfg)
return 0;
}
static int decimate_read(struct hook *h, struct sample *smps[], unsigned *cnt)
static int decimate_process(struct hook *h, struct sample *smps[], unsigned *cnt)
{
struct decimate *p = (struct decimate *) h->_vd;
@ -86,8 +86,7 @@ static struct plugin p = {
.priority = 99,
.init = decimate_init,
.parse = decimate_parse,
.read = decimate_read,
.process = decimate_read,
.process = decimate_process,
.size = sizeof(struct decimate)
}
};

View file

@ -55,7 +55,7 @@ static int drop_stop(struct hook *h)
return 0;
}
static int drop_read(struct hook *h, struct sample *smps[], unsigned *cnt)
static int drop_process(struct hook *h, struct sample *smps[], unsigned *cnt)
{
int i, ok, dist;
@ -121,7 +121,7 @@ static struct plugin p = {
.hook = {
.flags = HOOK_BUILTIN | HOOK_NODE,
.priority = 3,
.read = drop_read,
.process = drop_process,
.start = drop_start,
.stop = drop_stop,
.restart = drop_restart,

View file

@ -33,7 +33,7 @@
#include <villas/sample.h>
#include <villas/timing.h>
static int fix_read(struct hook *h, struct sample *smps[], unsigned *cnt)
static int fix_process(struct hook *h, struct sample *smps[], unsigned *cnt)
{
struct timespec now = time_now();
@ -66,7 +66,7 @@ static struct plugin p = {
.hook = {
.flags = HOOK_BUILTIN | HOOK_NODE,
.priority = 1,
.read = fix_read
.process = fix_process
}
};

View file

@ -87,7 +87,7 @@ int jitter_calc_deinit(struct hook *h)
* is high (i.e. several mins depending on GPS_NTP_DELAY_WIN_SIZE),
* the variance value will overrun the 64bit value.
*/
int jitter_calc_read(struct hook *h, struct sample *smps[], unsigned *cnt)
static int jitter_calc_process(struct hook *h, struct sample *smps[], unsigned *cnt)
{
struct jitter_calc *j = (struct jitter_calc *) h->_vd;
@ -134,7 +134,7 @@ static struct plugin p = {
.priority = 0,
.init = jitter_calc_init,
.destroy = jitter_calc_deinit,
.read = jitter_calc_read,
.process = jitter_calc_process,
.size = sizeof(struct jitter_calc)
}
};

View file

@ -87,7 +87,7 @@ static int limit_rate_parse(struct hook *h, json_t *cfg)
return 0;
}
static int limit_rate_write(struct hook *h, struct sample *smps[], unsigned *cnt)
static int limit_rate_process(struct hook *h, struct sample *smps[], unsigned *cnt)
{
struct limit_rate *p = (struct limit_rate *) h->_vd;
@ -130,9 +130,7 @@ static struct plugin p = {
.priority = 99,
.init = limit_rate_init,
.parse = limit_rate_parse,
.read = limit_rate_write,
.write = limit_rate_write,
.process = limit_rate_write,
.process = limit_rate_process,
.size = sizeof(struct limit_rate)
}
};

View file

@ -109,42 +109,16 @@ static int print_parse(struct hook *h, json_t *cfg)
return 0;
}
static int print_read(struct hook *h, struct sample *smps[], unsigned *cnt)
{
struct print *p = (struct print *) h->_vd;
if (p->prefix)
printf("%s", p->prefix);
else if (h->node)
printf("Node %s read: ", node_name(h->node));
io_print(&p->io, smps, *cnt);
return 0;
}
static int print_write(struct hook *h, struct sample *smps[], unsigned *cnt)
{
struct print *p = (struct print *) h->_vd;
if (p->prefix)
printf("%s", p->prefix);
else if (h->node)
printf("Node %s write: ", node_name(h->node));
io_print(&p->io, smps, *cnt);
return 0;
}
static int print_process(struct hook *h, struct sample *smps[], unsigned *cnt)
{
struct print *p = (struct print *) h->_vd;
if (p->prefix)
printf("%s", p->prefix);
else if (h->node)
printf("Node %s: ", node_name(h->node));
else if (h->path)
printf("Path %s process: ", path_name(h->path));
printf("Path %s: ", path_name(h->path));
io_print(&p->io, smps, *cnt);
@ -181,8 +155,6 @@ static struct plugin p = {
.destroy = print_destroy,
.start = print_start,
.stop = print_stop,
.read = print_read,
.write = print_write,
.process = print_process,
.size = sizeof(struct print)
}

View file

@ -54,7 +54,7 @@ static int restart_stop(struct hook *h)
return 0;
}
static int restart_read(struct hook *h, struct sample *smps[], unsigned *cnt)
static int restart_process(struct hook *h, struct sample *smps[], unsigned *cnt)
{
unsigned i;
struct restart *r = (struct restart *) h->_vd;
@ -67,7 +67,7 @@ static int restart_read(struct hook *h, struct sample *smps[], unsigned *cnt)
if (prev) {
/* A wrap around of the sequence no should not be treated as a simulation restart */
if (cur->sequence == 0 && prev->sequence <= (int) (UINT32_MAX - 128)) {
if (cur->sequence == 0 && prev->sequence != 0 && prev->sequence > UINT64_MAX - 16) {
warn("Simulation from node %s restarted (previous->sequence=%" PRIu64 ", current->sequence=%" PRIu64 ")",
node_name(h->node), prev->sequence, cur->sequence);
@ -106,7 +106,7 @@ static struct plugin p = {
.hook = {
.flags = HOOK_NODE | HOOK_BUILTIN,
.priority = 1,
.read = restart_read,
.process = restart_process,
.start = restart_start,
.stop = restart_stop,
.size = sizeof(struct restart)

View file

@ -48,7 +48,7 @@ static int shift_seq_parse(struct hook *h, json_t *cfg)
return 0;
}
static int shift_seq_read(struct hook *h, struct sample *smps[], unsigned *cnt)
static int shift_seq_process(struct hook *h, struct sample *smps[], unsigned *cnt)
{
struct shift *p = (struct shift *) h->_vd;
@ -66,8 +66,7 @@ static struct plugin p = {
.flags = HOOK_NODE | HOOK_PATH,
.priority = 99,
.parse = shift_seq_parse,
.read = shift_seq_read,
.process = shift_seq_read,
.process = shift_seq_process,
.size = sizeof(struct shift),
}
};

View file

@ -77,7 +77,7 @@ static int shift_ts_parse(struct hook *h, json_t *cfg)
return 0;
}
static int shift_ts_read(struct hook *h, struct sample *smps[], unsigned *cnt)
static int shift_ts_process(struct hook *h, struct sample *smps[], unsigned *cnt)
{
struct shift_ts *p = (struct shift_ts *) h->_vd;
@ -106,8 +106,7 @@ static struct plugin p = {
.priority = 99,
.init = shift_ts_init,
.parse = shift_ts_parse,
.read = shift_ts_read,
.process = shift_ts_read,
.process = shift_ts_process,
.size = sizeof(struct shift_ts)
}
};

View file

@ -89,7 +89,7 @@ static int skip_first_restart(struct hook *h)
return 0;
}
static int skip_first_read(struct hook *h, struct sample *smps[], unsigned *cnt)
static int skip_first_process(struct hook *h, struct sample *smps[], unsigned *cnt)
{
struct skip_first *p = (struct skip_first *) h->_vd;
@ -154,8 +154,7 @@ static struct plugin p = {
.parse = skip_first_parse,
.start = skip_first_restart,
.restart = skip_first_restart,
.read = skip_first_read,
.process = skip_first_read,
.process = skip_first_process,
.size = sizeof(struct skip_first)
}
};

View file

@ -155,7 +155,7 @@ static int stats_collect_parse(struct hook *h, json_t *cfg)
return 0;
}
static int stats_collect_read(struct hook *h, struct sample *smps[], unsigned *cnt)
static int stats_collect_process(struct hook *h, struct sample *smps[], unsigned *cnt)
{
struct stats_collect *p = (struct stats_collect *) h->_vd;
struct stats *s = &p->stats;
@ -190,6 +190,7 @@ static int stats_collect_read(struct hook *h, struct sample *smps[], unsigned *c
return 0;
}
__attribute__((unused))
static int stats_collect_write(struct hook *h, struct sample *smps[], unsigned *cnt)
{
struct stats_collect *p = (struct stats_collect *) h->_vd;
@ -215,8 +216,7 @@ static struct plugin p = {
.destroy = stats_collect_destroy,
.start = stats_collect_start,
.stop = stats_collect_stop,
.read = stats_collect_read,
.write = stats_collect_write,
.process = stats_collect_process,
.restart = stats_collect_restart,
.periodic = stats_collect_periodic,
.parse = stats_collect_parse,

View file

@ -29,7 +29,7 @@
#include <villas/timing.h>
#include <villas/sample.h>
static int ts_read(struct hook *h, struct sample *smps[], unsigned *cnt)
static int ts_process(struct hook *h, struct sample *smps[], unsigned *cnt)
{
for (int i = 0; i < *cnt; i++)
smps[i]->ts.origin = smps[i]->ts.received;
@ -44,7 +44,7 @@ static struct plugin p = {
.hook = {
.flags = HOOK_NODE,
.priority = 99,
.read = ts_read
.process = ts_process
}
};

View file

@ -411,7 +411,7 @@ int node_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rel
#ifdef WITH_HOOKS
/* Run read hooks */
int rread = hook_read_list(&n->in.hooks, smps, nread);
int rread = hook_process_list(&n->in.hooks, smps, nread);
int skipped = nread - rread;
if (skipped > 0 && n->stats != NULL) {
@ -437,7 +437,7 @@ int node_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re
#ifdef WITH_HOOKS
/* Run write hooks */
cnt = hook_write_list(&n->out.hooks, smps, cnt);
cnt = hook_process_list(&n->out.hooks, smps, cnt);
if (cnt <= 0)
return cnt;
#endif /* WITH_HOOKS */

View file

@ -239,9 +239,7 @@ check: if (optarg == endptr)
unsigned send = recv;
hook_read(&h, smps, (unsigned *) &send);
hook_process(&h, smps, (unsigned *) &send);
hook_write(&h, smps, (unsigned *) &send);
sent = io_print(&io, smps, send);
if (sent < 0)