diff --git a/server/include/cfg.h b/server/include/cfg.h index 6e336bb08..a60c493bf 100644 --- a/server/include/cfg.h +++ b/server/include/cfg.h @@ -76,6 +76,8 @@ int config_parse_path(config_setting_t *cfg, int config_parse_nodelist(config_setting_t *cfg, struct list *nodes, struct node **all); +int config_parse_hooks(config_setting_t *cfg, struct list *hooks); + /** Parse a single node and add it to the global configuration. * * @param cfg A libconfig object pointing to the node @@ -83,8 +85,7 @@ int config_parse_nodelist(config_setting_t *cfg, struct list *nodes, struct node * @retval 0 Success. Everything went well. * @retval <0 Error. Something went wrong. */ -int config_parse_node(config_setting_t *cfg, - struct node **nodes); +int config_parse_node(config_setting_t *cfg, struct node **nodes); /** Parse node connection details for OPAL type * diff --git a/server/include/hooks.h b/server/include/hooks.h index ca949c4fc..b245cc108 100644 --- a/server/include/hooks.h +++ b/server/include/hooks.h @@ -48,13 +48,28 @@ hook_cb_t hook_lookup(const char *name); /** Example hook: Print the message. */ int hook_print(struct msg *m, struct path *p); -/** Example hook: Filter the message on some criteria. */ -int hook_filter(struct msg *m, struct path *p); +/** Example hook: Log messages to a logfile in /tmp */ +int hook_log(struct msg *m, struct path *p); + +#define HOOK_LOG_MODE "w+" +#define HOOK_LOG_TEMPLATE "logs/s2ss-%Y_%m_%d-%H_%M_%S.log" + +/** Example hook: Drop messages. */ +int hook_decimate(struct msg *m, struct path *p); + +#define HOOK_DECIMATE_RATIO 10 /** Example hook: Convert the message values to fixed precision. */ int hook_tofixed(struct msg *m, struct path *p); -/** Example hook: Chain multiple hooks */ -int hook_multiple(struct msg *m, struct path *p); +/** Example hook: add timestamp to message. */ +int hook_ts(struct msg *m, struct path *p); + +#define HOOK_TS_INDEX -1 // last message + +/** Example hook: Finite-Impulse-Response (FIR) filter. */ +int hook_fir(struct msg *m, struct path *p); + +#define HOOK_FIR_INDEX 1 #endif /* _HOOKS_H_ */ diff --git a/server/src/cfg.c b/server/src/cfg.c index 5a954e341..6e01e5139 100644 --- a/server/src/cfg.c +++ b/server/src/cfg.c @@ -121,12 +121,9 @@ int config_parse_path(config_setting_t *cfg, cerror(cfg, "Missing output node for path"); /* Optional settings */ - if (config_setting_lookup_string(cfg, "hook", &hook)) { - p->hook = hook_lookup(hook); - - if (!p->hook) - cerror(cfg, "Failed to lookup hook function. Not registred?"); - } + struct config_setting_t *cfg_hook = config_setting_get_member(cfg, "hook"); + if (cfg_hook) + config_parse_hooks(cfg_hook, &p->hooks); config_setting_lookup_bool(cfg, "enabled", &enabled); config_setting_lookup_bool(cfg, "reverse", &reverse); @@ -202,6 +199,38 @@ int config_parse_nodelist(config_setting_t *cfg, struct list *nodes, struct node return 0; } +int config_parse_hooks(config_setting_t *cfg, struct list *hooks) { + const char *str; + hook_cb_t hook; + + switch (config_setting_type(cfg)) { + case CONFIG_TYPE_STRING: + str = config_setting_get_string(cfg); + hook = hook_lookup(str); + if (!hook) + cerror(cfg, "Invalid hook function '%s'", str); + + list_push(hooks, hook); + break; + + case CONFIG_TYPE_ARRAY: + for (int i=0; i * @copyright 2014, Institute for Automation of Complex Power Systems, EONERC */ - + +#include +#include #include +#include +#include #include "msg.h" #include "hooks.h" +#include "path.h" +#include "utils.h" /** @todo Make const */ static struct hook_id hook_list[] = { { hook_print, "print" }, - { hook_filter, "filter" }, + { hook_log, "log" }, + { hook_decimate, "decimate" }, { hook_tofixed, "tofixed" }, - { hook_multiple, "multiple" }, + { hook_ts, "ts" }, + { hook_fir, "fir" }, { NULL } }; @@ -43,13 +51,39 @@ int hook_print(struct msg *m, struct path *p) return 0; } -int hook_filter(struct msg *m, struct path *p) +int hook_log(struct msg *m, struct path *p) { - /* Drop every 10th message */ - if (m->sequence % 10 == 0) - return -1; - else - return 0; + static pthread_key_t pkey; + FILE *file = pthread_getspecific(pkey); + + if (!file) { + char fstr[64], pstr[33]; + path_print(p, pstr, sizeof(pstr)); + + struct tm tm; + time_t ts = time(NULL); + localtime_r(&ts, &tm); + strftime(fstr, sizeof(fstr), HOOK_LOG_TEMPLATE, &tm); + + + + file = fopen(fstr, HOOK_LOG_MODE); + if (file) + debug(5, "Opened log file for path %s: %s", pstr, fstr); + + pthread_key_create(&pkey, (void (*)(void *)) fclose); + pthread_setspecific(pkey, file); + } + + msg_fprint(file, m); + + return 0; +} + +int hook_decimate(struct msg *m, struct path *p) +{ + /* Drop every HOOK_DECIMATE_RATIO'th message */ + return (m->sequence % HOOK_DECIMATE_RATIO == 0) ? -1 : 0; } int hook_tofixed(struct msg *m, struct path *p) @@ -61,12 +95,51 @@ int hook_tofixed(struct msg *m, struct path *p) return 0; } -int hook_multiple(struct msg *m, struct path *p) +int hook_ts(struct msg *m, struct path *p) { - if (hook_print(m, p)) - return -1; - else if (hook_tofixed(m, p)) - return -1; - else - return 0; + struct timespec *ts = (struct timespec *) &m->data[HOOK_TS_INDEX]; + + clock_gettime(CLOCK_REALTIME, ts); + + return 0; +} + +/** Simple FIR-LP: F_s = 1kHz, F_pass = 100 Hz, F_block = 300 + * Tip: Use MATLAB's filter design tool and export coefficients + * with the integrated C-Header export */ +static const double hook_fir_coeffs[] = { -0.003658148158728, -0.008882653268281, 0.008001024183003, + 0.08090485991761, 0.2035239551043, 0.3040703593515, + 0.3040703593515, 0.2035239551043, 0.08090485991761, + 0.008001024183003, -0.008882653268281,-0.003658148158728 }; + +/** @todo: test */ +int hook_fir(struct msg *m, struct path *p) +{ + static pthread_key_t pkey; + float *history = pthread_getspecific(pkey); + + /** Length of impulse response */ + int len = ARRAY_LEN(hook_fir_coeffs); + /** Current index in circular history buffer */ + int cur = m->sequence % len; + /* Accumulator */ + double sum = 0; + + /* Create thread local storage for circular history buffer */ + if (!history) { + history = malloc(len * sizeof(float)); + + pthread_key_create(&pkey, free); + pthread_setspecific(pkey, history); + } + + /* Update circular buffer */ + history[cur] = m->data[HOOK_FIR_INDEX].f; + + for (int i=0; idata[HOOK_FIR_INDEX].f = sum; + + return 0; } diff --git a/server/src/path.c b/server/src/path.c index 264d005b1..a697ae2a0 100644 --- a/server/src/path.c +++ b/server/src/path.c @@ -123,9 +123,11 @@ static void * path_run(void *arg) } /* Call hook callbacks */ - if (p->hook && p->hook(m, p)) { - p->skipped++; - continue; + FOREACH(&p->hooks, it) { + if (it->hook(m, p)) { + p->skipped++; + continue; + } } /* Update last known sequence number */