1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

integrating new node types into server and tools

This commit is contained in:
Steffen Vogel 2015-05-06 11:49:13 +02:00
parent d1d5f521b7
commit 9af2e26655
10 changed files with 173 additions and 115 deletions

View file

@ -39,13 +39,16 @@ struct path
int tfd;
/** Send messages with a fixed rate over this path */
double rate;
/** Size of the history buffer in number of messages */
int poolsize;
/** A circular buffer of past messages */
struct msg *pool;
/** A pointer to the last received message */
struct msg *current;
/** A pointer to the previously received message */
struct msg *previous;
/** A circular buffer of past messages */
struct msg *history;
/** Counter for received messages according to their sequence no displacement */
struct hist histogram;

View file

@ -95,10 +95,10 @@ int config_parse_path(config_setting_t *cfg,
struct list *paths, struct list *nodes)
{
const char *in;
int enabled = 1;
int reverse = 0;
int enabled;
int reverse;
struct path *p = alloc(sizeof(struct path));
struct path *p = path_create();
/* Input node */
struct config_setting_t *cfg_in = config_setting_get_member(cfg, "in");
@ -125,9 +125,14 @@ int config_parse_path(config_setting_t *cfg,
if (cfg_hook)
config_parse_hooks(cfg_hook, &p->hooks);
config_setting_lookup_bool(cfg, "enabled", &enabled);
config_setting_lookup_bool(cfg, "reverse", &reverse);
config_setting_lookup_float(cfg, "rate", &p->rate);
if (!config_setting_lookup_bool(cfg, "enabled", &enabled))
enabled = 1;
if (!config_setting_lookup_bool(cfg, "reverse", &reverse))
reverse = 0;
if (!config_setting_lookup_float(cfg, "rate", &p->rate))
p->rate = 0; /* disabled */
if (!config_setting_lookup_int(cfg, "poolsize", &p->poolsize))
p->poolsize = DEFAULT_POOLSIZE;
p->cfg = cfg;
@ -250,7 +255,10 @@ int config_parse_node(config_setting_t *cfg, struct list *nodes)
cerror(cfg, "Missing node name");
if (!config_setting_lookup_string(cfg, "type", &type))
cerror(cfg, "Missing node name");
cerror(cfg, "Missing node type");
if (!config_setting_lookup_int(cfg, "combine", &n->combine))
n->combine = 1;
n->vt = node_lookup_vtable(type);
if (!n->vt)

View file

@ -12,6 +12,17 @@
#include "file.h"
#include "utils.h"
int file_init(int argc, char *argv[], struct settings *set)
{ INDENT
return 0; /* nothing todo here */
}
int file_deinit()
{ INDENT
return 0; /* nothing todo here */
}
int file_print(struct node *n, char *buf, int len)
{
struct file *f = n->file;
@ -83,25 +94,41 @@ int file_close(struct node *n)
return 0;
}
int file_read(struct node *n, struct msg *m)
int file_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt)
{
int i = 0;
struct file *f = n->file;
uint64_t runs;
if (!f->in)
error("Can't read from file node!");
if (f->in) {
read(f->tfd, &runs, sizeof(runs)); /* blocking for 1/f->rate seconds */
read(f->tfd, &runs, sizeof(runs)); /* blocking for 1/f->rate seconds */
for (i=0; i<cnt; i++) {
struct msg *m = &pool[(first+i) % poolsize];
msg_fscan(f->in, m);
}
}
else
warn("Can not read from node '%s'", n->name);
return msg_fscan(f->in, m);
return i;
}
int file_write(struct node *n, struct msg *m)
int file_write(struct node *n, struct msg *pool, int poolsize, int first, int cnt)
{
int i = 0;
struct file *f = n->file;
if (!f->out)
error("Can't write to file node!");
if (f->out) {
for (i=0; i<cnt; i++) {
struct msg *m = &pool[(first+i) % poolsize];
msg_fprint(f->out, m);
}
}
else
warn("Can not write to node '%s", n->name);
return msg_fprint(f->out, m);
return i;
}

View file

@ -48,13 +48,13 @@ void if_destroy(struct interface *i)
}
int if_start(struct interface *i, int affinity)
{ INDENT
{
if (!i->refcnt) {
warn("Interface '%s' is not used by an active node", i->name);
return -1;
}
else
info("Starting interface '%s'", i->name);
info("Starting interface '%s' (index=%u)", i->name, i->index);
{ INDENT
int mark = 0;
@ -68,7 +68,7 @@ int if_start(struct interface *i, int affinity)
if (setsockopt(s->sd, SOL_SOCKET, SO_MARK, &s->mark, sizeof(s->mark)))
serror("Failed to set fwmark for outgoing packets");
else
debug(4, "Set fwmark for socket->sd = %u to %u", s->sd, s->mark);
debug(4, "Set fwmark for socket (sd=%u) to %u", s->sd, s->mark);
tc_mark(i, TC_HDL(4000, s->mark), s->mark);
tc_netem(i, TC_HDL(4000, s->mark), s->netem);
@ -88,8 +88,8 @@ int if_start(struct interface *i, int affinity)
}
int if_stop(struct interface *i)
{ INDENT
info("Stopping interface '%s'", i->name);
{
info("Stopping interface '%s' (index=%u)", i->name, i->index);
{ INDENT
if_setaffinity(i, -1L);

View file

@ -32,7 +32,9 @@ void msg_swap(struct msg *m)
int msg_verify(struct msg *m)
{
return ((m->version == MSG_VERSION) &&
(m->type == MSG_TYPE_DATA))
(m->type == MSG_TYPE_DATA) &&
(m->length > 0) &&
(m->length <= MSG_VALUES))
? 0
: -1;
}

View file

@ -14,6 +14,7 @@
#include "utils.h"
#include "path.h"
#include "socket.h"
#ifndef sigev_notify_thread_id
#define sigev_notify_thread_id _sigev_un._tid
@ -44,12 +45,13 @@ static void * path_send(void *arg)
serror("Failed to start timer");
while (1) {
/* Block until 1/p->rate seconds elapsed */
read(p->tfd, &runs, sizeof(runs));
FOREACH(&p->destinations, it)
node_write(it->node, p->current);
p->sent++;
p->sent += node_write(p->in, p->pool, p->poolsize, p->received, p->in->combine);
debug(10, "Sent %u messages to %u destination nodes", p->in->combine, p->destinations.length);
}
return NULL;
@ -59,61 +61,70 @@ static void * path_send(void *arg)
static void * path_run(void *arg)
{
struct path *p = arg;
char buf[33];
/* Allocate memory for message pool */
p->pool = alloc(p->poolsize * sizeof(struct msg));
/* Open deferred TCP connection */
/* Open deferred TCP connection
node_start_defer(p->in);
FOREACH(&p->destinations, it)
node_start_defer(it->path->out);
node_start_defer(it->node); */
/* Main thread loop */
while (1) {
skip: while (1) {
/* Receive message */
p->previous = &p->history[(p->received-1) % POOL_SIZE];
p->current = &p->history[ p->received % POOL_SIZE];
int recv = node_read(p->in, p->pool, p->poolsize, p->received, p->in->combine);
node_read(p->in, p->current);
debug(10, "Received %u messages from node '%s'", recv, p->in->name);
p->received++;
/* For each received message... */
for (int i=0; i<recv; i++) {
p->previous = &p->pool[(p->received-1) % p->poolsize];
p->current = &p->pool[ p->received % p->poolsize];
p->received++;
/* Check header fields */
if (msg_verify(p->current)) {
p->invalid++;
goto skip; /* Drop message */
}
/* Check header fields */
if (msg_verify(p->current)) {
p->invalid++;
continue; /* Drop message */
/* Update histogram and handle wrap-around of sequence number */
int dist = (UINT16_MAX + p->current->sequence - p->previous->sequence) % UINT16_MAX;
if (dist > UINT16_MAX / 2)
dist -= UINT16_MAX;
hist_put(&p->histogram, dist);
/* Handle simulation restart */
if (p->current->sequence == 0 && abs(dist) >= 1) {
char buf[33];
path_print(p, buf, sizeof(buf));
warn("Simulation for path %s restarted (prev->seq=%u, current->seq=%u, dist=%d)",
buf, p->previous->sequence, p->current->sequence, dist);
path_reset(p);
}
else if (dist <= 0 && p->received > 1) {
p->dropped++;
goto skip;
}
}
/* Update histogram and handle wrap-around */
int dist = (UINT16_MAX + p->current->sequence - p->previous->sequence) % UINT16_MAX;
if (dist > UINT16_MAX / 2)
dist -= UINT16_MAX;
hist_put(&p->histogram, dist);
/* Handle simulation restart */
if (p->current->sequence == 0 && abs(dist) >= 1) {
warn("Simulation for path %s restarted (prev->seq=%u, current->seq=%u, dist=%d)",
buf, p->previous->sequence, p->current->sequence, dist);
path_reset(p);
}
else if (dist <= 0 && p->received > 1) {
p->dropped++;
continue;
}
/* Call hook callbacks */
FOREACH(&p->hooks, it) {
if (it->hook(p->current, p)) {
p->skipped++;
continue;
goto skip;
}
}
/* At fixed rate mode, messages are send by another thread */
if (!p->rate) {
FOREACH(&p->destinations, it)
node_write(it->node, p->current);
node_write(p->in, p->pool, p->poolsize, p->received, p->in->combine);
p->sent++;
}
@ -127,7 +138,7 @@ int path_start(struct path *p)
char buf[33];
path_print(p, buf, sizeof(buf));
info("Starting path: %s", buf);
info("Starting path: %s (poolsize = %u)", buf, p->poolsize);
/* At fixed rate mode, we start another thread for sending */
if (p->rate)
@ -202,8 +213,6 @@ int path_print(struct path *p, char *buf, int len)
struct path * path_create()
{
struct path *p = alloc(sizeof(struct path));
p->history = alloc(POOL_SIZE * sizeof(struct msg));
list_init(&p->destinations, NULL);
list_init(&p->hooks, NULL);
@ -219,6 +228,6 @@ void path_destroy(struct path *p)
list_destroy(&p->hooks);
hist_destroy(&p->histogram);
free(p->history);
free(p->pool);
free(p);
}

View file

@ -24,15 +24,22 @@
#include "utils.h"
#include "node.h"
#include "msg.h"
#include "socket.h"
static struct settings set;
static struct msg msg = MSG_INIT(0);
extern struct list nodes;
static struct msg *pool;
static struct node *node;
extern struct list nodes;
void quit(int sig, siginfo_t *si, void *ptr)
{
node_stop(node);
node_deinit();
list_destroy(&nodes);
free(pool);
exit(EXIT_SUCCESS);
}
@ -53,13 +60,13 @@ void usage(char *name)
int main(int argc, char *argv[])
{
char c;
int reverse = 0;
struct config_t config;
_mtid = pthread_self();
char c;
while ((c = getopt(argc, argv, "hr")) != -1) {
switch (c) {
case 'r': reverse = 1; break;
@ -81,18 +88,21 @@ int main(int argc, char *argv[])
sigaction(SIGTERM, &sa_quit, NULL);
sigaction(SIGINT, &sa_quit, NULL);
list_init(&nodes, (dtor_cb_t) node_destroy);
config_init(&config);
config_parse(argv[optind], &config, &set, &nodes, NULL);
node = node_lookup_name(argv[optind+1], &nodes);
if (!node)
error("There's no node with the name '%s'", argv[optind+1]);
node->refcnt++;
if (reverse)
node_reverse(node);
node->refcnt++;
pool = alloc(sizeof(struct msg) * node->combine);
node_init(argc-optind, argv+optind, &set);
node_start(node);
node_start_defer(node);
@ -100,20 +110,20 @@ int main(int argc, char *argv[])
fprintf(stderr, "# %-6s %-8s %-12s\n", "dev_id", "seq_no", "data");
while (1) {
node_read(node, &msg);
int recv = node_read(node, pool, node->combine, 0, node->combine);
if (msg.version != MSG_VERSION)
continue;
if (msg.type != MSG_TYPE_DATA)
continue;
for (int i=0; i<recv; i++) {
if (msg_verify(&pool[i]))
warn("Failed to verify message");
#if 1
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
fprintf(stdout, "%17.6f", ts.tv_sec + ts.tv_nsec / 1e9);
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
fprintf(stdout, "%17.6f\t", ts.tv_sec + ts.tv_nsec / 1e9);
#endif
msg_fprint(stdout, &msg);
msg_fprint(stdout, &pool[i]);
}
}
return 0;

View file

@ -27,14 +27,22 @@
#include "msg.h"
#include "socket.h"
static struct config_t config;
static struct settings set;
static struct msg msg = MSG_INIT(0);
static struct msg *pool;
static struct node *node;
extern struct list nodes;
void quit(int sig, siginfo_t *si, void *ptr)
{
node_stop(node);
node_deinit();
list_destroy(&nodes);
config_destroy(&config);
free(pool);
exit(EXIT_SUCCESS);
}
@ -55,13 +63,11 @@ void usage(char *name)
int main(int argc, char *argv[])
{
char c;
int reverse = 0;
_mtid = pthread_self();
struct config_t config;
char c;
while ((c = getopt(argc, argv, "hr")) != -1) {
switch (c) {
case 'r': reverse = 1; break;
@ -83,33 +89,41 @@ int main(int argc, char *argv[])
sigaction(SIGTERM, &sa_quit, NULL);
sigaction(SIGINT, &sa_quit, NULL);
list_init(&nodes, (dtor_cb_t) node_destroy);
config_init(&config);
config_parse(argv[optind], &config, &set, &nodes, NULL);
node = node_lookup_name(argv[optind+1], &nodes);
if (!node)
error("There's no node with the name '%s'", argv[optind+1]);
node->refcnt++;
if (reverse)
node_reverse(node);
node->refcnt++;
pool = alloc(sizeof(struct msg) * node->combine);
node_init(argc-optind, argv+optind, &set);
node_start(node);
node_start_defer(node);
while (!feof(stdin)) {
msg_fscan(stdin, &msg);
for (int i=0; i<node->combine; i++) {
msg_fscan(stdin, &pool[i]);
#if 1 /* Preprend timestamp */
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
fprintf(stdout, "%17.3f\t", ts.tv_sec + ts.tv_nsec / 1e9);
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
fprintf(stdout, "%17.6f\t", ts.tv_sec + ts.tv_nsec / 1e9);
#endif
msg_fprint(stdout, &msg);
node_write(node, &msg);
msg_fprint(stdout, &pool[i]);
}
node_write(node, pool, node->combine, 0, node->combine);
}
return 0;
}
/** @} */

View file

@ -12,10 +12,8 @@
#include <signal.h>
#include <unistd.h>
#include <sys/stat.h>
#include <netinet/ip.h>
#include "config.h"
#include "if.h"
#include "utils.h"
#include "cfg.h"
#include "path.h"
@ -29,8 +27,6 @@
extern struct list nodes;
/** Linked list of paths */
extern struct list paths;
/** Linked list of interfaces */
extern struct list interfaces;
/** The global configuration */
static struct settings settings;
@ -46,18 +42,13 @@ static void quit()
FOREACH(&nodes, it)
node_stop(it->node);
info("Stopping interfaces");
FOREACH(&interfaces, it)
if_stop(it->interface);
node_deinit();
/* Freeing dynamically allocated memory */
list_destroy(&paths);
list_destroy(&nodes);
list_destroy(&interfaces);
config_destroy(&config);
node_deinit();
info("Goodbye!");
_exit(EXIT_SUCCESS);
@ -140,7 +131,6 @@ int main(int argc, char *argv[])
/* Initialize lists */
list_init(&nodes, (dtor_cb_t) node_destroy);
list_init(&paths, (dtor_cb_t) path_destroy);
list_init(&interfaces, (dtor_cb_t) if_destroy);
info("Initialize real-time system");
realtime_init();
@ -148,22 +138,18 @@ int main(int argc, char *argv[])
info("Initialize signals");
signals_init();
info("Initialize node types");
node_init(argc, argv);
info("Parsing configuration");
config_init(&config);
config_parse(configfile, &config, &settings, &nodes, &paths);
info("Initialize node types");
node_init(argc, argv, &settings);
/* Connect all nodes and start one thread per path */
info("Starting nodes");
FOREACH(&nodes, it)
node_start(it->node);
info("Starting interfaces");
FOREACH(&interfaces, it)
if_start(it->interface, settings.affinity);
info("Starting paths");
FOREACH(&paths, it)
path_start(it->path);

View file

@ -100,8 +100,7 @@ int main(int argc, char *argv[])
node_start_defer(node);
/* Parse Arguments */
char c;
char *endptr;
char c, *endptr;
while ((c = getopt (argc-3, argv+3, "l:h:r:f:c:")) != -1) {
switch (c) {
case 'c':
@ -168,8 +167,8 @@ void test_rtt() {
while (running && (count < 0 || count--)) {
clock_gettime(CLOCK_ID, ts1);
node_write(node, &m);
node_read(node, &m);
node_write_single(node, &m); /* Ping */
node_read_single(node, &m); /* Pong */
clock_gettime(CLOCK_ID, ts2);
rtt = timespec_delta(ts1, ts2);