mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
Fixed bugs in connection. A source node is now able to connect to a target node.
This commit is contained in:
parent
a0f0410e73
commit
4f6c2543b5
2 changed files with 50 additions and 43 deletions
|
@ -62,21 +62,17 @@ struct infiniband {
|
|||
struct connection_s {
|
||||
struct addrinfo *src_addr;
|
||||
struct addrinfo *dst_addr;
|
||||
const int timeout;
|
||||
int timeout;
|
||||
enum rdma_port_space port_space;
|
||||
|
||||
struct ibv_mr *mr_payload;
|
||||
struct r_addr_key_s *r_addr_key;
|
||||
} conn;
|
||||
|
||||
struct init_s {
|
||||
int cq_size;
|
||||
enum ibv_qp_type qp_type;
|
||||
int max_send_wr;
|
||||
int max_recv_wr;
|
||||
} init;
|
||||
struct ibv_qp_init_attr qp_init;
|
||||
|
||||
int is_source;
|
||||
int cq_size;
|
||||
};
|
||||
|
||||
/** @see node_type::reverse */
|
||||
|
|
|
@ -33,7 +33,7 @@ static void ib_create_busy_poll(struct node *n, struct rdma_cm_id *id)
|
|||
struct infiniband *ib = (struct infiniband *) n->_vd;
|
||||
|
||||
// Create completion queue and bind to channel
|
||||
ib->ctx.cq = ibv_create_cq(ib->id->verbs, ib->init.cq_size, NULL, NULL, 0);
|
||||
ib->ctx.cq = ibv_create_cq(ib->id->verbs, ib->cq_size, NULL, NULL, 0);
|
||||
if(!ib->ctx.cq)
|
||||
error("Could not create completion queue in node %s.", node_name(n));
|
||||
|
||||
|
@ -52,7 +52,7 @@ static void ib_create_event(struct node *n, struct rdma_cm_id *id)
|
|||
|
||||
// Create completion queue and bind to channel
|
||||
ib->ctx.cq = ibv_create_cq(ib->id->verbs,
|
||||
ib->init.cq_size,
|
||||
ib->cq_size,
|
||||
NULL,
|
||||
ib->ctx.comp_channel,
|
||||
0);
|
||||
|
@ -89,25 +89,18 @@ static void ib_build_ibv(struct node *n, struct rdma_cm_id *id)
|
|||
break;
|
||||
}
|
||||
|
||||
// Prepare Queue Pair (QP) attributes
|
||||
struct ibv_qp_init_attr qp_attr;
|
||||
qp_attr.send_cq = ib->ctx.cq;
|
||||
qp_attr.recv_cq = ib->ctx.cq;
|
||||
qp_attr.qp_type = ib->init.qp_type;
|
||||
|
||||
qp_attr.cap.max_send_wr = ib->init.max_send_wr;
|
||||
qp_attr.cap.max_recv_wr = ib->init.max_recv_wr;
|
||||
qp_attr.cap.max_send_sge = 1;
|
||||
qp_attr.cap.max_recv_sge = 1;
|
||||
// Prepare remaining Queue Pair (QP) attributes
|
||||
ib->qp_init.send_cq = ib->ctx.cq;
|
||||
ib->qp_init.recv_cq = ib->ctx.cq;
|
||||
|
||||
//ToDo: Set maximum inline data
|
||||
|
||||
// Create the actual QP
|
||||
ret = rdma_create_qp(id, ib->ctx.pd, &qp_attr);
|
||||
ret = rdma_create_qp(id, ib->ctx.pd, &ib->qp_init);
|
||||
if(ret)
|
||||
error("Failed to create Queue Pair in node %s.", node_name(n));
|
||||
|
||||
info("Successfully created Queue Pair in node %s.", node_name(n));
|
||||
info("Successfully created Queue Pair.");
|
||||
}
|
||||
|
||||
static int ib_addr_resolved(struct node *n, struct rdma_cm_id *id)
|
||||
|
@ -115,6 +108,8 @@ static int ib_addr_resolved(struct node *n, struct rdma_cm_id *id)
|
|||
struct infiniband *ib = (struct infiniband *) n->_vd;
|
||||
int ret;
|
||||
|
||||
info("Successfully resolved address.");
|
||||
|
||||
// Build all components from IB Verbs
|
||||
ib_build_ibv(n, id);
|
||||
|
||||
|
@ -123,8 +118,6 @@ static int ib_addr_resolved(struct node *n, struct rdma_cm_id *id)
|
|||
if(ret)
|
||||
error("Failed to resolve route in node %s.", node_name(n));
|
||||
|
||||
info("Successfully resolved address node %s", node_name(n));
|
||||
|
||||
//ToDo: create check if data can be send inline
|
||||
|
||||
return 0;
|
||||
|
@ -134,7 +127,7 @@ static int ib_route_resolved(struct node *n, struct rdma_cm_id *id)
|
|||
{
|
||||
int ret;
|
||||
|
||||
ib_build_ibv(n, id);
|
||||
info("Successfully resolved route.");
|
||||
|
||||
//ToDo: Post receive WRs
|
||||
|
||||
|
@ -146,7 +139,7 @@ static int ib_route_resolved(struct node *n, struct rdma_cm_id *id)
|
|||
if(ret)
|
||||
error("Failed to connect in node %s.", node_name(n));
|
||||
|
||||
info("Route resolved and called rdma_connect");
|
||||
info("Called rdma_connect.");
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -197,6 +190,7 @@ static int ib_event(struct node *n, struct rdma_cm_event *event)
|
|||
case RDMA_CM_EVENT_REJECTED:
|
||||
error("Connection request or response was rejected by the remote end point!");
|
||||
case RDMA_CM_EVENT_ESTABLISHED:
|
||||
info("Connection established!");
|
||||
ret = 1;
|
||||
break;
|
||||
default:
|
||||
|
@ -244,7 +238,7 @@ int ib_parse(struct node *n, json_t *cfg)
|
|||
jerror(&err, "Failed to parse configuration of node %s", node_name(n));
|
||||
|
||||
// Translate IP:PORT to a struct addrinfo
|
||||
ret = getaddrinfo(local, NULL, NULL, &ib->conn.src_addr);
|
||||
ret = getaddrinfo(local, (char *)"13337", NULL, &ib->conn.src_addr);
|
||||
if(ret) {
|
||||
error("Failed to resolve local address '%s' of node %s: %s",
|
||||
local, node_name(n), gai_strerror(ret));
|
||||
|
@ -259,6 +253,9 @@ int ib_parse(struct node *n, json_t *cfg)
|
|||
error("Failed to translate rdma_port_space in node %s. %s is not a valid \
|
||||
port space supported by rdma_cma.h!", node_name(n), port_space);
|
||||
}
|
||||
|
||||
// Set timeout
|
||||
ib->conn.timeout = timeout;
|
||||
|
||||
// Translate poll mode
|
||||
if(strcmp(poll_mode, "EVENT") == 0) ib->poll.poll_mode = EVENT;
|
||||
|
@ -269,33 +266,36 @@ int ib_parse(struct node *n, json_t *cfg)
|
|||
}
|
||||
|
||||
// Set completion queue size
|
||||
ib->init.cq_size = cq_size;
|
||||
ib->cq_size = cq_size;
|
||||
|
||||
// Translate QP type
|
||||
if(strcmp(qp_type, "IBV_QPT_RC") == 0) ib->init.qp_type = IBV_QPT_RC;
|
||||
else if(strcmp(qp_type, "IBV_QPT_UC") == 0) ib->init.qp_type = IBV_QPT_UC;
|
||||
else if(strcmp(qp_type, "IBV_QPT_UD") == 0) ib->init.qp_type = IBV_QPT_UD;
|
||||
if(strcmp(qp_type, "IBV_QPT_RC") == 0) ib->qp_init.qp_type = IBV_QPT_RC;
|
||||
else if(strcmp(qp_type, "IBV_QPT_UC") == 0) ib->qp_init.qp_type = IBV_QPT_UC;
|
||||
else if(strcmp(qp_type, "IBV_QPT_UD") == 0) ib->qp_init.qp_type = IBV_QPT_UD;
|
||||
else {
|
||||
error("Failed to translate qp_type in node %s. %s is not a valid \
|
||||
qp_type!", node_name(n), qp_type);
|
||||
}
|
||||
|
||||
//Set max. send and receive Work Requests
|
||||
ib->init.max_send_wr = max_send_wr;
|
||||
ib->init.max_recv_wr = max_recv_wr;
|
||||
|
||||
// Set max. send and receive Work Requests
|
||||
ib->qp_init.cap.max_send_wr = max_send_wr;
|
||||
ib->qp_init.cap.max_recv_wr = max_recv_wr;
|
||||
|
||||
// Set remaining QP attributes
|
||||
ib->qp_init.cap.max_send_sge = 1;
|
||||
ib->qp_init.cap.max_recv_sge = 1;
|
||||
|
||||
//Check if node is a source and connect to target
|
||||
if(remote)
|
||||
{
|
||||
ib->is_source = 1;
|
||||
|
||||
// Translate address info
|
||||
ret = getaddrinfo(remote, NULL, NULL, &ib->conn.dst_addr);
|
||||
ret = getaddrinfo(remote, (char *)"13337", NULL, &ib->conn.dst_addr);
|
||||
if(ret) {
|
||||
error("Failed to resolve remote address '%s' of node %s: %s",
|
||||
remote, node_name(n), gai_strerror(ret));
|
||||
}
|
||||
|
||||
}
|
||||
else
|
||||
ib->is_source = 0;
|
||||
|
@ -331,8 +331,7 @@ int ib_start(struct node *n)
|
|||
error("Failed to create rdma_cm_id of node %s: %s",
|
||||
node_name(n), gai_strerror(ret));
|
||||
}
|
||||
info("Succesfully created CM RDMA ID of node %s",
|
||||
node_name(n));
|
||||
info("Succesfully created rdma_cm_id.");
|
||||
|
||||
// Bind rdma_cm_id to the HCA
|
||||
ret = rdma_bind_addr(ib->id, ib->conn.src_addr->ai_addr);
|
||||
|
@ -340,8 +339,7 @@ int ib_start(struct node *n)
|
|||
error("Failed to bind to local device of node %s: %s",
|
||||
node_name(n), gai_strerror(ret));
|
||||
}
|
||||
info("Bound to Infiniband device of node %s",
|
||||
node_name(n));
|
||||
info("Bound rdma_cm_id to Infiniband device.");
|
||||
|
||||
if(ib->is_source)
|
||||
{
|
||||
|
@ -354,19 +352,27 @@ int ib_start(struct node *n)
|
|||
error("Failed to resolve remote address after %ims of node %s: %s",
|
||||
ib->conn.timeout, node_name(n), gai_strerror(ret));
|
||||
}
|
||||
|
||||
}
|
||||
else
|
||||
{
|
||||
// Listen on rdma_cm_id for events
|
||||
ret = rdma_listen(ib->id, 10);
|
||||
if(ret) {
|
||||
error("Failed to listen to rdma_cm_id on node %s", node_name(n));
|
||||
}
|
||||
}
|
||||
|
||||
// Several events should occur on the event channel, to make
|
||||
// sure the nodes are succesfully connected.
|
||||
info("Starting to monitor events on rdma_cm_id on node %s.",
|
||||
node_name(n));
|
||||
info("Starting to monitor events on rdma_cm_id.");
|
||||
|
||||
while(rdma_get_cm_event(ib->ec, &event) == 0)
|
||||
{
|
||||
struct rdma_cm_event event_copy;
|
||||
memcpy(&event_copy, event, sizeof(*event));
|
||||
|
||||
rdma_ack_cm_event(event);
|
||||
|
||||
if(ib_event(n, &event_copy))
|
||||
break;
|
||||
}
|
||||
|
@ -397,6 +403,11 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt)
|
|||
|
||||
int ib_write(struct node *n, struct sample *smps[], unsigned cnt)
|
||||
{
|
||||
|
||||
for(int i = 0; i < smps[0]->length; i++)
|
||||
{
|
||||
printf("Sample %i: %f\n", i, smps[0]->data[i].f);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue