From 4e911be070077e42db3669c894136db7f13af8c2 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Fri, 5 Dec 2014 12:39:52 +0100 Subject: [PATCH] final commit for vtable based nodes --- server/include/cfg.h | 27 ++++++++ server/include/if.h | 49 ++++++++++++-- server/include/node.h | 95 ++++++++++++++++++++------- server/include/utils.h | 6 ++ server/src/cfg.c | 67 ++++++++++++------- server/src/if.c | 143 ++++++++++++++++++++++++++++++++--------- server/src/node.c | 121 ++++++++++++++++++++++++---------- server/src/path.c | 26 +++++--- 8 files changed, 409 insertions(+), 125 deletions(-) diff --git a/server/include/cfg.h b/server/include/cfg.h index 94aca7f22..5e93c42aa 100644 --- a/server/include/cfg.h +++ b/server/include/cfg.h @@ -18,6 +18,9 @@ struct node; struct path; struct interface; +struct socket; +struct opal; +struct gtfpga; struct netem; /** Global configuration */ @@ -78,6 +81,30 @@ int config_parse_path(config_setting_t *cfg, int config_parse_node(config_setting_t *cfg, struct node **nodes); +/** Parse node connection details for OPAL type + * + * @param cfg A libconfig object pointing to the node + * @retval 0 Success. Everything went well. + * @retval <0 Error. Something went wrong. + */ +int config_parse_opal(config_setting_t *cfg, struct node *n); + +/** Parse node connection details for GTFPGA type + * + * @param cfg A libconfig object pointing to the node + * @retval 0 Success. Everything went well. + * @retval <0 Error. Something went wrong. + */ +int config_parse_gtfpga(config_setting_t *cfg, struct node *n); + +/** Parse node connection details for SOCKET type + * + * @param cfg A libconfig object pointing to the node + * @retval 0 Success. Everything went well. + * @retval <0 Error. Something went wrong. + */ +int config_parse_socket(config_setting_t *cfg, struct node *n); + /** Parse network emulator (netem) settings. * * @param cfg A libconfig object containing the settings diff --git a/server/include/if.h b/server/include/if.h index 9e8162de0..a236e6bd3 100644 --- a/server/include/if.h +++ b/server/include/if.h @@ -17,6 +17,8 @@ #define IF_NAME_MAX IFNAMSIZ /**< Maximum length of an interface name */ #define IF_IRQ_MAX 3 /**< Maxmimal number of IRQs of an interface */ +struct socket; + /** Interface data structure */ struct interface { /** The index used by the kernel to reference this interface */ @@ -28,20 +30,57 @@ struct interface { /** List of IRQs of the NIC */ char irqs[IF_IRQ_MAX]; + /** Linked list of associated sockets */ + struct socket *sockets; /** Linked list pointer */ struct interface *next; }; +/** Add a new interface to the global list and lookup name, irqs... + * + * @param index The interface index of the OS + * @retval >0 Success. A pointer to the new interface. + * @retval 0 Error. The creation failed. + */ +struct interface * if_create(int index); + +/** Start interface. + * + * This setups traffic controls queue discs, network emulation and + * maps interface IRQs according to affinity. + * + * @param i A pointer to the interface structure. + * @param affinity Set the IRQ affinity of this interface. + * @retval 0 Success. Everything went well. + * @retval <0 Error. Something went wrong. + */ +int if_start(struct interface *i, int affinity); + +/** Stop interface + * + * This resets traffic qdiscs ant network emulation + * and maps interface IRQs to all CPUs. + * + * @param i A pointer to the interface structure. + * @retval 0 Success. Everything went well. + * @retval <0 Error. Something went wrong. + */ +int if_stop(struct interface *i); + /** Get outgoing interface. * - * Does a lookup in the kernel routing table to determine - * the interface which sends the data to a certain socket - * address. + * Depending on the address family of the socker address, + * this function tries to determine outgoing interface + * which is used to send packages to a remote host with the specified + * socket address. + * + * For AF_INET the fnuction performs a lookup in the kernel routing table. + * For AF_PACKET the function uses the existing sll_ifindex field of the socket address. * * @param sa A destination address for outgoing packets. * @return The interface index. */ -int if_getegress(struct sockaddr_in *sa); +int if_getegress(struct sockaddr *sa); /** Get all IRQs for this interface. * @@ -70,6 +109,6 @@ int if_setaffinity(struct interface *i, int affinity); * @retval NULL if no interface with index was found. * @retval >0 Success. A pointer to the interface. */ -struct interface* if_lookup_index(int index, struct interface *interfaces); +struct interface * if_lookup_index(int index); #endif /* _IF_H_ */ diff --git a/server/include/node.h b/server/include/node.h index 7ec67ce7d..fe31fdf2e 100644 --- a/server/include/node.h +++ b/server/include/node.h @@ -23,35 +23,61 @@ /** Static node initialization */ #define NODE_INIT(n) { \ - .sd = -1, \ .name = n \ } -/** The datastructure for a node. +/* Helper macros for virtual node type */ +#define node_type(n) ((n)->vt->type) +#define node_print(n, b, l) ((n)->vt->print(n, b, l)) +#define node_read(n, m) ((n)->vt->read(n, m)) +#define node_write(n, m) ((n)->vt->write(n, m)) + +/** Node type: layer, protocol, listen/connect */ +enum node_type { + IEEE_802_3, /* BSD socket: AF_PACKET SOCK_DGRAM */ + IP, /* BSD socket: AF_INET SOCK_RAW */ + UDP, /* BSD socket: AF_INET SOCK_DGRAM */ + TCPD, /* BSD socket: AF_INET SOCK_STREAM bind + listen + accept */ + TCP, /* BSD socket: AF_INET SOCK_STREAM bind + connect */ +// OPAL_ASYNC, /* OPAL-RT AsyncApi */ +// GTFPGA, /* Xilinx ML507 GTFPGA card */ + INVALID +}; + +/** C++ like vtable construct for socket_types */ +struct node_vtable { + enum node_type type; + const char *name; + + int (*parse)(config_setting_t *cfg, struct node *n); + int (*print)(struct node *n, char *buf, int len); + + int (*open)(struct node *n); + int (*close)(struct node *n); + int (*read)(struct node *n, struct msg *m); + int (*write)(struct node *n, struct msg *m); +}; + +/** The data structure for a node. * * Every entity which exchanges messages is represented by a node. + * Nodes can be remote machines and simulators or locally running processes. */ struct node { - /** The socket descriptor */ - int sd; /** How many paths are sending / receiving from this node? */ int refcnt; - /** Socket mark for netem, routing and filtering */ - int mark; - /** A short identifier of the node, only used for configuration and logging */ const char *name; - /** Local address of the socket */ - struct sockaddr_in local; - /** Remote address of the socket */ - struct sockaddr_in remote; - - /** The egress interface */ - struct interface *interface; - /** Network emulator settings */ - struct netem *netem; + /** C++ like virtual function call table */ + struct node_vtable const *vt; + /** Virtual data (used by vtable functions) */ + union { + struct socket *socket; + struct opal *opal; + struct gtfpga *gtfpga; + }; /** A pointer to the libconfig object which instantiated this node */ config_setting_t *cfg; @@ -62,19 +88,40 @@ struct node /** Connect and bind the UDP socket of this node. * - * @param n A pointer to the node structure - * @retval 0 Success. Everything went well. - * @retval <0 Error. Something went wrong. - */ -int node_connect(struct node *n); - -/** Disconnect the UDP socket of this node. + * Depending on the type (vtable) of this node, + * a socket is opened, shmem region registred... * * @param n A pointer to the node structure * @retval 0 Success. Everything went well. * @retval <0 Error. Something went wrong. */ -int node_disconnect(struct node *n); +int node_start(struct node *n); + +/** Deferred TCP connection setup + * + * @todo Dirty hack! + * We should check the address of the connecting node! + * We should preserve the original socket for proper shutdown. + */ +int node_start_defer(struct node *n); + +/** Stops a node. + * + * Depending on the type (vtable) of this node, + * a socket is closed, shmem region released... + * + * @param n A pointer to the node structure + * @retval 0 Success. Everything went well. + * @retval <0 Error. Something went wrong. + */ +int node_stop(struct node *n); + +/** Lookup string representation of socket type + * + * @param type A string describing the socket type. This must be one of: tcp, tcpd, udp, ip, ieee802.3 + * @return An enumeration value or INVALID (0) + */ +struct node_vtable const * node_lookup_vtable(const char *str); /** Search list of nodes for a name. * diff --git a/server/include/utils.h b/server/include/utils.h index 9820a4bc7..5763086e4 100644 --- a/server/include/utils.h +++ b/server/include/utils.h @@ -9,6 +9,7 @@ #define _UTILS_H_ #include +#include #include #include @@ -35,6 +36,8 @@ #define RIGHT(n) "\e[" ## n ## "C" #define LEFT(n) "\e[" ## n ## "D" +#define ARRAY_LEN(a) ( sizeof a / sizeof a[0] ) + /** The log level which is passed as first argument to print() */ enum log_level { DEBUG, INFO, WARN, ERROR }; @@ -83,6 +86,9 @@ void hist_plot(unsigned *hist, int length); /** Dump histogram data in Matlab format */ void hist_dump(unsigned *hist, int length); +/** A system(2) emulator with popen/pclose(2) and proper output handling */ +int system2(const char* cmd, ...); + /** Append an element to a single linked list */ #define list_add(list, elm) do { \ elm->next = list; \ diff --git a/server/src/cfg.c b/server/src/cfg.c index 47fc3afe2..de173b4a4 100644 --- a/server/src/cfg.c +++ b/server/src/cfg.c @@ -25,39 +25,44 @@ int config_parse(const char *filename, config_t *cfg, struct settings *set, { config_set_auto_convert(cfg, 1); - if (!config_read_file(cfg, filename)) { + if (!config_read_file(cfg, filename)) error("Failed to parse configuration: %s in %s:%d", config_error_text(cfg), filename, config_error_line(cfg) ); - } - /* Get and check config sections */ config_setting_t *cfg_root = config_root_setting(cfg); - if (!cfg_root || !config_setting_is_group(cfg_root)) - error("Missing global section in config file: %s", filename); - - config_setting_t *cfg_nodes = config_setting_get_member(cfg_root, "nodes"); - if (!cfg_nodes || !config_setting_is_group(cfg_nodes)) - error("Missing node section in config file: %s", filename); - - config_setting_t *cfg_paths = config_setting_get_member(cfg_root, "paths"); - if (!cfg_paths || !config_setting_is_list(cfg_paths)) - error("Missing path section in config file: %s", filename); /* Parse global settings */ - config_parse_global(cfg_root, set); + if (set) { + if (!cfg_root || !config_setting_is_group(cfg_root)) + error("Missing global section in config file: %s", filename); + config_parse_global(cfg_root, set); + } + /* Parse nodes */ - for (int i = 0; i < config_setting_length(cfg_nodes); i++) { - config_setting_t *cfg_node = config_setting_get_elem(cfg_nodes, i); - config_parse_node(cfg_node, nodes); + if (nodes) { + config_setting_t *cfg_nodes = config_setting_get_member(cfg_root, "nodes"); + if (!cfg_nodes || !config_setting_is_group(cfg_nodes)) + error("Missing node section in config file: %s", filename); + + for (int i = 0; i < config_setting_length(cfg_nodes); i++) { + config_setting_t *cfg_node = config_setting_get_elem(cfg_nodes, i); + config_parse_node(cfg_node, nodes); + } } /* Parse paths */ - for (int i = 0; i < config_setting_length(cfg_paths); i++) { - config_setting_t *cfg_path = config_setting_get_elem(cfg_paths, i); - config_parse_path(cfg_path, paths, nodes); + if (paths) { + config_setting_t *cfg_paths = config_setting_get_member(cfg_root, "paths"); + if (!cfg_paths || !config_setting_is_list(cfg_paths)) + error("Missing path section in config file: %s", filename); + + for (int i = 0; i < config_setting_length(cfg_paths); i++) { + config_setting_t *cfg_path = config_setting_get_elem(cfg_paths, i); + config_parse_path(cfg_path, paths, nodes); + } } return CONFIG_TRUE; @@ -70,7 +75,7 @@ int config_parse_global(config_setting_t *cfg, struct settings *set) config_setting_lookup_int(cfg, "debug", &set->debug); config_setting_lookup_float(cfg, "stats", &set->stats); - debug = set->debug; + _debug = set->debug; set->cfg = cfg; @@ -142,8 +147,8 @@ int config_parse_path(config_setting_t *cfg, } } else { + warn("Path '%s' => '%s' is not enabled", p->in->name, p->out->name); free(p); - warn(" Path is not enabled"); } return 0; @@ -151,7 +156,7 @@ int config_parse_path(config_setting_t *cfg, int config_parse_node(config_setting_t *cfg, struct node **nodes) { - const char *remote, *local; + const char *type; int ret; /* Allocate memory */ @@ -162,10 +167,24 @@ int config_parse_node(config_setting_t *cfg, struct node **nodes) memset(n, 0, sizeof(struct node)); /* Required settings */ + n->cfg = cfg; n->name = config_setting_name(cfg); if (!n->name) cerror(cfg, "Missing node name"); + if (!config_setting_lookup_string(cfg, "type", &type)) + cerror(cfg, "Missing node type"); + + n->vt = node_lookup_vtable(type); + if (!n->vt) + cerror(cfg, "Invalid node type"); + + ret = n->vt->parse(cfg, n); + + list_add(*nodes, n); + + return ret; +} /** @todo Implement */ int config_parse_opal(config_setting_t *cfg, struct node *n) @@ -235,7 +254,7 @@ int config_parse_netem(config_setting_t *cfg, struct netem *em) if (config_setting_lookup_int(cfg, "corrupt", &em->corrupt)) em->valid |= TC_NETEM_CORRUPT; - /** @todo Check netem config values */ + /** @todo Validate netem config values */ return CONFIG_TRUE; } diff --git a/server/src/if.c b/server/src/if.c index f9909c375..aaba33ea7 100644 --- a/server/src/if.c +++ b/server/src/if.c @@ -15,56 +15,138 @@ #include #include #include +#include #include "if.h" +#include "tc.h" +#include "socket.h" #include "utils.h" -int if_getegress(struct sockaddr_in *sa) -{ - char cmd[128]; - char token[32]; +/** Linked list of interfaces */ +struct interface *interfaces; - snprintf(cmd, sizeof(cmd), "ip route get %s", inet_ntoa(sa->sin_addr)); +struct interface * if_create(int index) { + struct interface *i = malloc(sizeof(struct interface)); + if (!i) + error("Failed to allocate memory for interface"); + else + memset(i, 0, sizeof(struct interface)); - debug(8, "System: %s", cmd); + i->index = index; + if_indextoname(index, i->name); - FILE *p = popen(cmd, "r"); - if (!p) + debug(3, "Created interface '%s'", i->name, i->index, i->refcnt); + + list_add(interfaces, i); + + return i; +} + +int if_start(struct interface *i, int affinity) +{ INDENT + if (!i->refcnt) { + warn("Interface '%s' is not used by an active node", i->name); return -1; - - while (!feof(p) && fscanf(p, "%31s", token)) { - if (!strcmp(token, "dev")) { - fscanf(p, "%31s", token); - break; + } + else + info("Starting interface '%s'", i->name); + + { INDENT + int mark = 0; + for (struct socket *s = i->sockets; s; s = s->next) { + if (s->netem) { + s->mark = 1 + mark++; + + /* Set fwmark for outgoing packets */ + if (setsockopt(s->sd, SOL_SOCKET, SO_MARK, &s->mark, sizeof(s->mark))) + perror("Failed to set fwmark for outgoing packets"); + else + debug(4, "Set fwmark for socket->sd = %u to %u", s->sd, s->mark); + + tc_mark(i, TC_HDL(4000, s->mark), s->mark); + tc_netem(i, TC_HDL(4000, s->mark), s->netem); + } } + + /* Create priority qdisc */ + if (mark) + tc_prio(i, TC_HDL(4000, 0), mark); + + /* Set affinity for network interfaces (skip _loopback_ dev) */ + if_getirqs(i); + if_setaffinity(i, affinity); } - return (WEXITSTATUS(fclose(p))) ? -1 : if_nametoindex(token); + return 0; +} + +int if_stop(struct interface *i) +{ INDENT + info("Stopping interface '%s'", i->name); + + { INDENT + if_setaffinity(i, -1L); + tc_reset(i); + } + + return 0; +} + +int if_getegress(struct sockaddr *sa) +{ + switch (sa->sa_family) { + case AF_INET: { + struct sockaddr_in *sin = (struct sockaddr_in *) sa; + char cmd[128]; + char token[32]; + + snprintf(cmd, sizeof(cmd), "ip route get %s", inet_ntoa(sin->sin_addr)); + + debug(8, "System: %s", cmd); + + FILE *p = popen(cmd, "r"); + if (!p) + return -1; + + while (!feof(p) && fscanf(p, "%31s", token)) { + if (!strcmp(token, "dev")) { + fscanf(p, "%31s", token); + break; + } + } + + return (WEXITSTATUS(fclose(p))) ? -1 : if_nametoindex(token); + } + + case AF_PACKET: { + struct sockaddr_ll *sll = (struct sockaddr_ll *) sa; + return sll->sll_ifindex; + } + + default: + return -1; + } } int if_getirqs(struct interface *i) { char dirname[NAME_MAX]; + int irq, n = 0; snprintf(dirname, sizeof(dirname), "/sys/class/net/%s/device/msi_irqs/", i->name); DIR *dir = opendir(dirname); - if (!dir) { - warn("Cannot open IRQs for interface '%s'", i->name); - return -ENOENT; + if (dir) { + memset(&i->irqs, 0, sizeof(char) * IF_IRQ_MAX); + + struct dirent *entry; + while ((entry = readdir(dir)) && n < IF_IRQ_MAX) { + if ((irq = atoi(entry->d_name))) + i->irqs[n++] = irq; + } + + closedir(dir); } - memset(&i->irqs, 0, sizeof(char) * IF_IRQ_MAX); - - int irq, n = 0; - struct dirent *entry; - while ((entry = readdir(dir)) && n < IF_IRQ_MAX) { - if ((irq = atoi(entry->d_name))) - i->irqs[n++] = irq; - } - - debug(7, "Found %u interrupts for interface '%s'", n, i->name); - - closedir(dir); return 0; } @@ -91,7 +173,7 @@ int if_setaffinity(struct interface *i, int affinity) return 0; } -struct interface* if_lookup_index(int index, struct interface *interfaces) +struct interface * if_lookup_index(int index) { for (struct interface *i = interfaces; i; i = i->next) { if (i->index == index) { @@ -101,3 +183,4 @@ struct interface* if_lookup_index(int index, struct interface *interfaces) return NULL; } + diff --git a/server/src/node.c b/server/src/node.c index b9b95461f..e3aa6789b 100644 --- a/server/src/node.c +++ b/server/src/node.c @@ -4,53 +4,106 @@ * @copyright 2014, Institute for Automation of Complex Power Systems, EONERC */ -#include #include -#include -#include -#include "config.h" +#include "node.h" #include "cfg.h" #include "utils.h" -#include "msg.h" -#include "node.h" -#include "if.h" -int node_connect(struct node *n) -{ - /* Create socket */ - n->sd = socket(AF_INET, SOCK_DGRAM, 0); - if (n->sd < 0) - perror("Failed to create socket"); +/* Node types */ +#include "socket.h" +#include "gtfpga.h" +#include "opal.h" - /* Bind socket for receiving */ - if (bind(n->sd, (struct sockaddr *) &n->local, sizeof(struct sockaddr_in))) - perror("Failed to bind to socket"); +#define VTABLE(type, name, fnc) { type, name, config_parse_ ## fnc, \ + fnc ## _print, \ + fnc ## _open, \ + fnc ## _close, \ + fnc ## _read, \ + fnc ## _write } - debug(1, " We listen for node '%s' at %s:%u", - n->name, inet_ntoa(n->local.sin_addr), - ntohs(n->local.sin_port)); - debug(1, " We sent to node '%s' at %s:%u", - n->name, inet_ntoa(n->remote.sin_addr), - ntohs(n->remote.sin_port)); +/** Vtable for virtual node sub types */ +static const struct node_vtable vtables[] = { + VTABLE(IEEE_802_3, "ieee802.3", socket), + VTABLE(IP, "ip", socket), + VTABLE(UDP, "udp", socket), + VTABLE(TCP, "tcp", socket), + VTABLE(TCPD, "tcpd", socket), + //VTABLE(OPAL, "opal", opal ), + //VTABLE(GTFPGA, "gtfpga", gtfpga), +}; - return 0; -} +/** Linked list of nodes */ +struct node *nodes; -int node_disconnect(struct node *n) -{ - close(n->sd); - - return 0; -} - -struct node* node_lookup_name(const char *str, struct node *nodes) +struct node * node_lookup_name(const char *str, struct node *nodes) { for (struct node *n = nodes; n; n = n->next) { - if (!strcmp(str, n->name)) { + if (!strcmp(str, n->name)) return n; - } } return NULL; } + +struct node_vtable const * node_lookup_vtable(const char *str) +{ + for (int i = 0; i < ARRAY_LEN(vtables); i++) { + if (!strcmp(vtables[i].name, str)) + return &vtables[i]; + } + + return NULL; +} + +int node_start(struct node *n) +{ + int ret; + + char str[256]; + node_print(n, str, sizeof(str)); + + debug(1, "Starting node '%s' of type '%s' (%s)", n->name, n->vt->name, str); + + { INDENT + if (!n->refcnt) + warn("Node '%s' is not used by an active path", n->name); + + ret = n->vt->open(n); + } + + return ret; +} + +int node_start_defer(struct node *n) +{ + switch (node_type(n)) { + case TCPD: + info("Wait for incoming TCP connection from node '%s'...", n->name); + listen(n->socket->sd, 1); + n->socket->sd = accept(n->socket->sd, NULL, NULL); + break; + + case TCP: + info("Connect with TCP to remote node '%s'", n->name); + connect(n->socket->sd, (struct sockaddr *) &n->socket->remote, sizeof(n->socket->remote)); + break; + + default: + break; + } + + return 0; +} + +int node_stop(struct node *n) +{ + int ret; + info("Stopping node '%s'", n->name); + + { INDENT + ret = n->vt->close(n); + } + + return ret; +} diff --git a/server/src/path.c b/server/src/path.c index df530be73..e5ee226eb 100644 --- a/server/src/path.c +++ b/server/src/path.c @@ -13,12 +13,14 @@ #include -#include "cfg.h" #include "utils.h" #include "path.h" #define sigev_notify_thread_id _sigev_un._tid +/** Linked list of paths */ +struct path *paths; + /** Send messages */ static void * path_send(void *arg) { @@ -50,9 +52,9 @@ static void * path_send(void *arg) perror("Failed to start timer"); while (1) { - sigwait(&set, &sig); + sigwait(&set, &sig); /* blocking wait for next timer tick */ if (p->last) { - msg_send(p->last, p->out); + node_write(p->out, p->last); p->last = NULL; p->sent++; } @@ -66,13 +68,17 @@ static void * path_run(void *arg) { struct path *p = (struct path *) arg; struct msg *m = malloc(sizeof(struct msg)); - if (!m) error("Failed to allocate memory for message!"); + + /* Open deferred TCP connection */ + node_start_defer(p->in); + node_start_defer(p->out); /* Main thread loop */ while (1) { - msg_recv(m, p->in); /* Receive message */ + node_read(p->in, m); /* Receive message */ + p->received++; /* Check header fields */ @@ -126,7 +132,7 @@ static void * path_run(void *arg) /* At fixed rate mode, messages are send by another thread */ if (!p->rate) { - msg_send(m, p->out); + node_write(p->out, m); /* Send message */ p->sent++; } } @@ -137,7 +143,9 @@ static void * path_run(void *arg) } int path_start(struct path *p) -{ +{ INDENT + info("Starting path: %12s " GRN("=>") " %-12s", p->in->name, p->out->name); + /* At fixed rate mode, we start another thread for sending */ if (p->rate) pthread_create(&p->sent_tid, NULL, &path_send, (void *) p); @@ -146,7 +154,9 @@ int path_start(struct path *p) } int path_stop(struct path *p) -{ +{ INDENT + info("Stopping path: %12s " RED("=>") " %-12s", p->in->name, p->out->name); + pthread_cancel(p->recv_tid); pthread_join(p->recv_tid, NULL);