diff --git a/include/villas/nodes/infiniband.h b/include/villas/nodes/infiniband.h index 6ebe37c8d..ae0928eb6 100644 --- a/include/villas/nodes/infiniband.h +++ b/include/villas/nodes/infiniband.h @@ -62,21 +62,17 @@ struct infiniband { struct connection_s { struct addrinfo *src_addr; struct addrinfo *dst_addr; - const int timeout; + int timeout; enum rdma_port_space port_space; struct ibv_mr *mr_payload; struct r_addr_key_s *r_addr_key; } conn; - struct init_s { - int cq_size; - enum ibv_qp_type qp_type; - int max_send_wr; - int max_recv_wr; - } init; + struct ibv_qp_init_attr qp_init; int is_source; + int cq_size; }; /** @see node_type::reverse */ diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index da0456781..e51c1bd90 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -33,7 +33,7 @@ static void ib_create_busy_poll(struct node *n, struct rdma_cm_id *id) struct infiniband *ib = (struct infiniband *) n->_vd; // Create completion queue and bind to channel - ib->ctx.cq = ibv_create_cq(ib->id->verbs, ib->init.cq_size, NULL, NULL, 0); + ib->ctx.cq = ibv_create_cq(ib->id->verbs, ib->cq_size, NULL, NULL, 0); if(!ib->ctx.cq) error("Could not create completion queue in node %s.", node_name(n)); @@ -52,7 +52,7 @@ static void ib_create_event(struct node *n, struct rdma_cm_id *id) // Create completion queue and bind to channel ib->ctx.cq = ibv_create_cq(ib->id->verbs, - ib->init.cq_size, + ib->cq_size, NULL, ib->ctx.comp_channel, 0); @@ -89,25 +89,18 @@ static void ib_build_ibv(struct node *n, struct rdma_cm_id *id) break; } - // Prepare Queue Pair (QP) attributes - struct ibv_qp_init_attr qp_attr; - qp_attr.send_cq = ib->ctx.cq; - qp_attr.recv_cq = ib->ctx.cq; - qp_attr.qp_type = ib->init.qp_type; - - qp_attr.cap.max_send_wr = ib->init.max_send_wr; - qp_attr.cap.max_recv_wr = ib->init.max_recv_wr; - qp_attr.cap.max_send_sge = 1; - qp_attr.cap.max_recv_sge = 1; + // Prepare remaining Queue Pair (QP) attributes + ib->qp_init.send_cq = ib->ctx.cq; + ib->qp_init.recv_cq = ib->ctx.cq; //ToDo: Set maximum inline data // Create the actual QP - ret = rdma_create_qp(id, ib->ctx.pd, &qp_attr); + ret = rdma_create_qp(id, ib->ctx.pd, &ib->qp_init); if(ret) error("Failed to create Queue Pair in node %s.", node_name(n)); - info("Successfully created Queue Pair in node %s.", node_name(n)); + info("Successfully created Queue Pair."); } static int ib_addr_resolved(struct node *n, struct rdma_cm_id *id) @@ -115,6 +108,8 @@ static int ib_addr_resolved(struct node *n, struct rdma_cm_id *id) struct infiniband *ib = (struct infiniband *) n->_vd; int ret; + info("Successfully resolved address."); + // Build all components from IB Verbs ib_build_ibv(n, id); @@ -123,8 +118,6 @@ static int ib_addr_resolved(struct node *n, struct rdma_cm_id *id) if(ret) error("Failed to resolve route in node %s.", node_name(n)); - info("Successfully resolved address node %s", node_name(n)); - //ToDo: create check if data can be send inline return 0; @@ -134,7 +127,7 @@ static int ib_route_resolved(struct node *n, struct rdma_cm_id *id) { int ret; - ib_build_ibv(n, id); + info("Successfully resolved route."); //ToDo: Post receive WRs @@ -146,7 +139,7 @@ static int ib_route_resolved(struct node *n, struct rdma_cm_id *id) if(ret) error("Failed to connect in node %s.", node_name(n)); - info("Route resolved and called rdma_connect"); + info("Called rdma_connect."); return 0; } @@ -197,6 +190,7 @@ static int ib_event(struct node *n, struct rdma_cm_event *event) case RDMA_CM_EVENT_REJECTED: error("Connection request or response was rejected by the remote end point!"); case RDMA_CM_EVENT_ESTABLISHED: + info("Connection established!"); ret = 1; break; default: @@ -244,7 +238,7 @@ int ib_parse(struct node *n, json_t *cfg) jerror(&err, "Failed to parse configuration of node %s", node_name(n)); // Translate IP:PORT to a struct addrinfo - ret = getaddrinfo(local, NULL, NULL, &ib->conn.src_addr); + ret = getaddrinfo(local, (char *)"13337", NULL, &ib->conn.src_addr); if(ret) { error("Failed to resolve local address '%s' of node %s: %s", local, node_name(n), gai_strerror(ret)); @@ -259,6 +253,9 @@ int ib_parse(struct node *n, json_t *cfg) error("Failed to translate rdma_port_space in node %s. %s is not a valid \ port space supported by rdma_cma.h!", node_name(n), port_space); } + + // Set timeout + ib->conn.timeout = timeout; // Translate poll mode if(strcmp(poll_mode, "EVENT") == 0) ib->poll.poll_mode = EVENT; @@ -269,33 +266,36 @@ int ib_parse(struct node *n, json_t *cfg) } // Set completion queue size - ib->init.cq_size = cq_size; + ib->cq_size = cq_size; // Translate QP type - if(strcmp(qp_type, "IBV_QPT_RC") == 0) ib->init.qp_type = IBV_QPT_RC; - else if(strcmp(qp_type, "IBV_QPT_UC") == 0) ib->init.qp_type = IBV_QPT_UC; - else if(strcmp(qp_type, "IBV_QPT_UD") == 0) ib->init.qp_type = IBV_QPT_UD; + if(strcmp(qp_type, "IBV_QPT_RC") == 0) ib->qp_init.qp_type = IBV_QPT_RC; + else if(strcmp(qp_type, "IBV_QPT_UC") == 0) ib->qp_init.qp_type = IBV_QPT_UC; + else if(strcmp(qp_type, "IBV_QPT_UD") == 0) ib->qp_init.qp_type = IBV_QPT_UD; else { error("Failed to translate qp_type in node %s. %s is not a valid \ qp_type!", node_name(n), qp_type); } - //Set max. send and receive Work Requests - ib->init.max_send_wr = max_send_wr; - ib->init.max_recv_wr = max_recv_wr; - + // Set max. send and receive Work Requests + ib->qp_init.cap.max_send_wr = max_send_wr; + ib->qp_init.cap.max_recv_wr = max_recv_wr; + + // Set remaining QP attributes + ib->qp_init.cap.max_send_sge = 1; + ib->qp_init.cap.max_recv_sge = 1; + //Check if node is a source and connect to target if(remote) { ib->is_source = 1; // Translate address info - ret = getaddrinfo(remote, NULL, NULL, &ib->conn.dst_addr); + ret = getaddrinfo(remote, (char *)"13337", NULL, &ib->conn.dst_addr); if(ret) { error("Failed to resolve remote address '%s' of node %s: %s", remote, node_name(n), gai_strerror(ret)); } - } else ib->is_source = 0; @@ -331,8 +331,7 @@ int ib_start(struct node *n) error("Failed to create rdma_cm_id of node %s: %s", node_name(n), gai_strerror(ret)); } - info("Succesfully created CM RDMA ID of node %s", - node_name(n)); + info("Succesfully created rdma_cm_id."); // Bind rdma_cm_id to the HCA ret = rdma_bind_addr(ib->id, ib->conn.src_addr->ai_addr); @@ -340,8 +339,7 @@ int ib_start(struct node *n) error("Failed to bind to local device of node %s: %s", node_name(n), gai_strerror(ret)); } - info("Bound to Infiniband device of node %s", - node_name(n)); + info("Bound rdma_cm_id to Infiniband device."); if(ib->is_source) { @@ -354,19 +352,27 @@ int ib_start(struct node *n) error("Failed to resolve remote address after %ims of node %s: %s", ib->conn.timeout, node_name(n), gai_strerror(ret)); } - + } + else + { + // Listen on rdma_cm_id for events + ret = rdma_listen(ib->id, 10); + if(ret) { + error("Failed to listen to rdma_cm_id on node %s", node_name(n)); + } } // Several events should occur on the event channel, to make // sure the nodes are succesfully connected. - info("Starting to monitor events on rdma_cm_id on node %s.", - node_name(n)); + info("Starting to monitor events on rdma_cm_id."); while(rdma_get_cm_event(ib->ec, &event) == 0) { struct rdma_cm_event event_copy; memcpy(&event_copy, event, sizeof(*event)); + rdma_ack_cm_event(event); + if(ib_event(n, &event_copy)) break; } @@ -397,6 +403,11 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt) int ib_write(struct node *n, struct sample *smps[], unsigned cnt) { + + for(int i = 0; i < smps[0]->length; i++) + { + printf("Sample %i: %f\n", i, smps[0]->data[i].f); + } return 0; }