diff --git a/include/hooks.h b/include/hooks.h index a5fb69ca9..71cdbd403 100644 --- a/include/hooks.h +++ b/include/hooks.h @@ -23,10 +23,20 @@ #include -#define REGISTER_HOOK(name, prio, fnc, type) \ -__attribute__((constructor)) void __register_ ## fnc () { \ - static struct hook h = { name, NULL, prio, type, NULL, fnc }; \ - list_push(&hooks, &h); \ +#include "queue.h" +#include "list.h" + +#define REGISTER_HOOK(nam, desc, prio, hist, fnc, typ) \ +__attribute__((constructor)) void __register_ ## fnc () { \ + static struct hook h = { \ + .name = nam, \ + .description = desc, \ + .priority = prio, \ + .history = hist, \ + .type = typ, \ + .cb = fnc \ + }; \ + list_push(&hooks, &h); \ } /* The configuration of hook parameters is done in "config.h" */ @@ -34,6 +44,8 @@ __attribute__((constructor)) void __register_ ## fnc () { \ /* Forward declarations */ struct path; struct hook; +struct sample; +struct settings; /** This is a list of hooks which can be used in the configuration file. */ extern struct list hooks; @@ -42,34 +54,40 @@ extern struct list hooks; * * @param p The path which is processing this message. * @param h The hook datastructure which contains parameter, name and private context for the hook. + * @param m A pointer to the first message which should be processed by the hook. + * @param cnt The number of messages which should be processed by the hook. * @param when Provides the type of hook for which this occurence of the callback function was executed. See hook_type for possible values. * @retval 0 Success. Continue processing and forwarding the message. * @retval <0 Error. Drop the message. */ -typedef int (*hook_cb_t)(struct path *p, struct hook *h, int when); +typedef int (*hook_cb_t)(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt); /** The type of a hook defines when a hook will be exectuted. This is used as a bitmask. */ enum hook_type { - HOOK_PATH_START = 1 << 0, /**< Called whenever a path is started; before threads are created. */ - HOOK_PATH_STOP = 1 << 1, /**< Called whenever a path is stopped; after threads are destoyed. */ - HOOK_PATH_RESTART = 1 << 2, /**< Called whenever a new simulation case is started. This is detected by a sequence no equal to zero. */ + HOOK_PATH_START = 1 << 0, /**< Called whenever a path is started; before threads are created. */ + HOOK_PATH_STOP = 1 << 1, /**< Called whenever a path is stopped; after threads are destoyed. */ + HOOK_PATH_RESTART = 1 << 2, /**< Called whenever a new simulation case is started. This is detected by a sequence no equal to zero. */ + + HOOK_READ = 1 << 3, /**< Called for every single received samples. */ + HOOK_WRITE = 1 << 4, /**< Called for every single sample which will be sent. */ + + HOOK_ASYNC = 1 << 7, /**< Called asynchronously with fixed rate (see path::rate). */ + HOOK_PERIODIC = 1 << 8, /**< Called periodically. Period is set by global 'stats' option in the configuration file. */ + + HOOK_INIT = 1 << 9, /**< Called before path is started to parse parameters. */ + HOOK_DEINIT = 1 << 10, /**< Called after path has been stopped to release memory allocated by HOOK_INIT */ + + HOOK_INTERNAL = 1 << 11, /**< Internal hooks are added to every path implicitely. */ + HOOK_PARSE = 1 << 12, /**< Called for parsing hook arguments. */ - HOOK_PRE = 1 << 3, /**< Called when a new packet of messages (samples) was received. */ - HOOK_POST = 1 << 4, /**< Called after each message (sample) of a packet was processed. */ - HOOK_MSG = 1 << 5, /**< Called for each message (sample) in a packet. */ - HOOK_ASYNC = 1 << 6, /**< Called asynchronously with fixed rate (see path::rate). */ - - HOOK_PERIODIC = 1 << 7, /**< Called periodically. Period is set by global 'stats' option in the configuration file. */ - /** @{ Classes of hooks */ - /** Internal hooks are mandatory. */ - HOOK_INTERNAL = 1 << 16, /** Hooks which are using private data must allocate and free them propery. */ - HOOK_PRIVATE = HOOK_PATH_START | HOOK_PATH_STOP, + HOOK_STORAGE = HOOK_INIT | HOOK_DEINIT, /** All path related actions */ HOOK_PATH = HOOK_PATH_START | HOOK_PATH_STOP | HOOK_PATH_RESTART, /** Hooks which are used to collect statistics. */ - HOOK_STATS = HOOK_INTERNAL | HOOK_PRIVATE | HOOK_PATH | HOOK_MSG | HOOK_PRE | HOOK_PERIODIC, + HOOK_STATS = HOOK_INTERNAL | HOOK_STORAGE | HOOK_PATH | HOOK_READ | HOOK_PERIODIC, + /** All hooks */ HOOK_ALL = HOOK_INTERNAL - 1 /** @} */ @@ -79,9 +97,19 @@ enum hook_type { struct hook { const char *name; /**< The unique name of this hook. This must be the first member! */ const char *parameter; /**< A parameter string for this hook. Can be used to configure the hook behaviour. */ + const char *description;/**< A short description of this hook function. */ + int priority; /**< A priority to change the order of execution within one type of hook */ + int history; /**< How many samples of history this hook requires. */ enum hook_type type; /**< The type of the hook as a bitfield */ - void *private; /**< Private data for this hook. This pointer can be used to pass data between consecutive calls of the callback. */ + + void *_vd; /**< Private data for this hook. This pointer can be used to pass data between consecutive calls of the callback. */ + + struct sample *last; + struct sample *prev; + + qptr_t head; + hook_cb_t cb; /**< The hook callback function as a function pointer. */ }; @@ -94,60 +122,37 @@ int hooks_sort_priority(const void *a, const void *b); /** Conditionally execute the hooks * * @param p A pointer to the path structure. - * @param t Which type of hooks should be executed? + * @param when Which type of hooks should be executed? + * @param m An array to of (cnt) pointers to msgs. + * @param cnt The size of the message array. * @retval 0 All registred hooks for the specified type have been executed successfully. * @retval <0 On of the hook functions signalized, that the processing should be aborted; message should be skipped. */ -int hook_run(struct path *p, enum hook_type t); +int hook_run(struct path *p, struct sample *smps[], size_t cnt, int when); -/** The following prototypes are example hooks +/** Allocate & deallocate private memory per hook. * - * @addtogroup hooks_examples Examples for hook functions - * @{ + * Hooks which use this function must be flagged with HOOL_STORAGE. + * + * @param h A pointer to the hook structure. + * @param when Which event cause the hook to be executed? + * @param len The size of hook prvate memory allocation. + * @return A pointer to the allocated memory region or NULL after it was released. */ +void * hook_storage(struct hook *h, int when, size_t len); -/** Example hook: Print the message. */ -int hook_print(struct path *p, struct hook *h, int when); +int hook_print(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt); +int hook_ts(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt); +int hook_convert(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt); +int hook_decimate(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt); +int hook_skip_first(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt); -/** Example hook: Drop messages. */ -int hook_decimate(struct path *p, struct hook *h, int when); - -/** Example hook: Convert the values of a message between fixed (integer) and floating point representation. */ -int hook_convert(struct path *p, struct hook *h, int when); - -/** Example hook: overwrite timestamp of message. */ -int hook_ts(struct path *p, struct hook *h, int when); - -/** Internal hook: add missing timestamps for node types which do not include a valid TS (ex. GTFPGA) */ -int hook_fix_ts(struct path *p, struct hook *h, int when); - -/** Example hook: Finite-Impulse-Response (FIR) filter. */ -int hook_fir(struct path *p, struct hook *h, int when); - -/** Example hook: drop first samples after simulation restart */ -int hook_skip_first(struct path *p, struct hook *h, int when); - -/** Example hook: Skip messages whose values are similiar to the previous ones */ -int hook_skip_unchanged(struct path *p, struct hook *h, int when); - -/* The following hooks are used to implement core functionality */ - -/** Core hook: verify message headers. Invalid messages will be dropped. */ -int hook_verify(struct path *p, struct hook *h, int when); - -/** Core hook: reset the path in case a new simulation was started. */ -int hook_restart(struct path *p, struct hook *h, int when); - -/** Core hook: check if sequence number is correct. Otherwise message will be dropped */ -int hook_drop(struct path *p, struct hook *h, int when); - -/** Core hook: collect statistics */ -int hook_stats(struct path *p, struct hook *h, int when); - -/** Core hook: send path statistics to another node */ -int hook_stats_send(struct path *p, struct hook *h, int when); - -/** Not a hook: just prints header for periodic statistics */ +int hook_stats_send(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt); +int hook_stats(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt); void hook_stats_header(); -#endif /** _HOOKS_H_ @} @} */ +int hook_fix_ts(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt); +int hook_restart(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt); +int hook_drop(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt); + +#endif /** _HOOKS_H_ @} */ diff --git a/lib/hooks-internal.c b/lib/hooks-internal.c new file mode 100644 index 000000000..0d3d2bb7a --- /dev/null +++ b/lib/hooks-internal.c @@ -0,0 +1,105 @@ +/** Internal hook functions. + * + * @author Steffen Vogel + * @copyright 2014-2016, Institute for Automation of Complex Power Systems, EONERC + * This file is part of S2SS. All Rights Reserved. Proprietary and confidential. + * Unauthorized copying of this file, via any medium is strictly prohibited. + *********************************************************************************/ + +#include "hooks.h" +#include "timing.h" +#include "sample.h" +#include "path.h" +#include "utils.h" + +REGISTER_HOOK("fix_ts", "Update timestamps of sample if not set", 0, 0, hook_fix_ts, HOOK_INTERNAL | HOOK_READ) +int hook_fix_ts(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt) +{ + struct timespec now = time_now(); + + for (int i = 0; i < cnt; i++) { + /* Check for missing receive timestamp + * Usually node_type::read() should update the receive timestamp. + * An example would be to use hardware timestamp capabilities of + * modern NICs. + */ + if ((smps[i]->ts.received.tv_sec == 0 && smps[i]->ts.received.tv_nsec == 0) || + (smps[i]->ts.received.tv_sec == -1 && smps[i]->ts.received.tv_nsec == -1)) + smps[i]->ts.received = now; + + /* Check for missing origin timestamp */ + if ((smps[i]->ts.origin.tv_sec == 0 && smps[i]->ts.origin.tv_nsec == 0) || + (smps[i]->ts.origin.tv_sec == -1 && smps[i]->ts.origin.tv_nsec == -1)) + smps[i]->ts.origin = now; + } + + return cnt; +} + +REGISTER_HOOK("restart", "Call restart hooks for current path", 1, 1, hook_restart, HOOK_INTERNAL | HOOK_READ) +int hook_restart(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt) +{ + for (int i = 0; i < cnt; i++) { + h->last = smps[i]; + + if (h->prev) { + if (h->last->sequence == 0 && + h->prev->sequence <= UINT32_MAX - 32) { + warn("Simulation for path %s restarted (prev->seq=%u, current->seq=%u)", + path_name(p), h->prev->sequence, h->last->sequence); + + p->invalid = + p->skipped = + p->dropped = 0; + + hook_run(p, &smps[i], cnt - i, HOOK_PATH_RESTART); + } + } + + h->prev = h->last; + } + + return cnt; +} + +REGISTER_HOOK("drop", "Drop messages with reordered sequence numbers", 3, 1, hook_drop, HOOK_INTERNAL | HOOK_READ) +int hook_drop(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt) +{ + int i, ok, dist; + + for (i = 0, ok = 0; i < cnt; i++) { + h->last = smps[i]; + + if (h->prev) { + dist = h->last->sequence - (int32_t) h->prev->sequence; + if (dist <= 0) { + p->dropped++; + warn("Dropped sample: dist = %d, i = %d", dist, i); + } + else { + struct sample *tmp; + + tmp = smps[i]; + smps[i] = smps[ok]; + smps[ok++] = tmp; + } + + /* To discard the first X samples in 'smps[]' we must + * shift them to the end of the 'smps[]' array. + * In case the hook returns a number 'ok' which is smaller than 'cnt', + * only the first 'ok' samples in 'smps[]' are accepted and further processed. + */ + } + else { + struct sample *tmp; + + tmp = smps[i]; + smps[i] = smps[ok]; + smps[ok++] = tmp; + } + + h->prev = h->last; + } + + return ok; +} diff --git a/lib/hooks-other.c b/lib/hooks-other.c new file mode 100644 index 000000000..1d69542b0 --- /dev/null +++ b/lib/hooks-other.c @@ -0,0 +1,165 @@ +/** Other hook funktions. + * + * @author Steffen Vogel + * @copyright 2014-2016, Institute for Automation of Complex Power Systems, EONERC + * This file is part of S2SS. All Rights Reserved. Proprietary and confidential. + * Unauthorized copying of this file, via any medium is strictly prohibited. + *********************************************************************************/ + +#include +#include +#include +#include + +#include "hooks.h" +#include "timing.h" +#include "utils.h" +#include "sample.h" + +REGISTER_HOOK("print", "Print the message to stdout", 99, 0, hook_print, HOOK_READ) +int hook_print(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt) +{ + for (int i = 0; i < cnt; i++) + sample_fprint(stdout, smps[i], SAMPLE_ALL); + + return cnt; +} + +REGISTER_HOOK("ts", "Update timestamp of message with current time", 99, 0, hook_ts, HOOK_READ) +int hook_ts(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt) +{ + for (int i = 0; i < cnt; i++) + smps[i]->ts.origin = smps[i]->ts.received; + + return cnt; +} + +REGISTER_HOOK("convert", "Convert message from / to floating-point / integer", 99, 0, hook_convert, HOOK_STORAGE | HOOK_PARSE | HOOK_READ) +int hook_convert(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt) +{ + struct { + enum { + TO_FIXED, + TO_FLOAT + } mode; + } *private = hook_storage(h, when, sizeof(*private)); + + switch (when) { + case HOOK_PARSE: + if (!h->parameter) + error("Missing parameter for hook: '%s'", h->name); + + if (!strcmp(h->parameter, "fixed")) + private->mode = TO_FIXED; + else if (!strcmp(h->parameter, "float")) + private->mode = TO_FLOAT; + else + error("Invalid parameter '%s' for hook 'convert'", h->parameter); + break; + + case HOOK_READ: + for (int i = 0; i < cnt; i++) { + for (int j = 0; j < smps[0]->length; j++) { + switch (private->mode) { + case TO_FIXED: smps[i]->values[j].i = smps[i]->values[j].f * 1e3; break; + case TO_FLOAT: smps[i]->values[j].f = smps[i]->values[j].i; break; + } + } + } + break; + } + + return 0; +} + +REGISTER_HOOK("decimate", "Downsamping by integer factor", 99, 0, hook_decimate, HOOK_STORAGE | HOOK_PARSE | HOOK_READ) +int hook_decimate(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt) +{ + struct { + unsigned ratio; + unsigned counter; + } *private = hook_storage(h, when, sizeof(*private)); + + int ok; + + switch (when) { + case HOOK_PARSE: + if (!h->parameter) + error("Missing parameter for hook: '%s'", h->name); + + private->ratio = strtol(h->parameter, NULL, 10); + if (!private->ratio) + error("Invalid parameter '%s' for hook 'decimate'", h->parameter); + + private->counter = 0; + break; + + case HOOK_READ: + ok = 0; + for (int i = 0; i < cnt; i++) { + if (private->counter++ % private->ratio == 0) { + struct sample *tmp; + + tmp = smps[ok]; + smps[ok++] = smps[i]; + smps[i] = tmp; + } + } + return ok; + } + + return cnt; +} + +REGISTER_HOOK("skip_first", "Skip the first samples", 99, 0, hook_skip_first, HOOK_STORAGE | HOOK_PARSE | HOOK_READ | HOOK_PATH) +int hook_skip_first(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt) +{ + struct { + struct timespec skip; /**< Time to wait until first message is not skipped */ + struct timespec until; /**< Absolute point in time from where we accept samples. */ + } *private = hook_storage(h, when, sizeof(*private)); + + char *endptr; + double wait; + int i, ok; + + switch (when) { + case HOOK_PARSE: + if (!h->parameter) + error("Missing parameter for hook: '%s'", h->name); + + wait = strtof(h->parameter, &endptr); + if (h->parameter == endptr) + error("Invalid parameter '%s' for hook 'skip_first'", h->parameter); + + private->skip = time_from_double(wait); + break; + + case HOOK_PATH_RESTART: + case HOOK_PATH_STOP: + private->until = time_add(&smps[0]->ts.received, &private->skip); + break; + + case HOOK_READ: + for (i = 0, ok = 0; i < cnt; i++) { + if (time_delta(&private->until, &smps[i]->ts.received) > 0) { + struct sample *tmp; + + tmp = smps[i]; + smps[i] = smps[ok]; + smps[ok++] = tmp; + + } + + /* To discard the first X samples in 'smps[]' we must + * shift them to the end of the 'smps[]' array. + * In case the hook returns a number 'ok' which is smaller than 'cnt', + * only the first 'ok' samples in 'smps[]' are accepted and further processed. + */ + } + + return ok; + } + + return 0; +} \ No newline at end of file diff --git a/lib/hooks-stats.c b/lib/hooks-stats.c new file mode 100644 index 000000000..f0af6273c --- /dev/null +++ b/lib/hooks-stats.c @@ -0,0 +1,147 @@ +/** Statistic-related hook functions. + * + * @author Steffen Vogel + * @copyright 2014-2016, Institute for Automation of Complex Power Systems, EONERC + * This file is part of S2SS. All Rights Reserved. Proprietary and confidential. + * Unauthorized copying of this file, via any medium is strictly prohibited. + *********************************************************************************/ + +#include "hooks.h" +#include "sample.h" +#include "path.h" +#include "utils.h" +#include "timing.h" + +extern struct list *hook_nodes; + +void hook_stats_header() +{ + #define UNIT(u) "(" YEL(u) ")" + + stats("%-40s|%19s|%19s|%19s|%19s|%19s|%19s|%10s|", "Source " MAG("=>") " Destination", + "OWD" UNIT("S") " ", + "Rate" UNIT("p/S") " ", + "Recv" UNIT("p") " ", + "Drop" UNIT("p") " ", + "Skip" UNIT("p") " ", + "Inval" UNIT("p") " ", + "Overuns " + ); + line(); +} + +REGISTER_HOOK("stats", "Collect statistics for the current path", 2, 1, hook_stats, HOOK_STATS) +int hook_stats(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt) +{ + switch (when) { + case HOOK_INIT: + /** @todo Allow configurable bounds for histograms */ + hist_create(&p->hist.owd, 0, 1, 100e-3); + hist_create(&p->hist.gap_msg, 90e-3, 110e-3, 1e-3); + hist_create(&p->hist.gap_recv, 90e-3, 110e-3, 1e-3); + hist_create(&p->hist.gap_seq, -HIST_SEQ, +HIST_SEQ, 1); + break; + + case HOOK_READ: + for (int i = 0; i < cnt; i++) { + h->last = smps[i]; + + if (h->prev) { + int gap_seq = h->last->sequence - (int32_t) h->prev->sequence; + double owd = time_delta(&h->last->ts.origin, &h->last->ts.received); + double gap = time_delta(&h->prev->ts.origin, &h->last->ts.origin); + double gap_recv = time_delta(&h->prev->ts.received, &h->last->ts.received); + + hist_put(&p->hist.gap_msg, gap); + hist_put(&p->hist.gap_seq, gap_seq); + hist_put(&p->hist.owd, owd); + hist_put(&p->hist.gap_recv, gap_recv); + } + + h->prev = h->last; + } + break; + + case HOOK_PATH_STOP: + if (p->hist.owd.total) { info("One-way delay:"); hist_print(&p->hist.owd); } + if (p->hist.gap_recv.total){ info("Inter-message arrival time:"); hist_print(&p->hist.gap_recv); } + if (p->hist.gap_msg.total) { info("Inter-message ts gap:"); hist_print(&p->hist.gap_msg); } + if (p->hist.gap_seq.total) { info("Inter-message sequence number gaps:"); hist_print(&p->hist.gap_seq); } + break; + + case HOOK_DEINIT: + hist_destroy(&p->hist.owd); + hist_destroy(&p->hist.gap_msg); + hist_destroy(&p->hist.gap_recv); + hist_destroy(&p->hist.gap_seq); + break; + + case HOOK_PATH_RESTART: + hist_reset(&p->hist.owd); + hist_reset(&p->hist.gap_seq); + hist_reset(&p->hist.gap_msg); + hist_reset(&p->hist.gap_recv); + break; + + case HOOK_PERIODIC: + stats("%-40.40s|%10s|%10s|%10ju|%10ju|%10ju|%10ju|%10ju|", path_name(p), "", "", + p->in->received, p->dropped, p->skipped, p->invalid, p->overrun); + break; + } + + return cnt; +} + +REGISTER_HOOK("stats_send", "Send path statistics to another node", 99, 0, hook_stats_send, HOOK_STORAGE | HOOK_PARSE | HOOK_PERIODIC | HOOK_PATH) +int hook_stats_send(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt) +{ + struct private { + struct node *dest; + int ratio; + } *private = hook_storage(h, when, sizeof(*private)); + + switch (when) { + case HOOK_PARSE: + if (!h->parameter) + error("Missing parameter for hook '%s'", h->name); + + if (!hook_nodes) + error("Missing reference to node list for hook '%s", h->name); + + private->dest = list_lookup(hook_nodes, h->parameter); + if (!private->dest) + error("Invalid destination node '%s' for hook '%s'", h->parameter, h->name); + + node_start(private->dest); + + break; + + case HOOK_PERIODIC: { + int ret, length; + char buf[SAMPLE_LEN(9)]; + struct sample *last, *smp = (struct sample *) buf; + + ret = queue_get(&p->queue, (void **) &last, p->in->received - 1); + if (ret == 1) + length = last->length; + else + length = -1; + + smp->values[0].f = p->in->received; + smp->values[1].f = length; + smp->values[2].f = p->invalid; + smp->values[3].f = p->skipped; + smp->values[4].f = p->dropped; + smp->values[5].f = p->overrun; + smp->values[6].f = p->hist.owd.last, + smp->values[7].f = 1.0 / p->hist.gap_msg.last; + smp->values[8].f = 1.0 / p->hist.gap_recv.last; + smp->length = 9; + + node_write(private->dest, &smp, 1); /* Send single message with statistics to destination node */ + break; + } + } + + return 0; +} \ No newline at end of file diff --git a/lib/hooks.c b/lib/hooks.c index 743490bca..2c1a7c6a2 100644 --- a/lib/hooks.c +++ b/lib/hooks.c @@ -1,10 +1,4 @@ -/** Hook funktions - * - * Every path can register hook functions which are called at specific events. - * A list of supported events is described by enum hook_flags. - * Please note that there are several hook callbacks which are hard coded into path_create(). - * - * This file includes some examples. +/** Hook-releated functions. * * @author Steffen Vogel * @copyright 2014-2016, Institute for Automation of Complex Power Systems, EONERC @@ -26,9 +20,9 @@ struct list hooks; /* Those references can be used inside the hook callbacks after initializing them with hook_init() */ -static struct list *hook_nodes = NULL; -static struct list *hook_paths = NULL; -static struct settings *hook_settings = NULL; +struct list *hook_nodes = NULL; +struct list *hook_paths = NULL; +struct settings *hook_settings = NULL; void hook_init(struct list *nodes, struct list *paths, struct settings *set) { @@ -44,476 +38,33 @@ int hooks_sort_priority(const void *a, const void *b) { return ha->priority - hb->priority; } -int hook_run(struct path *p, enum hook_type t) +int hook_run(struct path *p, struct sample *smps[], size_t cnt, int when) { - int ret = 0; - list_foreach(struct hook *h, &p->hooks) { - if (h->type & t) { - debug(22, "Running hook when=%u '%s' prio=%u", t, h->name, h->priority); + if (h->type & when) { + debug(DBG_HOOK | 22, "Running hook when=%u '%s' prio=%u, cnt=%zu", when, h->name, h->priority, cnt); - ret = ((hook_cb_t) h->cb)(p, h, t); - if (ret) - return ret; + cnt = h->cb(p, h, when, smps, cnt); + if (cnt == 0) + break; } } - return ret; + return cnt; } -REGISTER_HOOK("print", 99, hook_print, HOOK_MSG) -int hook_print(struct path *p, struct hook *h, int when) -{ - struct msg *m = pool_current(&p->pool); - double offset = time_delta(&MSG_TS(m), &p->ts.recv); - int flags = MSG_PRINT_ALL; - - /* We dont show the offset if its to large */ - if (offset > 1e9) - flags &= ~MSG_PRINT_OFFSET; - - msg_fprint(stdout, m, flags, offset); - - return 0; -} - -REGISTER_HOOK("ts", 99, hook_ts, HOOK_MSG) -int hook_ts(struct path *p, struct hook *h, int when) -{ - struct msg *m = pool_current(&p->pool); - - m->ts.sec = p->ts.recv.tv_sec; - m->ts.nsec = p->ts.recv.tv_nsec; - - return 0; -} - -REGISTER_HOOK("fix_ts", 0, hook_fix_ts, HOOK_INTERNAL | HOOK_MSG) -int hook_fix_ts(struct path *p, struct hook *h, int when) -{ - struct msg *m = pool_current(&p->pool); - - if ((m->ts.sec == 0 && m->ts.nsec == 0) || - (m->ts.sec == -1 && m->ts.nsec == -1)) - hook_ts(p, h, when); - - return 0; -} - -REGISTER_HOOK("skip_unchanged", 99, hook_skip_unchanged, HOOK_PRIVATE | HOOK_ASYNC) -int hook_skip_unchanged(struct path *p, struct hook *h, int when) -{ - struct private { - double threshold; - struct msg previous; - } *x = h->private; - - switch (when) { - case HOOK_PATH_START: - x = h->private = alloc(sizeof(struct private)); - - if (!h->parameter) - error("Missing parameter for hook 'deduplication'"); - - x->threshold = strtof(h->parameter, NULL); - if (!x->threshold) - error("Failed to parse parameter '%s' for hook 'deduplication'", h->parameter); - break; - - case HOOK_PATH_STOP: - free(x); - break; - - case HOOK_ASYNC: { - int ret = 0; - - struct msg *prev = &x->previous; - struct msg *cur = pool_current(&p->pool); - - for (int i = 0; i < MIN(cur->values, prev->values); i++) { - if (fabs(cur->data[i].f - prev->data[i].f) > x->threshold) - goto out; - } - - ret = -1; /* no appreciable change in values, we will drop the packet */ - - out: memcpy(prev, cur, sizeof(struct msg)); /* save current message for next run */ - - return ret; - } - } - - return 0; -} - -REGISTER_HOOK("convert", 99, hook_convert, HOOK_PRIVATE | HOOK_MSG) -int hook_convert(struct path *p, struct hook *h, int when) -{ - struct private { - enum { TO_FIXED, TO_FLOAT } mode; - } *x = h->private; - - switch (when) { - case HOOK_PATH_START: - x = h->private = alloc(sizeof(struct private)); - - if (!h->parameter) - error("Missing parameter for hook 'deduplication'"); - - if (!strcmp(h->parameter, "fixed")) - x->mode = TO_FIXED; - else if (!strcmp(h->parameter, "float")) - x->mode = TO_FLOAT; - else - error("Invalid parameter '%s' for hook 'convert'", h->parameter); - break; - - case HOOK_PATH_STOP: - free(x); - break; - - case HOOK_MSG: { - struct msg *m = pool_current(&p->pool); - - for (int i = 0; i < m->values; i++) { - switch (x->mode) { - /** @todo allow precission to be configured via parameter */ - case TO_FIXED: m->data[i].i = m->data[i].f * 1e3; break; - case TO_FLOAT: m->data[i].f = m->data[i].i; break; - } - } - break; - } - } - - return 0; -} - -REGISTER_HOOK("fir", 99, hook_fir, HOOK_PRIVATE | HOOK_MSG) -int hook_fir(struct path *p, struct hook *h, int when) -{ - /** @todo make this configurable via hook parameters */ - const static double coeffs[] = HOOK_FIR_COEFFS; - char *end; - - struct private { - struct pool coeffs; - struct pool history; - int index; - } *x = h->private; - - switch (when) { - case HOOK_PATH_START: - if (!h->parameter) - error("Missing parameter for hook 'fir'"); - - x = h->private = alloc(sizeof(struct private)); - - pool_create(&x->coeffs, ARRAY_LEN(coeffs), sizeof(double)); - pool_create(&x->history, ARRAY_LEN(coeffs), sizeof(double)); - - /** Fill with static coefficients */ - memcpy(x->coeffs.buffer, coeffs, sizeof(coeffs)); - - x->index = strtol(h->parameter, &end, 10); - if (h->parameter == end) - error("Invalid parameter '%s' for hook 'fir'", h->parameter); - break; - - case HOOK_PATH_STOP: - pool_destroy(&x->coeffs); - pool_destroy(&x->history); - - free(x); - break; - - case HOOK_MSG: { - /* Current value of interest */ - struct msg *m = pool_current(&p->pool); - float *value = &m->data[x->index].f; - double *history = pool_current(&x->history); - - /* Save last sample, unfiltered */ - *history = *value; - - /* Reset accumulator */ - *value = 0; - - /* FIR loop */ - for (int i = 0; i < pool_length(&x->coeffs); i++) { - double *coeff = pool_get(&x->coeffs, i); - double *hist = pool_getrel(&x->history, -i); - - *value += *coeff * *hist; - } - - break; - } - } - - return 0; -} - -REGISTER_HOOK("decimate", 99, hook_decimate, HOOK_PRIVATE | HOOK_POST) -int hook_decimate(struct path *p, struct hook *h, int when) -{ - struct private { - long ratio; - } *x = h->private; - - switch (when) { - case HOOK_PATH_START: - if (!h->parameter) - error("Missing parameter for hook 'decimate'"); - - x = h->private = alloc(sizeof(struct private)); - - x->ratio = strtol(h->parameter, NULL, 10); - if (!x->ratio) - error("Invalid parameter '%s' for hook 'decimate'", h->parameter); - break; - - case HOOK_PATH_STOP: - free(x); - break; - - case HOOK_POST: - return p->received % x->ratio; - } - - return 0; -} - -REGISTER_HOOK("skip_first", 99, hook_skip_first, HOOK_PRIVATE | HOOK_POST | HOOK_PATH ) -int hook_skip_first(struct path *p, struct hook *h, int when) -{ - struct private { - double wait; /**< Number of seconds to wait until first message is not skipped */ - struct timespec started; /**< Timestamp of last simulation restart */ - } *x = h->private; - - switch (when) { - case HOOK_PATH_START: - if (!h->parameter) - error("Missing parameter for hook 'skip_first'"); - - x = h->private = alloc(sizeof(struct private)); - - x->started = time_now(); - x->wait = strtof(h->parameter, NULL); - if (!x->wait) - error("Invalid parameter '%s' for hook 'skip_first'", h->parameter); - break; - - case HOOK_PATH_STOP: - free(x); - break; - - case HOOK_PATH_RESTART: - x->started = p->ts.recv; - break; - - case HOOK_POST: { - double delta = time_delta(&x->started, &p->ts.recv); - return delta < x->wait - ? -1 /* skip */ - : 0; /* send */ - } - } - - return 0; -} - -REGISTER_HOOK("restart", 1, hook_restart, HOOK_INTERNAL | HOOK_MSG) -int hook_restart(struct path *p, struct hook *h, int when) -{ - struct msg *cur = pool_current(&p->pool); - struct msg *prev = pool_previous(&p->pool); - - if (cur->sequence == 0 && - prev->sequence <= UINT32_MAX - 32) { - warn("Simulation for path %s restarted (prev->seq=%u, current->seq=%u)", - path_name(p), prev->sequence, cur->sequence); - - p->sent = - p->invalid = - p->skipped = - p->dropped = 0; - p->received = 1; - - if (hook_run(p, HOOK_PATH_RESTART)) - return -1; - } - - return 0; -} - -REGISTER_HOOK("verify", 2, hook_verify, HOOK_INTERNAL | HOOK_MSG) -int hook_verify(struct path *p, struct hook *h, int when) -{ - struct msg *cur = pool_current(&p->pool); - - int reason = msg_verify(cur); - if (reason) { - p->invalid++; - warn("Received invalid message (reason = %d)", reason); - return -1; - } - - return 0; -} - -REGISTER_HOOK("drop", 3, hook_drop, HOOK_INTERNAL | HOOK_MSG) -int hook_drop(struct path *p, struct hook *h, int when) -{ - struct msg *cur = pool_current(&p->pool); - struct msg *prev = pool_previous(&p->pool); - - int dist = cur->sequence - (int32_t) prev->sequence; - if (dist <= 0 && p->received > 1) { - p->dropped++; - return -1; - } - else - return 0; -} - -REGISTER_HOOK("stats", 2, hook_stats, HOOK_STATS) -int hook_stats(struct path *p, struct hook *h, int when) +void * hook_storage(struct hook *h, int when, size_t len) { switch (when) { - case HOOK_PATH_START: - /** @todo Allow configurable bounds for histograms */ - hist_create(&p->hist.owd, 0, 1, 100e-3); - hist_create(&p->hist.gap_msg, 90e-3, 110e-3, 1e-3); - hist_create(&p->hist.gap_recv, 90e-3, 110e-3, 1e-3); - hist_create(&p->hist.gap_seq, -HIST_SEQ, +HIST_SEQ, 1); - break; - - case HOOK_PRE: - /* Exclude first message from statistics */ - if (p->received > 0) - hist_put(&p->hist.gap_recv, time_delta(&p->ts.last, &p->ts.recv)); - break; - - case HOOK_MSG: { - struct msg *cur = pool_current(&p->pool); - struct msg *prev = pool_previous(&p->pool); - - /* Exclude first message from statistics */ - if (p->received > 0) { - int dist = cur->sequence - (int32_t) prev->sequence; - double delay = time_delta(&MSG_TS(cur), &p->ts.recv); - double gap = time_delta(&MSG_TS(prev), &MSG_TS(cur)); - - hist_put(&p->hist.gap_msg, gap); - hist_put(&p->hist.gap_seq, dist); - hist_put(&p->hist.owd, delay); - } - break; - } - - case HOOK_PATH_STOP: - if (p->hist.owd.total) { info("One-way delay:"); hist_print(&p->hist.owd); } - if (p->hist.gap_recv.total){ info("Inter-message arrival time:"); hist_print(&p->hist.gap_recv); } - if (p->hist.gap_msg.total) { info("Inter-message ts gap:"); hist_print(&p->hist.gap_msg); } - if (p->hist.gap_seq.total) { info("Inter-message sequence number gaps:"); hist_print(&p->hist.gap_seq); } - - hist_destroy(&p->hist.owd); - hist_destroy(&p->hist.gap_msg); - hist_destroy(&p->hist.gap_recv); - hist_destroy(&p->hist.gap_seq); - break; - - case HOOK_PATH_RESTART: - hist_reset(&p->hist.owd); - hist_reset(&p->hist.gap_seq); - hist_reset(&p->hist.gap_msg); - hist_reset(&p->hist.gap_recv); + case HOOK_INIT: + h->_vd = alloc(len); break; - case HOOK_PERIODIC: { - if (p->received > 1) { - struct msg *cur = pool_current(&p->pool); - - stats("%-40.40s|%10.2g|%10.2f|%10u|%10u|%10u|%10u|%10u|%10u|%10u|", path_name(p), - p->hist.owd.last, 1 / p->hist.gap_msg.last, - p->sent, p->received, p->dropped, p->skipped, p->invalid, p->overrun, cur->values - ); - } - else - stats("%-40.40s|%10s|%10s|%10u|%10u|%10u|%10u|%10u|%10u|%10s|", path_name(p), "", "", - p->sent, p->received, p->dropped, p->skipped, p->invalid, p->overrun, "" - ); - break; - } - } - - return 0; -} - -void hook_stats_header() -{ - #define UNIT(u) "(" YEL(u) ")" - - stats("%-40s|%19s|%19s|%19s|%19s|%19s|%19s|%19s|%10s|%10s|", "Source " MAG("=>") " Destination", - "OWD" UNIT("S") " ", - "Rate" UNIT("p/S") " ", - "Sent" UNIT("p") " ", - "Recv" UNIT("p") " ", - "Drop" UNIT("p") " ", - "Skip" UNIT("p") " ", - "Inval" UNIT("p") " ", - "Overuns ", - "Values " - ); - line(); -} - -REGISTER_HOOK("stats_send", 99, hook_stats_send, HOOK_PRIVATE | HOOK_PERIODIC) -int hook_stats_send(struct path *p, struct hook *h, int when) -{ - struct private { - struct node *dest; - struct msg *msg; - int ratio; - } *x = h->private; - - switch (when) { - case HOOK_PATH_START: - if (!h->parameter) - error("Missing parameter for hook 'stats_send'"); - - if (!hook_nodes) - error("stats_send() hook has no reference to node list"); - - x = h->private = alloc(sizeof(struct private)); - - x->msg = msg_create(9); - x->dest = list_lookup(hook_nodes, h->parameter); - if (!x->dest) - error("Invalid destination node '%s' for hook 'stats_send'", h->parameter); - break; - - case HOOK_PATH_STOP: - free(x->msg); - free(x); - break; - - case HOOK_PERIODIC: - x->msg->data[0].f = p->sent; - x->msg->data[1].f = p->received; - x->msg->data[2].f = p->invalid; - x->msg->data[3].f = p->skipped; - x->msg->data[4].f = p->dropped; - x->msg->data[5].f = p->overrun; - x->msg->data[6].f = p->hist.owd.last, - x->msg->data[7].f = 1.0 / p->hist.gap_msg.last; - x->msg->data[8].f = 1.0 / p->hist.gap_recv.last; - - node_write_single(x->dest, x->msg); /* Send single message with statistics to destination node */ + case HOOK_DEINIT: + free(h->_vd); + h->_vd = NULL; break; } - return 0; + return h->_vd; } \ No newline at end of file