mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
rework of hook system.
added new hooks: - skip_first
This commit is contained in:
parent
a66331ccbe
commit
9ceb05d823
11 changed files with 453 additions and 326 deletions
|
@ -1,7 +1,7 @@
|
|||
TARGETS = server send random receive test
|
||||
|
||||
# Common objs
|
||||
OBJS = path.o node.o hooks.o msg.o cfg.o stats.o
|
||||
OBJS = path.o node.o hooks.o msg.o cfg.o
|
||||
# Helper libs
|
||||
OBJS += utils.o list.o hist.o log.o timing.o checks.o
|
||||
|
||||
|
|
|
@ -82,6 +82,15 @@ int config_parse_nodelist(config_setting_t *cfg, struct list *nodes, struct list
|
|||
**/
|
||||
int config_parse_hooklist(config_setting_t *cfg, struct list *hooks);
|
||||
|
||||
/** Parse a single hook and append it to the list.
|
||||
* A hook definition is composed of the hook name and optional parameters
|
||||
* seperated by a colon.
|
||||
*
|
||||
* Examples:
|
||||
* "print:stdout"
|
||||
*/
|
||||
int config_parse_hook(config_setting_t *cfg, struct list *list);
|
||||
|
||||
/** Parse a single node and add it to the global configuration.
|
||||
*
|
||||
* @param cfg A libconfig object pointing to the node.
|
||||
|
|
|
@ -51,19 +51,18 @@
|
|||
{ "/sys/class/net/eth0/address" , "50:e5:49:eb:74:0c" }, \
|
||||
{ "/etc/machine-id", "0d8399d0216314f083b9ed2053a354a8" }, \
|
||||
{ "/dev/sda2", "\x53\xf6\xb5\xeb\x8b\x16\x46\xdc\x8d\x8f\x5b\x70\xb8\xc9\x1a\x2a", 0x468 } }
|
||||
|
||||
/* Hard coded configuration of hook functions */
|
||||
#define HOOK_FIR_INDEX 0 /**< Which value inside a message should be filtered? */
|
||||
|
||||
/** Coefficients for simple FIR-LowPass:
|
||||
* F_s = 1kHz, F_pass = 100 Hz, F_block = 300
|
||||
*
|
||||
* Tip: Use MATLAB's filter design tool and export coefficients
|
||||
* with the integrated C-Header export
|
||||
*/
|
||||
#define HOOK_FIR_COEFFS { -0.003658148158728, -0.008882653268281, 0.008001024183003, \
|
||||
0.08090485991761, 0.2035239551043, 0.3040703593515, \
|
||||
0.3040703593515, 0.2035239551043, 0.08090485991761, \
|
||||
0.008001024183003, -0.008882653268281,-0.003658148158728 }
|
||||
|
||||
#define HOOK_TS_INDEX -1 /**< The last value of message should be overwritten by a timestamp. */
|
||||
#define HOOK_DECIMATE_RATIO 30 /**< Only forward every 30th message to the destination nodes. */
|
||||
|
||||
#define HOOK_DEDUP_TYPE HOOK_ASYNC
|
||||
#define HOOK_DEDUP_TRESH 1e-3 /**< Do not send messages when difference of values to last message is smaller than this threshold */
|
||||
/** Global configuration */
|
||||
struct settings {
|
||||
/** Process priority (lower is better) */
|
||||
|
|
|
@ -23,16 +23,17 @@
|
|||
|
||||
#include <time.h>
|
||||
|
||||
#define REGISTER_HOOK(name, prio, fnc, type) \
|
||||
__attribute__((constructor)) void __register_ ## fnc () { \
|
||||
static struct hook h = { name, prio, fnc, type }; \
|
||||
list_push(&hooks, &h); \
|
||||
#define REGISTER_HOOK(name, prio, fnc, type) \
|
||||
__attribute__((constructor)) void __register_ ## fnc () { \
|
||||
static struct hook h = { name, NULL, prio, type, NULL, fnc }; \
|
||||
list_push(&hooks, &h); \
|
||||
}
|
||||
|
||||
/* The configuration of hook parameters is done in "config.h" */
|
||||
|
||||
/* Forward declarations */
|
||||
struct path;
|
||||
struct hook;
|
||||
|
||||
/** This is a list of hooks which can be used in the configuration file. */
|
||||
extern struct list hooks;
|
||||
|
@ -40,34 +41,51 @@ extern struct list hooks;
|
|||
/** Callback type of hook function
|
||||
*
|
||||
* @param p The path which is processing this message.
|
||||
* @param h The hook datastructure which contains parameter, name and private context for the hook.
|
||||
* @param when Provides the type of hook for which this occurence of the callback function was executed. See hook_type for possible values.
|
||||
* @retval 0 Success. Continue processing and forwarding the message.
|
||||
* @retval <0 Error. Drop the message.
|
||||
*/
|
||||
typedef int (*hook_cb_t)(struct path *p);
|
||||
typedef int (*hook_cb_t)(struct path *p, struct hook *h, int when);
|
||||
|
||||
/** The type of a hook defines when a hook will be exectuted. */
|
||||
/** The type of a hook defines when a hook will be exectuted. This is used as a bitmask. */
|
||||
enum hook_type {
|
||||
HOOK_PATH_START, /**< Called whenever a path is started; before threads are created. */
|
||||
HOOK_PATH_STOP, /**< Called whenever a path is stopped; after threads are destoyed. */
|
||||
HOOK_PATH_RESTART, /**< Called whenever a new simulation case is started. This is detected by a sequence no equal to zero. */
|
||||
HOOK_PATH_START = 1 << 0, /**< Called whenever a path is started; before threads are created. */
|
||||
HOOK_PATH_STOP = 1 << 1, /**< Called whenever a path is stopped; after threads are destoyed. */
|
||||
HOOK_PATH_RESTART = 1 << 2, /**< Called whenever a new simulation case is started. This is detected by a sequence no equal to zero. */
|
||||
|
||||
HOOK_PRE, /**< Called when a new packet of messages (samples) was received. */
|
||||
HOOK_POST, /**< Called after each message (sample) of a packet was processed. */
|
||||
HOOK_MSG, /**< Called for each message (sample) in a packet. */
|
||||
HOOK_ASYNC, /**< Called asynchronously with fixed rate (see path::rate). */
|
||||
HOOK_PRE = 1 << 3, /**< Called when a new packet of messages (samples) was received. */
|
||||
HOOK_POST = 1 << 4, /**< Called after each message (sample) of a packet was processed. */
|
||||
HOOK_MSG = 1 << 5, /**< Called for each message (sample) in a packet. */
|
||||
HOOK_ASYNC = 1 << 6, /**< Called asynchronously with fixed rate (see path::rate). */
|
||||
|
||||
HOOK_PERIODIC, /**< Called periodically. Period is set by global 'stats' option in the configuration file. */
|
||||
|
||||
HOOK_MAX
|
||||
HOOK_PERIODIC = 1 << 7, /**< Called periodically. Period is set by global 'stats' option in the configuration file. */
|
||||
|
||||
HOOK_INIT = 1 << 8, /**< Called to allocate and init hook-private data */
|
||||
HOOK_DEINIT = 1 << 9, /**< Called to free hook-private data */
|
||||
|
||||
/** @{ Classes of hooks */
|
||||
/** Internal hooks are mandatory. */
|
||||
HOOK_INTERNAL = 1 << 16,
|
||||
/** Hooks which are using private data must allocate and free them propery. */
|
||||
HOOK_PRIVATE = HOOK_INIT | HOOK_DEINIT,
|
||||
/** All path related actions */
|
||||
HOOK_PATH = HOOK_PATH_START | HOOK_PATH_STOP | HOOK_PATH_RESTART,
|
||||
/** Hooks which are used to collect statistics. */
|
||||
HOOK_STATS = HOOK_INTERNAL | HOOK_PRIVATE | HOOK_PATH | HOOK_MSG | HOOK_PERIODIC,
|
||||
/** All hooks */
|
||||
HOOK_ALL = HOOK_INTERNAL - 1
|
||||
/** @} */
|
||||
};
|
||||
|
||||
/** Descriptor for user defined hooks. See hooks[]. */
|
||||
struct hook {
|
||||
/** The unique name of this hook. This must be the first member! */
|
||||
const char *name;
|
||||
int priority;
|
||||
hook_cb_t callback;
|
||||
enum hook_type type;
|
||||
const char *name; /**< The unique name of this hook. This must be the first member! */
|
||||
const char *parameter; /**< A parameter string for this hook. Can be used to configure the hook behaviour. */
|
||||
int priority; /**< A priority to change the order of execution within one type of hook */
|
||||
enum hook_type type; /**< The type of the hook as a bitfield */
|
||||
void *private; /**< Private data for this hook. This pointer can be used to pass data between consecutive calls of the callback. */
|
||||
hook_cb_t cb; /**< The hook callback function as a function pointer. */
|
||||
};
|
||||
|
||||
/** The following prototypes are example hooks
|
||||
|
@ -76,36 +94,42 @@ struct hook {
|
|||
* @{
|
||||
*/
|
||||
|
||||
/** Example hook: Drop messages whose values are similiar to the previous ones */
|
||||
int hook_deduplicate(struct path *);
|
||||
|
||||
/** Example hook: Print the message. */
|
||||
int hook_print(struct path *p);
|
||||
int hook_print(struct path *p, struct hook *h, int when);
|
||||
|
||||
/** Example hook: Drop messages. */
|
||||
int hook_decimate(struct path *p);
|
||||
int hook_decimate(struct path *p, struct hook *h, int when);
|
||||
|
||||
/** Example hook: Convert the message values to fixed precision. */
|
||||
int hook_tofixed(struct path *p);
|
||||
/** Example hook: Convert the values of a message between fixed (integer) and floating point representation. */
|
||||
int hook_convert(struct path *p, struct hook *h, int when);
|
||||
|
||||
/** Example hook: overwrite timestamp of message. */
|
||||
int hook_ts(struct path *p);
|
||||
int hook_ts(struct path *p, struct hook *h, int when);
|
||||
|
||||
/** Example hook: Finite-Impulse-Response (FIR) filter. */
|
||||
int hook_fir(struct path *p);
|
||||
int hook_fir(struct path *p, struct hook *h, int when);
|
||||
|
||||
/** Example hook: Discrete Fourier Transform */
|
||||
int hook_dft(struct path *p);
|
||||
/** Example hook: drop first samples after simulation restart */
|
||||
int hook_skip_first(struct path *p, struct hook *h, int when);
|
||||
|
||||
/* The following prototypes are core hook functions */
|
||||
/** Example hook: Skip messages whose values are similiar to the previous ones */
|
||||
int hook_skip_unchanged(struct path *p, struct hook *h, int when);
|
||||
|
||||
/* The following hooks are used to implement core functionality */
|
||||
|
||||
/** Core hook: verify message headers. Invalid messages will be dropped. */
|
||||
int hook_verify(struct path *p);
|
||||
int hook_verify(struct path *p, struct hook *h, int when);
|
||||
|
||||
/** Core hook: reset the path in case a new simulation was started. */
|
||||
int hook_restart(struct path *p);
|
||||
int hook_restart(struct path *p, struct hook *h, int when);
|
||||
|
||||
/** Core hook: check if sequence number is correct. Otherwise message will be dropped */
|
||||
int hook_drop(struct path *p);
|
||||
int hook_drop(struct path *p, struct hook *h, int when);
|
||||
|
||||
/** Core hook: collect statistics */
|
||||
int hook_stats(struct path *p, struct hook *h, int when);
|
||||
|
||||
/** Core hook: send path statistics to another node */
|
||||
int hook_stats_send(struct path *p, struct hook *h, int when);
|
||||
|
||||
#endif /** _HOOKS_H_ @} @} */
|
||||
|
|
|
@ -40,7 +40,7 @@ struct path
|
|||
/** List of all outgoing nodes */
|
||||
struct list destinations;
|
||||
/** List of function pointers to hooks */
|
||||
struct list hooks[HOOK_MAX];
|
||||
struct list hooks;
|
||||
|
||||
/** Timer file descriptor for fixed rate sending */
|
||||
int tfd;
|
||||
|
@ -119,15 +119,6 @@ int path_start(struct path *p);
|
|||
*/
|
||||
int path_stop(struct path *p);
|
||||
|
||||
|
||||
/** Reset internal counters and histogram of a path.
|
||||
*
|
||||
* @param p A pointer to the path structure.
|
||||
* @retval 0 Success. Everything went well.
|
||||
* @retval <0 Error. Something went wrong.
|
||||
*/
|
||||
int path_reset(struct path *p);
|
||||
|
||||
/** Show some basic statistics for a path.
|
||||
*
|
||||
* @param p A pointer to the path structure.
|
||||
|
|
|
@ -1,36 +0,0 @@
|
|||
/** Hook functions to collect statistics
|
||||
*
|
||||
* @file
|
||||
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
|
||||
* @copyright 2014-2015, Institute for Automation of Complex Power Systems, EONERC
|
||||
* This file is part of S2SS. All Rights Reserved. Proprietary and confidential.
|
||||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
*********************************************************************************/
|
||||
|
||||
#ifndef _STATS_H_
|
||||
#define _STATS_H_
|
||||
|
||||
/* Forward declarations */
|
||||
struct path;
|
||||
|
||||
/** Print a table header for statistics printed by stats_line() */
|
||||
void stats_header();
|
||||
|
||||
/** Print a single line of stats including received, sent, invalid and dropped packet counters */
|
||||
int stats_line(struct path *p);
|
||||
|
||||
int stats_show(struct path *p);
|
||||
|
||||
/** Update histograms */
|
||||
int stats_collect(struct path *p);
|
||||
|
||||
/** Create histograms */
|
||||
int stats_start(struct path *p);
|
||||
|
||||
/** Destroy histograms */
|
||||
int stats_stop(struct path *p);
|
||||
|
||||
/** Reset all statistic counters to zero */
|
||||
int stats_reset(struct path *p);
|
||||
|
||||
#endif /* _STATS_H_ */
|
|
@ -124,7 +124,10 @@ int config_parse_path(config_setting_t *cfg,
|
|||
/* Optional settings */
|
||||
config_setting_t *cfg_hook = config_setting_get_member(cfg, "hook");
|
||||
if (cfg_hook)
|
||||
config_parse_hooklist(cfg_hook, p->hooks);
|
||||
config_parse_hooklist(cfg_hook, &p->hooks);
|
||||
|
||||
/* Initialize hooks and their private data / parameters */
|
||||
path_run_hook(p, HOOK_INIT);
|
||||
|
||||
if (!config_setting_lookup_bool(cfg, "enabled", &enabled))
|
||||
enabled = 1;
|
||||
|
@ -223,38 +226,14 @@ int config_parse_nodelist(config_setting_t *cfg, struct list *list, struct list
|
|||
}
|
||||
|
||||
int config_parse_hooklist(config_setting_t *cfg, struct list *list) {
|
||||
const char *str;
|
||||
const struct hook *hook;
|
||||
|
||||
switch (config_setting_type(cfg)) {
|
||||
case CONFIG_TYPE_STRING:
|
||||
str = config_setting_get_string(cfg);
|
||||
if (str) {
|
||||
hook = list_lookup(&hooks, str);
|
||||
if (hook)
|
||||
list_insert(&list[hook->type], hook->priority, hook->callback);
|
||||
else
|
||||
cerror(cfg, "Unknown hook function '%s'", str);
|
||||
}
|
||||
else
|
||||
cerror(cfg, "Invalid hook function");
|
||||
config_parse_hook(cfg, list);
|
||||
break;
|
||||
|
||||
case CONFIG_TYPE_ARRAY:
|
||||
for (int i = 0; i<config_setting_length(cfg); i++) {
|
||||
config_setting_t *elm = config_setting_get_elem(cfg, i);
|
||||
|
||||
str = config_setting_get_string(elm);
|
||||
if (str) {
|
||||
hook = list_lookup(&hooks, str);
|
||||
if (hook)
|
||||
list_insert(&list[hook->type], hook->priority, hook->callback);
|
||||
else
|
||||
cerror(elm, "Invalid hook function '%s'", str);
|
||||
}
|
||||
else
|
||||
cerror(cfg, "Invalid hook function");
|
||||
}
|
||||
for (int i = 0; i < config_setting_length(cfg); i++)
|
||||
config_parse_hook(config_setting_get_elem(cfg, i), list);
|
||||
break;
|
||||
|
||||
default:
|
||||
|
@ -264,6 +243,31 @@ int config_parse_hooklist(config_setting_t *cfg, struct list *list) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int config_parse_hook(config_setting_t *cfg, struct list *list)
|
||||
{
|
||||
struct hook *hook, *copy;
|
||||
const char *name = config_setting_get_string(cfg);
|
||||
if (!name)
|
||||
cerror(cfg, "Invalid hook function");
|
||||
|
||||
char *param = strchr(name, ':');
|
||||
if (param) { /* Split hook line */
|
||||
*param = '\0';
|
||||
param++;
|
||||
}
|
||||
|
||||
hook = list_lookup(&hooks, name);
|
||||
if (!hook)
|
||||
cerror(cfg, "Unknown hook function '%s'", name);
|
||||
|
||||
copy = memdup(hook, sizeof(struct hook));
|
||||
copy->parameter = param;
|
||||
|
||||
list_push(list, copy);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int config_parse_node(config_setting_t *cfg, struct list *nodes, struct settings *set)
|
||||
{
|
||||
const char *type;
|
||||
|
|
|
@ -22,61 +22,28 @@
|
|||
#include "path.h"
|
||||
#include "utils.h"
|
||||
|
||||
/* Some hooks can be configured by constants in te file "config.h" */
|
||||
extern struct list nodes;
|
||||
|
||||
struct list hooks;
|
||||
|
||||
REGISTER_HOOK("deduplicate", 99, hook_deduplicate, HOOK_DEDUP_TYPE)
|
||||
int hook_deduplicate(struct path *p)
|
||||
{
|
||||
int ret = 0;
|
||||
#if HOOK_DEDUP_TYPE == HOOK_ASYNC
|
||||
/** Thread local storage (TLS) is used to maintain a copy of the last run of the hook */
|
||||
static __thread struct msg previous = MSG_INIT(0);
|
||||
struct msg *prev = &previous;
|
||||
#else
|
||||
struct msg *prev = p->previous;
|
||||
#endif
|
||||
struct msg *cur = p->current;
|
||||
|
||||
for (int i = 0; i < MIN(cur->length, prev->length); i++) {
|
||||
if (fabs(cur->data[i].f - prev->data[i].f) > HOOK_DEDUP_TRESH)
|
||||
goto out;
|
||||
}
|
||||
|
||||
ret = -1; /* no appreciable change in values, we will drop the packet */
|
||||
|
||||
out:
|
||||
#if HOOK_DEDUP_TYPE == HOOK_ASYNC
|
||||
memcpy(prev, cur, sizeof(struct msg)); /* save current message for next run */
|
||||
#endif
|
||||
return ret;
|
||||
}
|
||||
|
||||
REGISTER_HOOK("print", 99, hook_print, HOOK_MSG)
|
||||
int hook_print(struct path *p)
|
||||
int hook_print(struct path *p, struct hook *h, int when)
|
||||
{
|
||||
struct msg *m = p->current;
|
||||
struct timespec ts = MSG_TS(m);
|
||||
double offset = time_delta(&MSG_TS(m), &p->ts_recv);
|
||||
int flags = MSG_PRINT_ALL;
|
||||
|
||||
msg_fprint(stdout, m, MSG_PRINT_ALL, time_delta(&ts, &p->ts_recv));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
REGISTER_HOOK("tofixed", 99, hook_tofixed, HOOK_MSG)
|
||||
int hook_tofixed(struct path *p)
|
||||
{
|
||||
struct msg *m = p->current;
|
||||
|
||||
for (int i = 0; i < m->length; i++)
|
||||
m->data[i].i = m->data[i].f * 1e3;
|
||||
/* We dont show the offset if its to large */
|
||||
if (offset > 1e9)
|
||||
flags &= ~MSG_PRINT_OFFSET;
|
||||
|
||||
msg_fprint(stdout, m, flags, offset);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
REGISTER_HOOK("ts", 99, hook_ts, HOOK_MSG)
|
||||
int hook_ts(struct path *p)
|
||||
int hook_ts(struct path *p, struct hook *h, int when)
|
||||
{
|
||||
struct msg *m = p->current;
|
||||
|
||||
|
@ -86,59 +53,223 @@ int hook_ts(struct path *p)
|
|||
return 0;
|
||||
}
|
||||
|
||||
REGISTER_HOOK("fir", 99, hook_fir, HOOK_MSG)
|
||||
int hook_fir(struct path *p)
|
||||
{
|
||||
/** Coefficients for simple FIR-LowPass:
|
||||
* F_s = 1kHz, F_pass = 100 Hz, F_block = 300
|
||||
*
|
||||
* Tip: Use MATLAB's filter design tool and export coefficients
|
||||
* with the integrated C-Header export
|
||||
*/
|
||||
static const double coeffs[] = HOOK_FIR_COEFFS;
|
||||
REGISTER_HOOK("skip_unchanged", 99, hook_skip_unchanged, HOOK_PRIVATE | HOOK_ASYNC)
|
||||
int hook_skip_unchanged(struct path *p, struct hook *h, int when)
|
||||
{
|
||||
struct private {
|
||||
double threshold;
|
||||
struct msg previous;
|
||||
} *private = h->private;
|
||||
|
||||
/** Per path thread local storage for unfiltered sample values.
|
||||
* The message ringbuffer (p->pool & p->current) will contain filtered data!
|
||||
*/
|
||||
static __thread double *past = NULL;
|
||||
|
||||
/** @todo Avoid dynamic allocation at runtime */
|
||||
if (!past)
|
||||
alloc(p->poolsize * sizeof(double));
|
||||
|
||||
|
||||
/* Current value of interest */
|
||||
float *cur = &p->current->data[HOOK_FIR_INDEX].f;
|
||||
|
||||
/* Save last sample, unfiltered */
|
||||
past[p->received % p->poolsize] = *cur;
|
||||
switch (when) {
|
||||
case HOOK_INIT:
|
||||
private = h->private = alloc(sizeof(struct private));
|
||||
|
||||
if (!h->parameter)
|
||||
error("Missing parameter for hook 'deduplication'");
|
||||
|
||||
/* Reset accumulator */
|
||||
*cur = 0;
|
||||
private->threshold = strtof(h->parameter, NULL);
|
||||
if (!private->threshold)
|
||||
error("Failed to parse parameter '%s' for hook 'deduplication'", h->parameter);
|
||||
break;
|
||||
|
||||
case HOOK_DEINIT:
|
||||
free(private);
|
||||
break;
|
||||
|
||||
case HOOK_ASYNC: {
|
||||
int ret = 0;
|
||||
|
||||
struct msg *prev = &private->previous;
|
||||
struct msg *cur = p->current;
|
||||
|
||||
/* FIR loop */
|
||||
for (int i = 0; i < MIN(ARRAY_LEN(coeffs), p->poolsize); i++)
|
||||
*cur += coeffs[i] * past[p->received+p->poolsize-i];
|
||||
for (int i = 0; i < MIN(cur->length, prev->length); i++) {
|
||||
if (fabs(cur->data[i].f - prev->data[i].f) > private->threshold)
|
||||
goto out;
|
||||
}
|
||||
|
||||
ret = -1; /* no appreciable change in values, we will drop the packet */
|
||||
|
||||
out: memcpy(prev, cur, sizeof(struct msg)); /* save current message for next run */
|
||||
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
REGISTER_HOOK("convert", 99, hook_convert, HOOK_PRIVATE | HOOK_MSG)
|
||||
int hook_convert(struct path *p, struct hook *h, int when)
|
||||
{
|
||||
struct private {
|
||||
enum { TO_FIXED, TO_FLOAT } mode;
|
||||
} *private = h->private;
|
||||
|
||||
switch (when) {
|
||||
case HOOK_INIT:
|
||||
private = h->private = alloc(sizeof(struct private));
|
||||
|
||||
if (!h->parameter)
|
||||
error("Missing parameter for hook 'deduplication'");
|
||||
|
||||
if (!strcmp(h->parameter, "fixed"))
|
||||
private->mode = TO_FIXED;
|
||||
else if (!strcmp(h->parameter, "float"))
|
||||
private->mode = TO_FLOAT;
|
||||
else
|
||||
error("Invalid parameter '%s' for hook 'convert'", h->parameter);
|
||||
break;
|
||||
|
||||
case HOOK_DEINIT:
|
||||
free(private);
|
||||
break;
|
||||
|
||||
case HOOK_MSG: {
|
||||
struct msg *m = p->current;
|
||||
|
||||
for (int i = 0; i < m->length; i++) {
|
||||
switch (private->mode) {
|
||||
/** @todo allow precission to be configured via parameter */
|
||||
case TO_FIXED: m->data[i].i = m->data[i].f * 1e3; break;
|
||||
case TO_FLOAT: m->data[i].f = m->data[i].i; break;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
REGISTER_HOOK("decimate", 99, hook_decimate, HOOK_POST)
|
||||
int hook_decimate(struct path *p)
|
||||
REGISTER_HOOK("fir", 99, hook_fir, HOOK_PRIVATE | HOOK_MSG)
|
||||
int hook_fir(struct path *p, struct hook *h, int when)
|
||||
{
|
||||
/* Only sent every HOOK_DECIMATE_RATIO'th message */
|
||||
return p->received % HOOK_DECIMATE_RATIO;
|
||||
/** @todo make this configurable via hook parameters */
|
||||
const static double coeffs[] = HOOK_FIR_COEFFS;
|
||||
|
||||
struct private {
|
||||
double *coeffs;
|
||||
double *history;
|
||||
int index;
|
||||
} *private = h->private;
|
||||
|
||||
switch (when) {
|
||||
case HOOK_INIT:
|
||||
if (!h->parameter)
|
||||
error("Missing parameter for hook 'fir'");
|
||||
|
||||
private = h->private = alloc(sizeof(struct private));
|
||||
|
||||
private->coeffs = memdup(coeffs, sizeof(coeffs));
|
||||
private->history = alloc(sizeof(coeffs));
|
||||
|
||||
private->index = strtol(h->parameter, NULL, 10);
|
||||
if (!private->index)
|
||||
error("Invalid parameter '%s' for hook 'fir'", h->parameter);
|
||||
break;
|
||||
|
||||
case HOOK_DEINIT:
|
||||
free(private->coeffs);
|
||||
free(private->history);
|
||||
free(private);
|
||||
break;
|
||||
|
||||
case HOOK_MSG: {
|
||||
/* Current value of interest */
|
||||
float *cur = &p->current->data[private->index].f;
|
||||
|
||||
/* Save last sample, unfiltered */
|
||||
private->history[p->received % p->poolsize] = *cur;
|
||||
|
||||
/* Reset accumulator */
|
||||
*cur = 0;
|
||||
|
||||
/* FIR loop */
|
||||
for (int i = 0; i < MIN(ARRAY_LEN(coeffs), p->poolsize); i++)
|
||||
*cur += private->coeffs[i] * private->history[p->received+p->poolsize-i];
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
REGISTER_HOOK("dft", 99, hook_dft, HOOK_POST)
|
||||
int hook_dft(struct path *p)
|
||||
REGISTER_HOOK("decimate", 99, hook_decimate, HOOK_PRIVATE | HOOK_POST)
|
||||
int hook_decimate(struct path *p, struct hook *h, int when)
|
||||
{
|
||||
return 0; /** @todo Implement */
|
||||
struct private {
|
||||
long ratio;
|
||||
} *private = h->private;
|
||||
|
||||
switch (when) {
|
||||
case HOOK_INIT:
|
||||
if (!h->parameter)
|
||||
error("Missing parameter for hook 'decimate'");
|
||||
|
||||
private = h->private = alloc(sizeof(struct private));
|
||||
|
||||
private->ratio = strtol(h->parameter, NULL, 10);
|
||||
if (!private->ratio)
|
||||
error("Invalid parameter '%s' for hook 'decimate'", h->parameter);
|
||||
break;
|
||||
|
||||
case HOOK_DEINIT:
|
||||
free(private);
|
||||
break;
|
||||
|
||||
case HOOK_POST:
|
||||
return p->received % private->ratio;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/** System hooks */
|
||||
REGISTER_HOOK("skip_first", 99, hook_skip_first, HOOK_PRIVATE | HOOK_POST | HOOK_PATH )
|
||||
int hook_skip_first(struct path *p, struct hook *h, int when)
|
||||
{
|
||||
struct private {
|
||||
double wait; /**< Number of seconds to wait until first message is not skipped */
|
||||
struct timespec started; /**< Timestamp of last simulation restart */
|
||||
} *private = h->private;
|
||||
|
||||
switch (when) {
|
||||
case HOOK_INIT:
|
||||
if (!h->parameter)
|
||||
error("Missing parameter for hook 'skip_first'");
|
||||
|
||||
int hook_restart(struct path *p)
|
||||
private = h->private = alloc(sizeof(struct private));
|
||||
|
||||
private->wait = strtof(h->parameter, NULL);
|
||||
if (!private->wait)
|
||||
error("Invalid parameter '%s' for hook 'skip_first'", h->parameter);
|
||||
break;
|
||||
|
||||
case HOOK_DEINIT:
|
||||
free(private);
|
||||
break;
|
||||
|
||||
case HOOK_PATH_RESTART:
|
||||
private->started = p->ts_recv;
|
||||
break;
|
||||
|
||||
case HOOK_PATH_START:
|
||||
clock_gettime(CLOCK_REALTIME, &private->started);
|
||||
break;
|
||||
|
||||
case HOOK_POST: {
|
||||
double delta = time_delta(&private->started, &p->ts_recv);
|
||||
return delta < private->wait
|
||||
? -1 /* skip */
|
||||
: 0; /* send */
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
REGISTER_HOOK("restart", 1, hook_restart, HOOK_INTERNAL | HOOK_MSG)
|
||||
int hook_restart(struct path *p, struct hook *h, int when)
|
||||
{
|
||||
if (p->current->sequence == 0 &&
|
||||
p->previous->sequence <= UINT32_MAX - 32) {
|
||||
|
@ -147,26 +278,34 @@ int hook_restart(struct path *p)
|
|||
buf, p->previous->sequence, p->current->sequence);
|
||||
free(buf);
|
||||
|
||||
path_reset(p);
|
||||
p->sent =
|
||||
p->invalid =
|
||||
p->skipped =
|
||||
p->dropped = 0;
|
||||
p->received = 1;
|
||||
|
||||
if (path_run_hook(p, HOOK_PATH_RESTART))
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int hook_verify(struct path *p)
|
||||
REGISTER_HOOK("verify", 2, hook_verify, HOOK_INTERNAL | HOOK_MSG)
|
||||
int hook_verify(struct path *p, struct hook *h, int when)
|
||||
{
|
||||
int reason = msg_verify(p->current);
|
||||
if (reason) {
|
||||
p->invalid++;
|
||||
warn("Received invalid message (reason=%d)", reason);
|
||||
|
||||
warn("Received invalid message (reason = %d)", reason);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int hook_drop(struct path *p)
|
||||
REGISTER_HOOK("drop", 3, hook_drop, HOOK_INTERNAL | HOOK_MSG)
|
||||
int hook_drop(struct path *p, struct hook *h, int when)
|
||||
{
|
||||
int dist = p->current->sequence - (int32_t) p->previous->sequence;
|
||||
if (dist <= 0 && p->received > 1) {
|
||||
|
@ -176,3 +315,101 @@ int hook_drop(struct path *p)
|
|||
else
|
||||
return 0;
|
||||
}
|
||||
|
||||
REGISTER_HOOK("stats", 2, hook_stats, HOOK_STATS)
|
||||
int hook_stats(struct path *p, struct hook *h, int when)
|
||||
{
|
||||
switch (when) {
|
||||
case HOOK_INIT:
|
||||
/** @todo Allow configurable bounds for histograms */
|
||||
hist_create(&p->hist_sequence, -HIST_SEQ, +HIST_SEQ, 1);
|
||||
hist_create(&p->hist_delay, 0, 2, 100e-3);
|
||||
hist_create(&p->hist_gap, 0, 40e-3, 1e-3);
|
||||
break;
|
||||
|
||||
case HOOK_DEINIT:
|
||||
hist_destroy(&p->hist_sequence);
|
||||
hist_destroy(&p->hist_delay);
|
||||
hist_destroy(&p->hist_gap);
|
||||
break;
|
||||
|
||||
case HOOK_MSG: {
|
||||
struct msg *prev = p->previous, *cur = p->current;
|
||||
|
||||
int dist = cur->sequence - (int32_t) prev->sequence;
|
||||
double delay = time_delta(&p->ts_recv, &MSG_TS(cur));
|
||||
double gap = time_delta(&MSG_TS(prev), &MSG_TS(cur));
|
||||
|
||||
hist_put(&p->hist_sequence, dist);
|
||||
hist_put(&p->hist_delay, delay);
|
||||
hist_put(&p->hist_gap, gap);
|
||||
break;
|
||||
}
|
||||
|
||||
case HOOK_PATH_STOP:
|
||||
if (p->hist_delay.total) { info("One-way delay (received):"); hist_print(&p->hist_delay); }
|
||||
if (p->hist_gap.total) { info("Message gap time:"); hist_print(&p->hist_gap); }
|
||||
if (p->hist_sequence.total) { info("Sequence number gaps:"); hist_print(&p->hist_sequence); }
|
||||
break;
|
||||
|
||||
case HOOK_PATH_RESTART:
|
||||
hist_reset(&p->hist_sequence);
|
||||
hist_reset(&p->hist_delay);
|
||||
hist_reset(&p->hist_gap);
|
||||
break;
|
||||
|
||||
case HOOK_PERIODIC: {
|
||||
char *buf = path_print(p);
|
||||
info("%-32s : %-8u %-8u %-8u %-8u %-8u", buf, p->sent, p->received, p->dropped, p->skipped, p->invalid);
|
||||
free(buf);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
REGISTER_HOOK("stats_send", 99, hook_stats_send, HOOK_PRIVATE | HOOK_MSG)
|
||||
int hook_stats_send(struct path *p, struct hook *h, int when)
|
||||
{
|
||||
struct private {
|
||||
struct node *dest;
|
||||
int ratio;
|
||||
} *private = h->private;
|
||||
|
||||
switch (when) {
|
||||
case HOOK_INIT:
|
||||
if (!h->parameter)
|
||||
error("Missing parameter for hook 'stats_send'");
|
||||
|
||||
private = h->private = alloc(sizeof(struct private));
|
||||
|
||||
private->dest = list_lookup(&nodes, h->parameter);
|
||||
if (!private->dest)
|
||||
error("Invalid destination node '%s' for hook 'stats_send'", h->parameter);
|
||||
break;
|
||||
|
||||
case HOOK_DEINIT:
|
||||
free(private);
|
||||
break;
|
||||
|
||||
case HOOK_MSG: {
|
||||
struct msg m = MSG_INIT(0);
|
||||
|
||||
m.data[m.length++].f = p->sent;
|
||||
m.data[m.length++].f = p->received;
|
||||
m.data[m.length++].f = p->invalid;
|
||||
m.data[m.length++].f = p->skipped;
|
||||
m.data[m.length++].f = p->dropped;
|
||||
m.data[m.length++].f = p->hist_delay.last,
|
||||
m.data[m.length++].f = p->hist_gap.last;
|
||||
|
||||
/* Send single message with statistics to destination node */
|
||||
node_write_single(private->dest, &m);
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -13,7 +13,6 @@
|
|||
#include "path.h"
|
||||
#include "timing.h"
|
||||
#include "config.h"
|
||||
#include "stats.h"
|
||||
|
||||
#ifndef sigev_notify_thread_id
|
||||
#define sigev_notify_thread_id _sigev_un._tid
|
||||
|
@ -42,8 +41,9 @@ static void path_write(struct path *p)
|
|||
int path_run_hook(struct path *p, enum hook_type t)
|
||||
{
|
||||
int ret = 0;
|
||||
ret += ((hook_cb_t) it->ptr)(p);
|
||||
list_foreach(struct hook *h, &p->hooks) {
|
||||
if (h->type & t)
|
||||
ret += ((hook_cb_t) h->cb)(p, h, t);
|
||||
}
|
||||
|
||||
return ret;
|
||||
|
@ -124,6 +124,12 @@ int path_start(struct path *p)
|
|||
char *buf = path_print(p);
|
||||
info("Starting path: %s (poolsize = %u, msgsize = %u, #hooks = %zu)", buf, p->poolsize, p->msgsize, list_length(&p->hooks));
|
||||
free(buf);
|
||||
|
||||
/* We sort the hooks according to their priority before starting the path */
|
||||
int hook_cmp(const void *a, const void *b) {
|
||||
return ((struct hook *) a)->priority - ((struct hook *) b)->priority;
|
||||
}
|
||||
list_sort(&p->hooks, hook_cmp);
|
||||
|
||||
if (path_run_hook(p, HOOK_PATH_START))
|
||||
return -1;
|
||||
|
@ -188,55 +194,27 @@ char * path_print(struct path *p)
|
|||
return buf;
|
||||
}
|
||||
|
||||
int path_reset(struct path *p)
|
||||
{
|
||||
if (path_run_hook(p, HOOK_PATH_RESTART))
|
||||
return -1;
|
||||
|
||||
p->sent =
|
||||
p->received =
|
||||
p->invalid =
|
||||
p->skipped =
|
||||
p->dropped = 0;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
struct path * path_create()
|
||||
{
|
||||
struct path *p = alloc(sizeof(struct path));
|
||||
|
||||
list_init(&p->destinations, NULL);
|
||||
list_init(&p->hooks, free);
|
||||
|
||||
for (int i = 0; i < HOOK_MAX; i++)
|
||||
list_init(&p->hooks[i], NULL);
|
||||
|
||||
#define hook_add(type, priority, cb) list_insert(&p->hooks[type], priority, cb)
|
||||
|
||||
hook_add(HOOK_MSG, 1, hook_verify);
|
||||
hook_add(HOOK_MSG, 2, hook_restart);
|
||||
hook_add(HOOK_MSG, 3, hook_drop);
|
||||
hook_add(HOOK_MSG, 4, stats_collect);
|
||||
|
||||
hook_add(HOOK_PATH_START, 1, stats_start);
|
||||
|
||||
hook_add(HOOK_PATH_STOP, 2, stats_show);
|
||||
hook_add(HOOK_PATH_STOP, 3, stats_stop);
|
||||
|
||||
hook_add(HOOK_PATH_RESTART, 1, stats_line);
|
||||
hook_add(HOOK_PATH_RESTART, 3, stats_reset);
|
||||
|
||||
hook_add(HOOK_PERIODIC, 1, stats_line);
|
||||
list_foreach(struct hook *h, &hooks) {
|
||||
if (h->type & HOOK_INTERNAL)
|
||||
list_push(&p->hooks, memdup(h, sizeof(*h)));
|
||||
}
|
||||
|
||||
return p;
|
||||
}
|
||||
|
||||
void path_destroy(struct path *p)
|
||||
{
|
||||
path_run_hook(p, HOOK_DEINIT);
|
||||
|
||||
list_destroy(&p->destinations);
|
||||
|
||||
for (int i = 0; i < HOOK_MAX; i++)
|
||||
list_destroy(&p->hooks[i]);
|
||||
list_destroy(&p->hooks);
|
||||
|
||||
free(p->pool);
|
||||
free(p);
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
#include "cfg.h"
|
||||
#include "path.h"
|
||||
#include "node.h"
|
||||
#include "stats.h"
|
||||
#include "checks.h"
|
||||
|
||||
#ifdef ENABLE_OPAL_ASYNC
|
||||
|
@ -194,7 +193,9 @@ int main(int argc, char *argv[])
|
|||
|
||||
/* Run! */
|
||||
if (settings.stats > 0) {
|
||||
stats_header();
|
||||
info("%-32s : %-8s %-8s %-8s %-8s %-8s",
|
||||
"Source " MAG("=>") " Destination", "#Sent", "#Recv", "#Drop", "#Skip", "#Invalid");
|
||||
line();
|
||||
|
||||
do list_foreach(struct path *p, &paths) {
|
||||
usleep(settings.stats * 1e6);
|
||||
|
|
|
@ -1,80 +0,0 @@
|
|||
/** Hook functions to collect statistics
|
||||
*
|
||||
* @file
|
||||
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
|
||||
* @copyright 2014-2015, Institute for Automation of Complex Power Systems, EONERC
|
||||
* This file is part of S2SS. All Rights Reserved. Proprietary and confidential.
|
||||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
*********************************************************************************/
|
||||
|
||||
#include "stats.h"
|
||||
#include "path.h"
|
||||
#include "timing.h"
|
||||
#include "utils.h"
|
||||
|
||||
void stats_header()
|
||||
{
|
||||
info("%-32s : %-8s %-8s %-8s %-8s %-8s",
|
||||
"Source " MAG("=>") " Destination", "#Sent", "#Recv", "#Drop", "#Skip", "#Invalid");
|
||||
line();
|
||||
}
|
||||
|
||||
int stats_line(struct path *p)
|
||||
{
|
||||
char *buf = path_print(p);
|
||||
info("%-32s : %-8u %-8u %-8u %-8u %-8u", buf, p->sent, p->received, p->dropped, p->skipped, p->invalid);
|
||||
free(buf);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int stats_show(struct path *p)
|
||||
{
|
||||
if (p->hist_delay.total) { info("One-way delay:"); hist_print(&p->hist_delay); }
|
||||
if (p->hist_gap.total) { info("Message gap time:"); hist_print(&p->hist_gap); }
|
||||
if (p->hist_sequence.total) { info("Sequence number gaps:"); hist_print(&p->hist_sequence); }
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int stats_collect(struct path *p)
|
||||
{
|
||||
int dist = p->current->sequence - (int32_t) p->previous->sequence;
|
||||
|
||||
struct timespec ts1 = MSG_TS(p->current);
|
||||
struct timespec ts2 = MSG_TS(p->previous);
|
||||
|
||||
hist_put(&p->hist_sequence, dist);
|
||||
hist_put(&p->hist_delay, time_delta(&ts1, &p->ts_recv));
|
||||
hist_put(&p->hist_gap, time_delta(&ts2, &ts1));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int stats_start(struct path *p)
|
||||
{
|
||||
/** @todo Allow configurable bounds for histograms */
|
||||
hist_create(&p->hist_sequence, -HIST_SEQ, +HIST_SEQ, 1);
|
||||
hist_create(&p->hist_delay, 0, 2, 100e-3);
|
||||
hist_create(&p->hist_gap, 0, 40e-3, 1e-3);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int stats_stop(struct path *p)
|
||||
{
|
||||
hist_destroy(&p->hist_sequence);
|
||||
hist_destroy(&p->hist_delay);
|
||||
hist_destroy(&p->hist_gap);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int stats_reset(struct path *p)
|
||||
{
|
||||
hist_reset(&p->hist_sequence);
|
||||
hist_reset(&p->hist_delay);
|
||||
hist_reset(&p->hist_gap);
|
||||
|
||||
return 0;
|
||||
}
|
Loading…
Add table
Reference in a new issue