diff --git a/server/.gitignore b/server/.gitignore index f929d8dbc..5772b93b7 100644 --- a/server/.gitignore +++ b/server/.gitignore @@ -1,3 +1,5 @@ +logs/ + *.d *.o *~ diff --git a/server/include/cfg.h b/server/include/cfg.h index 5e93c42aa..a60c493bf 100644 --- a/server/include/cfg.h +++ b/server/include/cfg.h @@ -14,6 +14,8 @@ #include +/* Forward declarations */ +struct list; struct node; struct path; struct interface; @@ -70,6 +72,11 @@ int config_parse_global(config_setting_t *cfg, struct settings *set); */ int config_parse_path(config_setting_t *cfg, struct path **paths, struct node **nodes); + +int config_parse_nodelist(config_setting_t *cfg, struct list *nodes, struct node **all); + + +int config_parse_hooks(config_setting_t *cfg, struct list *hooks); /** Parse a single node and add it to the global configuration. * @@ -78,8 +85,7 @@ int config_parse_path(config_setting_t *cfg, * @retval 0 Success. Everything went well. * @retval <0 Error. Something went wrong. */ -int config_parse_node(config_setting_t *cfg, - struct node **nodes); +int config_parse_node(config_setting_t *cfg, struct node **nodes); /** Parse node connection details for OPAL type * diff --git a/server/include/hooks.h b/server/include/hooks.h index ca949c4fc..b245cc108 100644 --- a/server/include/hooks.h +++ b/server/include/hooks.h @@ -48,13 +48,28 @@ hook_cb_t hook_lookup(const char *name); /** Example hook: Print the message. */ int hook_print(struct msg *m, struct path *p); -/** Example hook: Filter the message on some criteria. */ -int hook_filter(struct msg *m, struct path *p); +/** Example hook: Log messages to a logfile in /tmp */ +int hook_log(struct msg *m, struct path *p); + +#define HOOK_LOG_MODE "w+" +#define HOOK_LOG_TEMPLATE "logs/s2ss-%Y_%m_%d-%H_%M_%S.log" + +/** Example hook: Drop messages. */ +int hook_decimate(struct msg *m, struct path *p); + +#define HOOK_DECIMATE_RATIO 10 /** Example hook: Convert the message values to fixed precision. */ int hook_tofixed(struct msg *m, struct path *p); -/** Example hook: Chain multiple hooks */ -int hook_multiple(struct msg *m, struct path *p); +/** Example hook: add timestamp to message. */ +int hook_ts(struct msg *m, struct path *p); + +#define HOOK_TS_INDEX -1 // last message + +/** Example hook: Finite-Impulse-Response (FIR) filter. */ +int hook_fir(struct msg *m, struct path *p); + +#define HOOK_FIR_INDEX 1 #endif /* _HOOKS_H_ */ diff --git a/server/include/list.h b/server/include/list.h new file mode 100644 index 000000000..999365833 --- /dev/null +++ b/server/include/list.h @@ -0,0 +1,70 @@ +/** A generic linked list + * + * Linked lists a used for several data structures in the code. + * + * @author Steffen Vogel + * @copyright 2015, Institute for Automation of Complex Power Systems, EONERC + * @file + */ + +#ifndef _LIST_H_ +#define _LIST_H_ + +#include + +#include "hooks.h" + +/* Forward declarations */ +struct list_elm; +struct node; +struct path; +struct interface; + +/** Static list initialization */ +#define LIST_INIT { \ + .head = NULL, \ + .tail = NULL, \ + .count = 0, \ + .lock = PTHREAD_MUTEX_INITIALIZER \ +} + +#define FOREACH(list, elm) \ + for ( struct list_elm *elm = (list)->head; \ + elm; elm = elm->next ) + +#define FOREACH_R(list, elm) \ + for ( struct list_elm *elm = (list)->tail; \ + elm; elm = elm->prev ) + +#define list_first(list) ((list)->head) +#define list_last(list) ((list)->head) +#define list_length(list) ((list)->count) + +struct list { + struct list_elm *head, *tail; + int count; + + pthread_mutex_t lock; +}; + +struct list_elm { + union { + void *ptr; + struct node *node; + struct path *path; + struct interface *interface; + hook_cb_t hook; + }; + + struct list_elm *prev, *next; +}; + +void list_init(struct list *l); + +void list_destroy(struct list *l); + +void list_push(struct list *l, void *p); + +struct list_elm * list_search(struct list *l, int (*cmp)(void *)); + +#endif /* _LIST_H_ */ \ No newline at end of file diff --git a/server/include/path.h b/server/include/path.h index 70f1533df..78bc4432f 100644 --- a/server/include/path.h +++ b/server/include/path.h @@ -11,6 +11,7 @@ #include #include +#include "list.h" #include "config.h" #include "hist.h" #include "node.h" @@ -25,11 +26,14 @@ struct path { /** Pointer to the incoming node */ struct node *in; - /** Pointer to the outgoing node */ + /** Pointer to the first outgoing node. + * Usually this is only a pointer to the first list element of path::destinations. */ struct node *out; - - /** Function pointer of the hook */ - hook_cb_t hook; + + /** List of all outgoing nodes */ + struct list destinations; + /** List of function pointers to hooks */ + struct list hooks; /** Send messages with a fixed rate over this path */ double rate; @@ -43,7 +47,7 @@ struct path /** Last known message number */ unsigned int sequence; - /** Counter for sent messages to all outgoing nodes*/ + /** Counter for sent messages to all outgoing nodes */ unsigned int sent; /** Counter for received messages from all incoming nodes */ unsigned int received; @@ -89,4 +93,8 @@ int path_stop(struct path *p); */ void path_stats(struct path *p); +int path_print(struct path *p, char *buf, int len); + +int path_destroy(struct path *p); + #endif /* _PATH_H_ */ diff --git a/server/include/utils.h b/server/include/utils.h index b4eb8a028..d20c5500b 100644 --- a/server/include/utils.h +++ b/server/include/utils.h @@ -74,6 +74,15 @@ void epoch_reset(); */ void print(enum log_level lvl, const char *fmt, ...); +/** Safely append a format string to an existing string. + * + * This function is similar to strlcat() from BSD. + */ +int strap(char *dest, size_t size, const char *fmt, ...); + +/** Variable arguments (stdarg) version of strap() */ +int vstrap(char *dest, size_t size, const char *fmt, va_list va); + /** Convert integer to cpu_set_t. * * @param set A cpu bitmask diff --git a/server/src/cfg.c b/server/src/cfg.c index 2b03dd23b..6e01e5139 100644 --- a/server/src/cfg.c +++ b/server/src/cfg.c @@ -9,12 +9,13 @@ #include #include +#include "utils.h" +#include "list.h" #include "if.h" #include "tc.h" #include "cfg.h" #include "node.h" #include "path.h" -#include "utils.h" #include "hooks.h" #include "socket.h" @@ -93,34 +94,36 @@ int config_parse_global(config_setting_t *cfg, struct settings *set) int config_parse_path(config_setting_t *cfg, struct path **paths, struct node **nodes) { - const char *in, *out, *hook; + const char *in; int enabled = 1; int reverse = 0; - + struct path *p = alloc(sizeof(struct path)); - /* Required settings */ - if (!config_setting_lookup_string(cfg, "in", &in)) - cerror(cfg, "Missing input node for path"); - - if (!config_setting_lookup_string(cfg, "out", &out)) - cerror(cfg, "Missing output node for path"); - + /* Input node */ + struct config_setting_t *cfg_in = config_setting_get_member(cfg, "in"); + if (!cfg_in || config_setting_type(cfg_in) != CONFIG_TYPE_STRING) + cerror(cfg, "Invalid input node for path"); + + in = config_setting_get_string(cfg_in); p->in = node_lookup_name(in, *nodes); if (!p->in) - cerror(cfg, "Invalid input node '%s'", in); + cerror(cfg_in, "Invalid input node '%s", in); - p->out = node_lookup_name(out, *nodes); - if (!p->out) - cerror(cfg, "Invalid output node '%s'", out); + /* Output node(s) */ + struct config_setting_t *cfg_out = config_setting_get_member(cfg, "out"); + if (cfg_out) + config_parse_nodelist(cfg_out, &p->destinations, nodes); + + if (list_length(&p->destinations) >= 1) + p->out = list_first(&p->destinations)->node; + else + cerror(cfg, "Missing output node for path"); /* Optional settings */ - if (config_setting_lookup_string(cfg, "hook", &hook)) { - p->hook = hook_lookup(hook); - - if (!p->hook) - cerror(cfg, "Failed to lookup hook function. Not registred?"); - } + struct config_setting_t *cfg_hook = config_setting_get_member(cfg, "hook"); + if (cfg_hook) + config_parse_hooks(cfg_hook, &p->hooks); config_setting_lookup_bool(cfg, "enabled", &enabled); config_setting_lookup_bool(cfg, "reverse", &reverse); @@ -130,30 +133,104 @@ int config_parse_path(config_setting_t *cfg, if (enabled) { p->in->refcnt++; - p->out->refcnt++; - - list_add(*paths, p); - + FOREACH(&p->destinations, it) + it->node->refcnt++; + if (reverse) { - struct path *rev = alloc(sizeof(struct path)); + if (list_length(&p->destinations) > 1) + warn("Using first destination '%s' as source for reverse path. " + "Ignoring remaining nodes", p->out->name); - rev->in = p->out; /* Swap in/out */ - rev->out = p->in; + struct path *r = alloc(sizeof(struct path)); - rev->in->refcnt++; - rev->out->refcnt++; + r->in = p->out; /* Swap in/out */ + r->out = p->in; + + list_push(&r->destinations, r->out); - list_add(*paths, rev); + r->in->refcnt++; + r->out->refcnt++; + + list_add(*paths, r); } + + list_add(*paths, p); } else { - warn("Path '%s' => '%s' is not enabled", p->in->name, p->out->name); - free(p); + char buf[33]; + path_print(p, buf, sizeof(buf)); + + warn("Path %s is not enabled", buf); + path_destroy(p); } return 0; } +int config_parse_nodelist(config_setting_t *cfg, struct list *nodes, struct node **all) { + const char *str; + struct node *node; + + switch (config_setting_type(cfg)) { + case CONFIG_TYPE_STRING: + str = config_setting_get_string(cfg); + node = node_lookup_name(str, *all); + if (!node) + cerror(cfg, "Invalid outgoing node '%s'", str); + + list_push(nodes, node); + break; + + case CONFIG_TYPE_ARRAY: + for (int i=0; i * @copyright 2014, Institute for Automation of Complex Power Systems, EONERC */ - + +#include +#include #include +#include +#include #include "msg.h" #include "hooks.h" +#include "path.h" +#include "utils.h" /** @todo Make const */ static struct hook_id hook_list[] = { { hook_print, "print" }, - { hook_filter, "filter" }, + { hook_log, "log" }, + { hook_decimate, "decimate" }, { hook_tofixed, "tofixed" }, - { hook_multiple, "multiple" }, + { hook_ts, "ts" }, + { hook_fir, "fir" }, { NULL } }; @@ -43,13 +51,39 @@ int hook_print(struct msg *m, struct path *p) return 0; } -int hook_filter(struct msg *m, struct path *p) +int hook_log(struct msg *m, struct path *p) { - /* Drop every 10th message */ - if (m->sequence % 10 == 0) - return -1; - else - return 0; + static pthread_key_t pkey; + FILE *file = pthread_getspecific(pkey); + + if (!file) { + char fstr[64], pstr[33]; + path_print(p, pstr, sizeof(pstr)); + + struct tm tm; + time_t ts = time(NULL); + localtime_r(&ts, &tm); + strftime(fstr, sizeof(fstr), HOOK_LOG_TEMPLATE, &tm); + + + + file = fopen(fstr, HOOK_LOG_MODE); + if (file) + debug(5, "Opened log file for path %s: %s", pstr, fstr); + + pthread_key_create(&pkey, (void (*)(void *)) fclose); + pthread_setspecific(pkey, file); + } + + msg_fprint(file, m); + + return 0; +} + +int hook_decimate(struct msg *m, struct path *p) +{ + /* Drop every HOOK_DECIMATE_RATIO'th message */ + return (m->sequence % HOOK_DECIMATE_RATIO == 0) ? -1 : 0; } int hook_tofixed(struct msg *m, struct path *p) @@ -61,12 +95,51 @@ int hook_tofixed(struct msg *m, struct path *p) return 0; } -int hook_multiple(struct msg *m, struct path *p) +int hook_ts(struct msg *m, struct path *p) { - if (hook_print(m, p)) - return -1; - else if (hook_tofixed(m, p)) - return -1; - else - return 0; + struct timespec *ts = (struct timespec *) &m->data[HOOK_TS_INDEX]; + + clock_gettime(CLOCK_REALTIME, ts); + + return 0; +} + +/** Simple FIR-LP: F_s = 1kHz, F_pass = 100 Hz, F_block = 300 + * Tip: Use MATLAB's filter design tool and export coefficients + * with the integrated C-Header export */ +static const double hook_fir_coeffs[] = { -0.003658148158728, -0.008882653268281, 0.008001024183003, + 0.08090485991761, 0.2035239551043, 0.3040703593515, + 0.3040703593515, 0.2035239551043, 0.08090485991761, + 0.008001024183003, -0.008882653268281,-0.003658148158728 }; + +/** @todo: test */ +int hook_fir(struct msg *m, struct path *p) +{ + static pthread_key_t pkey; + float *history = pthread_getspecific(pkey); + + /** Length of impulse response */ + int len = ARRAY_LEN(hook_fir_coeffs); + /** Current index in circular history buffer */ + int cur = m->sequence % len; + /* Accumulator */ + double sum = 0; + + /* Create thread local storage for circular history buffer */ + if (!history) { + history = malloc(len * sizeof(float)); + + pthread_key_create(&pkey, free); + pthread_setspecific(pkey, history); + } + + /* Update circular buffer */ + history[cur] = m->data[HOOK_FIR_INDEX].f; + + for (int i=0; idata[HOOK_FIR_INDEX].f = sum; + + return 0; } diff --git a/server/src/list.c b/server/src/list.c new file mode 100644 index 000000000..d34c8beef --- /dev/null +++ b/server/src/list.c @@ -0,0 +1,69 @@ +/** A generic linked list + * + * Linked lists a used for several data structures in the code. + * + * @author Steffen Vogel + * @copyright 2015, Institute for Automation of Complex Power Systems, EONERC + * @file + */ + +#include "utils.h" +#include "list.h" + +void list_init(struct list *l) +{ + pthread_mutex_init(&l->lock, NULL); + + l->count = 0; + l->head = NULL; + l->tail = NULL; +} + +void list_destroy(struct list *l) +{ + pthread_mutex_lock(&l->lock); + + struct list_elm *elm = l->head; + while (elm) { + struct list_elm *tmp = elm; + free(tmp); + + elm = elm->next; + } + + pthread_mutex_destroy(&l->lock); +} + +void list_push(struct list *l, void *p) +{ + struct list_elm *e = alloc(sizeof(struct list_elm)); + + pthread_mutex_lock(&l->lock); + + e->ptr = p; + e->prev = l->tail; + e->next = NULL; + + if (l->tail) + l->tail->next = e; + if (l->head) + l->head->prev = e; + else + l->head = e; + + l->tail = e; + + l->count++; + + pthread_mutex_unlock(&l->lock); +} + +struct list_elm * list_search(struct list *l, int (*cmp)(void *)) +{ + FOREACH(l, it) { + if (!cmp(it->ptr)) + return it; + } + + return NULL; +} diff --git a/server/src/path.c b/server/src/path.c index d2c55df3d..a697ae2a0 100644 --- a/server/src/path.c +++ b/server/src/path.c @@ -21,7 +21,7 @@ /** Linked list of paths */ struct path *paths; -/** Send messages */ +/** Send messages asynchronously */ static void * path_send(void *arg) { int sig; @@ -53,9 +53,12 @@ static void * path_send(void *arg) while (1) { sigwait(&set, &sig); /* blocking wait for next timer tick */ - if (p->last) { - node_write(p->out, p->last); - p->last = NULL; + + if (p->received) { + FOREACH(&p->destinations, it) { + node_write(it->node, p->last); + } + p->sent++; } } @@ -66,6 +69,7 @@ static void * path_send(void *arg) /** Receive messages */ static void * path_run(void *arg) { + char buf[33]; struct path *p = arg; struct msg *m = alloc(sizeof(struct msg)); if (!m) @@ -73,7 +77,7 @@ static void * path_run(void *arg) /* Open deferred TCP connection */ node_start_defer(p->in); - node_start_defer(p->out); + // FIXME: node_start_defer(p->out); /* Main thread loop */ while (1) { @@ -97,11 +101,11 @@ static void * path_run(void *arg) /* Handle simulation restart */ if (m->sequence == 0 && abs(dist) >= 1) { + path_print(p, buf, sizeof(buf)); path_stats(p); - warn("Simulation for path %s " MAG("=>") " %s " - "restarted (p->seq=%u, m->seq=%u, dist=%d)", - p->in->name, p->out->name, - p->sequence, m->sequence, dist); + + warn("Simulation for path %s restarted (p->seq=%u, m->seq=%u, dist=%d)", + buf, p->sequence, m->sequence, dist); /* Reset counters */ p->sent = 0; @@ -119,9 +123,11 @@ static void * path_run(void *arg) } /* Call hook callbacks */ - if (p->hook && p->hook(m, p)) { - p->skipped++; - continue; + FOREACH(&p->hooks, it) { + if (it->hook(m, p)) { + p->skipped++; + continue; + } } /* Update last known sequence number */ @@ -130,7 +136,10 @@ static void * path_run(void *arg) /* At fixed rate mode, messages are send by another thread */ if (!p->rate) { - node_write(p->out, m); /* Send message */ + FOREACH(&p->destinations, it) { + node_write(it->node, m); + } + p->sent++; } } @@ -142,7 +151,10 @@ static void * path_run(void *arg) int path_start(struct path *p) { INDENT - info("Starting path: %12s " GRN("=>") " %-12s", p->in->name, p->out->name); + char buf[33]; + path_print(p, buf, sizeof(buf)); + + info("Starting path: %s", buf); hist_init(&p->histogram, -HIST_SEQ, +HIST_SEQ, 1); @@ -155,7 +167,10 @@ int path_start(struct path *p) int path_stop(struct path *p) { INDENT - info("Stopping path: %12s " RED("=>") " %-12s", p->in->name, p->out->name); + char buf[33]; + path_print(p, buf, sizeof(buf)); + + info("Stopping path: %s", buf); pthread_cancel(p->recv_tid); pthread_join(p->recv_tid, NULL); @@ -176,8 +191,34 @@ int path_stop(struct path *p) void path_stats(struct path *p) { - info("%12s " MAG("=>") " %-12s: %-8u %-8u %-8u %-8u %-8u", - p->in->name, p->out->name, - p->sent, p->received, p->dropped, p->skipped, p->invalid + char buf[33]; + path_print(p, buf, sizeof(buf)); + + info("%-32s : %-8u %-8u %-8u %-8u %-8u", + buf, p->sent, p->received, p->dropped, p->skipped, p->invalid ); } + +int path_print(struct path *p, char *buf, int len) +{ + *buf = 0; + + if (list_length(&p->destinations) > 1) { + strap(buf, len, "%s " MAG("=>") " [", p->in->name); + FOREACH(&p->destinations, it) + strap(buf, len, " %s", it->node->name); + strap(buf, len, " ]"); + } + else + strap(buf, len, "%s " MAG("=>") " %s", p->in->name, p->out->name); + + return 0; +} + +int path_destroy(struct path *p) +{ + list_destroy(&p->destinations); + list_destroy(&p->hooks); + + return 0; +} diff --git a/server/src/server.c b/server/src/server.c index e6235f24e..f9ba46162 100644 --- a/server/src/server.c +++ b/server/src/server.c @@ -38,6 +38,7 @@ static void quit() info("Stopping paths:"); for (struct path *p = paths; p; p = p->next) { INDENT path_stop(p); + path_destroy(p); } info("Stopping nodes:"); @@ -50,7 +51,7 @@ static void quit() if_stop(i); } - /** @todo Free nodes and paths */ + /** @todo Free nodes */ config_destroy(&config); @@ -155,8 +156,8 @@ int main(int argc, char *argv[]) struct path *p = paths; info("Runtime Statistics:"); - info("%12s " MAG("=>") " %-12s: %-8s %-8s %-8s %-8s %-8s", - "Source", "Destination", "#Sent", "#Recv", "#Drop", "#Skip", "#Inval"); + info("%-32s : %-8s %-8s %-8s %-8s %-8s", + "Source " MAG("=>") " Destination", "#Sent", "#Recv", "#Drop", "#Skip", "#Inval"); info("---------------------------------------------------------------------------"); while (1) { diff --git a/server/src/utils.c b/server/src/utils.c index c332e7799..1cd554765 100644 --- a/server/src/utils.c +++ b/server/src/utils.c @@ -35,6 +35,25 @@ void epoch_reset() clock_gettime(CLOCK_REALTIME, &epoch); } +int strap(char *dest, size_t size, const char *fmt, ...) +{ + int ret; + + va_list ap; + va_start(ap, fmt); + ret = vstrap(dest, size, fmt, ap); + va_end(ap); + + return ret; +} + +int vstrap(char *dest, size_t size, const char *fmt, va_list ap) +{ + int len = strlen(dest); + + return vsnprintf(dest + len, size - len, fmt, ap); +} + void print(enum log_level lvl, const char *fmt, ...) { struct timespec ts;