diff --git a/include/villas/memory/ib.h b/include/villas/memory/ib.h index 6ffa8c98f..2f4597657 100644 --- a/include/villas/memory/ib.h +++ b/include/villas/memory/ib.h @@ -28,4 +28,4 @@ struct memory_ib { struct memory_type *parent; }; -struct ibv_mr* memory_ib_get_mr(struct sample *smps); +struct ibv_mr * memory_ib_get_mr(struct sample *smps); diff --git a/include/villas/nodes/infiniband.h b/include/villas/nodes/infiniband.h index 07e11cd30..0e0b3136c 100644 --- a/include/villas/nodes/infiniband.h +++ b/include/villas/nodes/infiniband.h @@ -35,13 +35,12 @@ #include #include -/* Function poitner typedefs */ -typedef void (*ib_on_completion)(struct node*, struct ibv_wc*, int*); -typedef void* (*ib_poll_function)(void*); +/* Function pointer typedefs */ +typedef void (*ib_on_completion) (struct node*, struct ibv_wc*, int*); +typedef void * (*ib_poll_function) (void*); /* Enums */ -enum poll_mode_e -{ +enum poll_mode_e { EVENT, BUSY }; @@ -63,6 +62,7 @@ struct infiniband { struct ibv_cq *send_cq; struct ibv_comp_channel *comp_channel; } ctx; + /* Work Completion related */ struct poll_s { enum poll_mode_e poll_mode; @@ -108,7 +108,7 @@ struct infiniband { /* Misc settings */ int is_source; -int cq_size; + int cq_size; }; /** @see node_type::reverse */ diff --git a/lib/memory/ib.c b/lib/memory/ib.c index a810691cf..6cf0dbc66 100644 --- a/lib/memory/ib.c +++ b/lib/memory/ib.c @@ -35,7 +35,7 @@ struct ibv_mr * memory_ib_get_mr(struct sample *smps) p = sample_pool(smps); - ma = memory_get_allocation((char *)(p)+p->buffer_off); + ma = memory_get_allocation((char *) (p) + p->buffer_off); mr = ma->ib.mr; return mr; } diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index 8e35952a9..23d01e094 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -45,7 +45,7 @@ int ib_cleanup(struct node *n) // Deregister memory regions ibv_dereg_mr(ib->mem.mr_recv); - if(ib->is_source) + if (ib->is_source) ibv_dereg_mr(ib->mem.mr_send); info("Deregistered memory regions"); @@ -73,12 +73,12 @@ int ib_post_recv_wrs(struct node *n) struct ibv_sge sge; // Prepare receive Scatter/Gather element - sge.addr = (uintptr_t)pool_get(&ib->mem.p_recv); + sge.addr = (uintptr_t) pool_get(&ib->mem.p_recv); sge.length = ib->mem.p_recv.blocksz; sge.lkey = ib->mem.mr_recv->lkey; // Prepare a receive Work Request - wr.wr_id = (uintptr_t)sge.addr; + wr.wr_id = (uintptr_t) sge.addr; wr.next = NULL; wr.sg_list = &sge; wr.num_sge = 1; @@ -93,40 +93,35 @@ void ib_completion_target(struct node* n, struct ibv_wc* wc, int* size){} void ib_completion_source(struct node* n, struct ibv_wc* wc, int* size) { - struct infiniband *ib = (struct infiniband *)((struct node *)n)->_vd; + struct infiniband *ib = (struct infiniband *) ((struct node *) n)->_vd; - for(int i=0; i<*size; i++) - { + for (int i = 0; i < *size; i++) { //On disconnect, the QP set to error state and will be flushed - if(wc[i].status == IBV_WC_WR_FLUSH_ERR) - { + if (wc[i].status == IBV_WC_WR_FLUSH_ERR) { ib->poll.stopThread = 1; return; } - if(wc[i].status != IBV_WC_SUCCESS) - { + if (wc[i].status != IBV_WC_SUCCESS) warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i", node_name(n), wc[i].status); - } - sample_put((struct sample*)(wc[i].wr_id)); + sample_put((struct sample*) wc[i].wr_id); } } void * ib_event_thread(void *n) { - struct infiniband *ib = (struct infiniband *)((struct node *)n)->_vd; + struct infiniband *ib = (struct infiniband *) ((struct node *) n)->_vd; struct ibv_wc wc[ib->cq_size]; int size; - while(1) - { + while (1) { // Function blocks, until an event occurs ibv_get_cq_event(ib->ctx.comp_channel, &ib->ctx.send_cq, NULL); // Poll as long as WCs are available - while((size = ibv_poll_cq(ib->ctx.send_cq, ib->cq_size, wc))) + while ((size = ibv_poll_cq(ib->ctx.send_cq, ib->cq_size, wc))) ib->poll.on_compl(n, wc, &size); // Request a new event in the CQ and acknowledge event @@ -137,17 +132,16 @@ void * ib_event_thread(void *n) void * ib_busy_poll_thread(void *n) { - struct infiniband *ib = (struct infiniband *)((struct node *)n)->_vd; + struct infiniband *ib = (struct infiniband *) ((struct node *) n)->_vd; struct ibv_wc wc[ib->cq_size]; int size; - while(1) - { + while (1) { // Poll as long as WCs are available - while((size = ibv_poll_cq(ib->ctx.send_cq, ib->cq_size, wc))) + while ((size = ibv_poll_cq(ib->ctx.send_cq, ib->cq_size, wc))) ib->poll.on_compl(n, wc, &size); - if(ib->poll.stopThread) + if (ib->poll.stopThread) return NULL; } } @@ -158,11 +152,10 @@ static void ib_init_wc_poll(struct node *n) struct infiniband *ib = (struct infiniband *) n->_vd; ib->ctx.comp_channel = NULL; - if(ib->poll.poll_mode == EVENT) - { + if (ib->poll.poll_mode == EVENT) { // Create completion channel ib->ctx.comp_channel = ibv_create_comp_channel(ib->ctx.id->verbs); - if(!ib->ctx.comp_channel) + if (!ib->ctx.comp_channel) error("Could not create completion channel in node %s.", node_name(n)); } @@ -172,7 +165,7 @@ static void ib_init_wc_poll(struct node *n) NULL, NULL, 0); - if(!ib->ctx.recv_cq) + if (!ib->ctx.recv_cq) error("Could not create receive completion queue in node %s.", node_name(n)); ib->ctx.send_cq = ibv_create_cq(ib->ctx.id->verbs, @@ -180,27 +173,23 @@ static void ib_init_wc_poll(struct node *n) NULL, ib->ctx.comp_channel, 0); - if(!ib->ctx.send_cq) + if (!ib->ctx.send_cq) error("Could not create send completion queue in node %s.", node_name(n)); - if(ib->poll.poll_mode == EVENT) - { + if (ib->poll.poll_mode == EVENT) { // Request notifications from completion queue ret = ibv_req_notify_cq(ib->ctx.send_cq, 0); - if(ret) + if (ret) error("Failed to request notifiy CQ in node %s: %s", node_name(n), gai_strerror(ret)); } // Initialize polling pthread for source - if(ib->is_source) - { + if (ib->is_source) { ret = pthread_create(&ib->poll.cq_poller_thread, NULL, ib->poll.poll_func, n); - if(ret) - { + if (ret) error("Failed to create poll thread of node %s: %s", node_name(n), gai_strerror(ret)); - } } } @@ -211,7 +200,7 @@ static void ib_build_ibv(struct node *n) //Allocate protection domain ib->ctx.pd = ibv_alloc_pd(ib->ctx.id->verbs); - if(!ib->ctx.pd) + if (!ib->ctx.pd) error("Could not allocate protection domain in node %s.", node_name(n)); info("Allocated Protection Domain"); @@ -226,7 +215,7 @@ static void ib_build_ibv(struct node *n) // Create the actual QP ret = rdma_create_qp(ib->ctx.id, ib->ctx.pd, &ib->qp_init); - if(ret) + if (ret) error("Failed to create Queue Pair in node %s.", node_name(n)); info("Created Queue Pair with %i receive and %i send elements.", @@ -241,11 +230,9 @@ static void ib_build_ibv(struct node *n) ib->qp_init.cap.max_recv_wr, SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN), &memory_type_heap); - if(ret) - { + if (ret) error("Failed to init recv memory pool of node %s: %s", node_name(n), gai_strerror(ret)); - } //ToDo: initialize r_addr_key struct if mode is RDMA @@ -256,14 +243,12 @@ static void ib_build_ibv(struct node *n) (char*)&ib->mem.p_recv+ib->mem.p_recv.buffer_off, ib->mem.p_recv.len, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE); - if(!ib->mem.mr_recv) { + if (!ib->mem.mr_recv) error("Failed to register mr_recv with ibv_reg_mr of node %s.", node_name(n)); - } info("Allocated receive memory."); - if(ib->is_source) - { + if (ib->is_source) { ib->mem.p_send.state = STATE_DESTROYED; ib->mem.p_send.queue.state = STATE_DESTROYED; @@ -272,11 +257,9 @@ static void ib_build_ibv(struct node *n) ib->qp_init.cap.max_send_wr, sizeof(double), &memory_type_heap); - if(ret) - { + if (ret) error("Failed to init send memory of node %s: %s", node_name(n), gai_strerror(ret)); - } //ToDo: initialize r_addr_key struct if mode is RDMA @@ -287,10 +270,10 @@ static void ib_build_ibv(struct node *n) (char*)&ib->mem.p_send+ib->mem.p_send.buffer_off, ib->mem.p_send.len, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE); - if(!ib->mem.mr_send) { + if (!ib->mem.mr_send) error("Failed to register mr_send with ibv_reg_mr of node %s.", node_name(n)); - } + info("Allocated send memory."); } } @@ -307,7 +290,7 @@ static int ib_addr_resolved(struct node *n) // Resolve address ret = rdma_resolve_route(ib->ctx.id, ib->conn.timeout); - if(ret) + if (ret) error("Failed to resolve route in node %s.", node_name(n)); return 0; @@ -325,7 +308,7 @@ static int ib_route_resolved(struct node *n) // Send connection request ret = rdma_connect(ib->ctx.id, &cm_params); - if(ret) + if (ret) error("Failed to connect in node %s.", node_name(n)); info("Called rdma_connect."); @@ -347,7 +330,7 @@ static int ib_connect_request(struct node *n, struct rdma_cm_id *id) // Accept connection request ret = rdma_accept(ib->ctx.id, &cm_params); - if(ret) + if (ret) error("Failed to connect in node %s.", node_name(n)); info("Successfully accepted connection request."); @@ -359,8 +342,7 @@ static int ib_event(struct node *n, struct rdma_cm_event *event) { int ret = 0; - switch(event->event) - { + switch(event->event) { case RDMA_CM_EVENT_ADDR_RESOLVED: ret = ib_addr_resolved(n); break; @@ -386,8 +368,7 @@ static int ib_event(struct node *n, struct rdma_cm_event *event) ret = ib_cleanup(n); break; default: - error("Unknown event occurred: %u", - event->event); + error("Unknown event occurred: %u", event->event); } return ret; @@ -426,28 +407,26 @@ int ib_parse(struct node *n, json_t *cfg) "max_send_wr", &max_send_wr, "max_recv_wr", &max_recv_wr ); - if(ret) + if (ret) jerror(&err, "Failed to parse configuration of node %s", node_name(n)); // Translate IP:PORT to a struct addrinfo char* ip_adr = strtok(local, ":"); char* port = strtok(NULL, ":"); + ret = getaddrinfo(ip_adr, port, NULL, &ib->conn.src_addr); - if(ret) - { + if (ret) error("Failed to resolve local address '%s' of node %s: %s", local, node_name(n), gai_strerror(ret)); - } // Translate port space - if(strcmp(port_space, "RDMA_PS_IPOIB") == 0) ib->conn.port_space = RDMA_PS_IPOIB; - else if(strcmp(port_space, "RDMA_PS_TCP") == 0) ib->conn.port_space = RDMA_PS_TCP; - else if(strcmp(port_space, "RDMA_PS_UDP") == 0) ib->conn.port_space = RDMA_PS_UDP; - else if(strcmp(port_space, "RDMA_PS_IB") == 0) ib->conn.port_space = RDMA_PS_IB; - else { + if (strcmp(port_space, "RDMA_PS_IPOIB") == 0) ib->conn.port_space = RDMA_PS_IPOIB; + else if (strcmp(port_space, "RDMA_PS_TCP") == 0) ib->conn.port_space = RDMA_PS_TCP; + else if (strcmp(port_space, "RDMA_PS_UDP") == 0) ib->conn.port_space = RDMA_PS_UDP; + else if (strcmp(port_space, "RDMA_PS_IB") == 0) ib->conn.port_space = RDMA_PS_IB; + else error("Failed to translate rdma_port_space in node %s. %s is not a valid \ port space supported by rdma_cma.h!", node_name(n), port_space); - } // Set timeout ib->conn.timeout = timeout; @@ -455,44 +434,40 @@ int ib_parse(struct node *n, json_t *cfg) n->in.vectorize = 256; // Translate poll mode - if(strcmp(poll_mode, "EVENT") == 0) - { + if (strcmp(poll_mode, "EVENT") == 0) { ib->poll.poll_mode = EVENT; ib->poll.poll_func = ib_event_thread; } - else if(strcmp(poll_mode, "BUSY") == 0) - { + else if (strcmp(poll_mode, "BUSY") == 0) { ib->poll.poll_mode = BUSY; ib->poll.poll_func = ib_busy_poll_thread; } else - { error("Failed to translate poll_mode in node %s. %s is not a valid \ poll mode!", node_name(n), poll_mode); - } // Set completion queue size ib->cq_size = cq_size; // Translate QP type - if(strcmp(qp_type, "IBV_QPT_RC") == 0) ib->qp_init.qp_type = IBV_QPT_RC; - else if(strcmp(qp_type, "IBV_QPT_UC") == 0) ib->qp_init.qp_type = IBV_QPT_UC; - else if(strcmp(qp_type, "IBV_QPT_UD") == 0) ib->qp_init.qp_type = IBV_QPT_UD; - else { - error("Failed to translate qp_type in node %s. %s is not a valid \ - qp_type!", node_name(n), qp_type); - } + if (strcmp(qp_type, "IBV_QPT_RC") == 0) ib->qp_init.qp_type = IBV_QPT_RC; + else if (strcmp(qp_type, "IBV_QPT_UC") == 0) ib->qp_init.qp_type = IBV_QPT_UC; + else if (strcmp(qp_type, "IBV_QPT_UD") == 0) ib->qp_init.qp_type = IBV_QPT_UD; + else + error("Failed to translate qp_type in node %s. %s is not a valid \ + qp_type!", node_name(n), qp_type); + // Set max. send and receive Work Requests // First check if the set value is a power of 2, and warn the user if this is not the case int max_send_pow = (int) pow(2, ceil(log2(max_send_wr))); int max_recv_pow = (int) pow(2, ceil(log2(max_recv_wr))); - if(max_send_wr != max_send_pow) + if (max_send_wr != max_send_pow) warn("Max. number of send WRs (%i) is not a power of 2! The HCA will change this to a power of 2: %i", max_send_wr, max_send_pow); - if(max_recv_wr != max_recv_pow) + if (max_recv_wr != max_recv_pow) warn("Max. number of recv WRs (%i) is not a power of 2! The HCA will change this to a power of 2: %i", max_recv_wr, max_recv_pow); @@ -507,25 +482,22 @@ int ib_parse(struct node *n, json_t *cfg) ib->qp_init.cap.max_recv_sge = 1; //Check if node is a source and connect to target - if(remote) - { + if (remote) { ib->is_source = 1; // Translate address info char* ip_adr = strtok(remote, ":"); char* port = strtok(NULL, ":"); + ret = getaddrinfo(ip_adr, port, NULL, &ib->conn.dst_addr); - if(ret) - { + if (ret) error("Failed to resolve remote address '%s' of node %s: %s", remote, node_name(n), gai_strerror(ret)); - } // Set correct Work Completion function ib->poll.on_compl = ib_completion_source; } - else - { + else { ib->is_source = 0; // Set correct Work Completion function @@ -547,14 +519,12 @@ int ib_destroy(struct node *n) void * ib_disconnect_thread(void *n) { - struct node *node = (struct node *)n; - struct infiniband *ib = (struct infiniband *)((struct node *)n)->_vd; + struct node *node = (struct node *) n; + struct infiniband *ib = (struct infiniband *) node->_vd; struct rdma_cm_event *event; - while(rdma_get_cm_event(ib->ctx.ec, &event) == 0) - { - if(event->event == RDMA_CM_EVENT_DISCONNECTED) - { + while (rdma_get_cm_event(ib->ctx.ec, &event) == 0) { + if (event->event == RDMA_CM_EVENT_DISCONNECTED) { rdma_ack_cm_event(event); ib->conn.rdma_disconnect_called = 1; @@ -573,41 +543,35 @@ int ib_start(struct node *n) // Create event channel ib->ctx.ec = rdma_create_event_channel(); - if(!ib->ctx.ec) + if (!ib->ctx.ec) error("Failed to create event channel in node %s!", node_name(n)); ret = rdma_create_id(ib->ctx.ec, &ib->ctx.id, NULL, ib->conn.port_space); - if(ret) - { + if (ret) error("Failed to create rdma_cm_id of node %s: %s", node_name(n), gai_strerror(ret)); - } + info("Succesfully created rdma_cm_id."); // Bind rdma_cm_id to the HCA ret = rdma_bind_addr(ib->ctx.id, ib->conn.src_addr->ai_addr); - if(ret) - { + if (ret) error("Failed to bind to local device of node %s: %s", node_name(n), gai_strerror(ret)); - } + info("Bound rdma_cm_id to Infiniband device."); - if(ib->is_source) - { + if (ib->is_source) { // Resolve address ret = rdma_resolve_addr(ib->ctx.id, NULL, ib->conn.dst_addr->ai_addr, ib->conn.timeout); - if(ret) - { + if (ret) error("Failed to resolve remote address after %ims of node %s: %s", ib->conn.timeout, node_name(n), gai_strerror(ret)); - } } - else - { + else { // The ID will be overwritten for the target. If the event type is // RDMA_CM_EVENT_CONNECT_REQUEST, >then this references a new id for // that communication. @@ -615,7 +579,7 @@ int ib_start(struct node *n) // Listen on rdma_cm_id for events ret = rdma_listen(ib->ctx.listen_id, 10); - if(ret) + if (ret) error("Failed to listen to rdma_cm_id on node %s", node_name(n)); } @@ -623,23 +587,21 @@ int ib_start(struct node *n) // sure the nodes are succesfully connected. info("Starting to monitor events on rdma_cm_id."); - while(rdma_get_cm_event(ib->ctx.ec, &event) == 0) - { + while (rdma_get_cm_event(ib->ctx.ec, &event) == 0) { struct rdma_cm_event event_copy; + memcpy(&event_copy, event, sizeof(*event)); rdma_ack_cm_event(event); - if(ib_event(n, &event_copy)) + if (ib_event(n, &event_copy)) break; } ret = pthread_create(&ib->conn.stop_thread, NULL, ib_disconnect_thread, n); - if(ret) - { + if (ret) error("Failed to create thread to monitor disconnects in node %s: %s", node_name(n), gai_strerror(ret)); - } return 0; } @@ -654,21 +616,17 @@ int ib_stop(struct node *n) // Will flush all outstanding WRs to the Completion Queue and // will call RDMA_CM_EVENT_DISCONNECTED if that is done. ret = rdma_disconnect(ib->ctx.id); - if(ret) - { + if (ret) error("Error while calling rdma_disconnect in node %s: %s", node_name(n), gai_strerror(ret)); - } + info("Called rdma_disconnect."); // If disconnected event already occured, directly call cleanup function - if(ib->conn.rdma_disconnect_called) - { + if (ib->conn.rdma_disconnect_called) ib_cleanup(n); - } - // Else, wait for event to occur - else - { + else { + // Else, wait for event to occur ib->conn.rdma_disconnect_called = 1; rdma_get_cm_event(ib->ctx.ec, &event); @@ -696,40 +654,36 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt) struct ibv_wc wc[n->in.vectorize]; struct ibv_recv_wr wr[cnt], *bad_wr = NULL; struct ibv_sge sge[cnt]; - struct ibv_mr * mr; + struct ibv_mr *mr; int ret; - - - if(ib->conn.available_recv_wrs <= ib->qp_init.cap.max_recv_wr && cnt==n->in.vectorize) - { + if (ib->conn.available_recv_wrs <= ib->qp_init.cap.max_recv_wr && cnt==n->in.vectorize) { // Get Memory Region mr = memory_ib_get_mr(smps[0]); - for(int i=0; idata; + sge[i].addr = (uint64_t) &smps[i]->data; sge[i].length = SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN); sge[i].lkey = mr->lkey; // Prepare a receive Work Request - wr[i].wr_id = (uintptr_t)smps[i]; + wr[i].wr_id = (uintptr_t) smps[i]; wr[i].next = &wr[i+1]; wr[i].sg_list = &sge[i]; wr[i].num_sge = 1; ib->conn.available_recv_wrs++; - if(ib->conn.available_recv_wrs == ib->qp_init.cap.max_recv_wr || i==(cnt-1)) - { + if (ib->conn.available_recv_wrs == ib->qp_init.cap.max_recv_wr || i==(cnt-1)) { wr[i].next = NULL; break; } } + // Post list of Work Requests ret = ibv_post_recv(ib->ctx.id->qp, &wr[0], &bad_wr); @@ -738,25 +692,20 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt) // Poll Completion Queue ret = ibv_poll_cq(ib->ctx.recv_cq, n->in.vectorize, wc); - if(ret) - { + if (ret) { ib->conn.available_recv_wrs -= ret; - for(int i=0; ilength = wc[i].byte_len/sizeof(double); } @@ -776,7 +725,7 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt) struct infiniband *ib = (struct infiniband *) n->_vd; struct ibv_send_wr wr[cnt], *bad_wr = NULL; struct ibv_sge sge[cnt]; - struct ibv_mr * mr; + struct ibv_mr *mr; int ret; memset(&wr, 0, sizeof(wr)); @@ -787,8 +736,7 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt) // Get Memory Region mr = memory_ib_get_mr(smps[0]); - for(int i=0; ictx.id->qp, wr, &bad_wr); - if(ret) - { + if (ret) { error("Failed to send message in node %s: %s", node_name(n), gai_strerror(ret));