diff --git a/server/include/hooks.h b/server/include/hooks.h index b150649df..8ecd449d5 100644 --- a/server/include/hooks.h +++ b/server/include/hooks.h @@ -48,12 +48,6 @@ hook_cb_t hook_lookup(const char *name); /** Example hook: Print the message. */ int hook_print(struct msg *m, struct path *p); -/** Example hook: Log messages to a logfile in /tmp */ -int hook_log(struct msg *m, struct path *p); - -#define HOOK_LOG_MODE "w+" -#define HOOK_LOG_TEMPLATE "logs/s2ss-%Y_%m_%d-%H_%M_%S.log" - /** Example hook: Drop messages. */ int hook_decimate(struct msg *m, struct path *p); diff --git a/server/include/path.h b/server/include/path.h index 2907a312f..a01bf357f 100644 --- a/server/include/path.h +++ b/server/include/path.h @@ -35,6 +35,8 @@ struct path /** List of function pointers to hooks */ struct list hooks; + /** Timer file descriptor for fixed rate sending */ + int tfd; /** Send messages with a fixed rate over this path */ double rate; @@ -59,8 +61,6 @@ struct path /** Counter for dropped messages due to reordering */ unsigned int dropped; - /** A timer used for fixed rate transmission. */ - timer_t timer; /** The thread id for this path */ pthread_t recv_tid; /** A second thread id for fixed rate sending thread */ diff --git a/server/include/utils.h b/server/include/utils.h index dd25dd55e..425be1ac6 100644 --- a/server/include/utils.h +++ b/server/include/utils.h @@ -36,9 +36,8 @@ /* Alternate character set */ #define ACS(chr) "\e(0" chr "\e(B" -#define ACS_VERTICAL "|" #define ACS_HORIZONTAL ACS("\x71") -//#define ACS_VERTICAL ACS("\x78") +#define ACS_VERTICAL ACS("\x78") #define ACS_VERTRIGHT ACS("\x74") /* UTF-8 Line drawing characters */ @@ -87,7 +86,7 @@ extern pthread_t _mtid; */ int strap(char *dest, size_t size, const char *fmt, ...); -/** Variable arguments (stdarg) version of strap() */ +/** Variadic version of strap() */ int vstrap(char *dest, size_t size, const char *fmt, va_list va); /** Convert integer to cpu_set_t. @@ -115,9 +114,8 @@ void die(); /** Check assertion and exit if failed. */ #define assert(exp) do { \ if (EXPECT(!exp, 0)) { \ - print(ERROR, "Assertion failed: '%s' in %s, %s:%d", \ + error, "Assertion failed: '%s' in %s, %s:%d", \ #exp, __FUNCTION__, __BASE_FILE__, __LINE__); \ - exit(EXIT_FAILURE); \ } } while (0) #endif /* _UTILS_H_ */ diff --git a/server/src/hist.c b/server/src/hist.c index 9c4d3118c..dd89a1a56 100644 --- a/server/src/hist.c +++ b/server/src/hist.c @@ -125,7 +125,7 @@ void hist_plot(struct hist *h) } /* Print plot */ - info("%9s | %5s | %s", "Value", "Occur", "Histogram Plot:"); + info("%9s | %5s | %s", "Value", "Occur", "Plot"); line(); for (int i = 0; i < h->length; i++) { diff --git a/server/src/hooks.c b/server/src/hooks.c index 571389381..eb2df689c 100644 --- a/server/src/hooks.c +++ b/server/src/hooks.c @@ -23,7 +23,6 @@ /** @todo Make const */ static struct hook_id hook_list[] = { { hook_print, "print" }, - { hook_log, "log" }, { hook_decimate, "decimate" }, { hook_tofixed, "tofixed" }, { hook_ts, "ts" }, @@ -50,33 +49,6 @@ int hook_print(struct msg *m, struct path *p) return 0; } -int hook_log(struct msg *m, struct path *p) -{ - static pthread_key_t pkey; - FILE *file = pthread_getspecific(pkey); - - if (!file) { - char fstr[64], pstr[33]; - path_print(p, pstr, sizeof(pstr)); - - struct tm tm; - time_t ts = time(NULL); - localtime_r(&ts, &tm); - strftime(fstr, sizeof(fstr), HOOK_LOG_TEMPLATE, &tm); - - file = fopen(fstr, HOOK_LOG_MODE); - if (file) - debug(5, "Opened log file for path %s: %s", pstr, fstr); - - pthread_key_create(&pkey, (dtor_cb_t) fclose); - pthread_setspecific(pkey, file); - } - - msg_fprint(file, m); - - return 0; -} - int hook_decimate(struct msg *m, struct path *p) { /* Drop every HOOK_DECIMATE_RATIO'th message */ diff --git a/server/src/log.c b/server/src/log.c index 3eac7d2b2..07b9c80f0 100644 --- a/server/src/log.c +++ b/server/src/log.c @@ -143,7 +143,7 @@ void serror(const char *fmt, ...) vsnprintf(buf, sizeof(buf), fmt, ap); va_end(ap); - log_print(ERROR, "%s: %s", buf, strerror(errno)); + log_print(ERROR, "%s: %m (%u)", buf, errno); die(); } @@ -162,4 +162,4 @@ void cerror(config_setting_t *cfg, const char *fmt, ...) : "(stdio)", config_setting_source_line(cfg)); die(); -} \ No newline at end of file +} diff --git a/server/src/node.c b/server/src/node.c index 831408e3d..90baab818 100644 --- a/server/src/node.c +++ b/server/src/node.c @@ -118,20 +118,18 @@ int node_start(struct node *n) int node_start_defer(struct node *n) { - int ret; + struct socket *s = n->socket; if (node_type(n) == TCPD) { info("Wait for incoming TCP connection from node '%s'...", n->name); - ret = listen(n->socket->sd2, 1); - if (ret < 0) + s->sd = listen(s->sd2, 1); + if (s->sd < 0) serror("Failed to listen on socket for node '%s'", n->name); - ret = accept(n->socket->sd2, NULL, NULL); - if (ret < 0) + s->sd = accept(s->sd2, NULL, NULL); + if (s->sd < 0) serror("Failed to accept on socket for node '%s'", n->name); - - n->socket->sd = ret; } return 0; diff --git a/server/src/path.c b/server/src/path.c index 4e228a3f6..53c917f29 100644 --- a/server/src/path.c +++ b/server/src/path.c @@ -8,9 +8,8 @@ #include #include #include -#include -#include +#include #include #include "utils.h" @@ -28,33 +27,24 @@ static void * path_send(void *arg) { struct path *p = arg; - int sig; - sigset_t set; - - struct sigevent sev = { - .sigev_notify = SIGEV_THREAD_ID, - .sigev_signo = SIGALRM, - .sigev_notify_thread_id = syscall(SYS_gettid) - }; + int ret; + uint64_t runs; struct itimerspec its = { .it_interval = timespec_rate(p->rate), .it_value = { 1, 0 } }; - sigemptyset(&set); - sigaddset(&set, SIGALRM); - if(pthread_sigmask(SIG_BLOCK, &set, NULL)) - serror("Set signal mask"); - - if (timer_create(CLOCK_REALTIME, &sev, &p->timer)) + p->tfd = timerfd_create(CLOCK_REALTIME, 0); + if (p->tfd < 0) serror("Failed to create timer"); - if (timer_settime(p->timer, 0, &its, NULL)) + ret = timerfd_settime(p->tfd, 0, &its, NULL); + if (ret) serror("Failed to start timer"); while (1) { - sigwait(&set, &sig); /* blocking wait for next timer tick */ + read(p->tfd, &runs, sizeof(runs)); FOREACH(&p->destinations, it) node_write(it->node, p->current); @@ -73,7 +63,9 @@ static void * path_run(void *arg) /* Open deferred TCP connection */ node_start_defer(p->in); - // FIXME: node_start_defer(p->out); + + FOREACH(&p->destinations, it) + node_start_defer(it->path->out); /* Main thread loop */ while (1) { @@ -158,7 +150,7 @@ int path_stop(struct path *p) pthread_cancel(p->sent_tid); pthread_join(p->sent_tid, NULL); - timer_delete(p->timer); + close(p->tfd); } if (p->received) diff --git a/server/src/server.c b/server/src/server.c index e48be40f5..f070024d3 100644 --- a/server/src/server.c +++ b/server/src/server.c @@ -39,15 +39,15 @@ static config_t config; static void quit() { - info("Stopping paths:"); + info("Stopping paths"); FOREACH(&paths, it) path_stop(it->path); - info("Stopping nodes:"); + info("Stopping nodes"); FOREACH(&nodes, it) node_stop(it->node); - info("Stopping interfaces:"); + info("Stopping interfaces"); FOREACH(&interfaces, it) if_stop(it->interface); @@ -143,35 +143,34 @@ int main(int argc, char *argv[]) list_init(&paths, (dtor_cb_t) path_destroy); list_init(&interfaces, (dtor_cb_t) if_destroy); - info("Initialize real-time system:"); + info("Initialize real-time system"); realtime_init(); - info("Initialize signals:"); + info("Initialize signals"); signals_init(); - info("Initialize node types:"); + info("Initialize node types"); node_init(argc, argv); - info("Parsing configuration:"); + info("Parsing configuration"); config_init(&config); config_parse(configfile, &config, &settings, &nodes, &paths); /* Connect all nodes and start one thread per path */ - info("Starting nodes:"); + info("Starting nodes"); FOREACH(&nodes, it) node_start(it->node); - info("Starting interfaces:"); + info("Starting interfaces"); FOREACH(&interfaces, it) if_start(it->interface, settings.affinity); - info("Starting paths:"); + info("Starting paths"); FOREACH(&paths, it) path_start(it->path); /* Run! */ if (settings.stats > 0) { - info("Runtime Statistics:"); info("%-32s : %-8s %-8s %-8s %-8s %-8s", "Source " MAG("=>") " Destination", "#Sent", "#Recv", "#Drop", "#Skip", "#Inval"); line(); diff --git a/server/src/socket.c b/server/src/socket.c index f54c70329..9a16c781f 100644 --- a/server/src/socket.c +++ b/server/src/socket.c @@ -53,7 +53,7 @@ int socket_open(struct node *n) case TCP: s->sd = socket(sin->sin_family, SOCK_STREAM, IPPROTO_TCP); break; case UDP: s->sd = socket(sin->sin_family, SOCK_DGRAM, IPPROTO_UDP); break; case IP: s->sd = socket(sin->sin_family, SOCK_RAW, ntohs(sin->sin_port)); break; - case IEEE_802_3:s->sd = socket(sin->sin_family, SOCK_DGRAM, sll->sll_protocol); break; + case IEEE_802_3:s->sd = socket(sll->sll_family, SOCK_DGRAM, sll->sll_protocol); break; default: error("Invalid socket type!"); } @@ -67,11 +67,8 @@ int socket_open(struct node *n) serror("Failed to bind socket"); /* Connect socket for sending */ - if (node_type(n) == TCPD) { - /* Listening TCP sockets will be connected later by calling accept() */ + if (node_type(n) == TCP) { s->sd2 = s->sd; - } - else if (node_type(n) != IEEE_802_3) { ret = connect(s->sd, (struct sockaddr *) &s->remote, sizeof(s->remote)); if (ret < 0) serror("Failed to connect socket"); @@ -153,18 +150,27 @@ int socket_read(struct node *n, struct msg *m) int socket_write(struct node *n, struct msg *m) { struct socket *s = n->socket; - int ret; + int ret = -1; /* Convert headers to network byte order */ m->sequence = htons(m->sequence); - if (node_type(n) == IEEE_802_3) - ret = sendto(s->sd, m, MSG_LEN(m->length), 0, (struct sockaddr *) &s->remote, sizeof(s->remote)); - else - ret = send(s->sd, m, MSG_LEN(m->length), 0); + switch (node_type(n)) { + case IEEE_802_3:/* Connection-less protocols */ + case IP: + case UDP: + ret = sendto(s->sd, m, MSG_LEN(m->length), 0, (struct sockaddr *) &s->remote, sizeof(s->remote)); + break; + + case TCP: /* Connection-oriented protocols */ + case TCPD: + ret = send(s->sd, m, MSG_LEN(m->length), 0); + break; + default: { } + } if (ret < 0) - serror("Failed send(to)"); + serror("Failed send"); debug(10, "Message sent to node '%s': version=%u, type=%u, endian=%u, length=%u, sequence=%u", n->name, m->version, m->type, m->endian, m->length, ntohs(m->sequence));