mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
rewrite of hook system (much more powerful now :-)
This commit is contained in:
parent
2597fff110
commit
1b9bfc155a
6 changed files with 190 additions and 125 deletions
|
@ -23,27 +23,37 @@
|
|||
|
||||
/* The configuration of hook parameters is done in "config.h" */
|
||||
|
||||
struct msg;
|
||||
/* Forward declarations */
|
||||
struct path;
|
||||
|
||||
/** Callback type of hook function
|
||||
*
|
||||
* @param m The last message which has been received
|
||||
* @param p The path which is processing this message.
|
||||
* @param ts The timestamp when the message(s) were received.
|
||||
* @retval 0 Success. Continue processing and forwarding the message.
|
||||
* @retval <0 Error. Drop the message.
|
||||
*/
|
||||
typedef int (*hook_cb_t)(struct msg *m, struct path *p, struct timespec *ts);
|
||||
typedef int (*hook_cb_t)(struct path *p);
|
||||
|
||||
/** This is a static list of available hooks.
|
||||
*
|
||||
* It's used by hook_lookup to parse hook identfiers from the configuration file.
|
||||
* The list must be terminated by NULL pointers!
|
||||
*/
|
||||
struct hook_id {
|
||||
hook_cb_t cb;
|
||||
enum hook_type {
|
||||
HOOK_PATH_START, /**< Called whenever a path is started; before threads are created. */
|
||||
HOOK_PATH_STOP, /**< Called whenever a path is stopped; after threads are destoyed. */
|
||||
HOOK_PATH_RESTART, /**< Called whenever a new simulation case is started. This is detected by a sequence no equal to zero. */
|
||||
|
||||
HOOK_PRE, /**< Called when a new packet of messages (samples) was received. */
|
||||
HOOK_POST, /**< Called after each message (sample) of a packet was processed. */
|
||||
HOOK_MSG, /**< Called for each message (sample) in a packet. */
|
||||
|
||||
HOOK_PERIODIC, /**< Called periodically. Period is set by global 'stats' option in the configuration file. */
|
||||
|
||||
HOOK_MAX
|
||||
};
|
||||
|
||||
/** Descriptor for user defined hooks. See hook_list[]. */
|
||||
struct hook {
|
||||
int priority;
|
||||
hook_cb_t callback;
|
||||
const char *name;
|
||||
enum hook_type type;
|
||||
};
|
||||
|
||||
/** Get a function pointer of a hook function by its name
|
||||
|
@ -52,24 +62,47 @@ struct hook_id {
|
|||
* @retval NULL There is no hook registred with name.
|
||||
* @retval >0 A function pointer to the requested hook_cb_t hook.
|
||||
*/
|
||||
hook_cb_t hook_lookup(const char *name);
|
||||
const struct hook * hook_lookup(const char *name);
|
||||
|
||||
/** Conditionally execute the hooks
|
||||
*
|
||||
* @param p A pointer to the path structure.
|
||||
* @param t Which type of hooks should be executed?
|
||||
* @retval 0 All registred hooks for the specified type have been executed successfully.
|
||||
* @retval <0 On of the hook functions signalized, that the processing should be aborted; message should be skipped.
|
||||
*/
|
||||
int hook_run(struct path *p, enum hook_type t);
|
||||
|
||||
|
||||
/* The following prototypes are example hooks */
|
||||
|
||||
/** Example hook: Print the message. */
|
||||
int hook_print(struct msg *m, struct path *p, struct timespec *ts);
|
||||
int hook_print(struct path *p);
|
||||
|
||||
/** Example hook: Drop messages. */
|
||||
int hook_decimate(struct msg *m, struct path *p, struct timespec *ts);
|
||||
int hook_decimate(struct path *p);
|
||||
|
||||
/** Example hook: Convert the message values to fixed precision. */
|
||||
int hook_tofixed(struct msg *m, struct path *p, struct timespec *ts);
|
||||
int hook_tofixed(struct path *p);
|
||||
|
||||
/** Example hook: overwrite timestamp of message. */
|
||||
int hook_ts(struct msg *m, struct path *p, struct timespec *ts);
|
||||
int hook_ts(struct path *p);
|
||||
|
||||
/** Example hook: Finite-Impulse-Response (FIR) filter. */
|
||||
int hook_fir(struct msg *m, struct path *p, struct timespec *ts);
|
||||
int hook_fir(struct path *p);
|
||||
|
||||
/** Example hook: Discrete Fourier Transform */
|
||||
int hook_dft(struct msg *m, struct path *p, struct timespec *ts);
|
||||
int hook_dft(struct path *p);
|
||||
|
||||
/* The following prototypes are core hook functions */
|
||||
|
||||
/** Core hook: verify message headers. Invalid messages will be dropped. */
|
||||
int hook_verify(struct path *p);
|
||||
|
||||
/** Core hook: reset the path in case a new simulation was started. */
|
||||
int hook_restart(struct path *p);
|
||||
|
||||
/** Core hook: check if sequence number is correct. Otherwise message will be dropped */
|
||||
int hook_drop(struct path *p);
|
||||
|
||||
#endif /** _HOOKS_H_ @} */
|
||||
|
|
|
@ -35,7 +35,7 @@ struct path
|
|||
/** List of all outgoing nodes */
|
||||
struct list destinations;
|
||||
/** List of function pointers to hooks */
|
||||
struct list hooks;
|
||||
struct list hooks[HOOK_MAX];
|
||||
|
||||
/** Timer file descriptor for fixed rate sending */
|
||||
int tfd;
|
||||
|
@ -59,6 +59,8 @@ struct path
|
|||
/** A pointer to the libconfig object which instantiated this path */
|
||||
config_setting_t *cfg;
|
||||
|
||||
/* The following fields are mostly managed by hook_ functions */
|
||||
|
||||
/** Histogram of sequence number displacement of received messages */
|
||||
struct hist hist_seq;
|
||||
/** Histogram for delay of received messages */
|
||||
|
|
|
@ -211,16 +211,18 @@ int config_parse_nodelist(config_setting_t *cfg, struct list *nodes, struct list
|
|||
|
||||
int config_parse_hooklist(config_setting_t *cfg, struct list *hooks) {
|
||||
const char *str;
|
||||
hook_cb_t hook;
|
||||
|
||||
const struct hook *hook;
|
||||
|
||||
switch (config_setting_type(cfg)) {
|
||||
case CONFIG_TYPE_STRING:
|
||||
str = config_setting_get_string(cfg);
|
||||
hook = hook_lookup(str);
|
||||
if (!hook)
|
||||
cerror(cfg, "Invalid hook function '%s'", str);
|
||||
|
||||
list_push(hooks, hook);
|
||||
|
||||
debug(10, "Adding hook %s to chain %u with prio %u", hook->name, hook->type, hook->priority);
|
||||
|
||||
list_insert(&hooks[hook->type], hook->priority, hook->callback);
|
||||
break;
|
||||
|
||||
case CONFIG_TYPE_ARRAY:
|
||||
|
@ -230,7 +232,7 @@ int config_parse_hooklist(config_setting_t *cfg, struct list *hooks) {
|
|||
if (!hook)
|
||||
cerror(config_setting_get_elem(cfg, i), "Invalid hook function '%s'", str);
|
||||
|
||||
list_push(hooks, hook);
|
||||
list_insert(&hooks[hook->type], hook->priority, hook->callback);
|
||||
}
|
||||
break;
|
||||
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
/** Hook funktions
|
||||
*
|
||||
* Every path can register a hook function which is called for every received
|
||||
* message. This can be used to debug the data flow, get statistics
|
||||
* or alter the message.
|
||||
* Every path can register hook functions which are called at specific events.
|
||||
* A list of supported events is described by enum hook_flags.
|
||||
* Please note that there are several hook callbacks which are hard coded into path_create().
|
||||
*
|
||||
* This file includes some examples.
|
||||
*
|
||||
|
@ -21,34 +21,44 @@
|
|||
#include "path.h"
|
||||
#include "utils.h"
|
||||
|
||||
/* The configuration of hook parameters is done in "config.h" */
|
||||
/* Some hooks can be configured by constants in te file "config.h" */
|
||||
|
||||
/* Plausability checks */
|
||||
#if HOOK_MULTIPLEX_RATIO > POOL_SIZE
|
||||
#error "POOL_SIZE is too small for given HOOK_MULTIPLEX_RATIO"
|
||||
#endif
|
||||
|
||||
/** @todo Make const */
|
||||
static struct hook_id hook_list[] = {
|
||||
{ hook_print, "print" },
|
||||
{ hook_decimate, "decimate" },
|
||||
{ hook_tofixed, "tofixed" },
|
||||
{ hook_ts, "ts" },
|
||||
{ hook_fir, "fir" },
|
||||
{ hook_dft, "dft"}
|
||||
/** This is a static list of available hooks.
|
||||
*
|
||||
* It's used by hook_lookup to parse hook identfiers from the configuration file.
|
||||
* The list must be terminated by NULL pointers!
|
||||
*/
|
||||
static const struct hook hook_list[] = {
|
||||
/* Priority, Callback, Name, Type */
|
||||
{ 99, hook_print, "print", HOOK_MSG },
|
||||
{ 99, hook_decimate, "decimate", HOOK_POST },
|
||||
{ 99, hook_tofixed, "tofixed", HOOK_MSG },
|
||||
{ 99, hook_ts, "ts", HOOK_MSG },
|
||||
{ 99, hook_fir, "fir", HOOK_POST },
|
||||
{ 99, hook_dft, "dft", HOOK_POST }
|
||||
};
|
||||
|
||||
hook_cb_t hook_lookup(const char *name)
|
||||
const struct hook* hook_lookup(const char *name)
|
||||
{
|
||||
for (int i=0; i<ARRAY_LEN(hook_list); i++) {
|
||||
if (!strcmp(name, hook_list[i].name))
|
||||
return hook_list[i].cb;
|
||||
return &hook_list[i];
|
||||
}
|
||||
|
||||
return NULL; /* No matching hook was found */
|
||||
}
|
||||
|
||||
int hook_run(struct path *p, enum hook_type t)
|
||||
{
|
||||
int ret = 0;
|
||||
|
||||
FOREACH(&p->hooks[t], it)
|
||||
ret += ((hook_cb_t) it->ptr)(p);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int hook_print(struct msg *m, struct path *p, struct timespec *ts)
|
||||
int hook_print(struct path *p)
|
||||
{
|
||||
struct msg *m = p->current;
|
||||
struct timespec ts = MSG_TS(m);
|
||||
|
@ -59,16 +69,17 @@ int hook_print(struct msg *m, struct path *p, struct timespec *ts)
|
|||
return 0;
|
||||
}
|
||||
|
||||
int hook_tofixed(struct msg *m, struct path *p, struct timespec *ts)
|
||||
int hook_tofixed(struct path *p)
|
||||
{
|
||||
for (int i=0; i<m->length; i++) {
|
||||
struct msg *m = p->current;
|
||||
|
||||
for (int i=0; i<m->length; i++)
|
||||
m->data[i].i = m->data[i].f * 1e3;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int hook_ts(struct msg *m, struct path *p, struct timespec *ts)
|
||||
int hook_ts(struct path *p)
|
||||
{
|
||||
struct msg *m = p->current;
|
||||
|
||||
|
@ -78,7 +89,7 @@ int hook_ts(struct msg *m, struct path *p, struct timespec *ts)
|
|||
return 0;
|
||||
}
|
||||
|
||||
int hook_fir(struct msg *m, struct path *p, struct timespec *ts)
|
||||
int hook_fir(struct path *p)
|
||||
{
|
||||
/** Simple FIR-LP: F_s = 1kHz, F_pass = 100 Hz, F_block = 300
|
||||
* Tip: Use MATLAB's filter design tool and export coefficients
|
||||
|
@ -101,19 +112,60 @@ int hook_fir(struct msg *m, struct path *p, struct timespec *ts)
|
|||
sum += coeffs[i] * old->data[HOOK_FIR_INDEX].f;
|
||||
}
|
||||
|
||||
m->data[HOOK_FIR_INDEX].f = sum;
|
||||
p->current->data[HOOK_FIR_INDEX].f = sum;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int hook_decimate(struct msg *m, struct path *p, struct timespec *ts)
|
||||
int hook_decimate(struct path *p)
|
||||
{
|
||||
/* Only sent every HOOK_DECIMATE_RATIO'th message */
|
||||
return m->sequence % HOOK_DECIMATE_RATIO;
|
||||
return p->received % HOOK_DECIMATE_RATIO;
|
||||
}
|
||||
|
||||
/** @todo Implement */
|
||||
int hook_dft(struct msg *m, struct path *p, struct timespec *ts)
|
||||
int hook_dft(struct path *p)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
/** System hooks */
|
||||
|
||||
int hook_restart(struct path *p)
|
||||
{
|
||||
if (p->current->sequence == 0 &&
|
||||
p->previous->sequence <= UINT32_MAX - 32) {
|
||||
char buf[33];
|
||||
path_print(p, buf, sizeof(buf));
|
||||
warn("Simulation for path %s restarted (prev->seq=%u, current->seq=%u)",
|
||||
buf, p->previous->sequence, p->current->sequence);
|
||||
|
||||
path_reset(p);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int hook_verify(struct path *p)
|
||||
{
|
||||
int reason = msg_verify(p->current);
|
||||
if (reason) {
|
||||
p->invalid++;
|
||||
warn("Received invalid message (reason=%d)", reason);
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int hook_drop(struct path *p)
|
||||
{
|
||||
int dist = p->current->sequence - (int32_t) p->previous->sequence;
|
||||
if (dist <= 0 && p->received > 1) {
|
||||
p->dropped++;
|
||||
return -1;
|
||||
}
|
||||
else
|
||||
return 0;
|
||||
}
|
|
@ -72,7 +72,7 @@ static void * path_run(void *arg)
|
|||
p->previous = p->current = p->pool;
|
||||
|
||||
/* Main thread loop */
|
||||
skip: for(;;) {
|
||||
for(;;) {
|
||||
/* Receive message */
|
||||
int recv = node_read(p->in, p->pool, p->poolsize, p->received, p->in->combine);
|
||||
|
||||
|
@ -81,8 +81,14 @@ skip: for(;;) {
|
|||
|
||||
debug(10, "Received %u messages from node '%s'", recv, p->in->name);
|
||||
|
||||
/* Run preprocessing hooks */
|
||||
if (hook_run(p, HOOK_PRE)) {
|
||||
p->skipped += recv;
|
||||
continue;
|
||||
}
|
||||
|
||||
/* For each received message... */
|
||||
for (int i=0; i<recv; i++) {
|
||||
for (int i = 0; i < recv; i++) {
|
||||
p->previous = &p->pool[(p->received-1) % p->poolsize];
|
||||
p->current = &p->pool[ p->received % p->poolsize];
|
||||
|
||||
|
@ -91,42 +97,17 @@ skip: for(;;) {
|
|||
|
||||
p->received++;
|
||||
|
||||
/* Check header fields */
|
||||
if (msg_verify(p->current)) {
|
||||
p->invalid++;
|
||||
warn("Received invalid message!");
|
||||
goto skip; /* Drop message */
|
||||
}
|
||||
|
||||
/* Handle wrap-around of sequence number */
|
||||
int dist = (UINT32_MAX + p->current->sequence - p->previous->sequence) % UINT32_MAX;
|
||||
if (dist > UINT32_MAX / 2)
|
||||
dist -= UINT32_MAX;
|
||||
|
||||
/* Update sequence histogram */
|
||||
hist_put(&p->hist_seq, dist);
|
||||
|
||||
/* Handle simulation restart */
|
||||
if (p->current->sequence == 0 && abs(dist) >= 1) {
|
||||
char buf[33];
|
||||
path_print(p, buf, sizeof(buf));
|
||||
warn("Simulation for path %s restarted (prev->seq=%u, current->seq=%u, dist=%d)",
|
||||
buf, p->previous->sequence, p->current->sequence, dist);
|
||||
|
||||
path_reset(p);
|
||||
}
|
||||
else if (dist <= 0 && p->received > 1) {
|
||||
p->dropped++;
|
||||
goto skip;
|
||||
/* Run hooks for filtering, stats collection and manipulation */
|
||||
if (hook_run(p, HOOK_MSG)) {
|
||||
p->skipped++;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
/* Call hook callbacks */
|
||||
FOREACH(&p->hooks, it) {
|
||||
if (it->hook(p->current, p, &ts)) {
|
||||
p->skipped++;
|
||||
goto skip;
|
||||
}
|
||||
/* Run post processing hooks */
|
||||
if (hook_run(p, HOOK_POST)) {
|
||||
p->skipped += recv;
|
||||
continue;
|
||||
}
|
||||
|
||||
/* At fixed rate mode, messages are send by another thread */
|
||||
|
@ -143,6 +124,9 @@ int path_start(struct path *p)
|
|||
path_print(p, buf, sizeof(buf));
|
||||
|
||||
info("Starting path: %s (poolsize = %u)", buf, p->poolsize);
|
||||
|
||||
if (hook_run(p, HOOK_PATH_START))
|
||||
return -1;
|
||||
|
||||
/* At fixed rate mode, we start another thread for sending */
|
||||
if (p->rate)
|
||||
|
@ -167,40 +151,13 @@ int path_stop(struct path *p)
|
|||
|
||||
close(p->tfd);
|
||||
}
|
||||
|
||||
if (p->received) {
|
||||
info("Delay distribution:");
|
||||
hist_print(&p->hist_delay);
|
||||
info("Sequence number displacement:");
|
||||
hist_print(&p->hist_seq);
|
||||
}
|
||||
|
||||
if (hook_run(p, HOOK_PATH_STOP))
|
||||
return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int path_reset(struct path *p)
|
||||
{
|
||||
p->sent = 0;
|
||||
p->received = 1;
|
||||
p->invalid = 0;
|
||||
p->skipped = 0;
|
||||
p->dropped = 0;
|
||||
|
||||
hist_reset(&p->hist_seq);
|
||||
hist_reset(&p->hist_delay);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void path_print_stats(struct path *p)
|
||||
{
|
||||
char buf[33];
|
||||
path_print(p, buf, sizeof(buf));
|
||||
|
||||
info("%-32s : %-8u %-8u %-8u %-8u %-8u", buf,
|
||||
p->sent, p->received, p->dropped, p->skipped, p->invalid);
|
||||
}
|
||||
|
||||
int path_print(struct path *p, char *buf, int len)
|
||||
{
|
||||
*buf = 0;
|
||||
|
@ -219,15 +176,34 @@ int path_print(struct path *p, char *buf, int len)
|
|||
return 0;
|
||||
}
|
||||
|
||||
int path_reset(struct path *p)
|
||||
{
|
||||
if (hook_run(p, HOOK_PATH_RESTART))
|
||||
return -1;
|
||||
|
||||
p->sent =
|
||||
p->received =
|
||||
p->invalid =
|
||||
p->skipped =
|
||||
p->dropped = 0;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
struct path * path_create()
|
||||
{
|
||||
struct path *p = alloc(sizeof(struct path));
|
||||
|
||||
list_init(&p->destinations, NULL);
|
||||
list_init(&p->hooks, NULL);
|
||||
|
||||
for (int i = 0; i < HOOK_MAX; i++)
|
||||
list_init(&p->hooks[i], NULL);
|
||||
|
||||
hist_create(&p->hist_seq, -HIST_SEQ, +HIST_SEQ, 1);
|
||||
hist_create(&p->hist_delay, 0, 2, 100e-3);
|
||||
#define hook_add(type, priority, cb) list_insert(&p->hooks[type], priority, cb)
|
||||
|
||||
hook_add(HOOK_MSG, 1, hook_verify);
|
||||
hook_add(HOOK_MSG, 2, hook_restart);
|
||||
hook_add(HOOK_MSG, 3, hook_drop);
|
||||
|
||||
return p;
|
||||
}
|
||||
|
@ -235,10 +211,10 @@ struct path * path_create()
|
|||
void path_destroy(struct path *p)
|
||||
{
|
||||
list_destroy(&p->destinations);
|
||||
list_destroy(&p->hooks);
|
||||
hist_destroy(&p->hist_seq);
|
||||
hist_destroy(&p->hist_delay);
|
||||
|
||||
for (int i = 0; i < HOOK_MAX; i++)
|
||||
list_destroy(&p->hooks[i]);
|
||||
|
||||
free(p->pool);
|
||||
free(p);
|
||||
}
|
||||
|
|
|
@ -164,7 +164,7 @@ int main(int argc, char *argv[])
|
|||
|
||||
for (;;) FOREACH(&paths, it) {
|
||||
usleep(settings.stats * 1e6);
|
||||
path_print_stats(it->path);
|
||||
hook_run(it->path, HOOK_PERIODIC);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue