mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
Merge branch 'better-hooks'
This commit is contained in:
commit
cdb65c2444
12 changed files with 470 additions and 80 deletions
2
server/.gitignore
vendored
2
server/.gitignore
vendored
|
@ -1,3 +1,5 @@
|
|||
logs/
|
||||
|
||||
*.d
|
||||
*.o
|
||||
*~
|
||||
|
|
|
@ -14,6 +14,8 @@
|
|||
|
||||
#include <libconfig.h>
|
||||
|
||||
/* Forward declarations */
|
||||
struct list;
|
||||
struct node;
|
||||
struct path;
|
||||
struct interface;
|
||||
|
@ -70,6 +72,11 @@ int config_parse_global(config_setting_t *cfg, struct settings *set);
|
|||
*/
|
||||
int config_parse_path(config_setting_t *cfg,
|
||||
struct path **paths, struct node **nodes);
|
||||
|
||||
int config_parse_nodelist(config_setting_t *cfg, struct list *nodes, struct node **all);
|
||||
|
||||
|
||||
int config_parse_hooks(config_setting_t *cfg, struct list *hooks);
|
||||
|
||||
/** Parse a single node and add it to the global configuration.
|
||||
*
|
||||
|
@ -78,8 +85,7 @@ int config_parse_path(config_setting_t *cfg,
|
|||
* @retval 0 Success. Everything went well.
|
||||
* @retval <0 Error. Something went wrong.
|
||||
*/
|
||||
int config_parse_node(config_setting_t *cfg,
|
||||
struct node **nodes);
|
||||
int config_parse_node(config_setting_t *cfg, struct node **nodes);
|
||||
|
||||
/** Parse node connection details for OPAL type
|
||||
*
|
||||
|
|
|
@ -48,13 +48,28 @@ hook_cb_t hook_lookup(const char *name);
|
|||
/** Example hook: Print the message. */
|
||||
int hook_print(struct msg *m, struct path *p);
|
||||
|
||||
/** Example hook: Filter the message on some criteria. */
|
||||
int hook_filter(struct msg *m, struct path *p);
|
||||
/** Example hook: Log messages to a logfile in /tmp */
|
||||
int hook_log(struct msg *m, struct path *p);
|
||||
|
||||
#define HOOK_LOG_MODE "w+"
|
||||
#define HOOK_LOG_TEMPLATE "logs/s2ss-%Y_%m_%d-%H_%M_%S.log"
|
||||
|
||||
/** Example hook: Drop messages. */
|
||||
int hook_decimate(struct msg *m, struct path *p);
|
||||
|
||||
#define HOOK_DECIMATE_RATIO 10
|
||||
|
||||
/** Example hook: Convert the message values to fixed precision. */
|
||||
int hook_tofixed(struct msg *m, struct path *p);
|
||||
|
||||
/** Example hook: Chain multiple hooks */
|
||||
int hook_multiple(struct msg *m, struct path *p);
|
||||
/** Example hook: add timestamp to message. */
|
||||
int hook_ts(struct msg *m, struct path *p);
|
||||
|
||||
#define HOOK_TS_INDEX -1 // last message
|
||||
|
||||
/** Example hook: Finite-Impulse-Response (FIR) filter. */
|
||||
int hook_fir(struct msg *m, struct path *p);
|
||||
|
||||
#define HOOK_FIR_INDEX 1
|
||||
|
||||
#endif /* _HOOKS_H_ */
|
||||
|
|
70
server/include/list.h
Normal file
70
server/include/list.h
Normal file
|
@ -0,0 +1,70 @@
|
|||
/** A generic linked list
|
||||
*
|
||||
* Linked lists a used for several data structures in the code.
|
||||
*
|
||||
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
|
||||
* @copyright 2015, Institute for Automation of Complex Power Systems, EONERC
|
||||
* @file
|
||||
*/
|
||||
|
||||
#ifndef _LIST_H_
|
||||
#define _LIST_H_
|
||||
|
||||
#include <pthread.h>
|
||||
|
||||
#include "hooks.h"
|
||||
|
||||
/* Forward declarations */
|
||||
struct list_elm;
|
||||
struct node;
|
||||
struct path;
|
||||
struct interface;
|
||||
|
||||
/** Static list initialization */
|
||||
#define LIST_INIT { \
|
||||
.head = NULL, \
|
||||
.tail = NULL, \
|
||||
.count = 0, \
|
||||
.lock = PTHREAD_MUTEX_INITIALIZER \
|
||||
}
|
||||
|
||||
#define FOREACH(list, elm) \
|
||||
for ( struct list_elm *elm = (list)->head; \
|
||||
elm; elm = elm->next )
|
||||
|
||||
#define FOREACH_R(list, elm) \
|
||||
for ( struct list_elm *elm = (list)->tail; \
|
||||
elm; elm = elm->prev )
|
||||
|
||||
#define list_first(list) ((list)->head)
|
||||
#define list_last(list) ((list)->head)
|
||||
#define list_length(list) ((list)->count)
|
||||
|
||||
struct list {
|
||||
struct list_elm *head, *tail;
|
||||
int count;
|
||||
|
||||
pthread_mutex_t lock;
|
||||
};
|
||||
|
||||
struct list_elm {
|
||||
union {
|
||||
void *ptr;
|
||||
struct node *node;
|
||||
struct path *path;
|
||||
struct interface *interface;
|
||||
hook_cb_t hook;
|
||||
};
|
||||
|
||||
struct list_elm *prev, *next;
|
||||
};
|
||||
|
||||
void list_init(struct list *l);
|
||||
|
||||
void list_destroy(struct list *l);
|
||||
|
||||
void list_push(struct list *l, void *p);
|
||||
|
||||
struct list_elm * list_search(struct list *l, int (*cmp)(void *));
|
||||
|
||||
#endif /* _LIST_H_ */
|
|
@ -11,6 +11,7 @@
|
|||
#include <pthread.h>
|
||||
#include <libconfig.h>
|
||||
|
||||
#include "list.h"
|
||||
#include "config.h"
|
||||
#include "hist.h"
|
||||
#include "node.h"
|
||||
|
@ -25,11 +26,14 @@ struct path
|
|||
{
|
||||
/** Pointer to the incoming node */
|
||||
struct node *in;
|
||||
/** Pointer to the outgoing node */
|
||||
/** Pointer to the first outgoing node.
|
||||
* Usually this is only a pointer to the first list element of path::destinations. */
|
||||
struct node *out;
|
||||
|
||||
/** Function pointer of the hook */
|
||||
hook_cb_t hook;
|
||||
|
||||
/** List of all outgoing nodes */
|
||||
struct list destinations;
|
||||
/** List of function pointers to hooks */
|
||||
struct list hooks;
|
||||
|
||||
/** Send messages with a fixed rate over this path */
|
||||
double rate;
|
||||
|
@ -43,7 +47,7 @@ struct path
|
|||
/** Last known message number */
|
||||
unsigned int sequence;
|
||||
|
||||
/** Counter for sent messages to all outgoing nodes*/
|
||||
/** Counter for sent messages to all outgoing nodes */
|
||||
unsigned int sent;
|
||||
/** Counter for received messages from all incoming nodes */
|
||||
unsigned int received;
|
||||
|
@ -89,4 +93,8 @@ int path_stop(struct path *p);
|
|||
*/
|
||||
void path_stats(struct path *p);
|
||||
|
||||
int path_print(struct path *p, char *buf, int len);
|
||||
|
||||
int path_destroy(struct path *p);
|
||||
|
||||
#endif /* _PATH_H_ */
|
||||
|
|
|
@ -74,6 +74,15 @@ void epoch_reset();
|
|||
*/
|
||||
void print(enum log_level lvl, const char *fmt, ...);
|
||||
|
||||
/** Safely append a format string to an existing string.
|
||||
*
|
||||
* This function is similar to strlcat() from BSD.
|
||||
*/
|
||||
int strap(char *dest, size_t size, const char *fmt, ...);
|
||||
|
||||
/** Variable arguments (stdarg) version of strap() */
|
||||
int vstrap(char *dest, size_t size, const char *fmt, va_list va);
|
||||
|
||||
/** Convert integer to cpu_set_t.
|
||||
*
|
||||
* @param set A cpu bitmask
|
||||
|
|
141
server/src/cfg.c
141
server/src/cfg.c
|
@ -9,12 +9,13 @@
|
|||
#include <string.h>
|
||||
#include <netdb.h>
|
||||
|
||||
#include "utils.h"
|
||||
#include "list.h"
|
||||
#include "if.h"
|
||||
#include "tc.h"
|
||||
#include "cfg.h"
|
||||
#include "node.h"
|
||||
#include "path.h"
|
||||
#include "utils.h"
|
||||
#include "hooks.h"
|
||||
|
||||
#include "socket.h"
|
||||
|
@ -93,34 +94,36 @@ int config_parse_global(config_setting_t *cfg, struct settings *set)
|
|||
int config_parse_path(config_setting_t *cfg,
|
||||
struct path **paths, struct node **nodes)
|
||||
{
|
||||
const char *in, *out, *hook;
|
||||
const char *in;
|
||||
int enabled = 1;
|
||||
int reverse = 0;
|
||||
|
||||
|
||||
struct path *p = alloc(sizeof(struct path));
|
||||
|
||||
/* Required settings */
|
||||
if (!config_setting_lookup_string(cfg, "in", &in))
|
||||
cerror(cfg, "Missing input node for path");
|
||||
|
||||
if (!config_setting_lookup_string(cfg, "out", &out))
|
||||
cerror(cfg, "Missing output node for path");
|
||||
|
||||
/* Input node */
|
||||
struct config_setting_t *cfg_in = config_setting_get_member(cfg, "in");
|
||||
if (!cfg_in || config_setting_type(cfg_in) != CONFIG_TYPE_STRING)
|
||||
cerror(cfg, "Invalid input node for path");
|
||||
|
||||
in = config_setting_get_string(cfg_in);
|
||||
p->in = node_lookup_name(in, *nodes);
|
||||
if (!p->in)
|
||||
cerror(cfg, "Invalid input node '%s'", in);
|
||||
cerror(cfg_in, "Invalid input node '%s", in);
|
||||
|
||||
p->out = node_lookup_name(out, *nodes);
|
||||
if (!p->out)
|
||||
cerror(cfg, "Invalid output node '%s'", out);
|
||||
/* Output node(s) */
|
||||
struct config_setting_t *cfg_out = config_setting_get_member(cfg, "out");
|
||||
if (cfg_out)
|
||||
config_parse_nodelist(cfg_out, &p->destinations, nodes);
|
||||
|
||||
if (list_length(&p->destinations) >= 1)
|
||||
p->out = list_first(&p->destinations)->node;
|
||||
else
|
||||
cerror(cfg, "Missing output node for path");
|
||||
|
||||
/* Optional settings */
|
||||
if (config_setting_lookup_string(cfg, "hook", &hook)) {
|
||||
p->hook = hook_lookup(hook);
|
||||
|
||||
if (!p->hook)
|
||||
cerror(cfg, "Failed to lookup hook function. Not registred?");
|
||||
}
|
||||
struct config_setting_t *cfg_hook = config_setting_get_member(cfg, "hook");
|
||||
if (cfg_hook)
|
||||
config_parse_hooks(cfg_hook, &p->hooks);
|
||||
|
||||
config_setting_lookup_bool(cfg, "enabled", &enabled);
|
||||
config_setting_lookup_bool(cfg, "reverse", &reverse);
|
||||
|
@ -130,30 +133,104 @@ int config_parse_path(config_setting_t *cfg,
|
|||
|
||||
if (enabled) {
|
||||
p->in->refcnt++;
|
||||
p->out->refcnt++;
|
||||
|
||||
list_add(*paths, p);
|
||||
|
||||
FOREACH(&p->destinations, it)
|
||||
it->node->refcnt++;
|
||||
|
||||
if (reverse) {
|
||||
struct path *rev = alloc(sizeof(struct path));
|
||||
if (list_length(&p->destinations) > 1)
|
||||
warn("Using first destination '%s' as source for reverse path. "
|
||||
"Ignoring remaining nodes", p->out->name);
|
||||
|
||||
rev->in = p->out; /* Swap in/out */
|
||||
rev->out = p->in;
|
||||
struct path *r = alloc(sizeof(struct path));
|
||||
|
||||
rev->in->refcnt++;
|
||||
rev->out->refcnt++;
|
||||
r->in = p->out; /* Swap in/out */
|
||||
r->out = p->in;
|
||||
|
||||
list_push(&r->destinations, r->out);
|
||||
|
||||
list_add(*paths, rev);
|
||||
r->in->refcnt++;
|
||||
r->out->refcnt++;
|
||||
|
||||
list_add(*paths, r);
|
||||
}
|
||||
|
||||
list_add(*paths, p);
|
||||
}
|
||||
else {
|
||||
warn("Path '%s' => '%s' is not enabled", p->in->name, p->out->name);
|
||||
free(p);
|
||||
char buf[33];
|
||||
path_print(p, buf, sizeof(buf));
|
||||
|
||||
warn("Path %s is not enabled", buf);
|
||||
path_destroy(p);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int config_parse_nodelist(config_setting_t *cfg, struct list *nodes, struct node **all) {
|
||||
const char *str;
|
||||
struct node *node;
|
||||
|
||||
switch (config_setting_type(cfg)) {
|
||||
case CONFIG_TYPE_STRING:
|
||||
str = config_setting_get_string(cfg);
|
||||
node = node_lookup_name(str, *all);
|
||||
if (!node)
|
||||
cerror(cfg, "Invalid outgoing node '%s'", str);
|
||||
|
||||
list_push(nodes, node);
|
||||
break;
|
||||
|
||||
case CONFIG_TYPE_ARRAY:
|
||||
for (int i=0; i<config_setting_length(cfg); i++) {
|
||||
str = config_setting_get_string_elem(cfg, i);
|
||||
node = node_lookup_name(str, *all);
|
||||
if (!node)
|
||||
cerror(config_setting_get_elem(cfg, i), "Invalid outgoing node '%s'", str);
|
||||
|
||||
list_push(nodes, node);
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
cerror(cfg, "Invalid output node(s)");
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int config_parse_hooks(config_setting_t *cfg, struct list *hooks) {
|
||||
const char *str;
|
||||
hook_cb_t hook;
|
||||
|
||||
switch (config_setting_type(cfg)) {
|
||||
case CONFIG_TYPE_STRING:
|
||||
str = config_setting_get_string(cfg);
|
||||
hook = hook_lookup(str);
|
||||
if (!hook)
|
||||
cerror(cfg, "Invalid hook function '%s'", str);
|
||||
|
||||
list_push(hooks, hook);
|
||||
break;
|
||||
|
||||
case CONFIG_TYPE_ARRAY:
|
||||
for (int i=0; i<config_setting_length(cfg); i++) {
|
||||
str = config_setting_get_string_elem(cfg, i);
|
||||
hook = hook_lookup(str);
|
||||
if (!hook)
|
||||
cerror(config_setting_get_elem(cfg, i), "Invalid hook function '%s'", str);
|
||||
|
||||
list_push(hooks, hook);
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
cerror(cfg, "Invalid hook functions");
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int config_parse_node(config_setting_t *cfg, struct node **nodes)
|
||||
{
|
||||
const char *type;
|
||||
|
|
|
@ -9,18 +9,26 @@
|
|||
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
|
||||
* @copyright 2014, Institute for Automation of Complex Power Systems, EONERC
|
||||
*/
|
||||
|
||||
|
||||
#include <stdlib.h>
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
#include <pthread.h>
|
||||
#include <time.h>
|
||||
|
||||
#include "msg.h"
|
||||
#include "hooks.h"
|
||||
#include "path.h"
|
||||
#include "utils.h"
|
||||
|
||||
/** @todo Make const */
|
||||
static struct hook_id hook_list[] = {
|
||||
{ hook_print, "print" },
|
||||
{ hook_filter, "filter" },
|
||||
{ hook_log, "log" },
|
||||
{ hook_decimate, "decimate" },
|
||||
{ hook_tofixed, "tofixed" },
|
||||
{ hook_multiple, "multiple" },
|
||||
{ hook_ts, "ts" },
|
||||
{ hook_fir, "fir" },
|
||||
{ NULL }
|
||||
};
|
||||
|
||||
|
@ -43,13 +51,39 @@ int hook_print(struct msg *m, struct path *p)
|
|||
return 0;
|
||||
}
|
||||
|
||||
int hook_filter(struct msg *m, struct path *p)
|
||||
int hook_log(struct msg *m, struct path *p)
|
||||
{
|
||||
/* Drop every 10th message */
|
||||
if (m->sequence % 10 == 0)
|
||||
return -1;
|
||||
else
|
||||
return 0;
|
||||
static pthread_key_t pkey;
|
||||
FILE *file = pthread_getspecific(pkey);
|
||||
|
||||
if (!file) {
|
||||
char fstr[64], pstr[33];
|
||||
path_print(p, pstr, sizeof(pstr));
|
||||
|
||||
struct tm tm;
|
||||
time_t ts = time(NULL);
|
||||
localtime_r(&ts, &tm);
|
||||
strftime(fstr, sizeof(fstr), HOOK_LOG_TEMPLATE, &tm);
|
||||
|
||||
|
||||
|
||||
file = fopen(fstr, HOOK_LOG_MODE);
|
||||
if (file)
|
||||
debug(5, "Opened log file for path %s: %s", pstr, fstr);
|
||||
|
||||
pthread_key_create(&pkey, (void (*)(void *)) fclose);
|
||||
pthread_setspecific(pkey, file);
|
||||
}
|
||||
|
||||
msg_fprint(file, m);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int hook_decimate(struct msg *m, struct path *p)
|
||||
{
|
||||
/* Drop every HOOK_DECIMATE_RATIO'th message */
|
||||
return (m->sequence % HOOK_DECIMATE_RATIO == 0) ? -1 : 0;
|
||||
}
|
||||
|
||||
int hook_tofixed(struct msg *m, struct path *p)
|
||||
|
@ -61,12 +95,51 @@ int hook_tofixed(struct msg *m, struct path *p)
|
|||
return 0;
|
||||
}
|
||||
|
||||
int hook_multiple(struct msg *m, struct path *p)
|
||||
int hook_ts(struct msg *m, struct path *p)
|
||||
{
|
||||
if (hook_print(m, p))
|
||||
return -1;
|
||||
else if (hook_tofixed(m, p))
|
||||
return -1;
|
||||
else
|
||||
return 0;
|
||||
struct timespec *ts = (struct timespec *) &m->data[HOOK_TS_INDEX];
|
||||
|
||||
clock_gettime(CLOCK_REALTIME, ts);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/** Simple FIR-LP: F_s = 1kHz, F_pass = 100 Hz, F_block = 300
|
||||
* Tip: Use MATLAB's filter design tool and export coefficients
|
||||
* with the integrated C-Header export */
|
||||
static const double hook_fir_coeffs[] = { -0.003658148158728, -0.008882653268281, 0.008001024183003,
|
||||
0.08090485991761, 0.2035239551043, 0.3040703593515,
|
||||
0.3040703593515, 0.2035239551043, 0.08090485991761,
|
||||
0.008001024183003, -0.008882653268281,-0.003658148158728 };
|
||||
|
||||
/** @todo: test */
|
||||
int hook_fir(struct msg *m, struct path *p)
|
||||
{
|
||||
static pthread_key_t pkey;
|
||||
float *history = pthread_getspecific(pkey);
|
||||
|
||||
/** Length of impulse response */
|
||||
int len = ARRAY_LEN(hook_fir_coeffs);
|
||||
/** Current index in circular history buffer */
|
||||
int cur = m->sequence % len;
|
||||
/* Accumulator */
|
||||
double sum = 0;
|
||||
|
||||
/* Create thread local storage for circular history buffer */
|
||||
if (!history) {
|
||||
history = malloc(len * sizeof(float));
|
||||
|
||||
pthread_key_create(&pkey, free);
|
||||
pthread_setspecific(pkey, history);
|
||||
}
|
||||
|
||||
/* Update circular buffer */
|
||||
history[cur] = m->data[HOOK_FIR_INDEX].f;
|
||||
|
||||
for (int i=0; i<len; i++)
|
||||
sum += hook_fir_coeffs[(cur+len-i)%len] * history[(cur+i)%len];
|
||||
|
||||
m->data[HOOK_FIR_INDEX].f = sum;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
69
server/src/list.c
Normal file
69
server/src/list.c
Normal file
|
@ -0,0 +1,69 @@
|
|||
/** A generic linked list
|
||||
*
|
||||
* Linked lists a used for several data structures in the code.
|
||||
*
|
||||
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
|
||||
* @copyright 2015, Institute for Automation of Complex Power Systems, EONERC
|
||||
* @file
|
||||
*/
|
||||
|
||||
#include "utils.h"
|
||||
#include "list.h"
|
||||
|
||||
void list_init(struct list *l)
|
||||
{
|
||||
pthread_mutex_init(&l->lock, NULL);
|
||||
|
||||
l->count = 0;
|
||||
l->head = NULL;
|
||||
l->tail = NULL;
|
||||
}
|
||||
|
||||
void list_destroy(struct list *l)
|
||||
{
|
||||
pthread_mutex_lock(&l->lock);
|
||||
|
||||
struct list_elm *elm = l->head;
|
||||
while (elm) {
|
||||
struct list_elm *tmp = elm;
|
||||
free(tmp);
|
||||
|
||||
elm = elm->next;
|
||||
}
|
||||
|
||||
pthread_mutex_destroy(&l->lock);
|
||||
}
|
||||
|
||||
void list_push(struct list *l, void *p)
|
||||
{
|
||||
struct list_elm *e = alloc(sizeof(struct list_elm));
|
||||
|
||||
pthread_mutex_lock(&l->lock);
|
||||
|
||||
e->ptr = p;
|
||||
e->prev = l->tail;
|
||||
e->next = NULL;
|
||||
|
||||
if (l->tail)
|
||||
l->tail->next = e;
|
||||
if (l->head)
|
||||
l->head->prev = e;
|
||||
else
|
||||
l->head = e;
|
||||
|
||||
l->tail = e;
|
||||
|
||||
l->count++;
|
||||
|
||||
pthread_mutex_unlock(&l->lock);
|
||||
}
|
||||
|
||||
struct list_elm * list_search(struct list *l, int (*cmp)(void *))
|
||||
{
|
||||
FOREACH(l, it) {
|
||||
if (!cmp(it->ptr))
|
||||
return it;
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
|
@ -21,7 +21,7 @@
|
|||
/** Linked list of paths */
|
||||
struct path *paths;
|
||||
|
||||
/** Send messages */
|
||||
/** Send messages asynchronously */
|
||||
static void * path_send(void *arg)
|
||||
{
|
||||
int sig;
|
||||
|
@ -53,9 +53,12 @@ static void * path_send(void *arg)
|
|||
|
||||
while (1) {
|
||||
sigwait(&set, &sig); /* blocking wait for next timer tick */
|
||||
if (p->last) {
|
||||
node_write(p->out, p->last);
|
||||
p->last = NULL;
|
||||
|
||||
if (p->received) {
|
||||
FOREACH(&p->destinations, it) {
|
||||
node_write(it->node, p->last);
|
||||
}
|
||||
|
||||
p->sent++;
|
||||
}
|
||||
}
|
||||
|
@ -66,6 +69,7 @@ static void * path_send(void *arg)
|
|||
/** Receive messages */
|
||||
static void * path_run(void *arg)
|
||||
{
|
||||
char buf[33];
|
||||
struct path *p = arg;
|
||||
struct msg *m = alloc(sizeof(struct msg));
|
||||
if (!m)
|
||||
|
@ -73,7 +77,7 @@ static void * path_run(void *arg)
|
|||
|
||||
/* Open deferred TCP connection */
|
||||
node_start_defer(p->in);
|
||||
node_start_defer(p->out);
|
||||
// FIXME: node_start_defer(p->out);
|
||||
|
||||
/* Main thread loop */
|
||||
while (1) {
|
||||
|
@ -97,11 +101,11 @@ static void * path_run(void *arg)
|
|||
|
||||
/* Handle simulation restart */
|
||||
if (m->sequence == 0 && abs(dist) >= 1) {
|
||||
path_print(p, buf, sizeof(buf));
|
||||
path_stats(p);
|
||||
warn("Simulation for path %s " MAG("=>") " %s "
|
||||
"restarted (p->seq=%u, m->seq=%u, dist=%d)",
|
||||
p->in->name, p->out->name,
|
||||
p->sequence, m->sequence, dist);
|
||||
|
||||
warn("Simulation for path %s restarted (p->seq=%u, m->seq=%u, dist=%d)",
|
||||
buf, p->sequence, m->sequence, dist);
|
||||
|
||||
/* Reset counters */
|
||||
p->sent = 0;
|
||||
|
@ -119,9 +123,11 @@ static void * path_run(void *arg)
|
|||
}
|
||||
|
||||
/* Call hook callbacks */
|
||||
if (p->hook && p->hook(m, p)) {
|
||||
p->skipped++;
|
||||
continue;
|
||||
FOREACH(&p->hooks, it) {
|
||||
if (it->hook(m, p)) {
|
||||
p->skipped++;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
/* Update last known sequence number */
|
||||
|
@ -130,7 +136,10 @@ static void * path_run(void *arg)
|
|||
|
||||
/* At fixed rate mode, messages are send by another thread */
|
||||
if (!p->rate) {
|
||||
node_write(p->out, m); /* Send message */
|
||||
FOREACH(&p->destinations, it) {
|
||||
node_write(it->node, m);
|
||||
}
|
||||
|
||||
p->sent++;
|
||||
}
|
||||
}
|
||||
|
@ -142,7 +151,10 @@ static void * path_run(void *arg)
|
|||
|
||||
int path_start(struct path *p)
|
||||
{ INDENT
|
||||
info("Starting path: %12s " GRN("=>") " %-12s", p->in->name, p->out->name);
|
||||
char buf[33];
|
||||
path_print(p, buf, sizeof(buf));
|
||||
|
||||
info("Starting path: %s", buf);
|
||||
|
||||
hist_init(&p->histogram, -HIST_SEQ, +HIST_SEQ, 1);
|
||||
|
||||
|
@ -155,7 +167,10 @@ int path_start(struct path *p)
|
|||
|
||||
int path_stop(struct path *p)
|
||||
{ INDENT
|
||||
info("Stopping path: %12s " RED("=>") " %-12s", p->in->name, p->out->name);
|
||||
char buf[33];
|
||||
path_print(p, buf, sizeof(buf));
|
||||
|
||||
info("Stopping path: %s", buf);
|
||||
|
||||
pthread_cancel(p->recv_tid);
|
||||
pthread_join(p->recv_tid, NULL);
|
||||
|
@ -176,8 +191,34 @@ int path_stop(struct path *p)
|
|||
|
||||
void path_stats(struct path *p)
|
||||
{
|
||||
info("%12s " MAG("=>") " %-12s: %-8u %-8u %-8u %-8u %-8u",
|
||||
p->in->name, p->out->name,
|
||||
p->sent, p->received, p->dropped, p->skipped, p->invalid
|
||||
char buf[33];
|
||||
path_print(p, buf, sizeof(buf));
|
||||
|
||||
info("%-32s : %-8u %-8u %-8u %-8u %-8u",
|
||||
buf, p->sent, p->received, p->dropped, p->skipped, p->invalid
|
||||
);
|
||||
}
|
||||
|
||||
int path_print(struct path *p, char *buf, int len)
|
||||
{
|
||||
*buf = 0;
|
||||
|
||||
if (list_length(&p->destinations) > 1) {
|
||||
strap(buf, len, "%s " MAG("=>") " [", p->in->name);
|
||||
FOREACH(&p->destinations, it)
|
||||
strap(buf, len, " %s", it->node->name);
|
||||
strap(buf, len, " ]");
|
||||
}
|
||||
else
|
||||
strap(buf, len, "%s " MAG("=>") " %s", p->in->name, p->out->name);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int path_destroy(struct path *p)
|
||||
{
|
||||
list_destroy(&p->destinations);
|
||||
list_destroy(&p->hooks);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -38,6 +38,7 @@ static void quit()
|
|||
info("Stopping paths:");
|
||||
for (struct path *p = paths; p; p = p->next) { INDENT
|
||||
path_stop(p);
|
||||
path_destroy(p);
|
||||
}
|
||||
|
||||
info("Stopping nodes:");
|
||||
|
@ -50,7 +51,7 @@ static void quit()
|
|||
if_stop(i);
|
||||
}
|
||||
|
||||
/** @todo Free nodes and paths */
|
||||
/** @todo Free nodes */
|
||||
|
||||
config_destroy(&config);
|
||||
|
||||
|
@ -155,8 +156,8 @@ int main(int argc, char *argv[])
|
|||
struct path *p = paths;
|
||||
|
||||
info("Runtime Statistics:");
|
||||
info("%12s " MAG("=>") " %-12s: %-8s %-8s %-8s %-8s %-8s",
|
||||
"Source", "Destination", "#Sent", "#Recv", "#Drop", "#Skip", "#Inval");
|
||||
info("%-32s : %-8s %-8s %-8s %-8s %-8s",
|
||||
"Source " MAG("=>") " Destination", "#Sent", "#Recv", "#Drop", "#Skip", "#Inval");
|
||||
info("---------------------------------------------------------------------------");
|
||||
|
||||
while (1) {
|
||||
|
|
|
@ -35,6 +35,25 @@ void epoch_reset()
|
|||
clock_gettime(CLOCK_REALTIME, &epoch);
|
||||
}
|
||||
|
||||
int strap(char *dest, size_t size, const char *fmt, ...)
|
||||
{
|
||||
int ret;
|
||||
|
||||
va_list ap;
|
||||
va_start(ap, fmt);
|
||||
ret = vstrap(dest, size, fmt, ap);
|
||||
va_end(ap);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int vstrap(char *dest, size_t size, const char *fmt, va_list ap)
|
||||
{
|
||||
int len = strlen(dest);
|
||||
|
||||
return vsnprintf(dest + len, size - len, fmt, ap);
|
||||
}
|
||||
|
||||
void print(enum log_level lvl, const char *fmt, ...)
|
||||
{
|
||||
struct timespec ts;
|
||||
|
|
Loading…
Add table
Reference in a new issue