diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index f12bfa215..b24db19ac 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -35,57 +35,57 @@ int ib_cleanup(struct node *n) { - struct infiniband *ib = (struct infiniband *) n->_vd; - info("Starting to clean up"); + struct infiniband *ib = (struct infiniband *) n->_vd; + info("Starting to clean up"); - // Destroy QP - rdma_destroy_qp(ib->ctx.id); - info("Destroyed QP"); + // Destroy QP + rdma_destroy_qp(ib->ctx.id); + info("Destroyed QP"); - // Deregister memory regions - ibv_dereg_mr(ib->mem.mr_recv); - if(ib->is_source) - ibv_dereg_mr(ib->mem.mr_send); - info("Deregistered memory regions"); + // Deregister memory regions + ibv_dereg_mr(ib->mem.mr_recv); + if(ib->is_source) + ibv_dereg_mr(ib->mem.mr_send); + info("Deregistered memory regions"); - // Destroy pools - pool_destroy(&ib->mem.p_recv); - pool_destroy(&ib->mem.p_send); - info("Destroyed memory pools"); + // Destroy pools + pool_destroy(&ib->mem.p_recv); + pool_destroy(&ib->mem.p_send); + info("Destroyed memory pools"); - // Destroy RDMA CM ID - rdma_destroy_id(ib->ctx.id); - info("Destroyed rdma_cm_id"); + // Destroy RDMA CM ID + rdma_destroy_id(ib->ctx.id); + info("Destroyed rdma_cm_id"); - // Destroy event channel - rdma_destroy_event_channel(ib->ctx.ec); - info("Destroyed event channel"); + // Destroy event channel + rdma_destroy_event_channel(ib->ctx.ec); + info("Destroyed event channel"); - return 0; + return 0; } int ib_post_recv_wrs(struct node *n) { - struct infiniband *ib = (struct infiniband *) n->_vd; - struct ibv_recv_wr wr, *bad_wr = NULL; - int ret; - struct ibv_sge sge; + struct infiniband *ib = (struct infiniband *) n->_vd; + struct ibv_recv_wr wr, *bad_wr = NULL; + int ret; + struct ibv_sge sge; - // Prepare receive Scatter/Gather element - sge.addr = (uintptr_t)pool_get(&ib->mem.p_recv); - sge.length = ib->mem.p_recv.blocksz; - sge.lkey = ib->mem.mr_recv->lkey; + // Prepare receive Scatter/Gather element + sge.addr = (uintptr_t)pool_get(&ib->mem.p_recv); + sge.length = ib->mem.p_recv.blocksz; + sge.lkey = ib->mem.mr_recv->lkey; - // Prepare a receive Work Request - wr.wr_id = (uintptr_t)sge.addr; - wr.next = NULL; - wr.sg_list = &sge; - wr.num_sge = 1; + // Prepare a receive Work Request + wr.wr_id = (uintptr_t)sge.addr; + wr.next = NULL; + wr.sg_list = &sge; + wr.num_sge = 1; - // Post Work Request - ret = ibv_post_recv(ib->ctx.id->qp, &wr, &bad_wr); + // Post Work Request + ret = ibv_post_recv(ib->ctx.id->qp, &wr, &bad_wr); - return ret; + return ret; } void ib_completion_target(struct node* n, struct ibv_wc* wc, int* size){} @@ -108,33 +108,30 @@ void ib_completion_source(struct node* n, struct ibv_wc* wc, int* size) warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i", node_name(n), wc[i].status); } - else - { - // Release sample - sample_put((struct sample*)(wc[i].wr_id)); - } + + sample_put((struct sample*)(wc[i].wr_id)); } } void * ib_event_thread(void *n) { - struct infiniband *ib = (struct infiniband *)((struct node *)n)->_vd; - struct ibv_wc wc[ib->cq_size]; - int size; + struct infiniband *ib = (struct infiniband *)((struct node *)n)->_vd; + struct ibv_wc wc[ib->cq_size]; + int size; - while(1) - { - // Function blocks, until an event occurs - ibv_get_cq_event(ib->ctx.comp_channel, &ib->ctx.cq, NULL); + while(1) + { + // Function blocks, until an event occurs + ibv_get_cq_event(ib->ctx.comp_channel, &ib->ctx.send_cq, NULL); - // Poll as long as WCs are available - while((size = ibv_poll_cq(ib->ctx.cq, ib->cq_size, wc))) - ib->poll.on_compl(n, wc, &size); + // Poll as long as WCs are available + while((size = ibv_poll_cq(ib->ctx.send_cq, ib->cq_size, wc))) + ib->poll.on_compl(n, wc, &size); - // Request a new event in the CQ and acknowledge event - ibv_req_notify_cq(ib->ctx.cq, 0); - ibv_ack_cq_events(ib->ctx.cq, 1); - } + // Request a new event in the CQ and acknowledge event + ibv_req_notify_cq(ib->ctx.send_cq, 0); + ibv_ack_cq_events(ib->ctx.send_cq, 1); + } } void * ib_busy_poll_thread(void *n) @@ -146,7 +143,7 @@ void * ib_busy_poll_thread(void *n) while(1) { // Poll as long as WCs are available - while((size = ibv_poll_cq(ib->ctx.cq, ib->cq_size, wc))) + while((size = ibv_poll_cq(ib->ctx.send_cq, ib->cq_size, wc))) ib->poll.on_compl(n, wc, &size); if(ib->poll.stopThread) @@ -168,19 +165,27 @@ static void ib_init_wc_poll(struct node *n) error("Could not create completion channel in node %s.", node_name(n)); } - // Create completion queue and bind to channel (or NULL) - ib->ctx.cq = ibv_create_cq(ib->ctx.id->verbs, + // Create completion queues and bind to channel (or NULL) + ib->ctx.recv_cq = ibv_create_cq(ib->ctx.id->verbs, + ib->cq_size, + NULL, + NULL, + 0); + if(!ib->ctx.recv_cq) + error("Could not create receive completion queue in node %s.", node_name(n)); + + ib->ctx.send_cq = ibv_create_cq(ib->ctx.id->verbs, ib->cq_size, NULL, ib->ctx.comp_channel, 0); - if(!ib->ctx.cq) - error("Could not create completion queue in node %s.", node_name(n)); + if(!ib->ctx.send_cq) + error("Could not create send completion queue in node %s.", node_name(n)); if(ib->poll.poll_mode == EVENT) { // Request notifications from completion queue - ret = ibv_req_notify_cq(ib->ctx.cq, 0); + ret = ibv_req_notify_cq(ib->ctx.send_cq, 0); if(ret) error("Failed to request notifiy CQ in node %s: %s", node_name(n), gai_strerror(ret)); @@ -214,8 +219,8 @@ static void ib_build_ibv(struct node *n) ib_init_wc_poll(n); // Prepare remaining Queue Pair (QP) attributes - ib->qp_init.send_cq = ib->ctx.cq; - ib->qp_init.recv_cq = ib->ctx.cq; + ib->qp_init.send_cq = ib->ctx.send_cq; + ib->qp_init.recv_cq = ib->ctx.recv_cq; //ToDo: Set maximum inline data @@ -306,64 +311,64 @@ static void ib_build_ibv(struct node *n) static int ib_addr_resolved(struct node *n) { - struct infiniband *ib = (struct infiniband *) n->_vd; - int ret; + struct infiniband *ib = (struct infiniband *) n->_vd; + int ret; - info("Successfully resolved address."); + info("Successfully resolved address."); - // Build all components from IB Verbs - ib_build_ibv(n); + // Build all components from IB Verbs + ib_build_ibv(n); - // Resolve address - ret = rdma_resolve_route(ib->ctx.id, ib->conn.timeout); - if(ret) - error("Failed to resolve route in node %s.", node_name(n)); + // Resolve address + ret = rdma_resolve_route(ib->ctx.id, ib->conn.timeout); + if(ret) + error("Failed to resolve route in node %s.", node_name(n)); - return 0; + return 0; } static int ib_route_resolved(struct node *n) { - struct infiniband *ib = (struct infiniband *) n->_vd; - int ret; + struct infiniband *ib = (struct infiniband *) n->_vd; + int ret; - info("Successfully resolved route."); + info("Successfully resolved route."); - //ToDo: Post receive WRs + //ToDo: Post receive WRs - struct rdma_conn_param cm_params; - memset(&cm_params, 0, sizeof(cm_params)); + struct rdma_conn_param cm_params; + memset(&cm_params, 0, sizeof(cm_params)); - // Send connection request - ret = rdma_connect(ib->ctx.id, &cm_params); - if(ret) - error("Failed to connect in node %s.", node_name(n)); + // Send connection request + ret = rdma_connect(ib->ctx.id, &cm_params); + if(ret) + error("Failed to connect in node %s.", node_name(n)); - info("Called rdma_connect."); + info("Called rdma_connect."); - return 0; + return 0; } static int ib_connect_request(struct node *n, struct rdma_cm_id *id) { - struct infiniband *ib = (struct infiniband *) n->_vd; - int ret; - info("Received a connection request!"); + struct infiniband *ib = (struct infiniband *) n->_vd; + int ret; + info("Received a connection request!"); - ib->ctx.id = id; - ib_build_ibv(n); + ib->ctx.id = id; + ib_build_ibv(n); - struct rdma_conn_param cm_params; - memset(&cm_params, 0, sizeof(cm_params)); + struct rdma_conn_param cm_params; + memset(&cm_params, 0, sizeof(cm_params)); - // Accept connection request - ret = rdma_accept(ib->ctx.id, &cm_params); - if(ret) - error("Failed to connect in node %s.", node_name(n)); + // Accept connection request + ret = rdma_accept(ib->ctx.id, &cm_params); + if(ret) + error("Failed to connect in node %s.", node_name(n)); - info("Successfully accepted connection request."); + info("Successfully accepted connection request."); - return 0; + return 0; } static int ib_event(struct node *n, struct rdma_cm_event *event) @@ -463,6 +468,8 @@ int ib_parse(struct node *n, json_t *cfg) // Set timeout ib->conn.timeout = timeout; + n->in.vectorize = 256; + // Translate poll mode if(strcmp(poll_mode, "EVENT") == 0) { @@ -660,62 +667,97 @@ int ib_start(struct node *n) int ib_stop(struct node *n) { - struct infiniband *ib = (struct infiniband *) n->_vd; - struct rdma_cm_event *event = NULL; - int ret; + struct infiniband *ib = (struct infiniband *) n->_vd; + struct rdma_cm_event *event = NULL; + int ret; - // Call RDMA disconnect function - // Will flush all outstanding WRs to the Completion Queue and - // will call RDMA_CM_EVENT_DISCONNECTED if that is done. - ret = rdma_disconnect(ib->ctx.id); - if(ret) - { - error("Error while calling rdma_disconnect in node %s: %s", - node_name(n), gai_strerror(ret)); - } - info("Called rdma_disconnect."); + // Call RDMA disconnect function + // Will flush all outstanding WRs to the Completion Queue and + // will call RDMA_CM_EVENT_DISCONNECTED if that is done. + ret = rdma_disconnect(ib->ctx.id); + if(ret) + { + error("Error while calling rdma_disconnect in node %s: %s", + node_name(n), gai_strerror(ret)); + } + info("Called rdma_disconnect."); - // If disconnected event already occured, directly call cleanup function - if(ib->conn.rdma_disconnect_called) - { - ib_cleanup(n); - } - // Else, wait for event to occur - else - { - ib->conn.rdma_disconnect_called = 1; - rdma_get_cm_event(ib->ctx.ec, &event); + // If disconnected event already occured, directly call cleanup function + if(ib->conn.rdma_disconnect_called) + { + ib_cleanup(n); + } + // Else, wait for event to occur + else + { + ib->conn.rdma_disconnect_called = 1; + rdma_get_cm_event(ib->ctx.ec, &event); - rdma_ack_cm_event(event); + rdma_ack_cm_event(event); - ib_event(n, event); - } + ib_event(n, event); + } - return 0; + return 0; } int ib_init(struct super_node *n) { - - return 0; + return 0; } int ib_deinit() { - return 0; + return 0; } int ib_read(struct node *n, struct sample *smps[], unsigned cnt) { struct infiniband *ib = (struct infiniband *) n->_vd; struct ibv_wc wc[n->in.vectorize]; - struct ibv_recv_wr wr, *bad_wr = NULL; - struct ibv_sge sge; + struct ibv_recv_wr wr[cnt], *bad_wr = NULL; + struct ibv_sge sge[cnt]; struct ibv_mr ** mr; struct pool *p; int ret; - ret = ibv_poll_cq(ib->ctx.cq, n->in.vectorize, wc); + if(ib->conn.available_recv_wrs <= ib->qp_init.cap.max_recv_wr && cnt==n->in.vectorize) + { + // Get Memory Region + p = sample_pool(smps[0]); + mr = (struct ibv_mr **)((char *)(p)+p->buffer_off-8); + + for(int i=0; idata; + sge[i].length = SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN); + sge[i].lkey = (*mr)->lkey; + + // Prepare a receive Work Request + wr[i].wr_id = (uintptr_t)smps[i]; + wr[i].next = &wr[i+1]; + wr[i].sg_list = &sge[i]; + wr[i].num_sge = 1; + + ib->conn.available_recv_wrs++; + + if(ib->conn.available_recv_wrs == ib->qp_init.cap.max_recv_wr || i==(cnt-1)) + { + wr[i].next = NULL; + break; + } + } + // Post list of Work Requests + ret = ibv_post_recv(ib->ctx.id->qp, &wr[0], &bad_wr); + + } + + // Poll Completion Queue + ret = ibv_poll_cq(ib->ctx.recv_cq, n->in.vectorize, wc); if(ret) { @@ -723,50 +765,20 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt) for(int i=0; ilength = wc[i].byte_len/sizeof(double); - smps[i]->capacity = DEFAULT_SAMPLELEN; - - //Release sample - sample_put(smps[i]); - } + else + ret = 0; + + //Release sample + sample_put((struct sample*)(wc[i].wr_id)); } } - else if(ib->conn.available_recv_wrs < ib->qp_init.cap.max_recv_wr) - { - // No data received? Put new receive Work Requests to Receive Queue - // Get Memory Region - p = sample_pool(smps[0]); - mr = (struct ibv_mr **)((char *)(p)+p->buffer_off-8); - - // Increase refcnt of sample - sample_get(smps[0]); - - // Prepare receive Scatter/Gather element - sge.addr = (uint64_t)&smps[0]->data; - sge.length = SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN); - sge.lkey = (*mr)->lkey; - - // Prepare a receive Work Request - wr.wr_id = (uintptr_t)smps[0]; - wr.next = NULL; - wr.sg_list = &sge; - wr.num_sge = 1; - - // Post Work Request - ret = ibv_post_recv(ib->ctx.id->qp, &wr, &bad_wr); - - ib->conn.available_recv_wrs++; - } return ret; } @@ -783,7 +795,7 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt) memset(&wr, 0, sizeof(wr)); //ToDo: Place this into configuration and create checks if settings are valid - int send_inline = 0; + int send_inline = 1; // Get Memory Region p = sample_pool(smps[0]); @@ -818,10 +830,10 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt) ret = ibv_post_send(ib->ctx.id->qp, wr, &bad_wr); if(ret) { - error("Failed to send message in node %s: %s", - node_name(n), gai_strerror(ret)); + error("Failed to send message in node %s: %s", + node_name(n), gai_strerror(ret)); - return -ret; + return -ret; } return cnt; @@ -829,29 +841,29 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt) int ib_fd(struct node *n) { - return 0; + return 0; } static struct plugin p = { - .name = "infiniband", - .description = "Infiniband", - .type = PLUGIN_TYPE_NODE, - .node = { - .vectorize = 0, - .size = sizeof(struct infiniband), - .reverse = ib_reverse, - .parse = ib_parse, - .print = ib_print, - .start = ib_start, - .destroy = ib_destroy, - .stop = ib_stop, - .init = ib_init, - .deinit = ib_deinit, - .read = ib_read, - .write = ib_write, - .fd = ib_fd, - .memtype = ib_memtype - } + .name = "infiniband", + .description = "Infiniband", + .type = PLUGIN_TYPE_NODE, + .node = { + .vectorize = 0, + .size = sizeof(struct infiniband), + .reverse = ib_reverse, + .parse = ib_parse, + .print = ib_print, + .start = ib_start, + .destroy = ib_destroy, + .stop = ib_stop, + .init = ib_init, + .deinit = ib_deinit, + .read = ib_read, + .write = ib_write, + .fd = ib_fd, + .memtype = ib_memtype + } }; REGISTER_PLUGIN(&p)