diff --git a/server/include/file.h b/server/include/file.h index 130401b01..97a12b1b2 100644 --- a/server/include/file.h +++ b/server/include/file.h @@ -30,6 +30,12 @@ struct file { }; /** @see node_vtable::init */ +int file_init(int argc, char *argv[], struct settings *set); + +/** @see node_vtable::deinit */ +int file_deinit(); + +/** @see node_vtable::print */ int file_print(struct node *n, char *buf, int len); /** @see node_vtable::parse */ @@ -41,10 +47,10 @@ int file_open(struct node *n); /** @see node_vtable::close */ int file_close(struct node *n); -int file_read(struct node *n, struct msg *m); /** @see node_vtable::read */ +int file_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt); -int file_write(struct node *n, struct msg *m); /** @see node_vtable::write */ +int file_write(struct node *n, struct msg *pool, int poolsize, int first, int cnt); #endif /** _FILE_H_ @} */ \ No newline at end of file diff --git a/server/include/gtfpga.h b/server/include/gtfpga.h index 432e4f6f5..7b3f60ea9 100644 --- a/server/include/gtfpga.h +++ b/server/include/gtfpga.h @@ -32,8 +32,8 @@ struct gtfpga { struct pci_dev *dev; }; -int gtfpga_init(int argc, char * argv[]); /** @see node_vtable::init */ +int gtfpga_init(int argc, char * argv[], struct settings *set); /** @see node_vtable::deinit */ int gtfpga_deinit(); diff --git a/server/include/node.h b/server/include/node.h index 1cde79656..391d446f2 100644 --- a/server/include/node.h +++ b/server/include/node.h @@ -28,22 +28,27 @@ } /* Helper macros for virtual node type */ -#define node_type(n) ((n)->vt->type) -#define node_print(n, b, l) ((n)->vt->print(n, b, l)) -#define node_read(n, m) ((n)->vt->read(n, m)) -#define node_write(n, m) ((n)->vt->write(n, m)) +#define node_type(n) ((n)->vt->type) +#define node_print(n, b, l) ((n)->vt->print(n, b, l)) + +#define node_read(n, p, ps, f, c) ((n)->vt->read(n, p, ps, f, c)) +#define node_write(n, p, ps, f, c) ((n)->vt->write(n, p, ps, f, c)) + +#define node_read_single(n, m) ((n)->vt->read(n, m, 1, 0, 1)) +#define node_write_single(n, m) ((n)->vt->write(n, m, 1, 0, 1)) /** Node type: layer, protocol, listen/connect */ enum node_type { - LOG_FILE, /* File IO */ - IEEE_802_3, /* BSD socket: AF_PACKET SOCK_DGRAM */ - IP, /* BSD socket: AF_INET SOCK_RAW */ - UDP, /* BSD socket: AF_INET SOCK_DGRAM */ - TCPD, /* BSD socket: AF_INET SOCK_STREAM bind + listen + accept */ - TCP, /* BSD socket: AF_INET SOCK_STREAM bind + connect */ - OPAL_ASYNC, /* OPAL-RT Asynchronous Process Api */ - GTFPGA, /* Xilinx ML507 GTFPGA card */ - INVALID + LOG_FILE, /**< File IO */ + OPAL_ASYNC, /**< OPAL-RT Asynchronous Process Api */ + GTFPGA, /**< Xilinx ML507 GTFPGA card */ + IEEE_802_3 = 0x10, /**< BSD socket: AF_PACKET SOCK_DGRAM */ + IP, /**< BSD socket: AF_INET SOCK_RAW */ + UDP, /**< BSD socket: AF_INET SOCK_DGRAM */ + TCPD, /**< BSD socket: AF_INET SOCK_STREAM bind + listen + accept */ + TCP, /**< BSD socket: AF_INET SOCK_STREAM bind + connect */ + + SOCKET = 0xF0 /**< Mask for BSD socket types */ }; /** C++ like vtable construct for node_types @@ -86,10 +91,40 @@ struct node_vtable { * @retval <0 Error. Something went wrong. */ int (*close)(struct node *n); - int (*read)(struct node *n, struct msg *m); - int (*write)(struct node *n, struct msg *m); - int (*init)(int argc, char *argv[]); + /** Receive multiple messages from single datagram / packet. + * + * Messages are received with a single recvmsg() syscall by + * using gathering techniques (struct iovec). + * The messages will be stored in a circular buffer / array @p m. + * Indexes used to address @p m will wrap around after len messages. + * + * @param n A pointer to the node where the messages should be sent to. + * @param pool A pointer to an array of messages which should be sent. + * @param poolsize The length of the message array. + * @param first The index of the first message which should be sent. + * @param cnt The number of messages which should be sent. + * @return The number of messages actually received. + */ + int (*read) (struct node *n, struct msg *pool, int poolsize, int first, int cnt); + + /** Send multiple messages in a single datagram / packet. + * + * Messages are sent with a single sendmsg() syscall by + * using gathering techniques (struct iovec). + * The messages have to be stored in a circular buffer / array m. + * So the indexes will wrap around after len. + * + * @param n A pointer to the node where the messages should be sent to. + * @param pool A pointer to an array of messages which should be sent. + * @param poolsize The length of the message array. + * @param first The index of the first message which should be sent. + * @param cnt The number of messages which should be sent. + * @return The number of messages actually sent. + */ + int (*write)(struct node *n, struct msg *pool, int poolsize, int first, int cnt); + + int (*init)(int argc, char *argv[], struct settings *set); int (*deinit)(); int refcnt; @@ -104,6 +139,8 @@ struct node { /** How many paths are sending / receiving from this node? */ int refcnt; + /** Number of messages to send / recv at once (scatter / gather) */ + int combine; /** A short identifier of the node, only used for configuration and logging */ const char *name; diff --git a/server/include/opal.h b/server/include/opal.h index 3e58f6c05..e4c86b740 100644 --- a/server/include/opal.h +++ b/server/include/opal.h @@ -51,8 +51,6 @@ struct opal { int recv_id; int seq_no; - - struct opal_global *global; Opal_SendAsyncParam send_params; Opal_RecvAsyncParam recv_params; @@ -62,7 +60,7 @@ struct opal { * * @see node_vtable::init */ -int opal_init(int argc, char *argv[]); +int opal_init(int argc, char *argv[], struct settings *set); /** Free global OPAL settings and unmaps shared memory regions. * diff --git a/server/include/socket.h b/server/include/socket.h index c9afbad82..107559641 100644 --- a/server/include/socket.h +++ b/server/include/socket.h @@ -36,19 +36,24 @@ struct socket { struct socket *next; }; -/** Create new socket and connect(), bind(), accept(). - * - * @param n A pointer to the node. - * @retval 0 Success. Everything went well. - * @retval <0 Error. Something went wrong. - */ + +/** @see node_vtable::init */ +int socket_init(int argc, char *argv[], struct settings *set); + +/** @see node_vtable::deinit */ +int socket_deinit(); + +/** @see node_vtable::open */ int socket_open(struct node *n); /** @see node_vtable::close */ +int socket_close(struct node *n); /** @see node_vtable::write */ +int socket_write(struct node *n, struct msg *pool, int poolsize, int first, int cnt); /** @see node_vtable::read */ +int socket_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt); /** @see node_vtable::parse */ int socket_parse(config_setting_t *cfg, struct node *n); diff --git a/server/src/node.c b/server/src/node.c index 90baab818..7ac9b60e0 100644 --- a/server/src/node.c +++ b/server/src/node.c @@ -19,11 +19,6 @@ #endif #define VTABLE(type, name, fnc) { type, name, \ - fnc ## _parse, fnc ## _print, \ - fnc ## _open, fnc ## _close, \ - fnc ## _read, fnc ## _write } - -#define VTABLE2(type, name, fnc) { type, name, \ fnc ## _parse, fnc ## _print, \ fnc ## _open, fnc ## _close, \ fnc ## _read, fnc ## _write, \ @@ -32,10 +27,10 @@ /** Vtable for virtual node sub types */ struct node_vtable vtables[] = { #ifdef ENABLE_OPAL_ASYNC - VTABLE2(OPAL_ASYNC, "opal", opal), + VTABLE(OPAL_ASYNC, "opal", opal), #endif #ifdef ENABLE_GTFPGA - VTABLE2(GTFPGA, "gtfpga", gtfpga), + VTABLE(GTFPGA, "gtfpga", gtfpga), #endif VTABLE(LOG_FILE, "file", file), VTABLE(IEEE_802_3, "ieee802.3", socket), @@ -48,15 +43,13 @@ struct node_vtable vtables[] = { /** Linked list of nodes. */ struct list nodes; -int node_init(int argc, char *argv[]) +int node_init(int argc, char *argv[], struct settings *set) { INDENT for (int i=0; irefcnt && vt->init) { - if (vt->init(argc, argv)) - error("Failed to initialize '%s' node type", vt->name); - else - info("Initializing '%s' node type", vt->name); + if (vt->refcnt) { + info("Initializing '%s' node type", vt->name); + vt->init(argc, argv, set); } } @@ -68,12 +61,9 @@ int node_deinit() /* De-initialize node types */ for (int i=0; irefcnt && vt->deinit) { - if (vt->deinit()) - error("Failed to de-initialize '%s' node type", vt->name); - else - info("De-initializing '%s' node type", vt->name); - + if (vt->refcnt) { + info("De-initializing '%s' node type", vt->name); + vt->deinit(); } } return 0; diff --git a/server/src/opal.c b/server/src/opal.c index 382ad1643..c9b08ec98 100644 --- a/server/src/opal.c +++ b/server/src/opal.c @@ -13,15 +13,16 @@ #include "utils.h" static struct opal_global *og = NULL; +static struct list opals; -int opal_init(int argc, char *argv[]) -{ +int opal_init(int argc, char *argv[], struct settings *set) +{ INDENT int err; if (argc != 4) return -1; - struct opal_global *g = alloc(sizeof(struct opal_global)); + og = alloc(sizeof(struct opal_global)); pthread_mutex_init(&g->lock, NULL); @@ -57,15 +58,13 @@ int opal_init(int argc, char *argv[]) info("Started as OPAL Asynchronous process"); info("This is Simulator2Simulator Server (S2SS) %s (built on %s, %s, debug=%d)", VERSION, __DATE__, __TIME__, _debug); - opal_print_global(g); - - og = g; - + opal_print_global(og); + return 0; } int opal_deinit() -{ +{ INDENT int err; if (!og) @@ -112,35 +111,17 @@ int opal_print_global(struct opal_global *g) } int opal_parse(config_setting_t *cfg, struct node *n) -{ - if (!og) { - warn("Skipping node '%s', because this server is not running as an OPAL Async process!", n->name); - return -1; - } - +{ struct opal *o = alloc(sizeof(struct opal)); config_setting_lookup_int(cfg, "send_id", &o->send_id); config_setting_lookup_int(cfg, "recv_id", &o->recv_id); config_setting_lookup_bool(cfg, "reply", &o->reply); - - /* Search for valid send and recv ids */ - int sfound = 0, rfound = 0; - for (int i=0; isend_icons; i++) - sfound += og->send_ids[i] == o->send_id; - for (int i=0; isend_icons; i++) - rfound += og->send_ids[i] == o->send_id; - - if (!sfound) - cerror(config_setting_get_member(cfg, "send_id"), - "Invalid send_id '%u' for node '%s'", o->send_id, n->name); - if (!rfound) - cerror(config_setting_get_member(cfg, "recv_id"), - "Invalid recv_id '%u' for node '%s'", o->recv_id, n->name); n->opal = o; - n->opal->global = og; n->cfg = cfg; + + list_push(&opals, o); return 0; } @@ -159,6 +140,19 @@ int opal_open(struct node *n) { struct opal *o = n->opal; + /* Search for valid send and recv ids */ + int sfound = 0, rfound = 0; + for (int i=0; isend_icons; i++) + sfound += og->send_ids[i] == o->send_id; + for (int i=0; isend_icons; i++) + rfound += og->send_ids[i] == o->send_id; + + if (!sfound) + error("Invalid send_id '%u' for node '%s'", o->send_id, n->name); + if (!rfound) + error("Invalid recv_id '%u' for node '%s'", o->recv_id, n->name); + + /* Get some more informations and paramters from OPAL-RT */ OpalGetAsyncSendIconMode(&o->mode, o->send_id); OpalGetAsyncSendParameters(&o->send_params, sizeof(Opal_SendAsyncParam), o->send_id); OpalGetAsyncRecvParameters(&o->recv_params, sizeof(Opal_RecvAsyncParam), o->recv_id); diff --git a/server/src/socket.c b/server/src/socket.c index 9a16c781f..fec5fcd8f 100644 --- a/server/src/socket.c +++ b/server/src/socket.c @@ -11,6 +11,7 @@ #include #include #include +#include #include #include @@ -20,6 +21,7 @@ #include #include #include +#include #include #include "config.h" @@ -27,6 +29,52 @@ #include "socket.h" #include "if.h" +/** Linked list of interfaces */ +extern struct list interfaces; + +/** Linked list of sockets */ +struct list sockets; + +int socket_init(int argc, char * argv[], struct settings *set) +{ INDENT + list_init(&interfaces, (dtor_cb_t) if_destroy); + + /* Gather list of used network interfaces */ + FOREACH(&sockets, it) { + struct socket *s = it->socket; + + /* Determine outgoing interface */ + int index = if_getegress((struct sockaddr *) &s->remote); + if (index < 0) { + char buf[128]; + socket_print_addr(buf, sizeof(buf), (struct sockaddr *) &s->remote); + error("Failed to get interface for socket address '%s'", buf); + } + + struct interface *i = if_lookup_index(index); + if (!i) + i = if_create(index); + + list_push(&i->sockets, s); + i->refcnt++; + } + + FOREACH(&interfaces, it) + if_start(it->interface, set->affinity); + + return 0; +} + +int socket_deinit() +{ INDENT + FOREACH(&interfaces, it) + if_stop(it->interface); + + list_destroy(&interfaces); + + return 0; +} + int socket_print(struct node *n, char *buf, int len) { struct socket *s = n->socket; @@ -74,18 +122,6 @@ int socket_open(struct node *n) serror("Failed to connect socket"); } - /* Determine outgoing interface */ - int index = if_getegress((struct sockaddr *) &s->remote); - if (index < 0) - error("Failed to get egress interface for node '%s'", n->name); - - struct interface *i = if_lookup_index(index); - if (!i) - i = if_create(index); - - list_push(&i->sockets, s); - i->refcnt++; - /* Set socket priority, QoS or TOS IP options */ int prio; switch (node_type(n)) { @@ -125,56 +161,99 @@ int socket_close(struct node *n) return 0; } -int socket_read(struct node *n, struct msg *m) +int socket_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt) { + struct socket *s = n->socket; + + int bytes; + struct iovec iov[cnt]; + struct msghdr mhdr = { + .msg_iov = iov, + .msg_iovlen = ARRAY_LEN(iov) + }; + + /* Wait until next packet received */ + poll(&(struct pollfd) { .fd = s->sd, .events = POLLIN }, 1, -1); + /* Get size of received packet in bytes */ + ioctl(s->sd, FIONREAD, &bytes); + + /* Check packet integrity */ + if (bytes % (cnt * 4) != 0) + error("Packet length not dividable by 4!"); + if (bytes / cnt > sizeof(struct msg)) + error("Packet length is too large!"); + + for (int i = 0; i < cnt; i++) { + /* All messages of a packet must have equal length! */ + iov[i].iov_base = &pool[(first+poolsize-i) % poolsize]; + iov[i].iov_len = bytes / cnt; + } + /* Receive message from socket */ - int ret = recv(n->socket->sd, m, sizeof(struct msg), 0); + int ret = recvmsg(s->sd, &mhdr, 0); if (ret == 0) error("Remote node '%s' closed the connection", n->name); else if (ret < 0) serror("Failed recv"); - /* Convert headers to host byte order */ - m->sequence = ntohs(m->sequence); + for (int i = 0; i < cnt; i++) { + struct msg *n = &pool[(first+poolsize-i) % poolsize]; - /* Convert message to host endianess */ - if (m->endian != MSG_ENDIAN_HOST) - msg_swap(m); + /* Check integrity of packet */ + bytes -= MSG_LEN(n->length); + + /* Convert headers to host byte order */ + n->sequence = ntohs(n->sequence); + + /* Convert message to host endianess */ + if (n->endian != MSG_ENDIAN_HOST) + msg_swap(n); + } - debug(10, "Message received from node '%s': version=%u, type=%u, endian=%u, length=%u, sequence=%u", - n->name, m->version, m->type, m->endian, m->length, m->sequence); + /* Check packet integrity */ + if (bytes != 0) + error("Packet length does not match message header length!"); return 0; } -int socket_write(struct node *n, struct msg *m) +int socket_write(struct node *n, struct msg *pool, int poolsize, int first, int cnt) { struct socket *s = n->socket; int ret = -1; - - /* Convert headers to network byte order */ - m->sequence = htons(m->sequence); - - switch (node_type(n)) { - case IEEE_802_3:/* Connection-less protocols */ - case IP: - case UDP: - ret = sendto(s->sd, m, MSG_LEN(m->length), 0, (struct sockaddr *) &s->remote, sizeof(s->remote)); - break; - - case TCP: /* Connection-oriented protocols */ - case TCPD: - ret = send(s->sd, m, MSG_LEN(m->length), 0); - break; - default: { } + + struct iovec iov[cnt]; + struct msghdr mhdr = { + .msg_iov = iov, + .msg_iovlen = ARRAY_LEN(iov) + }; + + for (int i = 0; i < cnt; i++) { + struct msg *n = &pool[(first+poolsize+i) % poolsize]; + + /* Convert headers to host byte order */ + n->sequence = htons(n->sequence); + + iov[i].iov_base = n; + iov[i].iov_len = MSG_LEN(n->length); } + /* Specify destination address for connection-less procotols */ + switch (node_type(n)) { + case IEEE_802_3: + case IP: + case UDP: + mhdr.msg_name = (struct sockaddr *) &s->remote; + mhdr.msg_namelen = sizeof(s->remote); + break; + default: + break; + } + + ret = sendmsg(s->sd, &mhdr, 0); if (ret < 0) serror("Failed send"); - debug(10, "Message sent to node '%s': version=%u, type=%u, endian=%u, length=%u, sequence=%u", - n->name, m->version, m->type, m->endian, m->length, ntohs(m->sequence)); - return 0; } @@ -210,6 +289,8 @@ int socket_parse(config_setting_t *cfg, struct node *n) } n->socket = s; + + list_push(&sockets, s); return 0; } @@ -234,7 +315,7 @@ int socket_print_addr(char *buf, int len, struct sockaddr *sa) } default: - error("Unsupported address family: %u", sa->sa_family); + return snprintf(buf, len, "address family: %u", sa->sa_family); } return 0;