From 273e71b97374d6f42f9f5d158624bf8edb6e2cfe Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Thu, 12 Mar 2015 22:56:58 +0100 Subject: [PATCH 01/23] added first part of new node type: OPAL_ASYNC This node type allows starting the S2SS server as an asynchronous process from RT-LAB. --- server/include/cfg.h | 23 +++++++----- server/include/node.h | 12 ++++-- server/include/opal.h | 30 ++++++++++++++- server/src/cfg.c | 28 +++++++++++++- server/src/node.c | 19 +++++++--- server/src/opal.c | 87 +++++++++++++++++++++++++++++++++++++++++++ server/src/server.c | 28 +++++++++++++- 7 files changed, 203 insertions(+), 24 deletions(-) diff --git a/server/include/cfg.h b/server/include/cfg.h index 5e93c42aa..baafe3aa7 100644 --- a/server/include/cfg.h +++ b/server/include/cfg.h @@ -73,25 +73,27 @@ int config_parse_path(config_setting_t *cfg, /** Parse a single node and add it to the global configuration. * - * @param cfg A libconfig object pointing to the node - * @param nodes Add new nodes to this linked list + * @param cfg A libconfig object pointing to the node. + * @param nodes Add new nodes to this linked list. * @retval 0 Success. Everything went well. * @retval <0 Error. Something went wrong. */ -int config_parse_node(config_setting_t *cfg, - struct node **nodes); +int config_parse_node(config_setting_t *cfg, struct node **nodes); /** Parse node connection details for OPAL type * - * @param cfg A libconfig object pointing to the node + * @param argc The CLI argument count as used in main(). + * @param argv The CLI argument list as used in main(), containing shmem parameters. + * @param n A pointer to the node structure which should be parsed. * @retval 0 Success. Everything went well. * @retval <0 Error. Something went wrong. */ -int config_parse_opal(config_setting_t *cfg, struct node *n); +int config_parse_opal(int argc, char *argv[], struct node *n); /** Parse node connection details for GTFPGA type * - * @param cfg A libconfig object pointing to the node + * @param cfg A libconfig object pointing to the node. + * @param n A pointer to the node structure which should be parsed. * @retval 0 Success. Everything went well. * @retval <0 Error. Something went wrong. */ @@ -99,7 +101,8 @@ int config_parse_gtfpga(config_setting_t *cfg, struct node *n); /** Parse node connection details for SOCKET type * - * @param cfg A libconfig object pointing to the node + * @param cfg A libconfig object pointing to the node. + * @param n A pointer to the node structure which should be parsed. * @retval 0 Success. Everything went well. * @retval <0 Error. Something went wrong. */ @@ -107,8 +110,8 @@ int config_parse_socket(config_setting_t *cfg, struct node *n); /** Parse network emulator (netem) settings. * - * @param cfg A libconfig object containing the settings - * @param em A pointer to the settings + * @param cfg A libconfig object containing the settings. + * @param em A pointer to the netem settings structure (part of the path structure). * @retval 0 Success. Everything went well. * @retval <0 Error. Something went wrong. */ diff --git a/server/include/node.h b/server/include/node.h index fe31fdf2e..b862a9a3b 100644 --- a/server/include/node.h +++ b/server/include/node.h @@ -39,7 +39,7 @@ enum node_type { UDP, /* BSD socket: AF_INET SOCK_DGRAM */ TCPD, /* BSD socket: AF_INET SOCK_STREAM bind + listen + accept */ TCP, /* BSD socket: AF_INET SOCK_STREAM bind + connect */ -// OPAL_ASYNC, /* OPAL-RT AsyncApi */ + OPAL_ASYNC, /* OPAL-RT AsyncApi */ // GTFPGA, /* Xilinx ML507 GTFPGA card */ INVALID }; @@ -118,10 +118,14 @@ int node_stop(struct node *n); /** Lookup string representation of socket type * - * @param type A string describing the socket type. This must be one of: tcp, tcpd, udp, ip, ieee802.3 - * @return An enumeration value or INVALID (0) + * You can either provide a node type in string or enum representation. + * Set str to NULL, to use the enum type. + * + * @param str A string describing the socket type. This must be one of: tcp, tcpd, udp, ip, ieee802.3 + * @param enu The enum type of the socket. + * @return A pointer to the vtable, or NULL if there is no socket type / vtable with this id. */ -struct node_vtable const * node_lookup_vtable(const char *str); +struct node_vtable const * node_lookup_vtable(const char *str, struct node_type enu); /** Search list of nodes for a name. * diff --git a/server/include/opal.h b/server/include/opal.h index 7b0d74c7b..b44ce7c52 100644 --- a/server/include/opal.h +++ b/server/include/opal.h @@ -9,8 +9,36 @@ #ifndef _OPAL_H_ #define _OPAL_H_ -struct opal { +/* Define RTLAB before including OpalPrint.h for messages to be sent + * to the OpalDisplay. Otherwise stdout will be used. */ +#define RTLAB +#include "OpalPrint.h" +#include "AsyncApi.h" +/* This is just for initializing the shared memory access to communicate + * with the RT-LAB model. It's easier to remember the arguments like this */ +#define OPAL_ASYNC_SHMEM_NAME argv[1] +#define OPAL_ASYNC_SHMEM_SIZE atoi(argv[2]) +#define OPAL_PRINT_SHMEM_NAME argv[3] + +struct opal { + Opal_GenAsyncParam_Ctrl icon_ctrl; + + char * async_shmem_name; + char * print_shmem_name; + int async_shmem_size; }; +int opal_parse(int argc, char *argv[], struct node *n); + +int opal_print(struct node *n, char *buf, int len); + +int opal_open(struct node *n); + +int opal_close(struct node *n); + +int opal_read(struct node *n, struct msg *m); + +int opal_write(struct node *n, struct msg *m); + #endif /* _OPAL_H_ */ diff --git a/server/src/cfg.c b/server/src/cfg.c index 0ead1d2a3..dbd16d295 100644 --- a/server/src/cfg.c +++ b/server/src/cfg.c @@ -184,6 +184,9 @@ int config_parse_node(config_setting_t *cfg, struct node **nodes) n->vt = node_lookup_vtable(type); if (!n->vt) cerror(cfg, "Invalid type for node '%s'", n->name); + + if (!n->vt->parse) + cerror(cfg, "Node type '%s' is not allowed in the config", type); } else n->vt = node_lookup_vtable("udp"); @@ -196,8 +199,29 @@ int config_parse_node(config_setting_t *cfg, struct node **nodes) } /** @todo Implement */ -int config_parse_opal(config_setting_t *cfg, struct node *n) +int config_parse_opal(int argc, char *argv[], struct node *n) { + n->cfg = NULL; + n->name = "opal"; + n->type = OPAL_ASYNC; + n->vt = node_lookup_table(NULL, n->type); + + struct opal *o = (struct opal *) malloc(sizeof(struct opal)); + if (!o) + error("Failed to allocate memory for opal settings"); + + memset(o, 0, sizeof(opal)); + + o->async_shmem_name = OPAL_ASYNC_SHMEM_NAME; + o->async_shmem_size = OPAL_ASYNC_SHMEM_SIZE; + o->print_shmem_name = OPAL_PRINT_SHMEM_NAME; + + int err; + if ((err = OpalGetAsyncCtrlParameters(&o->icon_ctrl, sizeof(IconCtrlStruct))) != EOK) + error("Could not get controller parameters (%d).\n", PROGNAME, err); + + n->opal = o; + return 0; } @@ -214,7 +238,7 @@ int config_parse_socket(config_setting_t *cfg, struct node *n) struct socket *s = (struct socket *) malloc(sizeof(struct socket)); if (!s) - serror("Failed to allocate memory for socket"); + serror("Failed to allocate memory for socket settings"); memset(s, 0, sizeof(struct socket)); diff --git a/server/src/node.c b/server/src/node.c index 003ebf66f..7047b4f89 100644 --- a/server/src/node.c +++ b/server/src/node.c @@ -24,13 +24,14 @@ /** Vtable for virtual node sub types */ static const struct node_vtable vtables[] = { +#ifdef ENABLE_OPAL_ASYNC + { OPAL_ASYNC, "opal", NULL, opal_print, opal_open, opal_close, opal_read, opal_write }, +#endif VTABLE(IEEE_802_3, "ieee802.3", socket), VTABLE(IP, "ip", socket), VTABLE(UDP, "udp", socket), VTABLE(TCP, "tcp", socket), - VTABLE(TCPD, "tcpd", socket), - //VTABLE(OPAL, "opal", opal ), - //VTABLE(GTFPGA, "gtfpga", gtfpga), + VTABLE(TCPD, "tcpd", socket) }; /** Linked list of nodes */ @@ -46,11 +47,17 @@ struct node * node_lookup_name(const char *str, struct node *nodes) return NULL; } -struct node_vtable const * node_lookup_vtable(const char *str) +struct node_vtable const * node_lookup_vtable(const char *str, struct node_type enu) { for (int i = 0; i < ARRAY_LEN(vtables); i++) { - if (!strcmp(vtables[i].name, str)) - return &vtables[i]; + if (str) { + if (!strcmp(vtables[i].name, str)) + return &vtables[i]; + } + else { + if (vtables[i].type == enu) + return &vtables[i]; + } } return NULL; diff --git a/server/src/opal.c b/server/src/opal.c index 8539723c0..04de64c48 100644 --- a/server/src/opal.c +++ b/server/src/opal.c @@ -7,3 +7,90 @@ */ #include "opal.h" + +int opal_print(struct node *n, char *buf, int len) +{ + +} + +int opal_open(struct node *n) +{ + /* Enable the OpalPrint function. This prints to the OpalDisplay. */ + if (OpalSystemCtrl_Register(PRINT_SHMEM_NAME) != EOK) { + printf("%s: ERROR: OpalPrint() access not available\n", PROGNAME); + exit(EXIT_FAILURE); + } + + OpalPrint("%s: This is a S2SS client\n", PROGNAME); + + /* Open Share Memory created by the model. */ + if ((OpalOpenAsyncMem(ASYNC_SHMEM_SIZE, ASYNC_SHMEM_NAME)) != EOK) { + OpalPrint("%s: ERROR: Model shared memory not available\n", PROGNAME); + exit(EXIT_FAILURE); + } + +} + +int opal_close(struct node *n) +{ + OpalCloseAsyncMem (ASYNC_SHMEM_SIZE, ASYNC_SHMEM_NAME); + OpalSystemCtrl_UnRegister(PRINT_SHMEM_NAME); +} + +int opal_read(struct node *n, struct msg *m) +{ + /* This call unblocks when the 'Data Ready' line of a send icon is asserted. */ + if ((n = OpalWaitForAsyncSendRequest(&SendID)) != EOK) { + ModelState = OpalGetAsyncModelState(); + if ((ModelState != STATE_RESET) && (ModelState != STATE_STOP)) { + OpalSetAsyncSendIconError(n, SendID); + OpalPrint("%s: OpalWaitForAsyncSendRequest(), errno %d\n", PROGNAME, n); + } + + return -1; // FIXME: correct return value + } + + /* No errors encountered yet */ + OpalSetAsyncSendIconError(0, SendID); + + /* Get the size of the data being sent by the unblocking SendID */ + OpalGetAsyncSendIconDataLength(&mdldata_size, SendID); + if (mdldata_size / sizeof(double) > MSG_VALUES) { + OpalPrint("%s: Number of signals for SendID=%d exceeds allowed maximum (%d)\n", + PROGNAME, SendID, MSG_VALUES); + + return NULL; + } + + /* Read data from the model */ + OpalGetAsyncSendIconData(mdldata, mdldata_size, SendID); + + msg.sequence = htons(seq++); + msg.length = mdldata_size / sizeof(double); + + for (i = 0; i < msg.length; i++) + msg.data[i].f = (float) mdldata[i]; + + msg_size = MSG_LEN(msg.length); + + /* This next call allows the execution of the "asynchronous" process + * to actually be synchronous with the model. To achieve this, you + * should set the "Sending Mode" in the Async_Send block to + * NEED_REPLY_BEFORE_NEXT_SEND or NEED_REPLY_NOW. This will force + * the model to wait for this process to call this + * OpalAsyncSendRequestDone function before continuing. */ + OpalAsyncSendRequestDone(SendID); + + /* Before continuing, we make sure that the real-time model + * has not been stopped. If it has, we quit. */ + ModelState = OpalGetAsyncModelState(); + if ((ModelState == STATE_RESET) || (ModelState == STATE_STOP)) + return -1; // TODO: fixme + + return 0; +} + +int opal_write(struct node *n, struct msg *m) +{ + +} diff --git a/server/src/server.c b/server/src/server.c index 2eb7962e7..9556f8fe8 100644 --- a/server/src/server.c +++ b/server/src/server.c @@ -116,9 +116,15 @@ int main(int argc, char *argv[]) BLD(YEL(VERSION)), BLD(MAG(__DATE__)), BLD(MAG(__TIME__))); /* Check arguments */ +#ifndef ENABLE_OPAL_ASYNC if (argc != 2) +#else + if (argc != 2 || argc != 4) +#endif usage(argv[0]); + char *configfile = argv[1]; + /* Check priviledges */ if (getuid() != 0) error("The server requires superuser privileges!"); @@ -131,8 +137,28 @@ int main(int argc, char *argv[]) info("Parsing configuration:"); config_init(&config); +#ifdef ENABLE_OPAL_ASYNC + /* Check if called as asynchronous process from RT-LAB */ + if (argc == 4) { + /* Allocate memory */ + struct node *n = (struct node *) malloc(sizeof(struct node)); + if (!n) + error("Failed to allocate memory for node"); + + memset(n, 0, sizeof(struct node)); + + config_parse_node_opal(argc, argv, n); + + configfile = n->opal->icon_ctrl.StringParam[0]; + if (configfile && strlen(configfile)) + info("Found config file supplied by Opal Async process: '%s'", configfile); + + list_add(*nodes, n); + } +#endif + /* Parse configuration and create nodes/paths */ - config_parse(argv[1], &config, &settings, &nodes, &paths); + config_parse(configfile, &config, &settings, &nodes, &paths); /* Connect all nodes and start one thread per path */ info("Starting nodes:"); From c89aa97f95c48b7100effe0d9bab9ed83dc51cac Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Tue, 17 Mar 2015 22:44:09 +0100 Subject: [PATCH 02/23] added logging output via OpalPrint() --- server/include/utils.h | 9 +++++++ server/src/utils.c | 59 +++++++++++++++++++++++++++++++----------- 2 files changed, 53 insertions(+), 15 deletions(-) diff --git a/server/include/utils.h b/server/include/utils.h index 8b5f72582..98bccc4f1 100644 --- a/server/include/utils.h +++ b/server/include/utils.h @@ -68,6 +68,15 @@ void epoch_reset(); */ void print(enum log_level lvl, const char *fmt, ...); +/** Safely append a format string to an existing string. + * + * This function is similar to strlcat() from BSD. + */ +int strap(char *dest, size_t size, const char *fmt, ...); + +/** Variable arguments (stdarg) version of strap() */ +int vstrap(char *dest, size_t size, const char *fmt, va_list va); + /** Convert integer to cpu_set_t. * * @param set A cpu bitmask diff --git a/server/src/utils.c b/server/src/utils.c index 7464ad8f1..998555099 100644 --- a/server/src/utils.c +++ b/server/src/utils.c @@ -15,6 +15,10 @@ #include #include +#ifdef ENABLE_OPAL_ASYNC +#include +#endif + #include "config.h" #include "cfg.h" #include "utils.h" @@ -35,35 +39,60 @@ void epoch_reset() clock_gettime(CLOCK_REALTIME, &epoch); } +int strap(char *dest, size_t size, const char *fmt, ...) +{ + int ret; + + va_list ap; + va_start(ap, fmt); + ret = vstrap(dest, size, fmt, ap); + va_end(ap); + + return ret; +} + +int vstrap(char *dest, size_t size, const char *fmt, va_list ap) +{ + int len = strlen(dest); + + return vsnprintf(dest + len, size - len, fmt, ap); +} + void print(enum log_level lvl, const char *fmt, ...) { struct timespec ts; + char buf[512] = ""; va_list ap; - va_start(ap, fmt); /* Timestamp */ clock_gettime(CLOCK_REALTIME, &ts); - fprintf(stderr, "%8.3f ", timespec_delta(&epoch, &ts)); + strap(buf, sizeof(buf), "%8.3f ", timespec_delta(&epoch, &ts)); + /* Severity */ switch (lvl) { - case DEBUG: fprintf(stderr, BLD("%-5s "), GRY("Debug")); break; - case INFO: fprintf(stderr, BLD("%-5s "), " " ); break; - case WARN: fprintf(stderr, BLD("%-5s "), YEL(" Warn")); break; - case ERROR: fprintf(stderr, BLD("%-5s "), RED("Error")); break; + case DEBUG: strap(buf, sizeof(buf), BLD("%-5s "), GRY("Debug")); break; + case INFO: strap(buf, sizeof(buf), BLD("%-5s "), " " ); break; + case WARN: strap(buf, sizeof(buf), BLD("%-5s "), YEL(" Warn")); break; + case ERROR: strap(buf, sizeof(buf), BLD("%-5s "), RED("Error")); break; } - if (_indent) { - for (int i = 0; i < _indent-1; i++) - fprintf(stderr, GFX("\x78") " "); - - fprintf(stderr, GFX("\x74") " "); - } - - vfprintf(stderr, fmt, ap); - fprintf(stderr, "\n"); + /* Indention */ + for (int i = 0; i < _indent-1; i++) + strap(buf, sizeof(buf), GFX("\x78") " "); + strap(buf, sizeof(buf), GFX("\x74") " "); + /* Format String */ + va_start(ap, fmt); + vstrap(buf, sizeof(buf), fmt, ap); va_end(ap); + + /* Output */ +#ifdef ENABLE_OPAL_ASYNC + OpalPrint("%s\n", buf); +#else + fprintf(stderr, "%s\n", buf); +#endif } cpu_set_t to_cpu_set(int set) From b29c86032e251b96ad6120354a37565da77addd1 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Tue, 17 Mar 2015 22:46:46 +0100 Subject: [PATCH 03/23] second chunk of OPAL AsyncApi support (untested) --- server/include/cfg.h | 7 +- server/include/opal.h | 68 +++++++++++--- server/src/cfg.c | 46 +++++---- server/src/node.c | 2 +- server/src/opal.c | 213 ++++++++++++++++++++++++++++++++++-------- server/src/server.c | 32 +++---- 6 files changed, 273 insertions(+), 95 deletions(-) diff --git a/server/include/cfg.h b/server/include/cfg.h index baafe3aa7..4da342bae 100644 --- a/server/include/cfg.h +++ b/server/include/cfg.h @@ -82,13 +82,12 @@ int config_parse_node(config_setting_t *cfg, struct node **nodes); /** Parse node connection details for OPAL type * - * @param argc The CLI argument count as used in main(). - * @param argv The CLI argument list as used in main(), containing shmem parameters. - * @param n A pointer to the node structure which should be parsed. + * @param cfg A libconfig object pointing to the node. + * @param nodes Add new nodes to this linked list. * @retval 0 Success. Everything went well. * @retval <0 Error. Something went wrong. */ -int config_parse_opal(int argc, char *argv[], struct node *n); +int config_parse_opal(config_setting_t *cfg, struct node *n); /** Parse node connection details for GTFPGA type * diff --git a/server/include/opal.h b/server/include/opal.h index b44ce7c52..4fbf74b8c 100644 --- a/server/include/opal.h +++ b/server/include/opal.h @@ -1,4 +1,4 @@ -/** Node type: OPAL (AsyncApi) +/** Node type: OPAL (libOpalAsync API) * * This file implements the opal subtype for nodes. * @@ -9,30 +9,72 @@ #ifndef _OPAL_H_ #define _OPAL_H_ +#include + +#include "node.h" +#include "msg.h" + /* Define RTLAB before including OpalPrint.h for messages to be sent * to the OpalDisplay. Otherwise stdout will be used. */ #define RTLAB #include "OpalPrint.h" #include "AsyncApi.h" +#include "OpalGenAsyncParamCtrl.h" -/* This is just for initializing the shared memory access to communicate - * with the RT-LAB model. It's easier to remember the arguments like this */ -#define OPAL_ASYNC_SHMEM_NAME argv[1] -#define OPAL_ASYNC_SHMEM_SIZE atoi(argv[2]) -#define OPAL_PRINT_SHMEM_NAME argv[3] - -struct opal { - Opal_GenAsyncParam_Ctrl icon_ctrl; - - char * async_shmem_name; - char * print_shmem_name; +/** This global structure holds libOpalAsync related information. + * It's only used once in the code. */ +struct opal_global { + /** Shared Memory identifiers and size, provided via argv. */ + char *async_shmem_name, *print_shmem_name; int async_shmem_size; + + /** Number of send blocks used in the running OPAL model. */ + int send_icons, recv_icons; + /** A dynamically allocated array of SendIDs. */ + int *send_ids, *recv_ids; + + /** String and Float parameters, provided by the OPAL AsyncProcess block. */ + Opal_GenAsyncParam_Ctrl params; + + /** Big Global Lock for libOpalAsync API */ + pthread_mutex_t lock; }; -int opal_parse(int argc, char *argv[], struct node *n); +struct opal { + int reply; + int mode; + + int send_id; + int recv_id; + + int seq_no; + + struct opal_global *global; + + Opal_SendAsyncParam send_params; + Opal_RecvAsyncParam recv_params; +}; + +/** Initialize global OPAL settings and maps shared memory regions. + * + * @param argc The number of CLI arguments, provided to main(). + * @param argv The CLI argument list, provided to main(). + * @retval 0 On success. + * @retval <0 On failure. + */ +int opal_init(int argc, char *argv[]); + +/** Free global OPAL settings and unmaps shared memory regions. + * + * @retval 0 On success. + * @retval <0 On failure. + */ +int opal_deinit(); int opal_print(struct node *n, char *buf, int len); +int opal_print_global(struct opal_global *g); + int opal_open(struct node *n); int opal_close(struct node *n); diff --git a/server/src/cfg.c b/server/src/cfg.c index dbd16d295..44b85f01a 100644 --- a/server/src/cfg.c +++ b/server/src/cfg.c @@ -198,29 +198,41 @@ int config_parse_node(config_setting_t *cfg, struct node **nodes) return ret; } -/** @todo Implement */ -int config_parse_opal(int argc, char *argv[], struct node *n) -{ - n->cfg = NULL; - n->name = "opal"; - n->type = OPAL_ASYNC; - n->vt = node_lookup_table(NULL, n->type); +/** @todo: Remove this global variable. */ +extern struct opal_global *og; +int config_parse_opal(config_setting_t *cfg, struct node *n) +{ + if (!og) { + warn("Skipping this node, because this server is not running as an OPAL Async process!"); + return -1; + } + struct opal *o = (struct opal *) malloc(sizeof(struct opal)); if (!o) error("Failed to allocate memory for opal settings"); - - memset(o, 0, sizeof(opal)); - - o->async_shmem_name = OPAL_ASYNC_SHMEM_NAME; - o->async_shmem_size = OPAL_ASYNC_SHMEM_SIZE; - o->print_shmem_name = OPAL_PRINT_SHMEM_NAME; - - int err; - if ((err = OpalGetAsyncCtrlParameters(&o->icon_ctrl, sizeof(IconCtrlStruct))) != EOK) - error("Could not get controller parameters (%d).\n", PROGNAME, err); + + memset(o, 0, sizeof(struct opal)); + + config_setting_lookup_int(cfg, "send_id", &o->send_id); + config_setting_lookup_int(cfg, "recv_id", &o->send_id); + config_setting_lookup_bool(cfg, "reply", &o->reply); + + /* Search for valid send and recv ids */ + int sfound = 0, rfound = 0; + for (int i=0; isend_icons; i++) + sfound += og->send_ids[i] == o->send_id; + for (int i=0; isend_icons; i++) + rfound += og->send_ids[i] == o->send_id; + + if (!sfound) + cerror(config_setting_get_member(cfg, "send_id"), "Invalid send_id '%u' for node '%s'", o->send_id, n->name); + if (!rfound) + cerror(config_setting_get_member(cfg, "send_id"), "Invalid send_id '%u' for node '%s'", o->send_id, n->name); n->opal = o; + n->opal->global = og; + n->cfg = cfg; return 0; } diff --git a/server/src/node.c b/server/src/node.c index 7047b4f89..0cb830343 100644 --- a/server/src/node.c +++ b/server/src/node.c @@ -25,7 +25,7 @@ /** Vtable for virtual node sub types */ static const struct node_vtable vtables[] = { #ifdef ENABLE_OPAL_ASYNC - { OPAL_ASYNC, "opal", NULL, opal_print, opal_open, opal_close, opal_read, opal_write }, + VTABLE(OPAL_ASYNC, "opal", opal), #endif VTABLE(IEEE_802_3, "ieee802.3", socket), VTABLE(IP, "ip", socket), diff --git a/server/src/opal.c b/server/src/opal.c index 04de64c48..a14a13730 100644 --- a/server/src/opal.c +++ b/server/src/opal.c @@ -6,72 +6,178 @@ * @copyright 2014, Institute for Automation of Complex Power Systems, EONERC */ +#include +#include + #include "opal.h" +#include "utils.h" + +/** @todo: delcare statice */ +struct opal_global *og = NULL; + +int opal_init(int argc, char *argv[]) +{ + int err; + + if (argc != 4) + return -1; + + struct opal_global *g = (struct opal_global *) malloc(sizeof(struct opal_global)); + if (!g) + error("Failed to allocate memory for global OPAL settings"); + + memset(g, 0, sizeof(struct opal_global)); + + pthread_mutex_init(&g->lock, NULL); + + g->async_shmem_name = argv[1]; + g->async_shmem_size = atoi(argv[2]); + g->print_shmem_name = argv[3]; + + /* Enable the OpalPrint function. This prints to the OpalDisplay. */ + if ((err = OpalSystemCtrl_Register(g->print_shmem_name)) != EOK) + error("OpalPrint() access not available (%d)", err); + + /* Open Share Memory created by the model. */ + if ((err = OpalOpenAsyncMem(g->async_shmem_size, g->async_shmem_name)) != EOK) + error("Model shared memory not available (%d)", err); + + if ((err = OpalGetAsyncCtrlParameters(&g->params, sizeof(Opal_GenAsyncParam_Ctrl))) != EOK) + error("Could not get OPAL controller parameters (%d)", err); + + /* Get list of Send and RecvIDs */ + if ((err = OpalGetNbAsyncSendIcon(&g->send_icons)) != EOK) + error("Failed to get number of send blocks (%d)", err); + if ((err = OpalGetNbAsyncRecvIcon(&g->recv_icons) != EOK); + error("Failed to get number of recv blocks (%d)", err); + + g->send_ids = (int *) malloc(g->send_icons * sizeof(int)); + g->recv_ids = (int *) malloc(g->recv_icons * sizeof(int)); + if (!g->send_ids || !g->recv_ids) + error("Failed to allocate memory for OPAL AsyncApi ID list."); + + if ((err = OpalGetAsyncSendIDList(g->send_ids, g->send_icons)) != EOK) + error("Failed to get list of send ids (%d)", err); + if ((err = OpalGetAsyncRecvIDList(g->recv_ids, g->recv_icons)) != EOK) + error("Failed to get list of recv ids (%d)", err); + + info("Started as OPAL async process:"); + opal_print_global(g); + + og = g; + + return 0; +} + +int opal_deinit() +{ + if (og) { + if ((err = OpalCloseAsyncMem(og->async_shmem_size, og->async_shmem_name)) != EOK) + error("Failed to close shared memory area (%d)", err); + if ((err = OpalSystemCtrl_UnRegister(og->print_shmem_name)) != EOK) + error("Failed to close shared memory for system control (%d)", err); + + free(og->send_ids); + free(og->recv_ids); + free(og); + + og = NULL; + } + + return 0; +} + +int opal_print_global(struct opal_global *g) +{ INDENT + char sbuf[512] = ""; + char rbuf[512] = ""; + + for (int i=0; isend_icons; i++) + strap(sbuf, sizeof(sbuf), "%u ", g->send_ids[i]); + for (int i=0; irecv_icons; i++) + strap(rbuf, sizeof(rbuf), "%u ", g->recv_ids[i]); + + debug(4, "Controller ID: %u", g->params.controllerID); + debug(4, "Send Blocks: %s", sbuf); + debug(4, "Receive Blocks: %s", rbuf); + + debug(4, "Control Block Parameters:"); + for (int i=0; iparams.FloatParam[i]); + for (int i=0; iparams.StringParam[i]); + + return 0; +} int opal_print(struct node *n, char *buf, int len) { - + struct opal *o = n->opal; + + /** @todo: Print send_params, recv_params */ + + return snprintf(buf, len, "send_id=%u, recv_id=%u, reply=%u", + o->send_id, o->recv_id, o->reply); } int opal_open(struct node *n) { - /* Enable the OpalPrint function. This prints to the OpalDisplay. */ - if (OpalSystemCtrl_Register(PRINT_SHMEM_NAME) != EOK) { - printf("%s: ERROR: OpalPrint() access not available\n", PROGNAME); - exit(EXIT_FAILURE); - } - - OpalPrint("%s: This is a S2SS client\n", PROGNAME); - - /* Open Share Memory created by the model. */ - if ((OpalOpenAsyncMem(ASYNC_SHMEM_SIZE, ASYNC_SHMEM_NAME)) != EOK) { - OpalPrint("%s: ERROR: Model shared memory not available\n", PROGNAME); - exit(EXIT_FAILURE); - } + struct opal *o = n->opal; + + OpalGetAsyncSendIconMode(&o->mode, o->send_id); + OpalGetAsyncSendParameters(&o->send_params, sizeof(Opal_SendAsyncParam), o->send_id); + OpalGetAsyncRecvParameters(&o->recv_params, sizeof(Opal_RecvAsyncParam), o->recv_id); + return 0; } int opal_close(struct node *n) { - OpalCloseAsyncMem (ASYNC_SHMEM_SIZE, ASYNC_SHMEM_NAME); - OpalSystemCtrl_UnRegister(PRINT_SHMEM_NAME); + return 0; } int opal_read(struct node *n, struct msg *m) { + struct opal *o = n->opal; + + int state, len, ret; + unsigned id; + + double data[MSG_VALUES]; + /* This call unblocks when the 'Data Ready' line of a send icon is asserted. */ - if ((n = OpalWaitForAsyncSendRequest(&SendID)) != EOK) { - ModelState = OpalGetAsyncModelState(); - if ((ModelState != STATE_RESET) && (ModelState != STATE_STOP)) { - OpalSetAsyncSendIconError(n, SendID); - OpalPrint("%s: OpalWaitForAsyncSendRequest(), errno %d\n", PROGNAME, n); - } + do { + if ((ret = OpalWaitForAsyncSendRequest(&id)) != EOK) { + state = OpalGetAsyncModelState(); + if ((state != STATE_RESET) && (state != STATE_STOP)) { + OpalSetAsyncSendIconError(ret, id); + info("OpalWaitForAsyncSendRequest(), errno %d", ret); + } - return -1; // FIXME: correct return value - } + return -1; // FIXME: correct return value + } + } while (id != o->send_id); /* No errors encountered yet */ - OpalSetAsyncSendIconError(0, SendID); + OpalSetAsyncSendIconError(0, o->send_id); /* Get the size of the data being sent by the unblocking SendID */ - OpalGetAsyncSendIconDataLength(&mdldata_size, SendID); - if (mdldata_size / sizeof(double) > MSG_VALUES) { - OpalPrint("%s: Number of signals for SendID=%d exceeds allowed maximum (%d)\n", - PROGNAME, SendID, MSG_VALUES); + OpalGetAsyncSendIconDataLength(&len, o->send_id); + if (len > sizeof(data)) { + warn("Ignoring the last %u of %u values for OPAL node '%s' (send_id=%u).", + len / sizeof(double) - MSG_VALUES, len / sizeof(double), n->name, o->send_id); - return NULL; + len = sizeof(data); } /* Read data from the model */ - OpalGetAsyncSendIconData(mdldata, mdldata_size, SendID); + OpalGetAsyncSendIconData(data, len, o->send_id); - msg.sequence = htons(seq++); - msg.length = mdldata_size / sizeof(double); + m->sequence = htons(o->seq_no++); + m->length = len / sizeof(double); - for (i = 0; i < msg.length; i++) - msg.data[i].f = (float) mdldata[i]; - - msg_size = MSG_LEN(msg.length); + for (int i = 0; i < m->length; i++) + m->data[i].f = (float) data[i]; // casting to float! /* This next call allows the execution of the "asynchronous" process * to actually be synchronous with the model. To achieve this, you @@ -79,18 +185,43 @@ int opal_read(struct node *n, struct msg *m) * NEED_REPLY_BEFORE_NEXT_SEND or NEED_REPLY_NOW. This will force * the model to wait for this process to call this * OpalAsyncSendRequestDone function before continuing. */ - OpalAsyncSendRequestDone(SendID); + if (o->reply) + OpalAsyncSendRequestDone(o->send_id); /* Before continuing, we make sure that the real-time model * has not been stopped. If it has, we quit. */ - ModelState = OpalGetAsyncModelState(); - if ((ModelState == STATE_RESET) || (ModelState == STATE_STOP)) - return -1; // TODO: fixme + state = OpalGetAsyncModelState(); + if ((state == STATE_RESET) || (state == STATE_STOP)) + error("OpalGetAsyncModelState(): Model stopped or resetted!"); // TODO: fixme return 0; } int opal_write(struct node *n, struct msg *m) { + struct opal *o = n->opal; + + int state; + int len; + + double data[MSG_VALUES] = { NAN }; + + state = OpalGetAsyncModelState(); + if ((state == STATE_RESET) || (state == STATE_STOP)) + return -1; + OpalSetAsyncRecvIconStatus(m->sequence, o->recv_id); /* Set the Status to the message ID */ + OpalSetAsyncRecvIconError(0, o->recv_id); /* Set the Error to 0 */ + + /* Get the number of signals to send back to the model */ + OpalGetAsyncRecvIconDataLength(&len, o->recv_id); + if (len > sizeof(data)) + error("Receive Block of OPAL node '%s' is expecting more signals than"); + + for (int i = 0; i < m->length; i++) + data[i] = (double) m->data[i].f; + + OpalSetAsyncRecvIconData(data, len, o->recv_id); + + return 0; } diff --git a/server/src/server.c b/server/src/server.c index 9556f8fe8..ae0e0b201 100644 --- a/server/src/server.c +++ b/server/src/server.c @@ -22,6 +22,10 @@ #include "path.h" #include "node.h" +#ifdef ENABLE_OPAL_ASYNC +#include "opal.h" +#endif + /** Linked list of nodes */ extern struct node *nodes; /** Linked list of paths */ @@ -103,6 +107,11 @@ void usage(const char *name) { printf("Usage: %s CONFIG\n", name); printf(" CONFIG is a required path to a configuration file\n\n"); +#ifdef ENABLE_OPAL_ASYNC + printf("Usage: %s OPAL_ASYNC_SHMEM_NAME OPAL_ASYNC_SHMEM_SIZE OPAL_PRINT_SHMEM_NAME\n", name); + printf(" This type of invocation is used by OPAL-RT Asynchronous processes.\n"); + printf(" See in the RT-LAB User Guide for more information.\n\n"); +#endif printf("Simulator2Simulator Server %s (built on %s, %s)\n", BLU(VERSION), MAG(__DATE__), MAG(__TIME__)); @@ -116,10 +125,10 @@ int main(int argc, char *argv[]) BLD(YEL(VERSION)), BLD(MAG(__DATE__)), BLD(MAG(__TIME__))); /* Check arguments */ -#ifndef ENABLE_OPAL_ASYNC - if (argc != 2) +#ifdef ENABLE_OPAL_ASYNC + if (argc != 2 && argc != 4) #else - if (argc != 2 || argc != 4) + if (argc != 2) #endif usage(argv[0]); @@ -139,22 +148,7 @@ int main(int argc, char *argv[]) #ifdef ENABLE_OPAL_ASYNC /* Check if called as asynchronous process from RT-LAB */ - if (argc == 4) { - /* Allocate memory */ - struct node *n = (struct node *) malloc(sizeof(struct node)); - if (!n) - error("Failed to allocate memory for node"); - - memset(n, 0, sizeof(struct node)); - - config_parse_node_opal(argc, argv, n); - - configfile = n->opal->icon_ctrl.StringParam[0]; - if (configfile && strlen(configfile)) - info("Found config file supplied by Opal Async process: '%s'", configfile); - - list_add(*nodes, n); - } + opal_init(argc, argv); #endif /* Parse configuration and create nodes/paths */ From c081729cb55c70dd9fa9ce947cb7f101cc304527 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Tue, 17 Mar 2015 22:47:43 +0100 Subject: [PATCH 04/23] revert changes for node_lookup_vtable() --- server/include/node.h | 8 ++------ server/src/node.c | 12 +++--------- 2 files changed, 5 insertions(+), 15 deletions(-) diff --git a/server/include/node.h b/server/include/node.h index b862a9a3b..533ac724e 100644 --- a/server/include/node.h +++ b/server/include/node.h @@ -118,14 +118,10 @@ int node_stop(struct node *n); /** Lookup string representation of socket type * - * You can either provide a node type in string or enum representation. - * Set str to NULL, to use the enum type. - * - * @param str A string describing the socket type. This must be one of: tcp, tcpd, udp, ip, ieee802.3 - * @param enu The enum type of the socket. + * @param str A string describing the socket type. This must be one of: tcp, tcpd, udp, ip, ieee802.3 or opal * @return A pointer to the vtable, or NULL if there is no socket type / vtable with this id. */ -struct node_vtable const * node_lookup_vtable(const char *str, struct node_type enu); +struct node_vtable const * node_lookup_vtable(const char *str); /** Search list of nodes for a name. * diff --git a/server/src/node.c b/server/src/node.c index 0cb830343..21c4e8048 100644 --- a/server/src/node.c +++ b/server/src/node.c @@ -47,17 +47,11 @@ struct node * node_lookup_name(const char *str, struct node *nodes) return NULL; } -struct node_vtable const * node_lookup_vtable(const char *str, struct node_type enu) +struct node_vtable const * node_lookup_vtable(const char *str) { for (int i = 0; i < ARRAY_LEN(vtables); i++) { - if (str) { - if (!strcmp(vtables[i].name, str)) - return &vtables[i]; - } - else { - if (vtables[i].type == enu) - return &vtables[i]; - } + if (!strcmp(vtables[i].name, str)) + return &vtables[i]; } return NULL; From d75dee0e58560c78553666578a8f5c2214c78a8d Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Tue, 17 Mar 2015 22:54:16 +0100 Subject: [PATCH 05/23] updated Makefile to link 32bit OPAL-RT libraries --- server/Makefile | 32 +++++++++++++++++++++----------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/server/Makefile b/server/Makefile index ecd198e1a..d1bb2637b 100644 --- a/server/Makefile +++ b/server/Makefile @@ -1,18 +1,8 @@ TARGETS = server send random receive test SRCS = server.c send.c receive.c random.c node.c path.c utils.c socket.c msg.c cfg.c if.c tc.c hist.c -# Default target: build everything -all: $(TARGETS) - COMMON = socket.o if.o utils.o msg.o node.o cfg.o tc.o hooks.o -# Dependencies for individual binaries -server: $(COMMON) path.o hist.o -send: $(COMMON) -receive: $(COMMON) -random: utils.o msg.o -test: $(COMMON) hist.o - VPATH = src # Default debug level @@ -21,7 +11,7 @@ V ?= 2 # Some details about the compiled version # Compiler and linker flags -LDFLAGS = -pthread -lrt -lm -lconfig +LDLIBS = -pthread -lrt -lm -lconfig CFLAGS = -std=c99 -Iinclude/ -MMD -Wall CFLAGS += -D_XOPEN_SOURCE=500 -D_GNU_SOURCE -DV=$(V) CFLAGS += -D__GIT_REV__='"-$(shell git rev-parse --short HEAD)"' @@ -33,8 +23,28 @@ else CFLAGS += -O3 endif +# Enable OPAL-RT Asynchronous Process support +#OPALDIR = /usr/opalrt/common +OPALDIR = ../opal +ifneq (,$(wildcard $(OPALDIR)/include_target/AsyncApi.h)) + CFLAGS += -m32 -DENABLE_OPAL_ASYNC -I$(OPALDIR)/include_target + LDFLAGS += -m32 + LDLIBS += $(addprefix $(OPALDIR)/lib/redhawk/, libOpalAsyncApiCore.a libOpalCore.a libOpalUtils.a libirc.a) + COMMON += opal.o +endif + .PHONY: all clean +# Default target: build everything +all: $(TARGETS) + +# Specific dependencies +server: $(COMMON) server.o path.o hist.o +send: $(COMMON) send.o +receive: $(COMMON) receive.o +random: random.o utils.o msg.o +test: $(COMMON) hist.o + clean: $(RM) *~ *.o *.d $(RM) $(TARGETS) From 14d6d0d6fea4f279eca073fc621032f1493cef4e Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Tue, 17 Mar 2015 22:59:43 +0100 Subject: [PATCH 06/23] fixed typos --- server/src/opal.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/opal.c b/server/src/opal.c index a14a13730..44fb41a4a 100644 --- a/server/src/opal.c +++ b/server/src/opal.c @@ -48,7 +48,7 @@ int opal_init(int argc, char *argv[]) /* Get list of Send and RecvIDs */ if ((err = OpalGetNbAsyncSendIcon(&g->send_icons)) != EOK) error("Failed to get number of send blocks (%d)", err); - if ((err = OpalGetNbAsyncRecvIcon(&g->recv_icons) != EOK); + if ((err = OpalGetNbAsyncRecvIcon(&g->recv_icons)) != EOK); error("Failed to get number of recv blocks (%d)", err); g->send_ids = (int *) malloc(g->send_icons * sizeof(int)); @@ -71,6 +71,8 @@ int opal_init(int argc, char *argv[]) int opal_deinit() { + int err; + if (og) { if ((err = OpalCloseAsyncMem(og->async_shmem_size, og->async_shmem_name)) != EOK) error("Failed to close shared memory area (%d)", err); From ff010ba291b30ac9c73b34a2966b20110b60d4e0 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Tue, 17 Mar 2015 23:11:28 +0100 Subject: [PATCH 07/23] small cleanups --- server/Makefile | 7 +++---- server/src/server.c | 8 ++++---- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/server/Makefile b/server/Makefile index ecd198e1a..b4607380b 100644 --- a/server/Makefile +++ b/server/Makefile @@ -1,9 +1,9 @@ TARGETS = server send random receive test -SRCS = server.c send.c receive.c random.c node.c path.c utils.c socket.c msg.c cfg.c if.c tc.c hist.c # Default target: build everything all: $(TARGETS) +# Common dependencies for all binaries COMMON = socket.o if.o utils.o msg.o node.o cfg.o tc.o hooks.o # Dependencies for individual binaries @@ -13,13 +13,12 @@ receive: $(COMMON) random: utils.o msg.o test: $(COMMON) hist.o +# Search path for source files VPATH = src # Default debug level V ?= 2 -# Some details about the compiled version - # Compiler and linker flags LDFLAGS = -pthread -lrt -lm -lconfig CFLAGS = -std=c99 -Iinclude/ -MMD -Wall @@ -28,7 +27,7 @@ CFLAGS += -D__GIT_REV__='"-$(shell git rev-parse --short HEAD)"' # Conditional flags ifdef DEBUG - CFLAGS += -g + CFLAGS += -O0 -g else CFLAGS += -O3 endif diff --git a/server/src/server.c b/server/src/server.c index 2eb7962e7..e6235f24e 100644 --- a/server/src/server.c +++ b/server/src/server.c @@ -111,13 +111,13 @@ void usage(const char *name) int main(int argc, char *argv[]) { - epoch_reset(); - info("This is Simulator2Simulator Server (S2SS) %s (built on %s, %s)", - BLD(YEL(VERSION)), BLD(MAG(__DATE__)), BLD(MAG(__TIME__))); - /* Check arguments */ if (argc != 2) usage(argv[0]); + + epoch_reset(); + info("This is Simulator2Simulator Server (S2SS) %s (built on %s, %s)", + BLD(YEL(VERSION)), BLD(MAG(__DATE__)), BLD(MAG(__TIME__))); /* Check priviledges */ if (getuid() != 0) From 85d23eae40d1ae37b8652784f2d92d6403082cbe Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Tue, 17 Mar 2015 23:20:47 +0100 Subject: [PATCH 08/23] added new helper function to allocate and initialize memory --- server/include/utils.h | 3 +++ server/src/utils.c | 11 +++++++++++ 2 files changed, 14 insertions(+) diff --git a/server/include/utils.h b/server/include/utils.h index 8b5f72582..4e822af04 100644 --- a/server/include/utils.h +++ b/server/include/utils.h @@ -75,6 +75,9 @@ void print(enum log_level lvl, const char *fmt, ...); */ cpu_set_t to_cpu_set(int set); +/** Allocate and initialize memory. */ +void * alloc(size_t bytes); + /** Get delta between two timespec structs */ double timespec_delta(struct timespec *start, struct timespec *end); diff --git a/server/src/utils.c b/server/src/utils.c index 7464ad8f1..c332e7799 100644 --- a/server/src/utils.c +++ b/server/src/utils.c @@ -80,6 +80,17 @@ cpu_set_t to_cpu_set(int set) return cset; } +void * alloc(size_t bytes) +{ + void *p = malloc(bytes); + if (!p) + error("Failed to allocate memory"); + + memset(p, 0, bytes); + + return p; +} + double timespec_delta(struct timespec *start, struct timespec *end) { double sec = end->tv_sec - start->tv_sec; From 61b3e45bca12002a71c9338b488b86340f909e71 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Tue, 17 Mar 2015 23:21:31 +0100 Subject: [PATCH 09/23] make use of new helper function: alloc() --- server/src/cfg.c | 31 +++++-------------------------- server/src/hist.c | 2 +- server/src/if.c | 6 +----- server/src/path.c | 2 +- server/src/test.c | 2 +- 5 files changed, 9 insertions(+), 34 deletions(-) diff --git a/server/src/cfg.c b/server/src/cfg.c index 0ead1d2a3..2999f140d 100644 --- a/server/src/cfg.c +++ b/server/src/cfg.c @@ -97,11 +97,7 @@ int config_parse_path(config_setting_t *cfg, int enabled = 1; int reverse = 0; - struct path *p = (struct path *) malloc(sizeof(struct path)); - if (!p) - error("Failed to allocate memory for path"); - - memset(p, 0, sizeof(struct path)); + struct path *p = alloc(sizeof(struct path)); /* Required settings */ if (!config_setting_lookup_string(cfg, "in", &in)) @@ -139,11 +135,7 @@ int config_parse_path(config_setting_t *cfg, list_add(*paths, p); if (reverse) { - struct path *rev = (struct path *) malloc(sizeof(struct path)); - if (!rev) - error("Failed to allocate memory for path"); - - memcpy(rev, p, sizeof(struct path)); + struct path *rev = alloc(sizeof(struct path)); rev->in = p->out; /* Swap in/out */ rev->out = p->in; @@ -167,12 +159,7 @@ int config_parse_node(config_setting_t *cfg, struct node **nodes) const char *type; int ret; - /* Allocate memory */ - struct node *n = (struct node *) malloc(sizeof(struct node)); - if (!n) - error("Failed to allocate memory for node"); - - memset(n, 0, sizeof(struct node)); + struct node *n = alloc(sizeof(struct node)); /* Required settings */ n->cfg = cfg; @@ -212,11 +199,7 @@ int config_parse_socket(config_setting_t *cfg, struct node *n) const char *local, *remote; int ret; - struct socket *s = (struct socket *) malloc(sizeof(struct socket)); - if (!s) - serror("Failed to allocate memory for socket"); - - memset(s, 0, sizeof(struct socket)); + struct socket *s = (struct socket *) alloc(sizeof(struct socket)); if (!config_setting_lookup_string(cfg, "remote", &remote)) cerror(cfg, "Missing remote address for node '%s'", n->name); @@ -237,11 +220,7 @@ int config_parse_socket(config_setting_t *cfg, struct node *n) /** @todo Netem settings are not usable AF_UNIX */ config_setting_t *cfg_netem = config_setting_get_member(cfg, "netem"); if (cfg_netem) { - s->netem = (struct netem *) malloc(sizeof(struct netem)); - if (!s->netem) - error("Failed to allocate memory for netem"); - - memset(s->netem, 0, sizeof(struct netem)); + s->netem = (struct netem *) alloc(sizeof(struct netem)); config_parse_netem(cfg_netem, s->netem); } diff --git a/server/src/hist.c b/server/src/hist.c index 742e11be2..cb20c0940 100644 --- a/server/src/hist.c +++ b/server/src/hist.c @@ -23,7 +23,7 @@ void hist_init(struct hist *h, double low, double high, double resolution) h->high = high; h->resolution = resolution; h->length = (high - low) / resolution; - h->data = malloc(h->length * sizeof(unsigned)); + h->data = alloc(h->length * sizeof(unsigned)); hist_reset(h); } diff --git a/server/src/if.c b/server/src/if.c index a1e5d7bcc..a8c99207d 100644 --- a/server/src/if.c +++ b/server/src/if.c @@ -26,11 +26,7 @@ struct interface *interfaces; struct interface * if_create(int index) { - struct interface *i = malloc(sizeof(struct interface)); - if (!i) - error("Failed to allocate memory for interface"); - else - memset(i, 0, sizeof(struct interface)); + struct interface *i = alloc(sizeof(struct interface)); i->index = index; if_indextoname(index, i->name); diff --git a/server/src/path.c b/server/src/path.c index 1a4c5793f..8d0e28516 100644 --- a/server/src/path.c +++ b/server/src/path.c @@ -67,7 +67,7 @@ static void * path_send(void *arg) static void * path_run(void *arg) { struct path *p = (struct path *) arg; - struct msg *m = malloc(sizeof(struct msg)); + struct msg *m = alloc(sizeof(struct msg)); if (!m) error("Failed to allocate memory for message!"); diff --git a/server/src/test.c b/server/src/test.c index e06901b22..594732f36 100644 --- a/server/src/test.c +++ b/server/src/test.c @@ -147,7 +147,7 @@ check: void test_rtt() { struct msg m = MSG_INIT(sizeof(struct timespec) / sizeof(float)); struct timespec *ts1 = (struct timespec *) &m.data; - struct timespec *ts2 = malloc(sizeof(struct timespec)); + struct timespec *ts2 = alloc(sizeof(struct timespec)); double rtt; double rtt_max = LLONG_MIN; From 9a3d19ce6c7f5af9fd84462ca6886cdbe84a2ade Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Tue, 17 Mar 2015 23:23:13 +0100 Subject: [PATCH 10/23] removed superfluous casts from void * --- server/src/cfg.c | 2 +- server/src/path.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/cfg.c b/server/src/cfg.c index 2999f140d..2b03dd23b 100644 --- a/server/src/cfg.c +++ b/server/src/cfg.c @@ -199,7 +199,7 @@ int config_parse_socket(config_setting_t *cfg, struct node *n) const char *local, *remote; int ret; - struct socket *s = (struct socket *) alloc(sizeof(struct socket)); + struct socket *s = alloc(sizeof(struct socket)); if (!config_setting_lookup_string(cfg, "remote", &remote)) cerror(cfg, "Missing remote address for node '%s'", n->name); diff --git a/server/src/path.c b/server/src/path.c index 8d0e28516..d2c55df3d 100644 --- a/server/src/path.c +++ b/server/src/path.c @@ -66,7 +66,7 @@ static void * path_send(void *arg) /** Receive messages */ static void * path_run(void *arg) { - struct path *p = (struct path *) arg; + struct path *p = arg; struct msg *m = alloc(sizeof(struct msg)); if (!m) error("Failed to allocate memory for message!"); From 39e6a92b6193a131de27e4b46ebdb26ebf329d92 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Wed, 18 Mar 2015 00:30:38 +0100 Subject: [PATCH 11/23] small cleanups --- server/include/node.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/include/node.h b/server/include/node.h index fe31fdf2e..78dba6987 100644 --- a/server/include/node.h +++ b/server/include/node.h @@ -39,12 +39,12 @@ enum node_type { UDP, /* BSD socket: AF_INET SOCK_DGRAM */ TCPD, /* BSD socket: AF_INET SOCK_STREAM bind + listen + accept */ TCP, /* BSD socket: AF_INET SOCK_STREAM bind + connect */ -// OPAL_ASYNC, /* OPAL-RT AsyncApi */ + OPAL_ASYNC, /* OPAL-RT Asynchronous Process Api */ // GTFPGA, /* Xilinx ML507 GTFPGA card */ INVALID }; -/** C++ like vtable construct for socket_types */ +/** C++ like vtable construct for node_types */ struct node_vtable { enum node_type type; const char *name; @@ -75,7 +75,7 @@ struct node /** Virtual data (used by vtable functions) */ union { struct socket *socket; - struct opal *opal; + struct opal *opal; struct gtfpga *gtfpga; }; From 84686ffacbefbaae8cccae80523f43881f512424 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Wed, 18 Mar 2015 00:31:48 +0100 Subject: [PATCH 12/23] added new doubly-linked list implementation (this one supersedes the old Linux style lists) --- server/include/list.h | 51 ++++++++++++++++++++++++++++++++++++ server/src/list.c | 60 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 111 insertions(+) create mode 100644 server/include/list.h create mode 100644 server/src/list.c diff --git a/server/include/list.h b/server/include/list.h new file mode 100644 index 000000000..1defacaf4 --- /dev/null +++ b/server/include/list.h @@ -0,0 +1,51 @@ +/** A generic linked list + * + * Linked lists a used for several data structures in the code. + * + * @author Steffen Vogel + * @copyright 2015, Institute for Automation of Complex Power Systems, EONERC + * @file + */ + +#ifndef _LIST_H_ +#define _LIST_H_ + +#include + +/* Forward declarations */ +struct list_elm; + +/** Static list initialization */ +#define LIST_INIT { \ + .head = NULL, \ + .tail = NULL, \ + .count = 0, \ + .lock = PTHREAD_MUTEX_INITIALIZER \ +} + +#define FOREACH(list, elm) \ + for ( struct list_elm *elm = (list)->head, ; \ + elm != (list)->tail; elm = elm->next ) + +struct list { + struct list_elm *head, tail; + int count; + + pthread_mutex_lock_t lock; +}; + +struct list_elm { + void *data; + + struct list_elm *prev, next; +}; + +void list_init(struct list *l); + +void list_destroy(struct list *l); + +void list_push(struct list *l, void *d); + +struct list_elm * list_search(struct list *l, int (*cmp)(void *)); + +#endif /* _LIST_H_ */ \ No newline at end of file diff --git a/server/src/list.c b/server/src/list.c new file mode 100644 index 000000000..247d8af61 --- /dev/null +++ b/server/src/list.c @@ -0,0 +1,60 @@ +/** A generic linked list + * + * Linked lists a used for several data structures in the code. + * + * @author Steffen Vogel + * @copyright 2015, Institute for Automation of Complex Power Systems, EONERC + * @file + */ + +#include "list.h" + +void list_init(struct list *l) +{ + pthread_mutex_init(&l->lock, NULL); + + l->count = 0; + l->head = NULL; + l->tail = NULL; +} + +void list_destroy(struct list *l) +{ + pthread_mutex_lock(&l->lock); + + struct list_elm *elm = l->head; + while (elm) { + struct list_elm *tmp = elm; + free(tmp); + + elm = elm->next; + } + + pthread_mutex_destroy(&l->lock); +} + +void list_push(struct list *l, void *d) +{ + struct list_elm *e = alloc(sizeof(struct list_elm)); + + pthread_mutex_lock(&l->lock); + + e->data = d; + e->prev = l->tail; + e->next = NULL; + + l->tail->next = e; + l->tail = e; + + pthread_mutex_unlock(&l->lock); +} + +struct list_elm * list_search(struct list *l, int (*cmp)(void *)) +{ + foreach(l, it) { + if (!cmp(it->data)) + return it; + } + + return NULL; +} From 51a1b42f46741771e78b0e9d1de27592edc878b2 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Wed, 18 Mar 2015 15:38:06 +0100 Subject: [PATCH 13/23] cherry picked (v)strap functions from opal-async branch --- server/include/utils.h | 9 +++++++++ server/src/utils.c | 19 +++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/server/include/utils.h b/server/include/utils.h index 4e822af04..1f9eebdcd 100644 --- a/server/include/utils.h +++ b/server/include/utils.h @@ -68,6 +68,15 @@ void epoch_reset(); */ void print(enum log_level lvl, const char *fmt, ...); +/** Safely append a format string to an existing string. + * + * This function is similar to strlcat() from BSD. + */ +int strap(char *dest, size_t size, const char *fmt, ...); + +/** Variable arguments (stdarg) version of strap() */ +int vstrap(char *dest, size_t size, const char *fmt, va_list va); + /** Convert integer to cpu_set_t. * * @param set A cpu bitmask diff --git a/server/src/utils.c b/server/src/utils.c index c332e7799..1cd554765 100644 --- a/server/src/utils.c +++ b/server/src/utils.c @@ -35,6 +35,25 @@ void epoch_reset() clock_gettime(CLOCK_REALTIME, &epoch); } +int strap(char *dest, size_t size, const char *fmt, ...) +{ + int ret; + + va_list ap; + va_start(ap, fmt); + ret = vstrap(dest, size, fmt, ap); + va_end(ap); + + return ret; +} + +int vstrap(char *dest, size_t size, const char *fmt, va_list ap) +{ + int len = strlen(dest); + + return vsnprintf(dest + len, size - len, fmt, ap); +} + void print(enum log_level lvl, const char *fmt, ...) { struct timespec ts; From dab130545cd7bfeee273e6998fc78d1ca3011eb1 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Wed, 18 Mar 2015 15:45:06 +0100 Subject: [PATCH 14/23] added path_print() for variable number of destination nodes --- server/include/path.h | 2 ++ server/src/cfg.c | 5 ++++- server/src/path.c | 42 +++++++++++++++++++++++++++++++++--------- 3 files changed, 39 insertions(+), 10 deletions(-) diff --git a/server/include/path.h b/server/include/path.h index 70f1533df..44167e986 100644 --- a/server/include/path.h +++ b/server/include/path.h @@ -89,4 +89,6 @@ int path_stop(struct path *p); */ void path_stats(struct path *p); +int path_print(struct path *p, char *buf, int len); + #endif /* _PATH_H_ */ diff --git a/server/src/cfg.c b/server/src/cfg.c index 2b03dd23b..08d9bc060 100644 --- a/server/src/cfg.c +++ b/server/src/cfg.c @@ -147,8 +147,11 @@ int config_parse_path(config_setting_t *cfg, } } else { - warn("Path '%s' => '%s' is not enabled", p->in->name, p->out->name); free(p); + char buf[33]; + path_print(p, buf, sizeof(buf)); + + warn("Path %s is not enabled", buf); } return 0; diff --git a/server/src/path.c b/server/src/path.c index d2c55df3d..faca5b5b1 100644 --- a/server/src/path.c +++ b/server/src/path.c @@ -97,11 +97,11 @@ static void * path_run(void *arg) /* Handle simulation restart */ if (m->sequence == 0 && abs(dist) >= 1) { + path_print(p, buf, sizeof(buf)); path_stats(p); - warn("Simulation for path %s " MAG("=>") " %s " - "restarted (p->seq=%u, m->seq=%u, dist=%d)", - p->in->name, p->out->name, - p->sequence, m->sequence, dist); + + warn("Simulation for path %s restarted (p->seq=%u, m->seq=%u, dist=%d)", + buf, p->sequence, m->sequence, dist); /* Reset counters */ p->sent = 0; @@ -142,7 +142,10 @@ static void * path_run(void *arg) int path_start(struct path *p) { INDENT - info("Starting path: %12s " GRN("=>") " %-12s", p->in->name, p->out->name); + char buf[33]; + path_print(p, buf, sizeof(buf)); + + info("Starting path: %s", buf); hist_init(&p->histogram, -HIST_SEQ, +HIST_SEQ, 1); @@ -155,7 +158,10 @@ int path_start(struct path *p) int path_stop(struct path *p) { INDENT - info("Stopping path: %12s " RED("=>") " %-12s", p->in->name, p->out->name); + char buf[33]; + path_print(p, buf, sizeof(buf)); + + info("Stopping path: %s", buf); pthread_cancel(p->recv_tid); pthread_join(p->recv_tid, NULL); @@ -176,8 +182,26 @@ int path_stop(struct path *p) void path_stats(struct path *p) { - info("%12s " MAG("=>") " %-12s: %-8u %-8u %-8u %-8u %-8u", - p->in->name, p->out->name, - p->sent, p->received, p->dropped, p->skipped, p->invalid + char buf[33]; + path_print(p, buf, sizeof(buf)); + + info("%-32s : %-8u %-8u %-8u %-8u %-8u", + buf, p->sent, p->received, p->dropped, p->skipped, p->invalid ); } + +int path_print(struct path *p, char *buf, int len) +{ + *buf = 0; + + if (list_length(&p->destinations) > 1) { + strap(buf, len, "%s " MAG("=>") " [", p->in->name); + FOREACH(&p->destinations, it) + strap(buf, len, " %s", it->node->name); + strap(buf, len, " ]"); + } + else + strap(buf, len, "%s " MAG("=>") " %s", p->in->name, p->out->name); + + return 0; +} From e30e9292f2b5ac05065a00ba8408420727f84166 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Wed, 18 Mar 2015 15:47:18 +0100 Subject: [PATCH 15/23] proper release of path structure (path_destroy() vs free()) --- server/include/path.h | 2 ++ server/src/cfg.c | 2 +- server/src/path.c | 9 +++++++++ server/src/server.c | 7 ++++--- 4 files changed, 16 insertions(+), 4 deletions(-) diff --git a/server/include/path.h b/server/include/path.h index 44167e986..5867c6cc9 100644 --- a/server/include/path.h +++ b/server/include/path.h @@ -91,4 +91,6 @@ void path_stats(struct path *p); int path_print(struct path *p, char *buf, int len); +int path_destroy(struct path *p); + #endif /* _PATH_H_ */ diff --git a/server/src/cfg.c b/server/src/cfg.c index 08d9bc060..205bfe5e0 100644 --- a/server/src/cfg.c +++ b/server/src/cfg.c @@ -147,11 +147,11 @@ int config_parse_path(config_setting_t *cfg, } } else { - free(p); char buf[33]; path_print(p, buf, sizeof(buf)); warn("Path %s is not enabled", buf); + path_destroy(p); } return 0; diff --git a/server/src/path.c b/server/src/path.c index faca5b5b1..347154fa9 100644 --- a/server/src/path.c +++ b/server/src/path.c @@ -66,6 +66,7 @@ static void * path_send(void *arg) /** Receive messages */ static void * path_run(void *arg) { + char buf[33]; struct path *p = arg; struct msg *m = alloc(sizeof(struct msg)); if (!m) @@ -205,3 +206,11 @@ int path_print(struct path *p, char *buf, int len) return 0; } + +int path_destroy(struct path *p) +{ + list_destroy(&p->destinations); + list_destroy(&p->hooks); + + return 0; +} diff --git a/server/src/server.c b/server/src/server.c index e6235f24e..f9ba46162 100644 --- a/server/src/server.c +++ b/server/src/server.c @@ -38,6 +38,7 @@ static void quit() info("Stopping paths:"); for (struct path *p = paths; p; p = p->next) { INDENT path_stop(p); + path_destroy(p); } info("Stopping nodes:"); @@ -50,7 +51,7 @@ static void quit() if_stop(i); } - /** @todo Free nodes and paths */ + /** @todo Free nodes */ config_destroy(&config); @@ -155,8 +156,8 @@ int main(int argc, char *argv[]) struct path *p = paths; info("Runtime Statistics:"); - info("%12s " MAG("=>") " %-12s: %-8s %-8s %-8s %-8s %-8s", - "Source", "Destination", "#Sent", "#Recv", "#Drop", "#Skip", "#Inval"); + info("%-32s : %-8s %-8s %-8s %-8s %-8s", + "Source " MAG("=>") " Destination", "#Sent", "#Recv", "#Drop", "#Skip", "#Inval"); info("---------------------------------------------------------------------------"); while (1) { From eebc1edf9e0745bc81f1f948dbaebb604ffc6f99 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Wed, 18 Mar 2015 15:47:49 +0100 Subject: [PATCH 16/23] fixed and improved linked list --- server/include/list.h | 33 ++++++++++++++++++++++++++------- server/src/list.c | 19 ++++++++++++++----- 2 files changed, 40 insertions(+), 12 deletions(-) diff --git a/server/include/list.h b/server/include/list.h index 1defacaf4..999365833 100644 --- a/server/include/list.h +++ b/server/include/list.h @@ -12,8 +12,13 @@ #include +#include "hooks.h" + /* Forward declarations */ struct list_elm; +struct node; +struct path; +struct interface; /** Static list initialization */ #define LIST_INIT { \ @@ -24,27 +29,41 @@ struct list_elm; } #define FOREACH(list, elm) \ - for ( struct list_elm *elm = (list)->head, ; \ - elm != (list)->tail; elm = elm->next ) + for ( struct list_elm *elm = (list)->head; \ + elm; elm = elm->next ) + +#define FOREACH_R(list, elm) \ + for ( struct list_elm *elm = (list)->tail; \ + elm; elm = elm->prev ) + +#define list_first(list) ((list)->head) +#define list_last(list) ((list)->head) +#define list_length(list) ((list)->count) struct list { - struct list_elm *head, tail; + struct list_elm *head, *tail; int count; - pthread_mutex_lock_t lock; + pthread_mutex_t lock; }; struct list_elm { - void *data; + union { + void *ptr; + struct node *node; + struct path *path; + struct interface *interface; + hook_cb_t hook; + }; - struct list_elm *prev, next; + struct list_elm *prev, *next; }; void list_init(struct list *l); void list_destroy(struct list *l); -void list_push(struct list *l, void *d); +void list_push(struct list *l, void *p); struct list_elm * list_search(struct list *l, int (*cmp)(void *)); diff --git a/server/src/list.c b/server/src/list.c index 247d8af61..d34c8beef 100644 --- a/server/src/list.c +++ b/server/src/list.c @@ -7,6 +7,7 @@ * @file */ +#include "utils.h" #include "list.h" void list_init(struct list *l) @@ -33,26 +34,34 @@ void list_destroy(struct list *l) pthread_mutex_destroy(&l->lock); } -void list_push(struct list *l, void *d) +void list_push(struct list *l, void *p) { struct list_elm *e = alloc(sizeof(struct list_elm)); pthread_mutex_lock(&l->lock); - e->data = d; + e->ptr = p; e->prev = l->tail; e->next = NULL; - l->tail->next = e; + if (l->tail) + l->tail->next = e; + if (l->head) + l->head->prev = e; + else + l->head = e; + l->tail = e; + l->count++; + pthread_mutex_unlock(&l->lock); } struct list_elm * list_search(struct list *l, int (*cmp)(void *)) { - foreach(l, it) { - if (!cmp(it->data)) + FOREACH(l, it) { + if (!cmp(it->ptr)) return it; } From 980f1d12fae85493d2e12d2a856d0b5bdd71252d Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Wed, 18 Mar 2015 15:50:02 +0100 Subject: [PATCH 17/23] updated config parser to handle multiple destination nodes --- server/include/cfg.h | 5 +++ server/src/cfg.c | 93 ++++++++++++++++++++++++++++++++------------ 2 files changed, 74 insertions(+), 24 deletions(-) diff --git a/server/include/cfg.h b/server/include/cfg.h index 5e93c42aa..6e336bb08 100644 --- a/server/include/cfg.h +++ b/server/include/cfg.h @@ -14,6 +14,8 @@ #include +/* Forward declarations */ +struct list; struct node; struct path; struct interface; @@ -70,6 +72,9 @@ int config_parse_global(config_setting_t *cfg, struct settings *set); */ int config_parse_path(config_setting_t *cfg, struct path **paths, struct node **nodes); + +int config_parse_nodelist(config_setting_t *cfg, struct list *nodes, struct node **all); + /** Parse a single node and add it to the global configuration. * diff --git a/server/src/cfg.c b/server/src/cfg.c index 205bfe5e0..5a954e341 100644 --- a/server/src/cfg.c +++ b/server/src/cfg.c @@ -9,12 +9,13 @@ #include #include +#include "utils.h" +#include "list.h" #include "if.h" #include "tc.h" #include "cfg.h" #include "node.h" #include "path.h" -#include "utils.h" #include "hooks.h" #include "socket.h" @@ -93,26 +94,31 @@ int config_parse_global(config_setting_t *cfg, struct settings *set) int config_parse_path(config_setting_t *cfg, struct path **paths, struct node **nodes) { - const char *in, *out, *hook; + const char *in; int enabled = 1; int reverse = 0; - + struct path *p = alloc(sizeof(struct path)); - /* Required settings */ - if (!config_setting_lookup_string(cfg, "in", &in)) - cerror(cfg, "Missing input node for path"); - - if (!config_setting_lookup_string(cfg, "out", &out)) - cerror(cfg, "Missing output node for path"); - + /* Input node */ + struct config_setting_t *cfg_in = config_setting_get_member(cfg, "in"); + if (!cfg_in || config_setting_type(cfg_in) != CONFIG_TYPE_STRING) + cerror(cfg, "Invalid input node for path"); + + in = config_setting_get_string(cfg_in); p->in = node_lookup_name(in, *nodes); if (!p->in) - cerror(cfg, "Invalid input node '%s'", in); + cerror(cfg_in, "Invalid input node '%s", in); - p->out = node_lookup_name(out, *nodes); - if (!p->out) - cerror(cfg, "Invalid output node '%s'", out); + /* Output node(s) */ + struct config_setting_t *cfg_out = config_setting_get_member(cfg, "out"); + if (cfg_out) + config_parse_nodelist(cfg_out, &p->destinations, nodes); + + if (list_length(&p->destinations) >= 1) + p->out = list_first(&p->destinations)->node; + else + cerror(cfg, "Missing output node for path"); /* Optional settings */ if (config_setting_lookup_string(cfg, "hook", &hook)) { @@ -130,21 +136,28 @@ int config_parse_path(config_setting_t *cfg, if (enabled) { p->in->refcnt++; - p->out->refcnt++; - - list_add(*paths, p); - + FOREACH(&p->destinations, it) + it->node->refcnt++; + if (reverse) { - struct path *rev = alloc(sizeof(struct path)); + if (list_length(&p->destinations) > 1) + warn("Using first destination '%s' as source for reverse path. " + "Ignoring remaining nodes", p->out->name); - rev->in = p->out; /* Swap in/out */ - rev->out = p->in; + struct path *r = alloc(sizeof(struct path)); - rev->in->refcnt++; - rev->out->refcnt++; + r->in = p->out; /* Swap in/out */ + r->out = p->in; + + list_push(&r->destinations, r->out); - list_add(*paths, rev); + r->in->refcnt++; + r->out->refcnt++; + + list_add(*paths, r); } + + list_add(*paths, p); } else { char buf[33]; @@ -157,6 +170,38 @@ int config_parse_path(config_setting_t *cfg, return 0; } +int config_parse_nodelist(config_setting_t *cfg, struct list *nodes, struct node **all) { + const char *str; + struct node *node; + + switch (config_setting_type(cfg)) { + case CONFIG_TYPE_STRING: + str = config_setting_get_string(cfg); + node = node_lookup_name(str, *all); + if (!node) + cerror(cfg, "Invalid outgoing node '%s'", str); + + list_push(nodes, node); + break; + + case CONFIG_TYPE_ARRAY: + for (int i=0; i Date: Wed, 18 Mar 2015 15:53:01 +0100 Subject: [PATCH 18/23] finished support for multiple destination nodes --- server/include/path.h | 14 +++++++++----- server/src/path.c | 18 ++++++++++++------ 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/server/include/path.h b/server/include/path.h index 5867c6cc9..78bc4432f 100644 --- a/server/include/path.h +++ b/server/include/path.h @@ -11,6 +11,7 @@ #include #include +#include "list.h" #include "config.h" #include "hist.h" #include "node.h" @@ -25,11 +26,14 @@ struct path { /** Pointer to the incoming node */ struct node *in; - /** Pointer to the outgoing node */ + /** Pointer to the first outgoing node. + * Usually this is only a pointer to the first list element of path::destinations. */ struct node *out; - - /** Function pointer of the hook */ - hook_cb_t hook; + + /** List of all outgoing nodes */ + struct list destinations; + /** List of function pointers to hooks */ + struct list hooks; /** Send messages with a fixed rate over this path */ double rate; @@ -43,7 +47,7 @@ struct path /** Last known message number */ unsigned int sequence; - /** Counter for sent messages to all outgoing nodes*/ + /** Counter for sent messages to all outgoing nodes */ unsigned int sent; /** Counter for received messages from all incoming nodes */ unsigned int received; diff --git a/server/src/path.c b/server/src/path.c index 347154fa9..264d005b1 100644 --- a/server/src/path.c +++ b/server/src/path.c @@ -21,7 +21,7 @@ /** Linked list of paths */ struct path *paths; -/** Send messages */ +/** Send messages asynchronously */ static void * path_send(void *arg) { int sig; @@ -53,9 +53,12 @@ static void * path_send(void *arg) while (1) { sigwait(&set, &sig); /* blocking wait for next timer tick */ - if (p->last) { - node_write(p->out, p->last); - p->last = NULL; + + if (p->received) { + FOREACH(&p->destinations, it) { + node_write(it->node, p->last); + } + p->sent++; } } @@ -74,7 +77,7 @@ static void * path_run(void *arg) /* Open deferred TCP connection */ node_start_defer(p->in); - node_start_defer(p->out); + // FIXME: node_start_defer(p->out); /* Main thread loop */ while (1) { @@ -131,7 +134,10 @@ static void * path_run(void *arg) /* At fixed rate mode, messages are send by another thread */ if (!p->rate) { - node_write(p->out, m); /* Send message */ + FOREACH(&p->destinations, it) { + node_write(it->node, m); + } + p->sent++; } } From 9c57c54abedbdfb86c5cbfe3d6c8685ebff1ab07 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Wed, 18 Mar 2015 16:13:18 +0100 Subject: [PATCH 19/23] improved hook user callback system and added new examples (untested) --- server/include/cfg.h | 5 +- server/include/hooks.h | 23 +++++++-- server/src/cfg.c | 41 +++++++++++++--- server/src/hooks.c | 105 ++++++++++++++++++++++++++++++++++------- server/src/path.c | 8 ++-- 5 files changed, 151 insertions(+), 31 deletions(-) diff --git a/server/include/cfg.h b/server/include/cfg.h index 6e336bb08..a60c493bf 100644 --- a/server/include/cfg.h +++ b/server/include/cfg.h @@ -76,6 +76,8 @@ int config_parse_path(config_setting_t *cfg, int config_parse_nodelist(config_setting_t *cfg, struct list *nodes, struct node **all); +int config_parse_hooks(config_setting_t *cfg, struct list *hooks); + /** Parse a single node and add it to the global configuration. * * @param cfg A libconfig object pointing to the node @@ -83,8 +85,7 @@ int config_parse_nodelist(config_setting_t *cfg, struct list *nodes, struct node * @retval 0 Success. Everything went well. * @retval <0 Error. Something went wrong. */ -int config_parse_node(config_setting_t *cfg, - struct node **nodes); +int config_parse_node(config_setting_t *cfg, struct node **nodes); /** Parse node connection details for OPAL type * diff --git a/server/include/hooks.h b/server/include/hooks.h index ca949c4fc..b245cc108 100644 --- a/server/include/hooks.h +++ b/server/include/hooks.h @@ -48,13 +48,28 @@ hook_cb_t hook_lookup(const char *name); /** Example hook: Print the message. */ int hook_print(struct msg *m, struct path *p); -/** Example hook: Filter the message on some criteria. */ -int hook_filter(struct msg *m, struct path *p); +/** Example hook: Log messages to a logfile in /tmp */ +int hook_log(struct msg *m, struct path *p); + +#define HOOK_LOG_MODE "w+" +#define HOOK_LOG_TEMPLATE "logs/s2ss-%Y_%m_%d-%H_%M_%S.log" + +/** Example hook: Drop messages. */ +int hook_decimate(struct msg *m, struct path *p); + +#define HOOK_DECIMATE_RATIO 10 /** Example hook: Convert the message values to fixed precision. */ int hook_tofixed(struct msg *m, struct path *p); -/** Example hook: Chain multiple hooks */ -int hook_multiple(struct msg *m, struct path *p); +/** Example hook: add timestamp to message. */ +int hook_ts(struct msg *m, struct path *p); + +#define HOOK_TS_INDEX -1 // last message + +/** Example hook: Finite-Impulse-Response (FIR) filter. */ +int hook_fir(struct msg *m, struct path *p); + +#define HOOK_FIR_INDEX 1 #endif /* _HOOKS_H_ */ diff --git a/server/src/cfg.c b/server/src/cfg.c index 5a954e341..6e01e5139 100644 --- a/server/src/cfg.c +++ b/server/src/cfg.c @@ -121,12 +121,9 @@ int config_parse_path(config_setting_t *cfg, cerror(cfg, "Missing output node for path"); /* Optional settings */ - if (config_setting_lookup_string(cfg, "hook", &hook)) { - p->hook = hook_lookup(hook); - - if (!p->hook) - cerror(cfg, "Failed to lookup hook function. Not registred?"); - } + struct config_setting_t *cfg_hook = config_setting_get_member(cfg, "hook"); + if (cfg_hook) + config_parse_hooks(cfg_hook, &p->hooks); config_setting_lookup_bool(cfg, "enabled", &enabled); config_setting_lookup_bool(cfg, "reverse", &reverse); @@ -202,6 +199,38 @@ int config_parse_nodelist(config_setting_t *cfg, struct list *nodes, struct node return 0; } +int config_parse_hooks(config_setting_t *cfg, struct list *hooks) { + const char *str; + hook_cb_t hook; + + switch (config_setting_type(cfg)) { + case CONFIG_TYPE_STRING: + str = config_setting_get_string(cfg); + hook = hook_lookup(str); + if (!hook) + cerror(cfg, "Invalid hook function '%s'", str); + + list_push(hooks, hook); + break; + + case CONFIG_TYPE_ARRAY: + for (int i=0; i * @copyright 2014, Institute for Automation of Complex Power Systems, EONERC */ - + +#include +#include #include +#include +#include #include "msg.h" #include "hooks.h" +#include "path.h" +#include "utils.h" /** @todo Make const */ static struct hook_id hook_list[] = { { hook_print, "print" }, - { hook_filter, "filter" }, + { hook_log, "log" }, + { hook_decimate, "decimate" }, { hook_tofixed, "tofixed" }, - { hook_multiple, "multiple" }, + { hook_ts, "ts" }, + { hook_fir, "fir" }, { NULL } }; @@ -43,13 +51,39 @@ int hook_print(struct msg *m, struct path *p) return 0; } -int hook_filter(struct msg *m, struct path *p) +int hook_log(struct msg *m, struct path *p) { - /* Drop every 10th message */ - if (m->sequence % 10 == 0) - return -1; - else - return 0; + static pthread_key_t pkey; + FILE *file = pthread_getspecific(pkey); + + if (!file) { + char fstr[64], pstr[33]; + path_print(p, pstr, sizeof(pstr)); + + struct tm tm; + time_t ts = time(NULL); + localtime_r(&ts, &tm); + strftime(fstr, sizeof(fstr), HOOK_LOG_TEMPLATE, &tm); + + + + file = fopen(fstr, HOOK_LOG_MODE); + if (file) + debug(5, "Opened log file for path %s: %s", pstr, fstr); + + pthread_key_create(&pkey, (void (*)(void *)) fclose); + pthread_setspecific(pkey, file); + } + + msg_fprint(file, m); + + return 0; +} + +int hook_decimate(struct msg *m, struct path *p) +{ + /* Drop every HOOK_DECIMATE_RATIO'th message */ + return (m->sequence % HOOK_DECIMATE_RATIO == 0) ? -1 : 0; } int hook_tofixed(struct msg *m, struct path *p) @@ -61,12 +95,51 @@ int hook_tofixed(struct msg *m, struct path *p) return 0; } -int hook_multiple(struct msg *m, struct path *p) +int hook_ts(struct msg *m, struct path *p) { - if (hook_print(m, p)) - return -1; - else if (hook_tofixed(m, p)) - return -1; - else - return 0; + struct timespec *ts = (struct timespec *) &m->data[HOOK_TS_INDEX]; + + clock_gettime(CLOCK_REALTIME, ts); + + return 0; +} + +/** Simple FIR-LP: F_s = 1kHz, F_pass = 100 Hz, F_block = 300 + * Tip: Use MATLAB's filter design tool and export coefficients + * with the integrated C-Header export */ +static const double hook_fir_coeffs[] = { -0.003658148158728, -0.008882653268281, 0.008001024183003, + 0.08090485991761, 0.2035239551043, 0.3040703593515, + 0.3040703593515, 0.2035239551043, 0.08090485991761, + 0.008001024183003, -0.008882653268281,-0.003658148158728 }; + +/** @todo: test */ +int hook_fir(struct msg *m, struct path *p) +{ + static pthread_key_t pkey; + float *history = pthread_getspecific(pkey); + + /** Length of impulse response */ + int len = ARRAY_LEN(hook_fir_coeffs); + /** Current index in circular history buffer */ + int cur = m->sequence % len; + /* Accumulator */ + double sum = 0; + + /* Create thread local storage for circular history buffer */ + if (!history) { + history = malloc(len * sizeof(float)); + + pthread_key_create(&pkey, free); + pthread_setspecific(pkey, history); + } + + /* Update circular buffer */ + history[cur] = m->data[HOOK_FIR_INDEX].f; + + for (int i=0; idata[HOOK_FIR_INDEX].f = sum; + + return 0; } diff --git a/server/src/path.c b/server/src/path.c index 264d005b1..a697ae2a0 100644 --- a/server/src/path.c +++ b/server/src/path.c @@ -123,9 +123,11 @@ static void * path_run(void *arg) } /* Call hook callbacks */ - if (p->hook && p->hook(m, p)) { - p->skipped++; - continue; + FOREACH(&p->hooks, it) { + if (it->hook(m, p)) { + p->skipped++; + continue; + } } /* Update last known sequence number */ From a4305d5c4a009a5ca5f067addc9f26055dc0a650 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Wed, 18 Mar 2015 16:15:11 +0100 Subject: [PATCH 20/23] added logging directory to ignore list (used by new hook_log example) --- server/.gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/.gitignore b/server/.gitignore index f929d8dbc..5772b93b7 100644 --- a/server/.gitignore +++ b/server/.gitignore @@ -1,3 +1,5 @@ +logs/ + *.d *.o *~ From a9e1137a62a82cb1a35b2628c9fce1fcb21b93cb Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Wed, 18 Mar 2015 16:16:44 +0100 Subject: [PATCH 21/23] small cleanups --- server/src/node.c | 4 +++- server/src/socket.c | 6 ++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/server/src/node.c b/server/src/node.c index 003ebf66f..0b722f85e 100644 --- a/server/src/node.c +++ b/server/src/node.c @@ -58,8 +58,10 @@ struct node_vtable const * node_lookup_vtable(const char *str) int node_start(struct node *n) { - if (!n->refcnt) + if (!n->refcnt) { + warn("Node '%s' is unused. Skipping...", n->name); return -1; + } char str[256]; node_print(n, str, sizeof(str)); diff --git a/server/src/socket.c b/server/src/socket.c index 89fef9c8d..bfab7155e 100644 --- a/server/src/socket.c +++ b/server/src/socket.c @@ -46,9 +46,7 @@ int socket_open(struct node *n) struct sockaddr_in *sin = (struct sockaddr_in *) &s->local; struct sockaddr_ll *sll = (struct sockaddr_ll *) &s->local; int ret; - - s->sd = s->sd2 = -1; - + /* Create socket */ switch (node_type(n)) { case TCPD: @@ -166,7 +164,7 @@ int socket_write(struct node *n, struct msg *m) ret = send(s->sd, m, MSG_LEN(m->length), 0); if (ret < 0) - serror("Failed sendto"); + serror("Failed send(to)"); debug(10, "Message sent to node '%s': version=%u, type=%u, endian=%u, length=%u, sequence=%u", n->name, m->version, m->type, m->endian, m->length, ntohs(m->sequence)); From 7883a762cd80bcffd5db89b6f80d1eef06cf28e3 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Wed, 18 Mar 2015 16:18:10 +0100 Subject: [PATCH 22/23] added getopt() parser to toolchain and added -r (aka --reverse) as parameter --- server/include/node.h | 6 ++++++ server/include/utils.h | 6 ++++++ server/src/node.c | 14 +++++++++++++ server/src/receive.c | 46 +++++++++++++++++++++++++++++------------ server/src/send.c | 47 ++++++++++++++++++++++++++++++------------ 5 files changed, 93 insertions(+), 26 deletions(-) diff --git a/server/include/node.h b/server/include/node.h index 78dba6987..2b97294be 100644 --- a/server/include/node.h +++ b/server/include/node.h @@ -131,4 +131,10 @@ struct node_vtable const * node_lookup_vtable(const char *str); */ struct node* node_lookup_name(const char *str, struct node *nodes); +/** Reverse local and remote socket address. + * This is usefull for the helper programs: send, receive, test + * because they usually use the same configuration file as the + * server and therefore the direction needs to be swapped. */ +int node_reverse(struct node *n); + #endif /* _NODE_H_ */ diff --git a/server/include/utils.h b/server/include/utils.h index 4e822af04..b4eb8a028 100644 --- a/server/include/utils.h +++ b/server/include/utils.h @@ -39,6 +39,12 @@ #define ARRAY_LEN(a) ( sizeof a / sizeof a[0] ) +#define SWAP(a, b) do { \ + __typeof__(a) tmp = a; \ + a = b; \ + b = tmp; \ + } while(0) + /** The log level which is passed as first argument to print() */ enum log_level { DEBUG, INFO, WARN, ERROR }; diff --git a/server/src/node.c b/server/src/node.c index 0b722f85e..216e200e2 100644 --- a/server/src/node.c +++ b/server/src/node.c @@ -105,3 +105,17 @@ int node_stop(struct node *n) return ret; } + +int node_reverse(struct node *n) +{ + switch (n->vt->type) { + case IEEE_802_3: + case IP: + case UDP: + case TCP: + SWAP(n->socket->remote, n->socket->local); + break; + default: { } + } + return n->vt->open == socket_open; +} diff --git a/server/src/receive.c b/server/src/receive.c index 16738b864..18f131c0e 100644 --- a/server/src/receive.c +++ b/server/src/receive.c @@ -33,21 +33,36 @@ void quit(int sig, siginfo_t *si, void *ptr) exit(EXIT_SUCCESS); } +void usage(char *name) +{ + printf("Usage: %s CONFIG NODE\n", name); + printf(" CONFIG path to a configuration file\n"); + printf(" NODE name of the node which shoud be used\n\n"); + printf("Simulator2Simulator Server %s (built on %s %s)\n", + BLU(VERSION), MAG(__DATE__), MAG(__TIME__)); + printf(" Copyright 2014, Institute for Automation of Complex Power Systems, EONERC\n"); + printf(" Steffen Vogel \n"); + exit(EXIT_FAILURE); +} + int main(int argc, char *argv[]) { + char c; + int reverse = 0; + struct config_t config; - - if (argc != 3) { - printf("Usage: %s CONFIG NODE\n", argv[0]); - printf(" CONFIG path to a configuration file\n"); - printf(" NODE name of the node which shoud be used\n\n"); - printf("Simulator2Simulator Server %s (built on %s %s)\n", - BLU(VERSION), MAG(__DATE__), MAG(__TIME__)); - printf(" Copyright 2014, Institute for Automation of Complex Power Systems, EONERC\n"); - printf(" Steffen Vogel \n"); - exit(EXIT_FAILURE); + + while ((c = getopt(argc, argv, "hr")) != -1) { + switch (c) { + case 'r': reverse = 1; break; + case 'h': + case '?': usage(argv[0]); + } } + if (argc - optind != 2) + usage(argv[0]); + /* Setup signals */ struct sigaction sa_quit = { .sa_flags = SA_SIGINFO, @@ -59,11 +74,16 @@ int main(int argc, char *argv[]) sigaction(SIGINT, &sa_quit, NULL); config_init(&config); - config_parse(argv[1], &config, &set, &nodes, NULL); + config_parse(argv[optind], &config, &set, &nodes, NULL); - node = node_lookup_name(argv[2], nodes); + node = node_lookup_name(argv[optind+1], nodes); if (!node) - error("There's no node with the name '%s'", argv[2]); + error("There's no node with the name '%s'", argv[optind+1]); + + node->refcnt++; + + if (reverse) + node_reverse(node); node_start(node); node_start_defer(node); diff --git a/server/src/send.c b/server/src/send.c index 51db9cc99..2dd7bae2a 100644 --- a/server/src/send.c +++ b/server/src/send.c @@ -35,20 +35,36 @@ void quit(int sig, siginfo_t *si, void *ptr) exit(EXIT_SUCCESS); } +void usage(char *name) +{ + printf("Usage: %s [-r] CONFIG NODE\n", name); + printf(" -r swap local / remote address of socket based nodes)\n"); + printf(" CONFIG path to a configuration file\n"); + printf(" NODE name of the node which shoud be used\n"); + printf("Simulator2Simulator Server %s (built on %s %s)\n", + BLU(VERSION), MAG(__DATE__), MAG(__TIME__)); + printf(" Copyright 2014, Institute for Automation of Complex Power Systems, EONERC\n"); + printf(" Steffen Vogel \n"); + exit(EXIT_FAILURE); +} + int main(int argc, char *argv[]) { + char c; + int reverse = 0; + struct config_t config; - - if (argc != 3) { - printf("Usage: %s CONFIG NODE\n", argv[0]); - printf(" CONFIG path to a configuration file\n"); - printf(" NODE name of the node which shoud be used\n\n"); - printf("Simulator2Simulator Server %s (built on %s %s)\n", - BLU(VERSION), MAG(__DATE__), MAG(__TIME__)); - printf(" Copyright 2014, Institute for Automation of Complex Power Systems, EONERC\n"); - printf(" Steffen Vogel \n"); - exit(EXIT_FAILURE); + + while ((c = getopt(argc, argv, "hr")) != -1) { + switch (c) { + case 'r': reverse = 1; break; + case 'h': + case '?': usage(argv[0]); + } } + + if (argc - optind != 2) + usage(argv[0]); /* Setup signals */ struct sigaction sa_quit = { @@ -61,11 +77,16 @@ int main(int argc, char *argv[]) sigaction(SIGINT, &sa_quit, NULL); config_init(&config); - config_parse(argv[1], &config, &set, &nodes, NULL); + config_parse(argv[optind], &config, &set, &nodes, NULL); - node = node_lookup_name(argv[2], nodes); + node = node_lookup_name(argv[optind+1], nodes); if (!node) - error("There's no node with the name '%s'", argv[2]); + error("There's no node with the name '%s'", argv[optind+1]); + + node->refcnt++; + + if (reverse) + node_reverse(node); node_start(node); node_start_defer(node); From 695ab7d8d60b3beac449f3803204efb71c92f90b Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Wed, 18 Mar 2015 16:20:38 +0100 Subject: [PATCH 23/23] simplified Makefile --- server/Makefile | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/server/Makefile b/server/Makefile index b4607380b..bf9752125 100644 --- a/server/Makefile +++ b/server/Makefile @@ -4,14 +4,14 @@ TARGETS = server send random receive test all: $(TARGETS) # Common dependencies for all binaries -COMMON = socket.o if.o utils.o msg.o node.o cfg.o tc.o hooks.o +OBJS = socket.o if.o utils.o msg.o node.o cfg.o tc.o hooks.o list.o path.o hist.o # Dependencies for individual binaries -server: $(COMMON) path.o hist.o -send: $(COMMON) -receive: $(COMMON) -random: utils.o msg.o -test: $(COMMON) hist.o +server: server.o $(OBJS) +send: send.o $(OBJS) +receive: receive.o $(OBJS) +random: random.o $(OBJS) +test: test.o $(OBJS) # Search path for source files VPATH = src