diff --git a/server/include/hooks.h b/server/include/hooks.h index 55ece2ced..89ad082d8 100644 --- a/server/include/hooks.h +++ b/server/include/hooks.h @@ -23,27 +23,37 @@ /* The configuration of hook parameters is done in "config.h" */ -struct msg; +/* Forward declarations */ struct path; /** Callback type of hook function * - * @param m The last message which has been received * @param p The path which is processing this message. - * @param ts The timestamp when the message(s) were received. * @retval 0 Success. Continue processing and forwarding the message. * @retval <0 Error. Drop the message. */ -typedef int (*hook_cb_t)(struct msg *m, struct path *p, struct timespec *ts); +typedef int (*hook_cb_t)(struct path *p); -/** This is a static list of available hooks. - * - * It's used by hook_lookup to parse hook identfiers from the configuration file. - * The list must be terminated by NULL pointers! - */ -struct hook_id { - hook_cb_t cb; +enum hook_type { + HOOK_PATH_START, /**< Called whenever a path is started; before threads are created. */ + HOOK_PATH_STOP, /**< Called whenever a path is stopped; after threads are destoyed. */ + HOOK_PATH_RESTART, /**< Called whenever a new simulation case is started. This is detected by a sequence no equal to zero. */ + + HOOK_PRE, /**< Called when a new packet of messages (samples) was received. */ + HOOK_POST, /**< Called after each message (sample) of a packet was processed. */ + HOOK_MSG, /**< Called for each message (sample) in a packet. */ + + HOOK_PERIODIC, /**< Called periodically. Period is set by global 'stats' option in the configuration file. */ + + HOOK_MAX +}; + +/** Descriptor for user defined hooks. See hook_list[]. */ +struct hook { + int priority; + hook_cb_t callback; const char *name; + enum hook_type type; }; /** Get a function pointer of a hook function by its name @@ -52,24 +62,47 @@ struct hook_id { * @retval NULL There is no hook registred with name. * @retval >0 A function pointer to the requested hook_cb_t hook. */ -hook_cb_t hook_lookup(const char *name); +const struct hook * hook_lookup(const char *name); + +/** Conditionally execute the hooks + * + * @param p A pointer to the path structure. + * @param t Which type of hooks should be executed? + * @retval 0 All registred hooks for the specified type have been executed successfully. + * @retval <0 On of the hook functions signalized, that the processing should be aborted; message should be skipped. + */ +int hook_run(struct path *p, enum hook_type t); + + +/* The following prototypes are example hooks */ /** Example hook: Print the message. */ -int hook_print(struct msg *m, struct path *p, struct timespec *ts); +int hook_print(struct path *p); /** Example hook: Drop messages. */ -int hook_decimate(struct msg *m, struct path *p, struct timespec *ts); +int hook_decimate(struct path *p); /** Example hook: Convert the message values to fixed precision. */ -int hook_tofixed(struct msg *m, struct path *p, struct timespec *ts); +int hook_tofixed(struct path *p); /** Example hook: overwrite timestamp of message. */ -int hook_ts(struct msg *m, struct path *p, struct timespec *ts); +int hook_ts(struct path *p); /** Example hook: Finite-Impulse-Response (FIR) filter. */ -int hook_fir(struct msg *m, struct path *p, struct timespec *ts); +int hook_fir(struct path *p); /** Example hook: Discrete Fourier Transform */ -int hook_dft(struct msg *m, struct path *p, struct timespec *ts); +int hook_dft(struct path *p); + +/* The following prototypes are core hook functions */ + +/** Core hook: verify message headers. Invalid messages will be dropped. */ +int hook_verify(struct path *p); + +/** Core hook: reset the path in case a new simulation was started. */ +int hook_restart(struct path *p); + +/** Core hook: check if sequence number is correct. Otherwise message will be dropped */ +int hook_drop(struct path *p); #endif /** _HOOKS_H_ @} */ diff --git a/server/include/path.h b/server/include/path.h index 387148d02..71b0d64e0 100644 --- a/server/include/path.h +++ b/server/include/path.h @@ -35,7 +35,7 @@ struct path /** List of all outgoing nodes */ struct list destinations; /** List of function pointers to hooks */ - struct list hooks; + struct list hooks[HOOK_MAX]; /** Timer file descriptor for fixed rate sending */ int tfd; @@ -59,6 +59,8 @@ struct path /** A pointer to the libconfig object which instantiated this path */ config_setting_t *cfg; + /* The following fields are mostly managed by hook_ functions */ + /** Histogram of sequence number displacement of received messages */ struct hist hist_seq; /** Histogram for delay of received messages */ diff --git a/server/src/cfg.c b/server/src/cfg.c index 666e2bc65..1e9359022 100644 --- a/server/src/cfg.c +++ b/server/src/cfg.c @@ -211,16 +211,18 @@ int config_parse_nodelist(config_setting_t *cfg, struct list *nodes, struct list int config_parse_hooklist(config_setting_t *cfg, struct list *hooks) { const char *str; - hook_cb_t hook; - + const struct hook *hook; + switch (config_setting_type(cfg)) { case CONFIG_TYPE_STRING: str = config_setting_get_string(cfg); hook = hook_lookup(str); if (!hook) cerror(cfg, "Invalid hook function '%s'", str); - - list_push(hooks, hook); + + debug(10, "Adding hook %s to chain %u with prio %u", hook->name, hook->type, hook->priority); + + list_insert(&hooks[hook->type], hook->priority, hook->callback); break; case CONFIG_TYPE_ARRAY: @@ -230,7 +232,7 @@ int config_parse_hooklist(config_setting_t *cfg, struct list *hooks) { if (!hook) cerror(config_setting_get_elem(cfg, i), "Invalid hook function '%s'", str); - list_push(hooks, hook); + list_insert(&hooks[hook->type], hook->priority, hook->callback); } break; diff --git a/server/src/hooks.c b/server/src/hooks.c index 3d2c9097d..900e9bca7 100644 --- a/server/src/hooks.c +++ b/server/src/hooks.c @@ -1,8 +1,8 @@ /** Hook funktions * - * Every path can register a hook function which is called for every received - * message. This can be used to debug the data flow, get statistics - * or alter the message. + * Every path can register hook functions which are called at specific events. + * A list of supported events is described by enum hook_flags. + * Please note that there are several hook callbacks which are hard coded into path_create(). * * This file includes some examples. * @@ -21,34 +21,44 @@ #include "path.h" #include "utils.h" -/* The configuration of hook parameters is done in "config.h" */ +/* Some hooks can be configured by constants in te file "config.h" */ -/* Plausability checks */ -#if HOOK_MULTIPLEX_RATIO > POOL_SIZE - #error "POOL_SIZE is too small for given HOOK_MULTIPLEX_RATIO" -#endif - -/** @todo Make const */ -static struct hook_id hook_list[] = { - { hook_print, "print" }, - { hook_decimate, "decimate" }, - { hook_tofixed, "tofixed" }, - { hook_ts, "ts" }, - { hook_fir, "fir" }, - { hook_dft, "dft"} +/** This is a static list of available hooks. + * + * It's used by hook_lookup to parse hook identfiers from the configuration file. + * The list must be terminated by NULL pointers! + */ +static const struct hook hook_list[] = { +/* Priority, Callback, Name, Type */ + { 99, hook_print, "print", HOOK_MSG }, + { 99, hook_decimate, "decimate", HOOK_POST }, + { 99, hook_tofixed, "tofixed", HOOK_MSG }, + { 99, hook_ts, "ts", HOOK_MSG }, + { 99, hook_fir, "fir", HOOK_POST }, + { 99, hook_dft, "dft", HOOK_POST } }; -hook_cb_t hook_lookup(const char *name) +const struct hook* hook_lookup(const char *name) { for (int i=0; ihooks[t], it) + ret += ((hook_cb_t) it->ptr)(p); + + return ret; +} -int hook_print(struct msg *m, struct path *p, struct timespec *ts) +int hook_print(struct path *p) { struct msg *m = p->current; struct timespec ts = MSG_TS(m); @@ -59,16 +69,17 @@ int hook_print(struct msg *m, struct path *p, struct timespec *ts) return 0; } -int hook_tofixed(struct msg *m, struct path *p, struct timespec *ts) +int hook_tofixed(struct path *p) { - for (int i=0; ilength; i++) { + struct msg *m = p->current; + + for (int i=0; ilength; i++) m->data[i].i = m->data[i].f * 1e3; - } return 0; } -int hook_ts(struct msg *m, struct path *p, struct timespec *ts) +int hook_ts(struct path *p) { struct msg *m = p->current; @@ -78,7 +89,7 @@ int hook_ts(struct msg *m, struct path *p, struct timespec *ts) return 0; } -int hook_fir(struct msg *m, struct path *p, struct timespec *ts) +int hook_fir(struct path *p) { /** Simple FIR-LP: F_s = 1kHz, F_pass = 100 Hz, F_block = 300 * Tip: Use MATLAB's filter design tool and export coefficients @@ -101,19 +112,60 @@ int hook_fir(struct msg *m, struct path *p, struct timespec *ts) sum += coeffs[i] * old->data[HOOK_FIR_INDEX].f; } - m->data[HOOK_FIR_INDEX].f = sum; + p->current->data[HOOK_FIR_INDEX].f = sum; return 0; } -int hook_decimate(struct msg *m, struct path *p, struct timespec *ts) +int hook_decimate(struct path *p) { /* Only sent every HOOK_DECIMATE_RATIO'th message */ - return m->sequence % HOOK_DECIMATE_RATIO; + return p->received % HOOK_DECIMATE_RATIO; } /** @todo Implement */ -int hook_dft(struct msg *m, struct path *p, struct timespec *ts) +int hook_dft(struct path *p) { return 0; +} + +/** System hooks */ + +int hook_restart(struct path *p) +{ + if (p->current->sequence == 0 && + p->previous->sequence <= UINT32_MAX - 32) { + char buf[33]; + path_print(p, buf, sizeof(buf)); + warn("Simulation for path %s restarted (prev->seq=%u, current->seq=%u)", + buf, p->previous->sequence, p->current->sequence); + + path_reset(p); + } + + return 0; +} + +int hook_verify(struct path *p) +{ + int reason = msg_verify(p->current); + if (reason) { + p->invalid++; + warn("Received invalid message (reason=%d)", reason); + + return -1; + } + + return 0; +} + +int hook_drop(struct path *p) +{ + int dist = p->current->sequence - (int32_t) p->previous->sequence; + if (dist <= 0 && p->received > 1) { + p->dropped++; + return -1; + } + else + return 0; } \ No newline at end of file diff --git a/server/src/path.c b/server/src/path.c index bcd6b2eb9..39c49b8df 100644 --- a/server/src/path.c +++ b/server/src/path.c @@ -72,7 +72,7 @@ static void * path_run(void *arg) p->previous = p->current = p->pool; /* Main thread loop */ -skip: for(;;) { + for(;;) { /* Receive message */ int recv = node_read(p->in, p->pool, p->poolsize, p->received, p->in->combine); @@ -81,8 +81,14 @@ skip: for(;;) { debug(10, "Received %u messages from node '%s'", recv, p->in->name); + /* Run preprocessing hooks */ + if (hook_run(p, HOOK_PRE)) { + p->skipped += recv; + continue; + } + /* For each received message... */ - for (int i=0; iprevious = &p->pool[(p->received-1) % p->poolsize]; p->current = &p->pool[ p->received % p->poolsize]; @@ -91,42 +97,17 @@ skip: for(;;) { p->received++; - /* Check header fields */ - if (msg_verify(p->current)) { - p->invalid++; - warn("Received invalid message!"); - goto skip; /* Drop message */ - } - - /* Handle wrap-around of sequence number */ - int dist = (UINT32_MAX + p->current->sequence - p->previous->sequence) % UINT32_MAX; - if (dist > UINT32_MAX / 2) - dist -= UINT32_MAX; - - /* Update sequence histogram */ - hist_put(&p->hist_seq, dist); - - /* Handle simulation restart */ - if (p->current->sequence == 0 && abs(dist) >= 1) { - char buf[33]; - path_print(p, buf, sizeof(buf)); - warn("Simulation for path %s restarted (prev->seq=%u, current->seq=%u, dist=%d)", - buf, p->previous->sequence, p->current->sequence, dist); - - path_reset(p); - } - else if (dist <= 0 && p->received > 1) { - p->dropped++; - goto skip; + /* Run hooks for filtering, stats collection and manipulation */ + if (hook_run(p, HOOK_MSG)) { + p->skipped++; + continue; } } - /* Call hook callbacks */ - FOREACH(&p->hooks, it) { - if (it->hook(p->current, p, &ts)) { - p->skipped++; - goto skip; - } + /* Run post processing hooks */ + if (hook_run(p, HOOK_POST)) { + p->skipped += recv; + continue; } /* At fixed rate mode, messages are send by another thread */ @@ -143,6 +124,9 @@ int path_start(struct path *p) path_print(p, buf, sizeof(buf)); info("Starting path: %s (poolsize = %u)", buf, p->poolsize); + + if (hook_run(p, HOOK_PATH_START)) + return -1; /* At fixed rate mode, we start another thread for sending */ if (p->rate) @@ -167,40 +151,13 @@ int path_stop(struct path *p) close(p->tfd); } - - if (p->received) { - info("Delay distribution:"); - hist_print(&p->hist_delay); - info("Sequence number displacement:"); - hist_print(&p->hist_seq); - } + + if (hook_run(p, HOOK_PATH_STOP)) + return -1; return 0; } -int path_reset(struct path *p) -{ - p->sent = 0; - p->received = 1; - p->invalid = 0; - p->skipped = 0; - p->dropped = 0; - - hist_reset(&p->hist_seq); - hist_reset(&p->hist_delay); - - return 0; -} - -void path_print_stats(struct path *p) -{ - char buf[33]; - path_print(p, buf, sizeof(buf)); - - info("%-32s : %-8u %-8u %-8u %-8u %-8u", buf, - p->sent, p->received, p->dropped, p->skipped, p->invalid); -} - int path_print(struct path *p, char *buf, int len) { *buf = 0; @@ -219,15 +176,34 @@ int path_print(struct path *p, char *buf, int len) return 0; } +int path_reset(struct path *p) +{ + if (hook_run(p, HOOK_PATH_RESTART)) + return -1; + + p->sent = + p->received = + p->invalid = + p->skipped = + p->dropped = 0; + + return 0; +} + struct path * path_create() { struct path *p = alloc(sizeof(struct path)); list_init(&p->destinations, NULL); - list_init(&p->hooks, NULL); + + for (int i = 0; i < HOOK_MAX; i++) + list_init(&p->hooks[i], NULL); - hist_create(&p->hist_seq, -HIST_SEQ, +HIST_SEQ, 1); - hist_create(&p->hist_delay, 0, 2, 100e-3); +#define hook_add(type, priority, cb) list_insert(&p->hooks[type], priority, cb) + + hook_add(HOOK_MSG, 1, hook_verify); + hook_add(HOOK_MSG, 2, hook_restart); + hook_add(HOOK_MSG, 3, hook_drop); return p; } @@ -235,10 +211,10 @@ struct path * path_create() void path_destroy(struct path *p) { list_destroy(&p->destinations); - list_destroy(&p->hooks); - hist_destroy(&p->hist_seq); - hist_destroy(&p->hist_delay); + for (int i = 0; i < HOOK_MAX; i++) + list_destroy(&p->hooks[i]); + free(p->pool); free(p); } diff --git a/server/src/server.c b/server/src/server.c index e4d708e58..694d1677c 100644 --- a/server/src/server.c +++ b/server/src/server.c @@ -164,7 +164,7 @@ int main(int argc, char *argv[]) for (;;) FOREACH(&paths, it) { usleep(settings.stats * 1e6); - path_print_stats(it->path); + hook_run(it->path, HOOK_PERIODIC); } }