1
0
Fork 0
mirror of https://github.com/hermitcore/libhermit.git synced 2025-03-09 00:00:03 +01:00

Added first version of ethernet connection manager for infiniband.

This commit is contained in:
Annika Wierichs 2018-02-20 17:36:12 +01:00
parent 4de2db0ccc
commit a839229573
3 changed files with 310 additions and 185 deletions

View file

@ -0,0 +1,73 @@
/*
* Copyright (c) 2018, Annika Wierichs, RWTH Aachen University.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of the University nor the names of its contributors
* may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef __IBV_ETH_CM__
#define __IBV_ETH_CM__
struct pingpong_dest {
int lid;
int out_reads;
int qpn;
int psn;
unsigned rkey;
unsigned long long vaddr;
union ibv_gid gid;
unsigned srqn;
int gid_index;
};
/*
* Connect to a remote end node via ethernet sockets, given its IP address and a port number. This
* function creates a socket, connects to the server and returns the socket file descriptor.
*
* server_ip: Server IP address given as string.
* port: Port number.
*
* Returns: Socket file descriptor on success and -1 on failure.
*/
int eth_client_connect(const char *server_ip, int port);
/*
* Accept connection from a remote end node via ethernet sockets given a port number. This function
* creates a socket, binds it and listens to it, then accepts the connection with the client trying
* to connect.
*
* port: Port number.
*
* Returns: Socket file descriptor on success and -1 on failure.
*/
int eth_server_connect(int port);
int eth_client_exch_dest(int sockfd, struct pingpong_dest *local_dest,
struct pingpong_dest *rem_dest);
int eth_server_exch_dest(int sockfd, struct pingpong_dest *local_dest,
struct pingpong_dest *rem_dest);
int eth_close(int sockfd);
#endif // __IBV_ETH_CM__

231
kernel/ibv_eth_cm.c Normal file
View file

@ -0,0 +1,231 @@
/*
* Copyright (c) 2018, Annika Wierichs, RWTH Aachen University.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of the University nor the names of its contributors
* may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <sys/socket.h>
#include <netinet/in.h>
#include <hermit/ibv_hermit_cm.h>
#define KEY_MSG_SIZE (59)
/*
* Helper functions:
*/
int check_add_port(char **service, int port, const char *server_ip, struct addrinfo *hints,
struct addrinfo **res)
{
int str_size_max = 6;
*service = calloc(str_size_max, sizeof(char));
if (snprintf(*service, str_size_max, "%05d", port) < 0) {
return 1;
}
if (getaddrinfo(server_ip, *service, hints, res) < 0) {
fprintf(stderr, "Error for %s:%d\n", server_ip, port);
return 1;
}
return 0;
}
int eth_read(int sockfd, struct pingpong_dest *rem_dest)
{
int parsed;
char msg[KEY_MSG_SIZE];
if (read(sockfd, msg, sizeof msg) != sizeof msg) {
fprintf(stderr, "eth_read: Couldn't read remote address.\n");
return 1;
}
parsed = sscanf(msg, KEY_PRINT_FMT,
(unsigned int *) &rem_dest->lid, &rem_dest->out_reads, &rem_dest->qpn,
&rem_dest->psn, &rem_dest->rkey, &rem_dest->vaddr, &rem_dest->srqn);
if (parsed != 7) {
fprintf(stderr, "Couldn't parse line <%.*s>\n",(int)sizeof msg, msg);
return 1;
}
return 0;
}
int eth_write(int sockfd, struct pingpong_dest *local_dest)
{
char msg[KEY_MSG_SIZE];
sprintf(msg, KEY_PRINT_FMT,
local_dest->lid, local_dest->out_reads, local_dest->qpn, local_dest->psn,
local_dest->rkey, local_dest->vaddr, local_dest->srqn);
if (write(sockfd, msg, sizeof msg) != sizeof msg) {
perror("Client Write");
fprintf(stderr, "Couldn't send local address.\n");
return 1;
}
return 0;
}
/*
* Exposed functions:
*/
int eth_client_connect(const char *server_ip, int port)
{
struct addrinfo *res, *t;
struct addrinfo hints;
char *service;
int sockfd = -1;
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
if (check_add_port(&service, port, server_ip, &hints, &res)) {
fprintf(stderr, "Problem in resolving basic address and port\n");
return -1;
}
for (t = res; t; t = t->ai_next) {
sockfd = socket(t->ai_family, t->ai_socktype, t->ai_protocol);
if (sockfd >= 0) {
if (!connect(sockfd, t->ai_addr, t->ai_addrlen))
break; // Success.
close(sockfd);
sockfd = -1;
}
}
freeaddrinfo(res);
if (sockfd < 0) {
fprintf(stderr, "Couldn't connect to %s:%d\n", server_ip, port);
return -1;
}
return sockfd;
}
int eth_server_connect(int port)
{
struct addrinfo *res, *t;
struct addrinfo hints;
char *service;
int n;
int sockfd = -1, connfd;
memset(&hints, 0, sizeof hints);
hints.ai_flags = AI_PASSIVE;
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
if (check_add_port(&service, port, NULL, &hints, &res)) {
fprintf(stderr, "Problem in resolving basic address and port\n");
return -1;
}
for (t = res; t; t = t->ai_next) {
sockfd = socket(t->ai_family, t->ai_socktype, t->ai_protocol);
if (sockfd >= 0) {
n = 1;
setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n);
if (!bind(sockfd, t->ai_addr, t->ai_addrlen))
break; // Success
close(sockfd);
sockfd = -1;
}
}
freeaddrinfo(res);
if (sockfd < 0) {
fprintf(stderr, "Couldn't listen to port %d\n", port);
return -1;
}
listen(sockfd, 1);
connfd = accept(sockfd, NULL, 0);
if (connfd < 0) {
perror("Server Accept");
fprintf(stderr, "accept() failed\n");
close(sockfd);
return -1;
}
close(sockfd);
return connfd;
}
int eth_client_exch_dest(int sockfd, struct pingpong_dest *local_dest,
struct pingpong_dest *rem_dest)
{
if (eth_write(sockfd, local_dest)) {
fprintf(stderr, " Unable to write local destination information to socket.\n");
return 1;
}
if (eth_read(sockfd, rem_dest)) {
fprintf(stderr, " Unable to read remote destination information from socket.\n");
return 1;
}
return 0;
}
int eth_server_exch_dest(int sockfd, struct pingpong_dest *local_dest,
struct pingpong_dest *rem_dest)
{
if (eth_read(sockfd, rem_dest)) {
fprintf(stderr, " Unable to read remote destination information from socket.\n");
return 1;
}
if (eth_write(sockfd, local_dest)) {
fprintf(stderr, " Unable to write local destination information to socket.\n");
return 1;
}
return 0;
}
int eth_close(int sockfd)
{
if (close(sockfd)) {
fprintf(stderr, "Couldn't close socket.\n");
return -1;
}
return 0;
}

View file

@ -39,12 +39,12 @@
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
/* #include <sys/socket.h> */
#include <sys/time.h>
#include <netdb.h>
#include <malloc.h>
#include <getopt.h>
#include <netinet/in.h>
/* #include <netinet/in.h> */
#include <time.h>
#include <inttypes.h>
@ -83,17 +83,9 @@ struct pingpong_context {
static struct ibv_cq *pp_cq(struct pingpong_context *ctx)
{
return use_ts ? ibv_cq_ex_to_cq(ctx->cq_s.cq_ex) :
ctx->cq_s.cq;
return use_ts ? ibv_cq_ex_to_cq(ctx->cq_s.cq_ex) : ctx->cq_s.cq;
}
struct pingpong_dest {
int lid;
int qpn;
int psn;
union ibv_gid gid;
};
static int pp_connect_ctx(struct pingpong_context *ctx, int port, int my_psn, enum ibv_mtu mtu,
int sl, struct pingpong_dest *dest, int sgid_idx)
{
@ -151,175 +143,6 @@ static int pp_connect_ctx(struct pingpong_context *ctx, int port, int my_psn, en
return 0;
}
static struct pingpong_dest *pp_client_exch_dest(const char *servername, int port,
const struct pingpong_dest *my_dest)
{
struct addrinfo *res, *t;
struct addrinfo hints = {
.ai_family = AF_UNSPEC,
.ai_socktype = SOCK_STREAM
};
char *service;
char msg[sizeof "0000:000000:000000:00000000000000000000000000000000"];
int n;
int sockfd = -1;
struct pingpong_dest *rem_dest = NULL;
char gid[33];
if (asprintf(&service, "%d", port) < 0)
return NULL;
n = getaddrinfo(servername, service, &hints, &res);
if (n < 0) {
fprintf(stderr, "Error for %s:%d\n", servername, port);
free(service);
return NULL;
}
for (t = res; t; t = t->ai_next) {
sockfd = socket(t->ai_family, t->ai_socktype, t->ai_protocol);
if (sockfd >= 0) {
if (!connect(sockfd, t->ai_addr, t->ai_addrlen))
break;
close(sockfd);
sockfd = -1;
}
}
freeaddrinfo(res);
free(service);
if (sockfd < 0) {
fprintf(stderr, "Couldn't connect to %s:%d\n", servername, port);
return NULL;
}
gid_to_wire_gid(&my_dest->gid, gid);
sprintf(msg, "%04x:%06x:%06x:%s", my_dest->lid, my_dest->qpn,
my_dest->psn, gid);
if (write(sockfd, msg, sizeof msg) != sizeof msg) {
fprintf(stderr, "Couldn't send local address\n");
goto out;
}
if (read(sockfd, msg, sizeof msg) != sizeof msg ||
write(sockfd, "done", sizeof "done") != sizeof "done") {
perror("client read/write");
fprintf(stderr, "Couldn't read/write remote address\n");
goto out;
}
rem_dest = malloc(sizeof *rem_dest);
if (!rem_dest)
goto out;
sscanf(msg, "%x:%x:%x:%s", &rem_dest->lid, &rem_dest->qpn, &rem_dest->psn, gid);
wire_gid_to_gid(gid, &rem_dest->gid);
out:
close(sockfd);
return rem_dest;
}
static struct pingpong_dest *pp_server_exch_dest(struct pingpong_context *ctx, int ib_port,
enum ibv_mtu mtu, int port, int sl, const struct pingpong_dest *my_dest, int sgid_idx)
{
struct addrinfo *res, *t;
struct addrinfo hints = {
.ai_flags = AI_PASSIVE,
.ai_family = AF_UNSPEC,
.ai_socktype = SOCK_STREAM
};
char *service;
char msg[sizeof "0000:000000:000000:00000000000000000000000000000000"];
int n;
int sockfd = -1, connfd;
struct pingpong_dest *rem_dest = NULL;
char gid[33];
if (asprintf(&service, "%d", port) < 0)
return NULL;
n = getaddrinfo(NULL, service, &hints, &res);
if (n < 0) {
fprintf(stderr, "Error for port %d\n", port);
free(service);
return NULL;
}
for (t = res; t; t = t->ai_next) {
sockfd = socket(t->ai_family, t->ai_socktype, t->ai_protocol);
if (sockfd >= 0) {
n = 1;
setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n);
if (!bind(sockfd, t->ai_addr, t->ai_addrlen))
break;
close(sockfd);
sockfd = -1;
}
}
freeaddrinfo(res);
free(service);
if (sockfd < 0) {
fprintf(stderr, "Couldn't listen to port %d\n", port);
return NULL;
}
listen(sockfd, 1);
connfd = accept(sockfd, NULL, NULL);
close(sockfd);
if (connfd < 0) {
fprintf(stderr, "accept() failed\n");
return NULL;
}
n = read(connfd, msg, sizeof msg);
if (n != sizeof msg) {
perror("server read");
fprintf(stderr, "%d/%d: Couldn't read remote address\n", n, (int) sizeof msg);
goto out;
}
rem_dest = malloc(sizeof *rem_dest);
if (!rem_dest)
goto out;
sscanf(msg, "%x:%x:%x:%s", &rem_dest->lid, &rem_dest->qpn,
&rem_dest->psn, gid);
wire_gid_to_gid(gid, &rem_dest->gid);
if (pp_connect_ctx(ctx, ib_port, my_dest->psn, mtu, sl, rem_dest,
sgid_idx)) {
fprintf(stderr, "Couldn't connect to remote QP\n");
free(rem_dest);
rem_dest = NULL;
goto out;
}
gid_to_wire_gid(&my_dest->gid, gid);
sprintf(msg, "%04x:%06x:%06x:%s", my_dest->lid, my_dest->qpn,
my_dest->psn, gid);
if (write(connfd, msg, sizeof msg) != sizeof msg ||
read(connfd, msg, sizeof msg) != sizeof "done") {
fprintf(stderr, "Couldn't send/recv local address\n");
free(rem_dest);
rem_dest = NULL;
goto out;
}
out:
close(connfd);
return rem_dest;
}
static struct pingpong_context *pp_init_ctx(struct ibv_device *ib_dev, int size, int rx_depth,
int port, int use_event)
{
@ -889,10 +712,9 @@ int main(int argc, char *argv[])
if (servername)
rem_dest = pp_client_exch_dest(servername, port, &my_dest);
rem_dest = client_exch_dest(servername, port, &my_dest);
else
rem_dest = pp_server_exch_dest(ctx, ib_port, mtu, port, sl,
&my_dest, gidx);
rem_dest = server_exch_dest(ctx, ib_port, mtu, port, sl, &my_dest, gidx);
if (!rem_dest)
return 1;
@ -901,8 +723,7 @@ int main(int argc, char *argv[])
rem_dest->lid, rem_dest->qpn, rem_dest->psn);
if (servername)
if (pp_connect_ctx(ctx, ib_port, my_dest.psn, mtu, sl, rem_dest,
gidx))
if (pp_connect_ctx(ctx, ib_port, my_dest.psn, mtu, sl, rem_dest, gidx))
return 1;
ctx->pending = PINGPONG_RECV_WRID;