diff --git a/server/include/path.h b/server/include/path.h index c11d794d2..dd68f7995 100644 --- a/server/include/path.h +++ b/server/include/path.h @@ -39,13 +39,16 @@ struct path int tfd; /** Send messages with a fixed rate over this path */ double rate; + + /** Size of the history buffer in number of messages */ + int poolsize; + /** A circular buffer of past messages */ + struct msg *pool; /** A pointer to the last received message */ struct msg *current; /** A pointer to the previously received message */ struct msg *previous; - /** A circular buffer of past messages */ - struct msg *history; /** Counter for received messages according to their sequence no displacement */ struct hist histogram; diff --git a/server/src/cfg.c b/server/src/cfg.c index cb705cedd..3161938f4 100644 --- a/server/src/cfg.c +++ b/server/src/cfg.c @@ -95,10 +95,10 @@ int config_parse_path(config_setting_t *cfg, struct list *paths, struct list *nodes) { const char *in; - int enabled = 1; - int reverse = 0; + int enabled; + int reverse; - struct path *p = alloc(sizeof(struct path)); + struct path *p = path_create(); /* Input node */ struct config_setting_t *cfg_in = config_setting_get_member(cfg, "in"); @@ -125,9 +125,14 @@ int config_parse_path(config_setting_t *cfg, if (cfg_hook) config_parse_hooks(cfg_hook, &p->hooks); - config_setting_lookup_bool(cfg, "enabled", &enabled); - config_setting_lookup_bool(cfg, "reverse", &reverse); - config_setting_lookup_float(cfg, "rate", &p->rate); + if (!config_setting_lookup_bool(cfg, "enabled", &enabled)) + enabled = 1; + if (!config_setting_lookup_bool(cfg, "reverse", &reverse)) + reverse = 0; + if (!config_setting_lookup_float(cfg, "rate", &p->rate)) + p->rate = 0; /* disabled */ + if (!config_setting_lookup_int(cfg, "poolsize", &p->poolsize)) + p->poolsize = DEFAULT_POOLSIZE; p->cfg = cfg; @@ -250,7 +255,10 @@ int config_parse_node(config_setting_t *cfg, struct list *nodes) cerror(cfg, "Missing node name"); if (!config_setting_lookup_string(cfg, "type", &type)) - cerror(cfg, "Missing node name"); + cerror(cfg, "Missing node type"); + + if (!config_setting_lookup_int(cfg, "combine", &n->combine)) + n->combine = 1; n->vt = node_lookup_vtable(type); if (!n->vt) diff --git a/server/src/file.c b/server/src/file.c index 216ec321c..6e7a68657 100644 --- a/server/src/file.c +++ b/server/src/file.c @@ -12,6 +12,17 @@ #include "file.h" #include "utils.h" + +int file_init(int argc, char *argv[], struct settings *set) +{ INDENT + return 0; /* nothing todo here */ +} + +int file_deinit() +{ INDENT + return 0; /* nothing todo here */ +} + int file_print(struct node *n, char *buf, int len) { struct file *f = n->file; @@ -83,25 +94,41 @@ int file_close(struct node *n) return 0; } -int file_read(struct node *n, struct msg *m) +int file_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt) { + int i = 0; struct file *f = n->file; uint64_t runs; - if (!f->in) - error("Can't read from file node!"); + if (f->in) { + read(f->tfd, &runs, sizeof(runs)); /* blocking for 1/f->rate seconds */ - read(f->tfd, &runs, sizeof(runs)); /* blocking for 1/f->rate seconds */ + for (i=0; iin, m); + } + } + else + warn("Can not read from node '%s'", n->name); - return msg_fscan(f->in, m); + return i; } -int file_write(struct node *n, struct msg *m) +int file_write(struct node *n, struct msg *pool, int poolsize, int first, int cnt) { + int i = 0; struct file *f = n->file; - if (!f->out) - error("Can't write to file node!"); + if (f->out) { + for (i=0; iout, m); + } + } + else + warn("Can not write to node '%s", n->name); - return msg_fprint(f->out, m); + return i; } \ No newline at end of file diff --git a/server/src/if.c b/server/src/if.c index acf3bc737..4085f1b36 100644 --- a/server/src/if.c +++ b/server/src/if.c @@ -48,13 +48,13 @@ void if_destroy(struct interface *i) } int if_start(struct interface *i, int affinity) -{ INDENT +{ if (!i->refcnt) { warn("Interface '%s' is not used by an active node", i->name); return -1; } else - info("Starting interface '%s'", i->name); + info("Starting interface '%s' (index=%u)", i->name, i->index); { INDENT int mark = 0; @@ -68,7 +68,7 @@ int if_start(struct interface *i, int affinity) if (setsockopt(s->sd, SOL_SOCKET, SO_MARK, &s->mark, sizeof(s->mark))) serror("Failed to set fwmark for outgoing packets"); else - debug(4, "Set fwmark for socket->sd = %u to %u", s->sd, s->mark); + debug(4, "Set fwmark for socket (sd=%u) to %u", s->sd, s->mark); tc_mark(i, TC_HDL(4000, s->mark), s->mark); tc_netem(i, TC_HDL(4000, s->mark), s->netem); @@ -88,8 +88,8 @@ int if_start(struct interface *i, int affinity) } int if_stop(struct interface *i) -{ INDENT - info("Stopping interface '%s'", i->name); +{ + info("Stopping interface '%s' (index=%u)", i->name, i->index); { INDENT if_setaffinity(i, -1L); diff --git a/server/src/msg.c b/server/src/msg.c index cb46b22da..67b93ef2a 100644 --- a/server/src/msg.c +++ b/server/src/msg.c @@ -32,7 +32,9 @@ void msg_swap(struct msg *m) int msg_verify(struct msg *m) { return ((m->version == MSG_VERSION) && - (m->type == MSG_TYPE_DATA)) + (m->type == MSG_TYPE_DATA) && + (m->length > 0) && + (m->length <= MSG_VALUES)) ? 0 : -1; } diff --git a/server/src/path.c b/server/src/path.c index 53c917f29..192f45633 100644 --- a/server/src/path.c +++ b/server/src/path.c @@ -14,6 +14,7 @@ #include "utils.h" #include "path.h" +#include "socket.h" #ifndef sigev_notify_thread_id #define sigev_notify_thread_id _sigev_un._tid @@ -44,12 +45,13 @@ static void * path_send(void *arg) serror("Failed to start timer"); while (1) { + /* Block until 1/p->rate seconds elapsed */ read(p->tfd, &runs, sizeof(runs)); FOREACH(&p->destinations, it) - node_write(it->node, p->current); - - p->sent++; + p->sent += node_write(p->in, p->pool, p->poolsize, p->received, p->in->combine); + + debug(10, "Sent %u messages to %u destination nodes", p->in->combine, p->destinations.length); } return NULL; @@ -59,61 +61,70 @@ static void * path_send(void *arg) static void * path_run(void *arg) { struct path *p = arg; - char buf[33]; + + /* Allocate memory for message pool */ + p->pool = alloc(p->poolsize * sizeof(struct msg)); - /* Open deferred TCP connection */ + /* Open deferred TCP connection node_start_defer(p->in); FOREACH(&p->destinations, it) - node_start_defer(it->path->out); + node_start_defer(it->node); */ /* Main thread loop */ - while (1) { +skip: while (1) { /* Receive message */ - p->previous = &p->history[(p->received-1) % POOL_SIZE]; - p->current = &p->history[ p->received % POOL_SIZE]; + int recv = node_read(p->in, p->pool, p->poolsize, p->received, p->in->combine); - node_read(p->in, p->current); + debug(10, "Received %u messages from node '%s'", recv, p->in->name); - p->received++; + /* For each received message... */ + for (int i=0; iprevious = &p->pool[(p->received-1) % p->poolsize]; + p->current = &p->pool[ p->received % p->poolsize]; + + p->received++; + + /* Check header fields */ + if (msg_verify(p->current)) { + p->invalid++; + goto skip; /* Drop message */ + } - /* Check header fields */ - if (msg_verify(p->current)) { - p->invalid++; - continue; /* Drop message */ + /* Update histogram and handle wrap-around of sequence number */ + int dist = (UINT16_MAX + p->current->sequence - p->previous->sequence) % UINT16_MAX; + if (dist > UINT16_MAX / 2) + dist -= UINT16_MAX; + + hist_put(&p->histogram, dist); + + /* Handle simulation restart */ + if (p->current->sequence == 0 && abs(dist) >= 1) { + char buf[33]; + path_print(p, buf, sizeof(buf)); + warn("Simulation for path %s restarted (prev->seq=%u, current->seq=%u, dist=%d)", + buf, p->previous->sequence, p->current->sequence, dist); + + path_reset(p); + } + else if (dist <= 0 && p->received > 1) { + p->dropped++; + goto skip; + } } - - /* Update histogram and handle wrap-around */ - int dist = (UINT16_MAX + p->current->sequence - p->previous->sequence) % UINT16_MAX; - if (dist > UINT16_MAX / 2) - dist -= UINT16_MAX; - - hist_put(&p->histogram, dist); - - /* Handle simulation restart */ - if (p->current->sequence == 0 && abs(dist) >= 1) { - warn("Simulation for path %s restarted (prev->seq=%u, current->seq=%u, dist=%d)", - buf, p->previous->sequence, p->current->sequence, dist); - - path_reset(p); - } - else if (dist <= 0 && p->received > 1) { - p->dropped++; - continue; - } - + /* Call hook callbacks */ FOREACH(&p->hooks, it) { if (it->hook(p->current, p)) { p->skipped++; - continue; + goto skip; } } - + /* At fixed rate mode, messages are send by another thread */ if (!p->rate) { FOREACH(&p->destinations, it) - node_write(it->node, p->current); + node_write(p->in, p->pool, p->poolsize, p->received, p->in->combine); p->sent++; } @@ -127,7 +138,7 @@ int path_start(struct path *p) char buf[33]; path_print(p, buf, sizeof(buf)); - info("Starting path: %s", buf); + info("Starting path: %s (poolsize = %u)", buf, p->poolsize); /* At fixed rate mode, we start another thread for sending */ if (p->rate) @@ -202,8 +213,6 @@ int path_print(struct path *p, char *buf, int len) struct path * path_create() { struct path *p = alloc(sizeof(struct path)); - - p->history = alloc(POOL_SIZE * sizeof(struct msg)); list_init(&p->destinations, NULL); list_init(&p->hooks, NULL); @@ -219,6 +228,6 @@ void path_destroy(struct path *p) list_destroy(&p->hooks); hist_destroy(&p->histogram); - free(p->history); + free(p->pool); free(p); } diff --git a/server/src/receive.c b/server/src/receive.c index 85141895d..41348e7cc 100644 --- a/server/src/receive.c +++ b/server/src/receive.c @@ -24,15 +24,22 @@ #include "utils.h" #include "node.h" #include "msg.h" +#include "socket.h" static struct settings set; -static struct msg msg = MSG_INIT(0); -extern struct list nodes; +static struct msg *pool; static struct node *node; +extern struct list nodes; + void quit(int sig, siginfo_t *si, void *ptr) { node_stop(node); + node_deinit(); + + list_destroy(&nodes); + free(pool); + exit(EXIT_SUCCESS); } @@ -53,13 +60,13 @@ void usage(char *name) int main(int argc, char *argv[]) { - char c; int reverse = 0; struct config_t config; _mtid = pthread_self(); + char c; while ((c = getopt(argc, argv, "hr")) != -1) { switch (c) { case 'r': reverse = 1; break; @@ -81,18 +88,21 @@ int main(int argc, char *argv[]) sigaction(SIGTERM, &sa_quit, NULL); sigaction(SIGINT, &sa_quit, NULL); + list_init(&nodes, (dtor_cb_t) node_destroy); config_init(&config); config_parse(argv[optind], &config, &set, &nodes, NULL); node = node_lookup_name(argv[optind+1], &nodes); if (!node) error("There's no node with the name '%s'", argv[optind+1]); - - node->refcnt++; if (reverse) node_reverse(node); + + node->refcnt++; + pool = alloc(sizeof(struct msg) * node->combine); + node_init(argc-optind, argv+optind, &set); node_start(node); node_start_defer(node); @@ -100,20 +110,20 @@ int main(int argc, char *argv[]) fprintf(stderr, "# %-6s %-8s %-12s\n", "dev_id", "seq_no", "data"); while (1) { - node_read(node, &msg); + int recv = node_read(node, pool, node->combine, 0, node->combine); - if (msg.version != MSG_VERSION) - continue; - if (msg.type != MSG_TYPE_DATA) - continue; + for (int i=0; irefcnt++; - + if (reverse) node_reverse(node); + + node->refcnt++; + pool = alloc(sizeof(struct msg) * node->combine); + node_init(argc-optind, argv+optind, &set); node_start(node); node_start_defer(node); while (!feof(stdin)) { - msg_fscan(stdin, &msg); + for (int i=0; icombine; i++) { + msg_fscan(stdin, &pool[i]); #if 1 /* Preprend timestamp */ - struct timespec ts; - clock_gettime(CLOCK_REALTIME, &ts); - fprintf(stdout, "%17.3f\t", ts.tv_sec + ts.tv_nsec / 1e9); + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + fprintf(stdout, "%17.6f\t", ts.tv_sec + ts.tv_nsec / 1e9); #endif - msg_fprint(stdout, &msg); - node_write(node, &msg); + msg_fprint(stdout, &pool[i]); + } + + node_write(node, pool, node->combine, 0, node->combine); } return 0; } + +/** @} */ \ No newline at end of file diff --git a/server/src/server.c b/server/src/server.c index 176062799..602f3b184 100644 --- a/server/src/server.c +++ b/server/src/server.c @@ -12,10 +12,8 @@ #include #include #include -#include #include "config.h" -#include "if.h" #include "utils.h" #include "cfg.h" #include "path.h" @@ -29,8 +27,6 @@ extern struct list nodes; /** Linked list of paths */ extern struct list paths; -/** Linked list of interfaces */ -extern struct list interfaces; /** The global configuration */ static struct settings settings; @@ -46,18 +42,13 @@ static void quit() FOREACH(&nodes, it) node_stop(it->node); - info("Stopping interfaces"); - FOREACH(&interfaces, it) - if_stop(it->interface); + node_deinit(); /* Freeing dynamically allocated memory */ list_destroy(&paths); list_destroy(&nodes); - list_destroy(&interfaces); config_destroy(&config); - node_deinit(); - info("Goodbye!"); _exit(EXIT_SUCCESS); @@ -140,7 +131,6 @@ int main(int argc, char *argv[]) /* Initialize lists */ list_init(&nodes, (dtor_cb_t) node_destroy); list_init(&paths, (dtor_cb_t) path_destroy); - list_init(&interfaces, (dtor_cb_t) if_destroy); info("Initialize real-time system"); realtime_init(); @@ -148,22 +138,18 @@ int main(int argc, char *argv[]) info("Initialize signals"); signals_init(); - info("Initialize node types"); - node_init(argc, argv); - info("Parsing configuration"); config_init(&config); config_parse(configfile, &config, &settings, &nodes, &paths); + + info("Initialize node types"); + node_init(argc, argv, &settings); /* Connect all nodes and start one thread per path */ info("Starting nodes"); FOREACH(&nodes, it) node_start(it->node); - info("Starting interfaces"); - FOREACH(&interfaces, it) - if_start(it->interface, settings.affinity); - info("Starting paths"); FOREACH(&paths, it) path_start(it->path); diff --git a/server/src/test.c b/server/src/test.c index bcdf4be7a..628435809 100644 --- a/server/src/test.c +++ b/server/src/test.c @@ -100,8 +100,7 @@ int main(int argc, char *argv[]) node_start_defer(node); /* Parse Arguments */ - char c; - char *endptr; + char c, *endptr; while ((c = getopt (argc-3, argv+3, "l:h:r:f:c:")) != -1) { switch (c) { case 'c': @@ -168,8 +167,8 @@ void test_rtt() { while (running && (count < 0 || count--)) { clock_gettime(CLOCK_ID, ts1); - node_write(node, &m); - node_read(node, &m); + node_write_single(node, &m); /* Ping */ + node_read_single(node, &m); /* Pong */ clock_gettime(CLOCK_ID, ts2); rtt = timespec_delta(ts1, ts2);