1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

Merge branch 'master' of github.com:RWTH-ACS/S2SS

This commit is contained in:
Steffen Vogel 2015-04-01 13:28:33 +02:00
commit 877e2b7aec
10 changed files with 52 additions and 93 deletions

View file

@ -48,12 +48,6 @@ hook_cb_t hook_lookup(const char *name);
/** Example hook: Print the message. */
int hook_print(struct msg *m, struct path *p);
/** Example hook: Log messages to a logfile in /tmp */
int hook_log(struct msg *m, struct path *p);
#define HOOK_LOG_MODE "w+"
#define HOOK_LOG_TEMPLATE "logs/s2ss-%Y_%m_%d-%H_%M_%S.log"
/** Example hook: Drop messages. */
int hook_decimate(struct msg *m, struct path *p);

View file

@ -35,6 +35,8 @@ struct path
/** List of function pointers to hooks */
struct list hooks;
/** Timer file descriptor for fixed rate sending */
int tfd;
/** Send messages with a fixed rate over this path */
double rate;
@ -59,8 +61,6 @@ struct path
/** Counter for dropped messages due to reordering */
unsigned int dropped;
/** A timer used for fixed rate transmission. */
timer_t timer;
/** The thread id for this path */
pthread_t recv_tid;
/** A second thread id for fixed rate sending thread */

View file

@ -36,9 +36,8 @@
/* Alternate character set */
#define ACS(chr) "\e(0" chr "\e(B"
#define ACS_VERTICAL "|"
#define ACS_HORIZONTAL ACS("\x71")
//#define ACS_VERTICAL ACS("\x78")
#define ACS_VERTICAL ACS("\x78")
#define ACS_VERTRIGHT ACS("\x74")
/* UTF-8 Line drawing characters */
@ -87,7 +86,7 @@ extern pthread_t _mtid;
*/
int strap(char *dest, size_t size, const char *fmt, ...);
/** Variable arguments (stdarg) version of strap() */
/** Variadic version of strap() */
int vstrap(char *dest, size_t size, const char *fmt, va_list va);
/** Convert integer to cpu_set_t.
@ -115,9 +114,8 @@ void die();
/** Check assertion and exit if failed. */
#define assert(exp) do { \
if (EXPECT(!exp, 0)) { \
print(ERROR, "Assertion failed: '%s' in %s, %s:%d", \
error, "Assertion failed: '%s' in %s, %s:%d", \
#exp, __FUNCTION__, __BASE_FILE__, __LINE__); \
exit(EXIT_FAILURE); \
} } while (0)
#endif /* _UTILS_H_ */

View file

@ -125,7 +125,7 @@ void hist_plot(struct hist *h)
}
/* Print plot */
info("%9s | %5s | %s", "Value", "Occur", "Histogram Plot:");
info("%9s | %5s | %s", "Value", "Occur", "Plot");
line();
for (int i = 0; i < h->length; i++) {

View file

@ -23,7 +23,6 @@
/** @todo Make const */
static struct hook_id hook_list[] = {
{ hook_print, "print" },
{ hook_log, "log" },
{ hook_decimate, "decimate" },
{ hook_tofixed, "tofixed" },
{ hook_ts, "ts" },
@ -50,33 +49,6 @@ int hook_print(struct msg *m, struct path *p)
return 0;
}
int hook_log(struct msg *m, struct path *p)
{
static pthread_key_t pkey;
FILE *file = pthread_getspecific(pkey);
if (!file) {
char fstr[64], pstr[33];
path_print(p, pstr, sizeof(pstr));
struct tm tm;
time_t ts = time(NULL);
localtime_r(&ts, &tm);
strftime(fstr, sizeof(fstr), HOOK_LOG_TEMPLATE, &tm);
file = fopen(fstr, HOOK_LOG_MODE);
if (file)
debug(5, "Opened log file for path %s: %s", pstr, fstr);
pthread_key_create(&pkey, (dtor_cb_t) fclose);
pthread_setspecific(pkey, file);
}
msg_fprint(file, m);
return 0;
}
int hook_decimate(struct msg *m, struct path *p)
{
/* Drop every HOOK_DECIMATE_RATIO'th message */

View file

@ -143,7 +143,7 @@ void serror(const char *fmt, ...)
vsnprintf(buf, sizeof(buf), fmt, ap);
va_end(ap);
log_print(ERROR, "%s: %s", buf, strerror(errno));
log_print(ERROR, "%s: %m (%u)", buf, errno);
die();
}
@ -162,4 +162,4 @@ void cerror(config_setting_t *cfg, const char *fmt, ...)
: "(stdio)",
config_setting_source_line(cfg));
die();
}
}

View file

@ -118,20 +118,18 @@ int node_start(struct node *n)
int node_start_defer(struct node *n)
{
int ret;
struct socket *s = n->socket;
if (node_type(n) == TCPD) {
info("Wait for incoming TCP connection from node '%s'...", n->name);
ret = listen(n->socket->sd2, 1);
if (ret < 0)
s->sd = listen(s->sd2, 1);
if (s->sd < 0)
serror("Failed to listen on socket for node '%s'", n->name);
ret = accept(n->socket->sd2, NULL, NULL);
if (ret < 0)
s->sd = accept(s->sd2, NULL, NULL);
if (s->sd < 0)
serror("Failed to accept on socket for node '%s'", n->name);
n->socket->sd = ret;
}
return 0;

View file

@ -8,9 +8,8 @@
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <signal.h>
#include <time.h>
#include <sys/timerfd.h>
#include <sys/syscall.h>
#include "utils.h"
@ -28,33 +27,24 @@ static void * path_send(void *arg)
{
struct path *p = arg;
int sig;
sigset_t set;
struct sigevent sev = {
.sigev_notify = SIGEV_THREAD_ID,
.sigev_signo = SIGALRM,
.sigev_notify_thread_id = syscall(SYS_gettid)
};
int ret;
uint64_t runs;
struct itimerspec its = {
.it_interval = timespec_rate(p->rate),
.it_value = { 1, 0 }
};
sigemptyset(&set);
sigaddset(&set, SIGALRM);
if(pthread_sigmask(SIG_BLOCK, &set, NULL))
serror("Set signal mask");
if (timer_create(CLOCK_REALTIME, &sev, &p->timer))
p->tfd = timerfd_create(CLOCK_REALTIME, 0);
if (p->tfd < 0)
serror("Failed to create timer");
if (timer_settime(p->timer, 0, &its, NULL))
ret = timerfd_settime(p->tfd, 0, &its, NULL);
if (ret)
serror("Failed to start timer");
while (1) {
sigwait(&set, &sig); /* blocking wait for next timer tick */
read(p->tfd, &runs, sizeof(runs));
FOREACH(&p->destinations, it)
node_write(it->node, p->current);
@ -73,7 +63,9 @@ static void * path_run(void *arg)
/* Open deferred TCP connection */
node_start_defer(p->in);
// FIXME: node_start_defer(p->out);
FOREACH(&p->destinations, it)
node_start_defer(it->path->out);
/* Main thread loop */
while (1) {
@ -158,7 +150,7 @@ int path_stop(struct path *p)
pthread_cancel(p->sent_tid);
pthread_join(p->sent_tid, NULL);
timer_delete(p->timer);
close(p->tfd);
}
if (p->received)

View file

@ -39,15 +39,15 @@ static config_t config;
static void quit()
{
info("Stopping paths:");
info("Stopping paths");
FOREACH(&paths, it)
path_stop(it->path);
info("Stopping nodes:");
info("Stopping nodes");
FOREACH(&nodes, it)
node_stop(it->node);
info("Stopping interfaces:");
info("Stopping interfaces");
FOREACH(&interfaces, it)
if_stop(it->interface);
@ -143,35 +143,34 @@ int main(int argc, char *argv[])
list_init(&paths, (dtor_cb_t) path_destroy);
list_init(&interfaces, (dtor_cb_t) if_destroy);
info("Initialize real-time system:");
info("Initialize real-time system");
realtime_init();
info("Initialize signals:");
info("Initialize signals");
signals_init();
info("Initialize node types:");
info("Initialize node types");
node_init(argc, argv);
info("Parsing configuration:");
info("Parsing configuration");
config_init(&config);
config_parse(configfile, &config, &settings, &nodes, &paths);
/* Connect all nodes and start one thread per path */
info("Starting nodes:");
info("Starting nodes");
FOREACH(&nodes, it)
node_start(it->node);
info("Starting interfaces:");
info("Starting interfaces");
FOREACH(&interfaces, it)
if_start(it->interface, settings.affinity);
info("Starting paths:");
info("Starting paths");
FOREACH(&paths, it)
path_start(it->path);
/* Run! */
if (settings.stats > 0) {
info("Runtime Statistics:");
info("%-32s : %-8s %-8s %-8s %-8s %-8s",
"Source " MAG("=>") " Destination", "#Sent", "#Recv", "#Drop", "#Skip", "#Inval");
line();

View file

@ -53,7 +53,7 @@ int socket_open(struct node *n)
case TCP: s->sd = socket(sin->sin_family, SOCK_STREAM, IPPROTO_TCP); break;
case UDP: s->sd = socket(sin->sin_family, SOCK_DGRAM, IPPROTO_UDP); break;
case IP: s->sd = socket(sin->sin_family, SOCK_RAW, ntohs(sin->sin_port)); break;
case IEEE_802_3:s->sd = socket(sin->sin_family, SOCK_DGRAM, sll->sll_protocol); break;
case IEEE_802_3:s->sd = socket(sll->sll_family, SOCK_DGRAM, sll->sll_protocol); break;
default:
error("Invalid socket type!");
}
@ -67,11 +67,8 @@ int socket_open(struct node *n)
serror("Failed to bind socket");
/* Connect socket for sending */
if (node_type(n) == TCPD) {
/* Listening TCP sockets will be connected later by calling accept() */
if (node_type(n) == TCP) {
s->sd2 = s->sd;
}
else if (node_type(n) != IEEE_802_3) {
ret = connect(s->sd, (struct sockaddr *) &s->remote, sizeof(s->remote));
if (ret < 0)
serror("Failed to connect socket");
@ -153,18 +150,27 @@ int socket_read(struct node *n, struct msg *m)
int socket_write(struct node *n, struct msg *m)
{
struct socket *s = n->socket;
int ret;
int ret = -1;
/* Convert headers to network byte order */
m->sequence = htons(m->sequence);
if (node_type(n) == IEEE_802_3)
ret = sendto(s->sd, m, MSG_LEN(m->length), 0, (struct sockaddr *) &s->remote, sizeof(s->remote));
else
ret = send(s->sd, m, MSG_LEN(m->length), 0);
switch (node_type(n)) {
case IEEE_802_3:/* Connection-less protocols */
case IP:
case UDP:
ret = sendto(s->sd, m, MSG_LEN(m->length), 0, (struct sockaddr *) &s->remote, sizeof(s->remote));
break;
case TCP: /* Connection-oriented protocols */
case TCPD:
ret = send(s->sd, m, MSG_LEN(m->length), 0);
break;
default: { }
}
if (ret < 0)
serror("Failed send(to)");
serror("Failed send");
debug(10, "Message sent to node '%s': version=%u, type=%u, endian=%u, length=%u, sequence=%u",
n->name, m->version, m->type, m->endian, m->length, ntohs(m->sequence));