1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

final commit for vtable based nodes

This commit is contained in:
Steffen Vogel 2014-12-05 12:39:52 +01:00
parent c552ad9caa
commit 4e911be070
8 changed files with 409 additions and 125 deletions

View file

@ -18,6 +18,9 @@ struct node;
struct path;
struct interface;
struct socket;
struct opal;
struct gtfpga;
struct netem;
/** Global configuration */
@ -78,6 +81,30 @@ int config_parse_path(config_setting_t *cfg,
int config_parse_node(config_setting_t *cfg,
struct node **nodes);
/** Parse node connection details for OPAL type
*
* @param cfg A libconfig object pointing to the node
* @retval 0 Success. Everything went well.
* @retval <0 Error. Something went wrong.
*/
int config_parse_opal(config_setting_t *cfg, struct node *n);
/** Parse node connection details for GTFPGA type
*
* @param cfg A libconfig object pointing to the node
* @retval 0 Success. Everything went well.
* @retval <0 Error. Something went wrong.
*/
int config_parse_gtfpga(config_setting_t *cfg, struct node *n);
/** Parse node connection details for SOCKET type
*
* @param cfg A libconfig object pointing to the node
* @retval 0 Success. Everything went well.
* @retval <0 Error. Something went wrong.
*/
int config_parse_socket(config_setting_t *cfg, struct node *n);
/** Parse network emulator (netem) settings.
*
* @param cfg A libconfig object containing the settings

View file

@ -17,6 +17,8 @@
#define IF_NAME_MAX IFNAMSIZ /**< Maximum length of an interface name */
#define IF_IRQ_MAX 3 /**< Maxmimal number of IRQs of an interface */
struct socket;
/** Interface data structure */
struct interface {
/** The index used by the kernel to reference this interface */
@ -28,20 +30,57 @@ struct interface {
/** List of IRQs of the NIC */
char irqs[IF_IRQ_MAX];
/** Linked list of associated sockets */
struct socket *sockets;
/** Linked list pointer */
struct interface *next;
};
/** Add a new interface to the global list and lookup name, irqs...
*
* @param index The interface index of the OS
* @retval >0 Success. A pointer to the new interface.
* @retval 0 Error. The creation failed.
*/
struct interface * if_create(int index);
/** Start interface.
*
* This setups traffic controls queue discs, network emulation and
* maps interface IRQs according to affinity.
*
* @param i A pointer to the interface structure.
* @param affinity Set the IRQ affinity of this interface.
* @retval 0 Success. Everything went well.
* @retval <0 Error. Something went wrong.
*/
int if_start(struct interface *i, int affinity);
/** Stop interface
*
* This resets traffic qdiscs ant network emulation
* and maps interface IRQs to all CPUs.
*
* @param i A pointer to the interface structure.
* @retval 0 Success. Everything went well.
* @retval <0 Error. Something went wrong.
*/
int if_stop(struct interface *i);
/** Get outgoing interface.
*
* Does a lookup in the kernel routing table to determine
* the interface which sends the data to a certain socket
* address.
* Depending on the address family of the socker address,
* this function tries to determine outgoing interface
* which is used to send packages to a remote host with the specified
* socket address.
*
* For AF_INET the fnuction performs a lookup in the kernel routing table.
* For AF_PACKET the function uses the existing sll_ifindex field of the socket address.
*
* @param sa A destination address for outgoing packets.
* @return The interface index.
*/
int if_getegress(struct sockaddr_in *sa);
int if_getegress(struct sockaddr *sa);
/** Get all IRQs for this interface.
*
@ -70,6 +109,6 @@ int if_setaffinity(struct interface *i, int affinity);
* @retval NULL if no interface with index was found.
* @retval >0 Success. A pointer to the interface.
*/
struct interface* if_lookup_index(int index, struct interface *interfaces);
struct interface * if_lookup_index(int index);
#endif /* _IF_H_ */

View file

@ -23,35 +23,61 @@
/** Static node initialization */
#define NODE_INIT(n) { \
.sd = -1, \
.name = n \
}
/** The datastructure for a node.
/* Helper macros for virtual node type */
#define node_type(n) ((n)->vt->type)
#define node_print(n, b, l) ((n)->vt->print(n, b, l))
#define node_read(n, m) ((n)->vt->read(n, m))
#define node_write(n, m) ((n)->vt->write(n, m))
/** Node type: layer, protocol, listen/connect */
enum node_type {
IEEE_802_3, /* BSD socket: AF_PACKET SOCK_DGRAM */
IP, /* BSD socket: AF_INET SOCK_RAW */
UDP, /* BSD socket: AF_INET SOCK_DGRAM */
TCPD, /* BSD socket: AF_INET SOCK_STREAM bind + listen + accept */
TCP, /* BSD socket: AF_INET SOCK_STREAM bind + connect */
// OPAL_ASYNC, /* OPAL-RT AsyncApi */
// GTFPGA, /* Xilinx ML507 GTFPGA card */
INVALID
};
/** C++ like vtable construct for socket_types */
struct node_vtable {
enum node_type type;
const char *name;
int (*parse)(config_setting_t *cfg, struct node *n);
int (*print)(struct node *n, char *buf, int len);
int (*open)(struct node *n);
int (*close)(struct node *n);
int (*read)(struct node *n, struct msg *m);
int (*write)(struct node *n, struct msg *m);
};
/** The data structure for a node.
*
* Every entity which exchanges messages is represented by a node.
* Nodes can be remote machines and simulators or locally running processes.
*/
struct node
{
/** The socket descriptor */
int sd;
/** How many paths are sending / receiving from this node? */
int refcnt;
/** Socket mark for netem, routing and filtering */
int mark;
/** A short identifier of the node, only used for configuration and logging */
const char *name;
/** Local address of the socket */
struct sockaddr_in local;
/** Remote address of the socket */
struct sockaddr_in remote;
/** The egress interface */
struct interface *interface;
/** Network emulator settings */
struct netem *netem;
/** C++ like virtual function call table */
struct node_vtable const *vt;
/** Virtual data (used by vtable functions) */
union {
struct socket *socket;
struct opal *opal;
struct gtfpga *gtfpga;
};
/** A pointer to the libconfig object which instantiated this node */
config_setting_t *cfg;
@ -62,19 +88,40 @@ struct node
/** Connect and bind the UDP socket of this node.
*
* @param n A pointer to the node structure
* @retval 0 Success. Everything went well.
* @retval <0 Error. Something went wrong.
*/
int node_connect(struct node *n);
/** Disconnect the UDP socket of this node.
* Depending on the type (vtable) of this node,
* a socket is opened, shmem region registred...
*
* @param n A pointer to the node structure
* @retval 0 Success. Everything went well.
* @retval <0 Error. Something went wrong.
*/
int node_disconnect(struct node *n);
int node_start(struct node *n);
/** Deferred TCP connection setup
*
* @todo Dirty hack!
* We should check the address of the connecting node!
* We should preserve the original socket for proper shutdown.
*/
int node_start_defer(struct node *n);
/** Stops a node.
*
* Depending on the type (vtable) of this node,
* a socket is closed, shmem region released...
*
* @param n A pointer to the node structure
* @retval 0 Success. Everything went well.
* @retval <0 Error. Something went wrong.
*/
int node_stop(struct node *n);
/** Lookup string representation of socket type
*
* @param type A string describing the socket type. This must be one of: tcp, tcpd, udp, ip, ieee802.3
* @return An enumeration value or INVALID (0)
*/
struct node_vtable const * node_lookup_vtable(const char *str);
/** Search list of nodes for a name.
*

View file

@ -9,6 +9,7 @@
#define _UTILS_H_
#include <stdlib.h>
#include <stdarg.h>
#include <errno.h>
#include <sched.h>
@ -35,6 +36,8 @@
#define RIGHT(n) "\e[" ## n ## "C"
#define LEFT(n) "\e[" ## n ## "D"
#define ARRAY_LEN(a) ( sizeof a / sizeof a[0] )
/** The log level which is passed as first argument to print() */
enum log_level { DEBUG, INFO, WARN, ERROR };
@ -83,6 +86,9 @@ void hist_plot(unsigned *hist, int length);
/** Dump histogram data in Matlab format */
void hist_dump(unsigned *hist, int length);
/** A system(2) emulator with popen/pclose(2) and proper output handling */
int system2(const char* cmd, ...);
/** Append an element to a single linked list */
#define list_add(list, elm) do { \
elm->next = list; \

View file

@ -25,39 +25,44 @@ int config_parse(const char *filename, config_t *cfg, struct settings *set,
{
config_set_auto_convert(cfg, 1);
if (!config_read_file(cfg, filename)) {
if (!config_read_file(cfg, filename))
error("Failed to parse configuration: %s in %s:%d",
config_error_text(cfg), filename,
config_error_line(cfg)
);
}
/* Get and check config sections */
config_setting_t *cfg_root = config_root_setting(cfg);
if (!cfg_root || !config_setting_is_group(cfg_root))
error("Missing global section in config file: %s", filename);
config_setting_t *cfg_nodes = config_setting_get_member(cfg_root, "nodes");
if (!cfg_nodes || !config_setting_is_group(cfg_nodes))
error("Missing node section in config file: %s", filename);
config_setting_t *cfg_paths = config_setting_get_member(cfg_root, "paths");
if (!cfg_paths || !config_setting_is_list(cfg_paths))
error("Missing path section in config file: %s", filename);
/* Parse global settings */
config_parse_global(cfg_root, set);
if (set) {
if (!cfg_root || !config_setting_is_group(cfg_root))
error("Missing global section in config file: %s", filename);
config_parse_global(cfg_root, set);
}
/* Parse nodes */
for (int i = 0; i < config_setting_length(cfg_nodes); i++) {
config_setting_t *cfg_node = config_setting_get_elem(cfg_nodes, i);
config_parse_node(cfg_node, nodes);
if (nodes) {
config_setting_t *cfg_nodes = config_setting_get_member(cfg_root, "nodes");
if (!cfg_nodes || !config_setting_is_group(cfg_nodes))
error("Missing node section in config file: %s", filename);
for (int i = 0; i < config_setting_length(cfg_nodes); i++) {
config_setting_t *cfg_node = config_setting_get_elem(cfg_nodes, i);
config_parse_node(cfg_node, nodes);
}
}
/* Parse paths */
for (int i = 0; i < config_setting_length(cfg_paths); i++) {
config_setting_t *cfg_path = config_setting_get_elem(cfg_paths, i);
config_parse_path(cfg_path, paths, nodes);
if (paths) {
config_setting_t *cfg_paths = config_setting_get_member(cfg_root, "paths");
if (!cfg_paths || !config_setting_is_list(cfg_paths))
error("Missing path section in config file: %s", filename);
for (int i = 0; i < config_setting_length(cfg_paths); i++) {
config_setting_t *cfg_path = config_setting_get_elem(cfg_paths, i);
config_parse_path(cfg_path, paths, nodes);
}
}
return CONFIG_TRUE;
@ -70,7 +75,7 @@ int config_parse_global(config_setting_t *cfg, struct settings *set)
config_setting_lookup_int(cfg, "debug", &set->debug);
config_setting_lookup_float(cfg, "stats", &set->stats);
debug = set->debug;
_debug = set->debug;
set->cfg = cfg;
@ -142,8 +147,8 @@ int config_parse_path(config_setting_t *cfg,
}
}
else {
warn("Path '%s' => '%s' is not enabled", p->in->name, p->out->name);
free(p);
warn(" Path is not enabled");
}
return 0;
@ -151,7 +156,7 @@ int config_parse_path(config_setting_t *cfg,
int config_parse_node(config_setting_t *cfg, struct node **nodes)
{
const char *remote, *local;
const char *type;
int ret;
/* Allocate memory */
@ -162,10 +167,24 @@ int config_parse_node(config_setting_t *cfg, struct node **nodes)
memset(n, 0, sizeof(struct node));
/* Required settings */
n->cfg = cfg;
n->name = config_setting_name(cfg);
if (!n->name)
cerror(cfg, "Missing node name");
if (!config_setting_lookup_string(cfg, "type", &type))
cerror(cfg, "Missing node type");
n->vt = node_lookup_vtable(type);
if (!n->vt)
cerror(cfg, "Invalid node type");
ret = n->vt->parse(cfg, n);
list_add(*nodes, n);
return ret;
}
/** @todo Implement */
int config_parse_opal(config_setting_t *cfg, struct node *n)
@ -235,7 +254,7 @@ int config_parse_netem(config_setting_t *cfg, struct netem *em)
if (config_setting_lookup_int(cfg, "corrupt", &em->corrupt))
em->valid |= TC_NETEM_CORRUPT;
/** @todo Check netem config values */
/** @todo Validate netem config values */
return CONFIG_TRUE;
}

View file

@ -15,56 +15,138 @@
#include <netinet/ip.h>
#include <arpa/inet.h>
#include <net/if.h>
#include <linux/if_packet.h>
#include "if.h"
#include "tc.h"
#include "socket.h"
#include "utils.h"
int if_getegress(struct sockaddr_in *sa)
{
char cmd[128];
char token[32];
/** Linked list of interfaces */
struct interface *interfaces;
snprintf(cmd, sizeof(cmd), "ip route get %s", inet_ntoa(sa->sin_addr));
struct interface * if_create(int index) {
struct interface *i = malloc(sizeof(struct interface));
if (!i)
error("Failed to allocate memory for interface");
else
memset(i, 0, sizeof(struct interface));
debug(8, "System: %s", cmd);
i->index = index;
if_indextoname(index, i->name);
FILE *p = popen(cmd, "r");
if (!p)
debug(3, "Created interface '%s'", i->name, i->index, i->refcnt);
list_add(interfaces, i);
return i;
}
int if_start(struct interface *i, int affinity)
{ INDENT
if (!i->refcnt) {
warn("Interface '%s' is not used by an active node", i->name);
return -1;
while (!feof(p) && fscanf(p, "%31s", token)) {
if (!strcmp(token, "dev")) {
fscanf(p, "%31s", token);
break;
}
else
info("Starting interface '%s'", i->name);
{ INDENT
int mark = 0;
for (struct socket *s = i->sockets; s; s = s->next) {
if (s->netem) {
s->mark = 1 + mark++;
/* Set fwmark for outgoing packets */
if (setsockopt(s->sd, SOL_SOCKET, SO_MARK, &s->mark, sizeof(s->mark)))
perror("Failed to set fwmark for outgoing packets");
else
debug(4, "Set fwmark for socket->sd = %u to %u", s->sd, s->mark);
tc_mark(i, TC_HDL(4000, s->mark), s->mark);
tc_netem(i, TC_HDL(4000, s->mark), s->netem);
}
}
/* Create priority qdisc */
if (mark)
tc_prio(i, TC_HDL(4000, 0), mark);
/* Set affinity for network interfaces (skip _loopback_ dev) */
if_getirqs(i);
if_setaffinity(i, affinity);
}
return (WEXITSTATUS(fclose(p))) ? -1 : if_nametoindex(token);
return 0;
}
int if_stop(struct interface *i)
{ INDENT
info("Stopping interface '%s'", i->name);
{ INDENT
if_setaffinity(i, -1L);
tc_reset(i);
}
return 0;
}
int if_getegress(struct sockaddr *sa)
{
switch (sa->sa_family) {
case AF_INET: {
struct sockaddr_in *sin = (struct sockaddr_in *) sa;
char cmd[128];
char token[32];
snprintf(cmd, sizeof(cmd), "ip route get %s", inet_ntoa(sin->sin_addr));
debug(8, "System: %s", cmd);
FILE *p = popen(cmd, "r");
if (!p)
return -1;
while (!feof(p) && fscanf(p, "%31s", token)) {
if (!strcmp(token, "dev")) {
fscanf(p, "%31s", token);
break;
}
}
return (WEXITSTATUS(fclose(p))) ? -1 : if_nametoindex(token);
}
case AF_PACKET: {
struct sockaddr_ll *sll = (struct sockaddr_ll *) sa;
return sll->sll_ifindex;
}
default:
return -1;
}
}
int if_getirqs(struct interface *i)
{
char dirname[NAME_MAX];
int irq, n = 0;
snprintf(dirname, sizeof(dirname), "/sys/class/net/%s/device/msi_irqs/", i->name);
DIR *dir = opendir(dirname);
if (!dir) {
warn("Cannot open IRQs for interface '%s'", i->name);
return -ENOENT;
if (dir) {
memset(&i->irqs, 0, sizeof(char) * IF_IRQ_MAX);
struct dirent *entry;
while ((entry = readdir(dir)) && n < IF_IRQ_MAX) {
if ((irq = atoi(entry->d_name)))
i->irqs[n++] = irq;
}
closedir(dir);
}
memset(&i->irqs, 0, sizeof(char) * IF_IRQ_MAX);
int irq, n = 0;
struct dirent *entry;
while ((entry = readdir(dir)) && n < IF_IRQ_MAX) {
if ((irq = atoi(entry->d_name)))
i->irqs[n++] = irq;
}
debug(7, "Found %u interrupts for interface '%s'", n, i->name);
closedir(dir);
return 0;
}
@ -91,7 +173,7 @@ int if_setaffinity(struct interface *i, int affinity)
return 0;
}
struct interface* if_lookup_index(int index, struct interface *interfaces)
struct interface * if_lookup_index(int index)
{
for (struct interface *i = interfaces; i; i = i->next) {
if (i->index == index) {
@ -101,3 +183,4 @@ struct interface* if_lookup_index(int index, struct interface *interfaces)
return NULL;
}

View file

@ -4,53 +4,106 @@
* @copyright 2014, Institute for Automation of Complex Power Systems, EONERC
*/
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include "config.h"
#include "node.h"
#include "cfg.h"
#include "utils.h"
#include "msg.h"
#include "node.h"
#include "if.h"
int node_connect(struct node *n)
{
/* Create socket */
n->sd = socket(AF_INET, SOCK_DGRAM, 0);
if (n->sd < 0)
perror("Failed to create socket");
/* Node types */
#include "socket.h"
#include "gtfpga.h"
#include "opal.h"
/* Bind socket for receiving */
if (bind(n->sd, (struct sockaddr *) &n->local, sizeof(struct sockaddr_in)))
perror("Failed to bind to socket");
#define VTABLE(type, name, fnc) { type, name, config_parse_ ## fnc, \
fnc ## _print, \
fnc ## _open, \
fnc ## _close, \
fnc ## _read, \
fnc ## _write }
debug(1, " We listen for node '%s' at %s:%u",
n->name, inet_ntoa(n->local.sin_addr),
ntohs(n->local.sin_port));
debug(1, " We sent to node '%s' at %s:%u",
n->name, inet_ntoa(n->remote.sin_addr),
ntohs(n->remote.sin_port));
/** Vtable for virtual node sub types */
static const struct node_vtable vtables[] = {
VTABLE(IEEE_802_3, "ieee802.3", socket),
VTABLE(IP, "ip", socket),
VTABLE(UDP, "udp", socket),
VTABLE(TCP, "tcp", socket),
VTABLE(TCPD, "tcpd", socket),
//VTABLE(OPAL, "opal", opal ),
//VTABLE(GTFPGA, "gtfpga", gtfpga),
};
return 0;
}
/** Linked list of nodes */
struct node *nodes;
int node_disconnect(struct node *n)
{
close(n->sd);
return 0;
}
struct node* node_lookup_name(const char *str, struct node *nodes)
struct node * node_lookup_name(const char *str, struct node *nodes)
{
for (struct node *n = nodes; n; n = n->next) {
if (!strcmp(str, n->name)) {
if (!strcmp(str, n->name))
return n;
}
}
return NULL;
}
struct node_vtable const * node_lookup_vtable(const char *str)
{
for (int i = 0; i < ARRAY_LEN(vtables); i++) {
if (!strcmp(vtables[i].name, str))
return &vtables[i];
}
return NULL;
}
int node_start(struct node *n)
{
int ret;
char str[256];
node_print(n, str, sizeof(str));
debug(1, "Starting node '%s' of type '%s' (%s)", n->name, n->vt->name, str);
{ INDENT
if (!n->refcnt)
warn("Node '%s' is not used by an active path", n->name);
ret = n->vt->open(n);
}
return ret;
}
int node_start_defer(struct node *n)
{
switch (node_type(n)) {
case TCPD:
info("Wait for incoming TCP connection from node '%s'...", n->name);
listen(n->socket->sd, 1);
n->socket->sd = accept(n->socket->sd, NULL, NULL);
break;
case TCP:
info("Connect with TCP to remote node '%s'", n->name);
connect(n->socket->sd, (struct sockaddr *) &n->socket->remote, sizeof(n->socket->remote));
break;
default:
break;
}
return 0;
}
int node_stop(struct node *n)
{
int ret;
info("Stopping node '%s'", n->name);
{ INDENT
ret = n->vt->close(n);
}
return ret;
}

View file

@ -13,12 +13,14 @@
#include <sys/syscall.h>
#include "cfg.h"
#include "utils.h"
#include "path.h"
#define sigev_notify_thread_id _sigev_un._tid
/** Linked list of paths */
struct path *paths;
/** Send messages */
static void * path_send(void *arg)
{
@ -50,9 +52,9 @@ static void * path_send(void *arg)
perror("Failed to start timer");
while (1) {
sigwait(&set, &sig);
sigwait(&set, &sig); /* blocking wait for next timer tick */
if (p->last) {
msg_send(p->last, p->out);
node_write(p->out, p->last);
p->last = NULL;
p->sent++;
}
@ -66,13 +68,17 @@ static void * path_run(void *arg)
{
struct path *p = (struct path *) arg;
struct msg *m = malloc(sizeof(struct msg));
if (!m)
error("Failed to allocate memory for message!");
/* Open deferred TCP connection */
node_start_defer(p->in);
node_start_defer(p->out);
/* Main thread loop */
while (1) {
msg_recv(m, p->in); /* Receive message */
node_read(p->in, m); /* Receive message */
p->received++;
/* Check header fields */
@ -126,7 +132,7 @@ static void * path_run(void *arg)
/* At fixed rate mode, messages are send by another thread */
if (!p->rate) {
msg_send(m, p->out);
node_write(p->out, m); /* Send message */
p->sent++;
}
}
@ -137,7 +143,9 @@ static void * path_run(void *arg)
}
int path_start(struct path *p)
{
{ INDENT
info("Starting path: %12s " GRN("=>") " %-12s", p->in->name, p->out->name);
/* At fixed rate mode, we start another thread for sending */
if (p->rate)
pthread_create(&p->sent_tid, NULL, &path_send, (void *) p);
@ -146,7 +154,9 @@ int path_start(struct path *p)
}
int path_stop(struct path *p)
{
{ INDENT
info("Stopping path: %12s " RED("=>") " %-12s", p->in->name, p->out->name);
pthread_cancel(p->recv_tid);
pthread_join(p->recv_tid, NULL);