diff --git a/server/Makefile b/server/Makefile index 88b65f1d0..dbe1007fe 100644 --- a/server/Makefile +++ b/server/Makefile @@ -1,7 +1,7 @@ TARGETS = server send random receive test # Common objs -OBJS = path.o node.o hooks.o msg.o cfg.o stats.o +OBJS = path.o node.o hooks.o msg.o cfg.o # Helper libs OBJS += utils.o list.o hist.o log.o timing.o checks.o diff --git a/server/include/cfg.h b/server/include/cfg.h index 0cbb5e85d..994503133 100644 --- a/server/include/cfg.h +++ b/server/include/cfg.h @@ -82,6 +82,15 @@ int config_parse_nodelist(config_setting_t *cfg, struct list *nodes, struct list **/ int config_parse_hooklist(config_setting_t *cfg, struct list *hooks); +/** Parse a single hook and append it to the list. + * A hook definition is composed of the hook name and optional parameters + * seperated by a colon. + * + * Examples: + * "print:stdout" + */ +int config_parse_hook(config_setting_t *cfg, struct list *list); + /** Parse a single node and add it to the global configuration. * * @param cfg A libconfig object pointing to the node. diff --git a/server/include/config.h b/server/include/config.h index 1e10130b1..0afaa36ee 100644 --- a/server/include/config.h +++ b/server/include/config.h @@ -51,19 +51,18 @@ { "/sys/class/net/eth0/address" , "50:e5:49:eb:74:0c" }, \ { "/etc/machine-id", "0d8399d0216314f083b9ed2053a354a8" }, \ { "/dev/sda2", "\x53\xf6\xb5\xeb\x8b\x16\x46\xdc\x8d\x8f\x5b\x70\xb8\xc9\x1a\x2a", 0x468 } } - -/* Hard coded configuration of hook functions */ -#define HOOK_FIR_INDEX 0 /**< Which value inside a message should be filtered? */ + +/** Coefficients for simple FIR-LowPass: + * F_s = 1kHz, F_pass = 100 Hz, F_block = 300 + * + * Tip: Use MATLAB's filter design tool and export coefficients + * with the integrated C-Header export + */ #define HOOK_FIR_COEFFS { -0.003658148158728, -0.008882653268281, 0.008001024183003, \ 0.08090485991761, 0.2035239551043, 0.3040703593515, \ 0.3040703593515, 0.2035239551043, 0.08090485991761, \ 0.008001024183003, -0.008882653268281,-0.003658148158728 } -#define HOOK_TS_INDEX -1 /**< The last value of message should be overwritten by a timestamp. */ -#define HOOK_DECIMATE_RATIO 30 /**< Only forward every 30th message to the destination nodes. */ - -#define HOOK_DEDUP_TYPE HOOK_ASYNC -#define HOOK_DEDUP_TRESH 1e-3 /**< Do not send messages when difference of values to last message is smaller than this threshold */ /** Global configuration */ struct settings { /** Process priority (lower is better) */ diff --git a/server/include/hooks.h b/server/include/hooks.h index b54e11f82..17e3c503e 100644 --- a/server/include/hooks.h +++ b/server/include/hooks.h @@ -23,16 +23,17 @@ #include -#define REGISTER_HOOK(name, prio, fnc, type) \ -__attribute__((constructor)) void __register_ ## fnc () { \ - static struct hook h = { name, prio, fnc, type }; \ - list_push(&hooks, &h); \ +#define REGISTER_HOOK(name, prio, fnc, type) \ +__attribute__((constructor)) void __register_ ## fnc () { \ + static struct hook h = { name, NULL, prio, type, NULL, fnc }; \ + list_push(&hooks, &h); \ } /* The configuration of hook parameters is done in "config.h" */ /* Forward declarations */ struct path; +struct hook; /** This is a list of hooks which can be used in the configuration file. */ extern struct list hooks; @@ -40,34 +41,51 @@ extern struct list hooks; /** Callback type of hook function * * @param p The path which is processing this message. + * @param h The hook datastructure which contains parameter, name and private context for the hook. + * @param when Provides the type of hook for which this occurence of the callback function was executed. See hook_type for possible values. * @retval 0 Success. Continue processing and forwarding the message. * @retval <0 Error. Drop the message. */ -typedef int (*hook_cb_t)(struct path *p); +typedef int (*hook_cb_t)(struct path *p, struct hook *h, int when); -/** The type of a hook defines when a hook will be exectuted. */ +/** The type of a hook defines when a hook will be exectuted. This is used as a bitmask. */ enum hook_type { - HOOK_PATH_START, /**< Called whenever a path is started; before threads are created. */ - HOOK_PATH_STOP, /**< Called whenever a path is stopped; after threads are destoyed. */ - HOOK_PATH_RESTART, /**< Called whenever a new simulation case is started. This is detected by a sequence no equal to zero. */ + HOOK_PATH_START = 1 << 0, /**< Called whenever a path is started; before threads are created. */ + HOOK_PATH_STOP = 1 << 1, /**< Called whenever a path is stopped; after threads are destoyed. */ + HOOK_PATH_RESTART = 1 << 2, /**< Called whenever a new simulation case is started. This is detected by a sequence no equal to zero. */ - HOOK_PRE, /**< Called when a new packet of messages (samples) was received. */ - HOOK_POST, /**< Called after each message (sample) of a packet was processed. */ - HOOK_MSG, /**< Called for each message (sample) in a packet. */ - HOOK_ASYNC, /**< Called asynchronously with fixed rate (see path::rate). */ + HOOK_PRE = 1 << 3, /**< Called when a new packet of messages (samples) was received. */ + HOOK_POST = 1 << 4, /**< Called after each message (sample) of a packet was processed. */ + HOOK_MSG = 1 << 5, /**< Called for each message (sample) in a packet. */ + HOOK_ASYNC = 1 << 6, /**< Called asynchronously with fixed rate (see path::rate). */ - HOOK_PERIODIC, /**< Called periodically. Period is set by global 'stats' option in the configuration file. */ - - HOOK_MAX + HOOK_PERIODIC = 1 << 7, /**< Called periodically. Period is set by global 'stats' option in the configuration file. */ + + HOOK_INIT = 1 << 8, /**< Called to allocate and init hook-private data */ + HOOK_DEINIT = 1 << 9, /**< Called to free hook-private data */ + + /** @{ Classes of hooks */ + /** Internal hooks are mandatory. */ + HOOK_INTERNAL = 1 << 16, + /** Hooks which are using private data must allocate and free them propery. */ + HOOK_PRIVATE = HOOK_INIT | HOOK_DEINIT, + /** All path related actions */ + HOOK_PATH = HOOK_PATH_START | HOOK_PATH_STOP | HOOK_PATH_RESTART, + /** Hooks which are used to collect statistics. */ + HOOK_STATS = HOOK_INTERNAL | HOOK_PRIVATE | HOOK_PATH | HOOK_MSG | HOOK_PERIODIC, + /** All hooks */ + HOOK_ALL = HOOK_INTERNAL - 1 + /** @} */ }; /** Descriptor for user defined hooks. See hooks[]. */ struct hook { - /** The unique name of this hook. This must be the first member! */ - const char *name; - int priority; - hook_cb_t callback; - enum hook_type type; + const char *name; /**< The unique name of this hook. This must be the first member! */ + const char *parameter; /**< A parameter string for this hook. Can be used to configure the hook behaviour. */ + int priority; /**< A priority to change the order of execution within one type of hook */ + enum hook_type type; /**< The type of the hook as a bitfield */ + void *private; /**< Private data for this hook. This pointer can be used to pass data between consecutive calls of the callback. */ + hook_cb_t cb; /**< The hook callback function as a function pointer. */ }; /** The following prototypes are example hooks @@ -76,36 +94,42 @@ struct hook { * @{ */ -/** Example hook: Drop messages whose values are similiar to the previous ones */ -int hook_deduplicate(struct path *); - /** Example hook: Print the message. */ -int hook_print(struct path *p); +int hook_print(struct path *p, struct hook *h, int when); /** Example hook: Drop messages. */ -int hook_decimate(struct path *p); +int hook_decimate(struct path *p, struct hook *h, int when); -/** Example hook: Convert the message values to fixed precision. */ -int hook_tofixed(struct path *p); +/** Example hook: Convert the values of a message between fixed (integer) and floating point representation. */ +int hook_convert(struct path *p, struct hook *h, int when); /** Example hook: overwrite timestamp of message. */ -int hook_ts(struct path *p); +int hook_ts(struct path *p, struct hook *h, int when); /** Example hook: Finite-Impulse-Response (FIR) filter. */ -int hook_fir(struct path *p); +int hook_fir(struct path *p, struct hook *h, int when); -/** Example hook: Discrete Fourier Transform */ -int hook_dft(struct path *p); +/** Example hook: drop first samples after simulation restart */ +int hook_skip_first(struct path *p, struct hook *h, int when); -/* The following prototypes are core hook functions */ +/** Example hook: Skip messages whose values are similiar to the previous ones */ +int hook_skip_unchanged(struct path *p, struct hook *h, int when); + +/* The following hooks are used to implement core functionality */ /** Core hook: verify message headers. Invalid messages will be dropped. */ -int hook_verify(struct path *p); +int hook_verify(struct path *p, struct hook *h, int when); /** Core hook: reset the path in case a new simulation was started. */ -int hook_restart(struct path *p); +int hook_restart(struct path *p, struct hook *h, int when); /** Core hook: check if sequence number is correct. Otherwise message will be dropped */ -int hook_drop(struct path *p); +int hook_drop(struct path *p, struct hook *h, int when); + +/** Core hook: collect statistics */ +int hook_stats(struct path *p, struct hook *h, int when); + +/** Core hook: send path statistics to another node */ +int hook_stats_send(struct path *p, struct hook *h, int when); #endif /** _HOOKS_H_ @} @} */ diff --git a/server/include/path.h b/server/include/path.h index 6838fa7ad..6722e9374 100644 --- a/server/include/path.h +++ b/server/include/path.h @@ -40,7 +40,7 @@ struct path /** List of all outgoing nodes */ struct list destinations; /** List of function pointers to hooks */ - struct list hooks[HOOK_MAX]; + struct list hooks; /** Timer file descriptor for fixed rate sending */ int tfd; @@ -119,15 +119,6 @@ int path_start(struct path *p); */ int path_stop(struct path *p); - -/** Reset internal counters and histogram of a path. - * - * @param p A pointer to the path structure. - * @retval 0 Success. Everything went well. - * @retval <0 Error. Something went wrong. - */ -int path_reset(struct path *p); - /** Show some basic statistics for a path. * * @param p A pointer to the path structure. diff --git a/server/include/stats.h b/server/include/stats.h deleted file mode 100644 index c0476c4f1..000000000 --- a/server/include/stats.h +++ /dev/null @@ -1,36 +0,0 @@ -/** Hook functions to collect statistics - * - * @file - * @author Steffen Vogel - * @copyright 2014-2015, Institute for Automation of Complex Power Systems, EONERC - * This file is part of S2SS. All Rights Reserved. Proprietary and confidential. - * Unauthorized copying of this file, via any medium is strictly prohibited. - *********************************************************************************/ - -#ifndef _STATS_H_ -#define _STATS_H_ - -/* Forward declarations */ -struct path; - -/** Print a table header for statistics printed by stats_line() */ -void stats_header(); - -/** Print a single line of stats including received, sent, invalid and dropped packet counters */ -int stats_line(struct path *p); - -int stats_show(struct path *p); - -/** Update histograms */ -int stats_collect(struct path *p); - -/** Create histograms */ -int stats_start(struct path *p); - -/** Destroy histograms */ -int stats_stop(struct path *p); - -/** Reset all statistic counters to zero */ -int stats_reset(struct path *p); - -#endif /* _STATS_H_ */ \ No newline at end of file diff --git a/server/src/cfg.c b/server/src/cfg.c index a0a59bfd1..24fcdda6e 100644 --- a/server/src/cfg.c +++ b/server/src/cfg.c @@ -124,7 +124,10 @@ int config_parse_path(config_setting_t *cfg, /* Optional settings */ config_setting_t *cfg_hook = config_setting_get_member(cfg, "hook"); if (cfg_hook) - config_parse_hooklist(cfg_hook, p->hooks); + config_parse_hooklist(cfg_hook, &p->hooks); + + /* Initialize hooks and their private data / parameters */ + path_run_hook(p, HOOK_INIT); if (!config_setting_lookup_bool(cfg, "enabled", &enabled)) enabled = 1; @@ -223,38 +226,14 @@ int config_parse_nodelist(config_setting_t *cfg, struct list *list, struct list } int config_parse_hooklist(config_setting_t *cfg, struct list *list) { - const char *str; - const struct hook *hook; - switch (config_setting_type(cfg)) { case CONFIG_TYPE_STRING: - str = config_setting_get_string(cfg); - if (str) { - hook = list_lookup(&hooks, str); - if (hook) - list_insert(&list[hook->type], hook->priority, hook->callback); - else - cerror(cfg, "Unknown hook function '%s'", str); - } - else - cerror(cfg, "Invalid hook function"); + config_parse_hook(cfg, list); break; case CONFIG_TYPE_ARRAY: - for (int i = 0; itype], hook->priority, hook->callback); - else - cerror(elm, "Invalid hook function '%s'", str); - } - else - cerror(cfg, "Invalid hook function"); - } + for (int i = 0; i < config_setting_length(cfg); i++) + config_parse_hook(config_setting_get_elem(cfg, i), list); break; default: @@ -264,6 +243,31 @@ int config_parse_hooklist(config_setting_t *cfg, struct list *list) { return 0; } +int config_parse_hook(config_setting_t *cfg, struct list *list) +{ + struct hook *hook, *copy; + const char *name = config_setting_get_string(cfg); + if (!name) + cerror(cfg, "Invalid hook function"); + + char *param = strchr(name, ':'); + if (param) { /* Split hook line */ + *param = '\0'; + param++; + } + + hook = list_lookup(&hooks, name); + if (!hook) + cerror(cfg, "Unknown hook function '%s'", name); + + copy = memdup(hook, sizeof(struct hook)); + copy->parameter = param; + + list_push(list, copy); + + return 0; +} + int config_parse_node(config_setting_t *cfg, struct list *nodes, struct settings *set) { const char *type; diff --git a/server/src/hooks.c b/server/src/hooks.c index fa679d418..46bc992de 100644 --- a/server/src/hooks.c +++ b/server/src/hooks.c @@ -22,61 +22,28 @@ #include "path.h" #include "utils.h" -/* Some hooks can be configured by constants in te file "config.h" */ +extern struct list nodes; struct list hooks; -REGISTER_HOOK("deduplicate", 99, hook_deduplicate, HOOK_DEDUP_TYPE) -int hook_deduplicate(struct path *p) -{ - int ret = 0; -#if HOOK_DEDUP_TYPE == HOOK_ASYNC - /** Thread local storage (TLS) is used to maintain a copy of the last run of the hook */ - static __thread struct msg previous = MSG_INIT(0); - struct msg *prev = &previous; -#else - struct msg *prev = p->previous; -#endif - struct msg *cur = p->current; - - for (int i = 0; i < MIN(cur->length, prev->length); i++) { - if (fabs(cur->data[i].f - prev->data[i].f) > HOOK_DEDUP_TRESH) - goto out; - } - - ret = -1; /* no appreciable change in values, we will drop the packet */ - -out: -#if HOOK_DEDUP_TYPE == HOOK_ASYNC - memcpy(prev, cur, sizeof(struct msg)); /* save current message for next run */ -#endif - return ret; -} - REGISTER_HOOK("print", 99, hook_print, HOOK_MSG) -int hook_print(struct path *p) +int hook_print(struct path *p, struct hook *h, int when) { struct msg *m = p->current; - struct timespec ts = MSG_TS(m); + double offset = time_delta(&MSG_TS(m), &p->ts_recv); + int flags = MSG_PRINT_ALL; - msg_fprint(stdout, m, MSG_PRINT_ALL, time_delta(&ts, &p->ts_recv)); - - return 0; -} - -REGISTER_HOOK("tofixed", 99, hook_tofixed, HOOK_MSG) -int hook_tofixed(struct path *p) -{ - struct msg *m = p->current; - - for (int i = 0; i < m->length; i++) - m->data[i].i = m->data[i].f * 1e3; + /* We dont show the offset if its to large */ + if (offset > 1e9) + flags &= ~MSG_PRINT_OFFSET; + + msg_fprint(stdout, m, flags, offset); return 0; } REGISTER_HOOK("ts", 99, hook_ts, HOOK_MSG) -int hook_ts(struct path *p) +int hook_ts(struct path *p, struct hook *h, int when) { struct msg *m = p->current; @@ -86,59 +53,223 @@ int hook_ts(struct path *p) return 0; } -REGISTER_HOOK("fir", 99, hook_fir, HOOK_MSG) -int hook_fir(struct path *p) -{ - /** Coefficients for simple FIR-LowPass: - * F_s = 1kHz, F_pass = 100 Hz, F_block = 300 - * - * Tip: Use MATLAB's filter design tool and export coefficients - * with the integrated C-Header export - */ - static const double coeffs[] = HOOK_FIR_COEFFS; +REGISTER_HOOK("skip_unchanged", 99, hook_skip_unchanged, HOOK_PRIVATE | HOOK_ASYNC) +int hook_skip_unchanged(struct path *p, struct hook *h, int when) +{ + struct private { + double threshold; + struct msg previous; + } *private = h->private; - /** Per path thread local storage for unfiltered sample values. - * The message ringbuffer (p->pool & p->current) will contain filtered data! - */ - static __thread double *past = NULL; - - /** @todo Avoid dynamic allocation at runtime */ - if (!past) - alloc(p->poolsize * sizeof(double)); - - - /* Current value of interest */ - float *cur = &p->current->data[HOOK_FIR_INDEX].f; - - /* Save last sample, unfiltered */ - past[p->received % p->poolsize] = *cur; + switch (when) { + case HOOK_INIT: + private = h->private = alloc(sizeof(struct private)); + + if (!h->parameter) + error("Missing parameter for hook 'deduplication'"); - /* Reset accumulator */ - *cur = 0; + private->threshold = strtof(h->parameter, NULL); + if (!private->threshold) + error("Failed to parse parameter '%s' for hook 'deduplication'", h->parameter); + break; + + case HOOK_DEINIT: + free(private); + break; + + case HOOK_ASYNC: { + int ret = 0; + + struct msg *prev = &private->previous; + struct msg *cur = p->current; - /* FIR loop */ - for (int i = 0; i < MIN(ARRAY_LEN(coeffs), p->poolsize); i++) - *cur += coeffs[i] * past[p->received+p->poolsize-i]; + for (int i = 0; i < MIN(cur->length, prev->length); i++) { + if (fabs(cur->data[i].f - prev->data[i].f) > private->threshold) + goto out; + } + + ret = -1; /* no appreciable change in values, we will drop the packet */ + + out: memcpy(prev, cur, sizeof(struct msg)); /* save current message for next run */ + + return ret; + } + } + + return 0; +} + +REGISTER_HOOK("convert", 99, hook_convert, HOOK_PRIVATE | HOOK_MSG) +int hook_convert(struct path *p, struct hook *h, int when) +{ + struct private { + enum { TO_FIXED, TO_FLOAT } mode; + } *private = h->private; + + switch (when) { + case HOOK_INIT: + private = h->private = alloc(sizeof(struct private)); + + if (!h->parameter) + error("Missing parameter for hook 'deduplication'"); + + if (!strcmp(h->parameter, "fixed")) + private->mode = TO_FIXED; + else if (!strcmp(h->parameter, "float")) + private->mode = TO_FLOAT; + else + error("Invalid parameter '%s' for hook 'convert'", h->parameter); + break; + + case HOOK_DEINIT: + free(private); + break; + + case HOOK_MSG: { + struct msg *m = p->current; + + for (int i = 0; i < m->length; i++) { + switch (private->mode) { + /** @todo allow precission to be configured via parameter */ + case TO_FIXED: m->data[i].i = m->data[i].f * 1e3; break; + case TO_FLOAT: m->data[i].f = m->data[i].i; break; + } + } + break; + } + } return 0; } -REGISTER_HOOK("decimate", 99, hook_decimate, HOOK_POST) -int hook_decimate(struct path *p) +REGISTER_HOOK("fir", 99, hook_fir, HOOK_PRIVATE | HOOK_MSG) +int hook_fir(struct path *p, struct hook *h, int when) { - /* Only sent every HOOK_DECIMATE_RATIO'th message */ - return p->received % HOOK_DECIMATE_RATIO; + /** @todo make this configurable via hook parameters */ + const static double coeffs[] = HOOK_FIR_COEFFS; + + struct private { + double *coeffs; + double *history; + int index; + } *private = h->private; + + switch (when) { + case HOOK_INIT: + if (!h->parameter) + error("Missing parameter for hook 'fir'"); + + private = h->private = alloc(sizeof(struct private)); + + private->coeffs = memdup(coeffs, sizeof(coeffs)); + private->history = alloc(sizeof(coeffs)); + + private->index = strtol(h->parameter, NULL, 10); + if (!private->index) + error("Invalid parameter '%s' for hook 'fir'", h->parameter); + break; + + case HOOK_DEINIT: + free(private->coeffs); + free(private->history); + free(private); + break; + + case HOOK_MSG: { + /* Current value of interest */ + float *cur = &p->current->data[private->index].f; + + /* Save last sample, unfiltered */ + private->history[p->received % p->poolsize] = *cur; + + /* Reset accumulator */ + *cur = 0; + + /* FIR loop */ + for (int i = 0; i < MIN(ARRAY_LEN(coeffs), p->poolsize); i++) + *cur += private->coeffs[i] * private->history[p->received+p->poolsize-i]; + break; + } + } + + return 0; } -REGISTER_HOOK("dft", 99, hook_dft, HOOK_POST) -int hook_dft(struct path *p) +REGISTER_HOOK("decimate", 99, hook_decimate, HOOK_PRIVATE | HOOK_POST) +int hook_decimate(struct path *p, struct hook *h, int when) { - return 0; /** @todo Implement */ + struct private { + long ratio; + } *private = h->private; + + switch (when) { + case HOOK_INIT: + if (!h->parameter) + error("Missing parameter for hook 'decimate'"); + + private = h->private = alloc(sizeof(struct private)); + + private->ratio = strtol(h->parameter, NULL, 10); + if (!private->ratio) + error("Invalid parameter '%s' for hook 'decimate'", h->parameter); + break; + + case HOOK_DEINIT: + free(private); + break; + + case HOOK_POST: + return p->received % private->ratio; + } + + return 0; } -/** System hooks */ +REGISTER_HOOK("skip_first", 99, hook_skip_first, HOOK_PRIVATE | HOOK_POST | HOOK_PATH ) +int hook_skip_first(struct path *p, struct hook *h, int when) +{ + struct private { + double wait; /**< Number of seconds to wait until first message is not skipped */ + struct timespec started; /**< Timestamp of last simulation restart */ + } *private = h->private; + + switch (when) { + case HOOK_INIT: + if (!h->parameter) + error("Missing parameter for hook 'skip_first'"); -int hook_restart(struct path *p) + private = h->private = alloc(sizeof(struct private)); + + private->wait = strtof(h->parameter, NULL); + if (!private->wait) + error("Invalid parameter '%s' for hook 'skip_first'", h->parameter); + break; + + case HOOK_DEINIT: + free(private); + break; + + case HOOK_PATH_RESTART: + private->started = p->ts_recv; + break; + + case HOOK_PATH_START: + clock_gettime(CLOCK_REALTIME, &private->started); + break; + + case HOOK_POST: { + double delta = time_delta(&private->started, &p->ts_recv); + return delta < private->wait + ? -1 /* skip */ + : 0; /* send */ + } + } + + return 0; +} + +REGISTER_HOOK("restart", 1, hook_restart, HOOK_INTERNAL | HOOK_MSG) +int hook_restart(struct path *p, struct hook *h, int when) { if (p->current->sequence == 0 && p->previous->sequence <= UINT32_MAX - 32) { @@ -147,26 +278,34 @@ int hook_restart(struct path *p) buf, p->previous->sequence, p->current->sequence); free(buf); - path_reset(p); + p->sent = + p->invalid = + p->skipped = + p->dropped = 0; + p->received = 1; + + if (path_run_hook(p, HOOK_PATH_RESTART)) + return -1; } return 0; } -int hook_verify(struct path *p) +REGISTER_HOOK("verify", 2, hook_verify, HOOK_INTERNAL | HOOK_MSG) +int hook_verify(struct path *p, struct hook *h, int when) { int reason = msg_verify(p->current); if (reason) { p->invalid++; - warn("Received invalid message (reason=%d)", reason); - + warn("Received invalid message (reason = %d)", reason); return -1; } return 0; } -int hook_drop(struct path *p) +REGISTER_HOOK("drop", 3, hook_drop, HOOK_INTERNAL | HOOK_MSG) +int hook_drop(struct path *p, struct hook *h, int when) { int dist = p->current->sequence - (int32_t) p->previous->sequence; if (dist <= 0 && p->received > 1) { @@ -176,3 +315,101 @@ int hook_drop(struct path *p) else return 0; } + +REGISTER_HOOK("stats", 2, hook_stats, HOOK_STATS) +int hook_stats(struct path *p, struct hook *h, int when) +{ + switch (when) { + case HOOK_INIT: + /** @todo Allow configurable bounds for histograms */ + hist_create(&p->hist_sequence, -HIST_SEQ, +HIST_SEQ, 1); + hist_create(&p->hist_delay, 0, 2, 100e-3); + hist_create(&p->hist_gap, 0, 40e-3, 1e-3); + break; + + case HOOK_DEINIT: + hist_destroy(&p->hist_sequence); + hist_destroy(&p->hist_delay); + hist_destroy(&p->hist_gap); + break; + + case HOOK_MSG: { + struct msg *prev = p->previous, *cur = p->current; + + int dist = cur->sequence - (int32_t) prev->sequence; + double delay = time_delta(&p->ts_recv, &MSG_TS(cur)); + double gap = time_delta(&MSG_TS(prev), &MSG_TS(cur)); + + hist_put(&p->hist_sequence, dist); + hist_put(&p->hist_delay, delay); + hist_put(&p->hist_gap, gap); + break; + } + + case HOOK_PATH_STOP: + if (p->hist_delay.total) { info("One-way delay (received):"); hist_print(&p->hist_delay); } + if (p->hist_gap.total) { info("Message gap time:"); hist_print(&p->hist_gap); } + if (p->hist_sequence.total) { info("Sequence number gaps:"); hist_print(&p->hist_sequence); } + break; + + case HOOK_PATH_RESTART: + hist_reset(&p->hist_sequence); + hist_reset(&p->hist_delay); + hist_reset(&p->hist_gap); + break; + + case HOOK_PERIODIC: { + char *buf = path_print(p); + info("%-32s : %-8u %-8u %-8u %-8u %-8u", buf, p->sent, p->received, p->dropped, p->skipped, p->invalid); + free(buf); + break; + } + } + + return 0; +} + +REGISTER_HOOK("stats_send", 99, hook_stats_send, HOOK_PRIVATE | HOOK_MSG) +int hook_stats_send(struct path *p, struct hook *h, int when) +{ + struct private { + struct node *dest; + int ratio; + } *private = h->private; + + switch (when) { + case HOOK_INIT: + if (!h->parameter) + error("Missing parameter for hook 'stats_send'"); + + private = h->private = alloc(sizeof(struct private)); + + private->dest = list_lookup(&nodes, h->parameter); + if (!private->dest) + error("Invalid destination node '%s' for hook 'stats_send'", h->parameter); + break; + + case HOOK_DEINIT: + free(private); + break; + + case HOOK_MSG: { + struct msg m = MSG_INIT(0); + + m.data[m.length++].f = p->sent; + m.data[m.length++].f = p->received; + m.data[m.length++].f = p->invalid; + m.data[m.length++].f = p->skipped; + m.data[m.length++].f = p->dropped; + m.data[m.length++].f = p->hist_delay.last, + m.data[m.length++].f = p->hist_gap.last; + + /* Send single message with statistics to destination node */ + node_write_single(private->dest, &m); + + break; + } + } + + return 0; +} \ No newline at end of file diff --git a/server/src/path.c b/server/src/path.c index 85a7f80a7..535c9ffbf 100644 --- a/server/src/path.c +++ b/server/src/path.c @@ -13,7 +13,6 @@ #include "path.h" #include "timing.h" #include "config.h" -#include "stats.h" #ifndef sigev_notify_thread_id #define sigev_notify_thread_id _sigev_un._tid @@ -42,8 +41,9 @@ static void path_write(struct path *p) int path_run_hook(struct path *p, enum hook_type t) { int ret = 0; - ret += ((hook_cb_t) it->ptr)(p); list_foreach(struct hook *h, &p->hooks) { + if (h->type & t) + ret += ((hook_cb_t) h->cb)(p, h, t); } return ret; @@ -124,6 +124,12 @@ int path_start(struct path *p) char *buf = path_print(p); info("Starting path: %s (poolsize = %u, msgsize = %u, #hooks = %zu)", buf, p->poolsize, p->msgsize, list_length(&p->hooks)); free(buf); + + /* We sort the hooks according to their priority before starting the path */ + int hook_cmp(const void *a, const void *b) { + return ((struct hook *) a)->priority - ((struct hook *) b)->priority; + } + list_sort(&p->hooks, hook_cmp); if (path_run_hook(p, HOOK_PATH_START)) return -1; @@ -188,55 +194,27 @@ char * path_print(struct path *p) return buf; } -int path_reset(struct path *p) -{ - if (path_run_hook(p, HOOK_PATH_RESTART)) - return -1; - - p->sent = - p->received = - p->invalid = - p->skipped = - p->dropped = 0; - - return 0; -} - struct path * path_create() { struct path *p = alloc(sizeof(struct path)); list_init(&p->destinations, NULL); + list_init(&p->hooks, free); - for (int i = 0; i < HOOK_MAX; i++) - list_init(&p->hooks[i], NULL); - -#define hook_add(type, priority, cb) list_insert(&p->hooks[type], priority, cb) - - hook_add(HOOK_MSG, 1, hook_verify); - hook_add(HOOK_MSG, 2, hook_restart); - hook_add(HOOK_MSG, 3, hook_drop); - hook_add(HOOK_MSG, 4, stats_collect); - - hook_add(HOOK_PATH_START, 1, stats_start); - - hook_add(HOOK_PATH_STOP, 2, stats_show); - hook_add(HOOK_PATH_STOP, 3, stats_stop); - - hook_add(HOOK_PATH_RESTART, 1, stats_line); - hook_add(HOOK_PATH_RESTART, 3, stats_reset); - - hook_add(HOOK_PERIODIC, 1, stats_line); + list_foreach(struct hook *h, &hooks) { + if (h->type & HOOK_INTERNAL) + list_push(&p->hooks, memdup(h, sizeof(*h))); + } return p; } void path_destroy(struct path *p) { + path_run_hook(p, HOOK_DEINIT); + list_destroy(&p->destinations); - - for (int i = 0; i < HOOK_MAX; i++) - list_destroy(&p->hooks[i]); + list_destroy(&p->hooks); free(p->pool); free(p); diff --git a/server/src/server.c b/server/src/server.c index ba21a6443..2f6d6c5cb 100644 --- a/server/src/server.c +++ b/server/src/server.c @@ -19,7 +19,6 @@ #include "cfg.h" #include "path.h" #include "node.h" -#include "stats.h" #include "checks.h" #ifdef ENABLE_OPAL_ASYNC @@ -194,7 +193,9 @@ int main(int argc, char *argv[]) /* Run! */ if (settings.stats > 0) { - stats_header(); + info("%-32s : %-8s %-8s %-8s %-8s %-8s", + "Source " MAG("=>") " Destination", "#Sent", "#Recv", "#Drop", "#Skip", "#Invalid"); + line(); do list_foreach(struct path *p, &paths) { usleep(settings.stats * 1e6); diff --git a/server/src/stats.c b/server/src/stats.c deleted file mode 100644 index 7bb2b84dc..000000000 --- a/server/src/stats.c +++ /dev/null @@ -1,80 +0,0 @@ -/** Hook functions to collect statistics - * - * @file - * @author Steffen Vogel - * @copyright 2014-2015, Institute for Automation of Complex Power Systems, EONERC - * This file is part of S2SS. All Rights Reserved. Proprietary and confidential. - * Unauthorized copying of this file, via any medium is strictly prohibited. - *********************************************************************************/ - -#include "stats.h" -#include "path.h" -#include "timing.h" -#include "utils.h" - -void stats_header() -{ - info("%-32s : %-8s %-8s %-8s %-8s %-8s", - "Source " MAG("=>") " Destination", "#Sent", "#Recv", "#Drop", "#Skip", "#Invalid"); - line(); -} - -int stats_line(struct path *p) -{ - char *buf = path_print(p); - info("%-32s : %-8u %-8u %-8u %-8u %-8u", buf, p->sent, p->received, p->dropped, p->skipped, p->invalid); - free(buf); - - return 0; -} - -int stats_show(struct path *p) -{ - if (p->hist_delay.total) { info("One-way delay:"); hist_print(&p->hist_delay); } - if (p->hist_gap.total) { info("Message gap time:"); hist_print(&p->hist_gap); } - if (p->hist_sequence.total) { info("Sequence number gaps:"); hist_print(&p->hist_sequence); } - - return 0; -} - -int stats_collect(struct path *p) -{ - int dist = p->current->sequence - (int32_t) p->previous->sequence; - - struct timespec ts1 = MSG_TS(p->current); - struct timespec ts2 = MSG_TS(p->previous); - - hist_put(&p->hist_sequence, dist); - hist_put(&p->hist_delay, time_delta(&ts1, &p->ts_recv)); - hist_put(&p->hist_gap, time_delta(&ts2, &ts1)); - - return 0; -} - -int stats_start(struct path *p) -{ - /** @todo Allow configurable bounds for histograms */ - hist_create(&p->hist_sequence, -HIST_SEQ, +HIST_SEQ, 1); - hist_create(&p->hist_delay, 0, 2, 100e-3); - hist_create(&p->hist_gap, 0, 40e-3, 1e-3); - - return 0; -} - -int stats_stop(struct path *p) -{ - hist_destroy(&p->hist_sequence); - hist_destroy(&p->hist_delay); - hist_destroy(&p->hist_gap); - - return 0; -} - -int stats_reset(struct path *p) -{ - hist_reset(&p->hist_sequence); - hist_reset(&p->hist_delay); - hist_reset(&p->hist_gap); - - return 0; -}