1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

introducing new unified node types with scatter/gather (multiplexing) support

This commit is contained in:
Steffen Vogel 2015-05-06 11:48:30 +02:00
parent 17a3ff8869
commit d1d5f521b7
8 changed files with 229 additions and 118 deletions

View file

@ -30,6 +30,12 @@ struct file {
};
/** @see node_vtable::init */
int file_init(int argc, char *argv[], struct settings *set);
/** @see node_vtable::deinit */
int file_deinit();
/** @see node_vtable::print */
int file_print(struct node *n, char *buf, int len);
/** @see node_vtable::parse */
@ -41,10 +47,10 @@ int file_open(struct node *n);
/** @see node_vtable::close */
int file_close(struct node *n);
int file_read(struct node *n, struct msg *m);
/** @see node_vtable::read */
int file_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt);
int file_write(struct node *n, struct msg *m);
/** @see node_vtable::write */
int file_write(struct node *n, struct msg *pool, int poolsize, int first, int cnt);
#endif /** _FILE_H_ @} */

View file

@ -32,8 +32,8 @@ struct gtfpga {
struct pci_dev *dev;
};
int gtfpga_init(int argc, char * argv[]);
/** @see node_vtable::init */
int gtfpga_init(int argc, char * argv[], struct settings *set);
/** @see node_vtable::deinit */
int gtfpga_deinit();

View file

@ -28,22 +28,27 @@
}
/* Helper macros for virtual node type */
#define node_type(n) ((n)->vt->type)
#define node_print(n, b, l) ((n)->vt->print(n, b, l))
#define node_read(n, m) ((n)->vt->read(n, m))
#define node_write(n, m) ((n)->vt->write(n, m))
#define node_type(n) ((n)->vt->type)
#define node_print(n, b, l) ((n)->vt->print(n, b, l))
#define node_read(n, p, ps, f, c) ((n)->vt->read(n, p, ps, f, c))
#define node_write(n, p, ps, f, c) ((n)->vt->write(n, p, ps, f, c))
#define node_read_single(n, m) ((n)->vt->read(n, m, 1, 0, 1))
#define node_write_single(n, m) ((n)->vt->write(n, m, 1, 0, 1))
/** Node type: layer, protocol, listen/connect */
enum node_type {
LOG_FILE, /* File IO */
IEEE_802_3, /* BSD socket: AF_PACKET SOCK_DGRAM */
IP, /* BSD socket: AF_INET SOCK_RAW */
UDP, /* BSD socket: AF_INET SOCK_DGRAM */
TCPD, /* BSD socket: AF_INET SOCK_STREAM bind + listen + accept */
TCP, /* BSD socket: AF_INET SOCK_STREAM bind + connect */
OPAL_ASYNC, /* OPAL-RT Asynchronous Process Api */
GTFPGA, /* Xilinx ML507 GTFPGA card */
INVALID
LOG_FILE, /**< File IO */
OPAL_ASYNC, /**< OPAL-RT Asynchronous Process Api */
GTFPGA, /**< Xilinx ML507 GTFPGA card */
IEEE_802_3 = 0x10, /**< BSD socket: AF_PACKET SOCK_DGRAM */
IP, /**< BSD socket: AF_INET SOCK_RAW */
UDP, /**< BSD socket: AF_INET SOCK_DGRAM */
TCPD, /**< BSD socket: AF_INET SOCK_STREAM bind + listen + accept */
TCP, /**< BSD socket: AF_INET SOCK_STREAM bind + connect */
SOCKET = 0xF0 /**< Mask for BSD socket types */
};
/** C++ like vtable construct for node_types
@ -86,10 +91,40 @@ struct node_vtable {
* @retval <0 Error. Something went wrong.
*/
int (*close)(struct node *n);
int (*read)(struct node *n, struct msg *m);
int (*write)(struct node *n, struct msg *m);
int (*init)(int argc, char *argv[]);
/** Receive multiple messages from single datagram / packet.
*
* Messages are received with a single recvmsg() syscall by
* using gathering techniques (struct iovec).
* The messages will be stored in a circular buffer / array @p m.
* Indexes used to address @p m will wrap around after len messages.
*
* @param n A pointer to the node where the messages should be sent to.
* @param pool A pointer to an array of messages which should be sent.
* @param poolsize The length of the message array.
* @param first The index of the first message which should be sent.
* @param cnt The number of messages which should be sent.
* @return The number of messages actually received.
*/
int (*read) (struct node *n, struct msg *pool, int poolsize, int first, int cnt);
/** Send multiple messages in a single datagram / packet.
*
* Messages are sent with a single sendmsg() syscall by
* using gathering techniques (struct iovec).
* The messages have to be stored in a circular buffer / array m.
* So the indexes will wrap around after len.
*
* @param n A pointer to the node where the messages should be sent to.
* @param pool A pointer to an array of messages which should be sent.
* @param poolsize The length of the message array.
* @param first The index of the first message which should be sent.
* @param cnt The number of messages which should be sent.
* @return The number of messages actually sent.
*/
int (*write)(struct node *n, struct msg *pool, int poolsize, int first, int cnt);
int (*init)(int argc, char *argv[], struct settings *set);
int (*deinit)();
int refcnt;
@ -104,6 +139,8 @@ struct node
{
/** How many paths are sending / receiving from this node? */
int refcnt;
/** Number of messages to send / recv at once (scatter / gather) */
int combine;
/** A short identifier of the node, only used for configuration and logging */
const char *name;

View file

@ -51,8 +51,6 @@ struct opal {
int recv_id;
int seq_no;
struct opal_global *global;
Opal_SendAsyncParam send_params;
Opal_RecvAsyncParam recv_params;
@ -62,7 +60,7 @@ struct opal {
*
* @see node_vtable::init
*/
int opal_init(int argc, char *argv[]);
int opal_init(int argc, char *argv[], struct settings *set);
/** Free global OPAL settings and unmaps shared memory regions.
*

View file

@ -36,19 +36,24 @@ struct socket {
struct socket *next;
};
/** Create new socket and connect(), bind(), accept().
*
* @param n A pointer to the node.
* @retval 0 Success. Everything went well.
* @retval <0 Error. Something went wrong.
*/
/** @see node_vtable::init */
int socket_init(int argc, char *argv[], struct settings *set);
/** @see node_vtable::deinit */
int socket_deinit();
/** @see node_vtable::open */
int socket_open(struct node *n);
/** @see node_vtable::close */
int socket_close(struct node *n);
/** @see node_vtable::write */
int socket_write(struct node *n, struct msg *pool, int poolsize, int first, int cnt);
/** @see node_vtable::read */
int socket_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt);
/** @see node_vtable::parse */
int socket_parse(config_setting_t *cfg, struct node *n);

View file

@ -19,11 +19,6 @@
#endif
#define VTABLE(type, name, fnc) { type, name, \
fnc ## _parse, fnc ## _print, \
fnc ## _open, fnc ## _close, \
fnc ## _read, fnc ## _write }
#define VTABLE2(type, name, fnc) { type, name, \
fnc ## _parse, fnc ## _print, \
fnc ## _open, fnc ## _close, \
fnc ## _read, fnc ## _write, \
@ -32,10 +27,10 @@
/** Vtable for virtual node sub types */
struct node_vtable vtables[] = {
#ifdef ENABLE_OPAL_ASYNC
VTABLE2(OPAL_ASYNC, "opal", opal),
VTABLE(OPAL_ASYNC, "opal", opal),
#endif
#ifdef ENABLE_GTFPGA
VTABLE2(GTFPGA, "gtfpga", gtfpga),
VTABLE(GTFPGA, "gtfpga", gtfpga),
#endif
VTABLE(LOG_FILE, "file", file),
VTABLE(IEEE_802_3, "ieee802.3", socket),
@ -48,15 +43,13 @@ struct node_vtable vtables[] = {
/** Linked list of nodes. */
struct list nodes;
int node_init(int argc, char *argv[])
int node_init(int argc, char *argv[], struct settings *set)
{ INDENT
for (int i=0; i<ARRAY_LEN(vtables); i++) {
const struct node_vtable *vt = &vtables[i];
if (vt->refcnt && vt->init) {
if (vt->init(argc, argv))
error("Failed to initialize '%s' node type", vt->name);
else
info("Initializing '%s' node type", vt->name);
if (vt->refcnt) {
info("Initializing '%s' node type", vt->name);
vt->init(argc, argv, set);
}
}
@ -68,12 +61,9 @@ int node_deinit()
/* De-initialize node types */
for (int i=0; i<ARRAY_LEN(vtables); i++) {
struct node_vtable *vt = &vtables[i];
if (vt->refcnt && vt->deinit) {
if (vt->deinit())
error("Failed to de-initialize '%s' node type", vt->name);
else
info("De-initializing '%s' node type", vt->name);
if (vt->refcnt) {
info("De-initializing '%s' node type", vt->name);
vt->deinit();
}
}
return 0;

View file

@ -13,15 +13,16 @@
#include "utils.h"
static struct opal_global *og = NULL;
static struct list opals;
int opal_init(int argc, char *argv[])
{
int opal_init(int argc, char *argv[], struct settings *set)
{ INDENT
int err;
if (argc != 4)
return -1;
struct opal_global *g = alloc(sizeof(struct opal_global));
og = alloc(sizeof(struct opal_global));
pthread_mutex_init(&g->lock, NULL);
@ -57,15 +58,13 @@ int opal_init(int argc, char *argv[])
info("Started as OPAL Asynchronous process");
info("This is Simulator2Simulator Server (S2SS) %s (built on %s, %s, debug=%d)",
VERSION, __DATE__, __TIME__, _debug);
opal_print_global(g);
og = g;
opal_print_global(og);
return 0;
}
int opal_deinit()
{
{ INDENT
int err;
if (!og)
@ -112,35 +111,17 @@ int opal_print_global(struct opal_global *g)
}
int opal_parse(config_setting_t *cfg, struct node *n)
{
if (!og) {
warn("Skipping node '%s', because this server is not running as an OPAL Async process!", n->name);
return -1;
}
{
struct opal *o = alloc(sizeof(struct opal));
config_setting_lookup_int(cfg, "send_id", &o->send_id);
config_setting_lookup_int(cfg, "recv_id", &o->recv_id);
config_setting_lookup_bool(cfg, "reply", &o->reply);
/* Search for valid send and recv ids */
int sfound = 0, rfound = 0;
for (int i=0; i<og->send_icons; i++)
sfound += og->send_ids[i] == o->send_id;
for (int i=0; i<og->send_icons; i++)
rfound += og->send_ids[i] == o->send_id;
if (!sfound)
cerror(config_setting_get_member(cfg, "send_id"),
"Invalid send_id '%u' for node '%s'", o->send_id, n->name);
if (!rfound)
cerror(config_setting_get_member(cfg, "recv_id"),
"Invalid recv_id '%u' for node '%s'", o->recv_id, n->name);
n->opal = o;
n->opal->global = og;
n->cfg = cfg;
list_push(&opals, o);
return 0;
}
@ -159,6 +140,19 @@ int opal_open(struct node *n)
{
struct opal *o = n->opal;
/* Search for valid send and recv ids */
int sfound = 0, rfound = 0;
for (int i=0; i<og->send_icons; i++)
sfound += og->send_ids[i] == o->send_id;
for (int i=0; i<og->send_icons; i++)
rfound += og->send_ids[i] == o->send_id;
if (!sfound)
error("Invalid send_id '%u' for node '%s'", o->send_id, n->name);
if (!rfound)
error("Invalid recv_id '%u' for node '%s'", o->recv_id, n->name);
/* Get some more informations and paramters from OPAL-RT */
OpalGetAsyncSendIconMode(&o->mode, o->send_id);
OpalGetAsyncSendParameters(&o->send_params, sizeof(Opal_SendAsyncParam), o->send_id);
OpalGetAsyncRecvParameters(&o->recv_params, sizeof(Opal_RecvAsyncParam), o->recv_id);

View file

@ -11,6 +11,7 @@
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <poll.h>
#include <linux/if_packet.h>
#include <net/if.h>
@ -20,6 +21,7 @@
#include <netinet/ether.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/ioctl.h>
#include <netdb.h>
#include "config.h"
@ -27,6 +29,52 @@
#include "socket.h"
#include "if.h"
/** Linked list of interfaces */
extern struct list interfaces;
/** Linked list of sockets */
struct list sockets;
int socket_init(int argc, char * argv[], struct settings *set)
{ INDENT
list_init(&interfaces, (dtor_cb_t) if_destroy);
/* Gather list of used network interfaces */
FOREACH(&sockets, it) {
struct socket *s = it->socket;
/* Determine outgoing interface */
int index = if_getegress((struct sockaddr *) &s->remote);
if (index < 0) {
char buf[128];
socket_print_addr(buf, sizeof(buf), (struct sockaddr *) &s->remote);
error("Failed to get interface for socket address '%s'", buf);
}
struct interface *i = if_lookup_index(index);
if (!i)
i = if_create(index);
list_push(&i->sockets, s);
i->refcnt++;
}
FOREACH(&interfaces, it)
if_start(it->interface, set->affinity);
return 0;
}
int socket_deinit()
{ INDENT
FOREACH(&interfaces, it)
if_stop(it->interface);
list_destroy(&interfaces);
return 0;
}
int socket_print(struct node *n, char *buf, int len)
{
struct socket *s = n->socket;
@ -74,18 +122,6 @@ int socket_open(struct node *n)
serror("Failed to connect socket");
}
/* Determine outgoing interface */
int index = if_getegress((struct sockaddr *) &s->remote);
if (index < 0)
error("Failed to get egress interface for node '%s'", n->name);
struct interface *i = if_lookup_index(index);
if (!i)
i = if_create(index);
list_push(&i->sockets, s);
i->refcnt++;
/* Set socket priority, QoS or TOS IP options */
int prio;
switch (node_type(n)) {
@ -125,56 +161,99 @@ int socket_close(struct node *n)
return 0;
}
int socket_read(struct node *n, struct msg *m)
int socket_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt)
{
struct socket *s = n->socket;
int bytes;
struct iovec iov[cnt];
struct msghdr mhdr = {
.msg_iov = iov,
.msg_iovlen = ARRAY_LEN(iov)
};
/* Wait until next packet received */
poll(&(struct pollfd) { .fd = s->sd, .events = POLLIN }, 1, -1);
/* Get size of received packet in bytes */
ioctl(s->sd, FIONREAD, &bytes);
/* Check packet integrity */
if (bytes % (cnt * 4) != 0)
error("Packet length not dividable by 4!");
if (bytes / cnt > sizeof(struct msg))
error("Packet length is too large!");
for (int i = 0; i < cnt; i++) {
/* All messages of a packet must have equal length! */
iov[i].iov_base = &pool[(first+poolsize-i) % poolsize];
iov[i].iov_len = bytes / cnt;
}
/* Receive message from socket */
int ret = recv(n->socket->sd, m, sizeof(struct msg), 0);
int ret = recvmsg(s->sd, &mhdr, 0);
if (ret == 0)
error("Remote node '%s' closed the connection", n->name);
else if (ret < 0)
serror("Failed recv");
/* Convert headers to host byte order */
m->sequence = ntohs(m->sequence);
for (int i = 0; i < cnt; i++) {
struct msg *n = &pool[(first+poolsize-i) % poolsize];
/* Convert message to host endianess */
if (m->endian != MSG_ENDIAN_HOST)
msg_swap(m);
/* Check integrity of packet */
bytes -= MSG_LEN(n->length);
/* Convert headers to host byte order */
n->sequence = ntohs(n->sequence);
/* Convert message to host endianess */
if (n->endian != MSG_ENDIAN_HOST)
msg_swap(n);
}
debug(10, "Message received from node '%s': version=%u, type=%u, endian=%u, length=%u, sequence=%u",
n->name, m->version, m->type, m->endian, m->length, m->sequence);
/* Check packet integrity */
if (bytes != 0)
error("Packet length does not match message header length!");
return 0;
}
int socket_write(struct node *n, struct msg *m)
int socket_write(struct node *n, struct msg *pool, int poolsize, int first, int cnt)
{
struct socket *s = n->socket;
int ret = -1;
/* Convert headers to network byte order */
m->sequence = htons(m->sequence);
switch (node_type(n)) {
case IEEE_802_3:/* Connection-less protocols */
case IP:
case UDP:
ret = sendto(s->sd, m, MSG_LEN(m->length), 0, (struct sockaddr *) &s->remote, sizeof(s->remote));
break;
case TCP: /* Connection-oriented protocols */
case TCPD:
ret = send(s->sd, m, MSG_LEN(m->length), 0);
break;
default: { }
struct iovec iov[cnt];
struct msghdr mhdr = {
.msg_iov = iov,
.msg_iovlen = ARRAY_LEN(iov)
};
for (int i = 0; i < cnt; i++) {
struct msg *n = &pool[(first+poolsize+i) % poolsize];
/* Convert headers to host byte order */
n->sequence = htons(n->sequence);
iov[i].iov_base = n;
iov[i].iov_len = MSG_LEN(n->length);
}
/* Specify destination address for connection-less procotols */
switch (node_type(n)) {
case IEEE_802_3:
case IP:
case UDP:
mhdr.msg_name = (struct sockaddr *) &s->remote;
mhdr.msg_namelen = sizeof(s->remote);
break;
default:
break;
}
ret = sendmsg(s->sd, &mhdr, 0);
if (ret < 0)
serror("Failed send");
debug(10, "Message sent to node '%s': version=%u, type=%u, endian=%u, length=%u, sequence=%u",
n->name, m->version, m->type, m->endian, m->length, ntohs(m->sequence));
return 0;
}
@ -210,6 +289,8 @@ int socket_parse(config_setting_t *cfg, struct node *n)
}
n->socket = s;
list_push(&sockets, s);
return 0;
}
@ -234,7 +315,7 @@ int socket_print_addr(char *buf, int len, struct sockaddr *sa)
}
default:
error("Unsupported address family: %u", sa->sa_family);
return snprintf(buf, len, "address family: %u", sa->sa_family);
}
return 0;