From 746fd2f694588c979029cc64c342ecfad990e3c4 Mon Sep 17 00:00:00 2001 From: Dennis Potter Date: Sun, 8 Jul 2018 15:00:47 +0200 Subject: [PATCH] Refactored ib_write in the same way as ib_read (described in #153). Merged separate completion queue polls to ib_write. Closes #167 --- include/villas/nodes/infiniband.h | 15 ---- lib/nodes/infiniband.c | 128 ++++++++---------------------- 2 files changed, 32 insertions(+), 111 deletions(-) diff --git a/include/villas/nodes/infiniband.h b/include/villas/nodes/infiniband.h index c60baab38..e6f55d197 100644 --- a/include/villas/nodes/infiniband.h +++ b/include/villas/nodes/infiniband.h @@ -34,12 +34,6 @@ #include #include #include - -/* Function pointer typedefs */ -typedef void (*ib_on_completion) (struct node*, struct ibv_wc*, int*); -typedef void * (*ib_poll_function) (void*); -typedef void * (*ib_event_function) (void*); - /* Enums */ enum poll_mode_e { EVENT, @@ -67,15 +61,6 @@ struct infiniband { /* Work Completion related */ struct poll_s { enum poll_mode_e poll_mode; - - /* On completion function */ - ib_on_completion on_compl; - - /* Busy poll or Event function */ - ib_poll_function poll_func; - - /* Poll thread */ - pthread_t cq_poller_thread; } poll; int stopThreads; diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index af93cbaf1..b4b7bcc7d 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -57,7 +57,7 @@ int ib_disconnect(struct node *n) // Set available receive work requests to zero ib->conn.available_recv_wrs = 0; - return 0; + return ib->stopThreads; } int ib_post_recv_wrs(struct node *n) @@ -84,66 +84,13 @@ int ib_post_recv_wrs(struct node *n) return ret; } -void ib_completion_target(struct node* n, struct ibv_wc* wc, int* size){} - -void ib_completion_source(struct node* n, struct ibv_wc* wc, int* size) -{ - for (int i = 0; i < *size; i++) { - if (wc[i].status != IBV_WC_SUCCESS && wc[i].status != IBV_WC_WR_FLUSH_ERR) - warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i", - node_name(n), wc[i].status); - - sample_put((struct sample *) wc[i].wr_id); - } -} - -void * ib_event_thread(void *n) -{ - struct infiniband *ib = (struct infiniband *) ((struct node *) n)->_vd; - struct ibv_wc wc[ib->cq_size]; - int size; - - debug(LOG_IB | 1, "Initialized event based poll thread of node %s", node_name(n)); - - while (1) { - // Function blocks, until an event occurs - ibv_get_cq_event(ib->ctx.comp_channel, &ib->ctx.send_cq, NULL); - - // Poll as long as WCs are available - while ((size = ibv_poll_cq(ib->ctx.send_cq, ib->cq_size, wc))) - ib->poll.on_compl(n, wc, &size); - - // Request a new event in the CQ and acknowledge event - ibv_req_notify_cq(ib->ctx.send_cq, 0); - ibv_ack_cq_events(ib->ctx.send_cq, 1); - } -} - -void * ib_busy_poll_thread(void *n) -{ - struct infiniband *ib = (struct infiniband *) ((struct node *) n)->_vd; - struct ibv_wc wc[ib->cq_size]; - int size; - - debug(LOG_IB | 1, "Initialized busy poll thread of node %s", node_name(n)); - - while (1) { - // Poll as long as WCs are available - while ((size = ibv_poll_cq(ib->ctx.send_cq, ib->cq_size, wc))) - ib->poll.on_compl(n, wc, &size); - - if (ib->stopThreads) - return NULL; - } -} - static void ib_init_wc_poll(struct node *n) { int ret; struct infiniband *ib = (struct infiniband *) n->_vd; ib->ctx.comp_channel = NULL; - debug(LOG_IB | 1, "Starting to initialize completion queues and threads"); + debug(LOG_IB | 1, "Starting to initialize completion queues"); if (ib->poll.poll_mode == EVENT) { // Create completion channel @@ -176,14 +123,6 @@ static void ib_init_wc_poll(struct node *n) debug(LOG_IB | 3, "Called ibv_req_notificy_cq on send Completion Queue"); } - - // Initialize polling pthread for source - if (ib->is_source) { - ret = pthread_create(&ib->poll.cq_poller_thread, NULL, ib->poll.poll_func, n); - if (ret) - error("Failed to create poll thread of node %s: %s", - node_name(n), gai_strerror(ret)); - } } static void ib_build_ibv(struct node *n) @@ -385,14 +324,10 @@ int ib_parse(struct node *n, json_t *cfg) debug(LOG_IB | 4, "Set timeout to %i in node %s", timeout, node_name(n)); // Translate poll mode - if (strcmp(poll_mode, "EVENT") == 0) { + if (strcmp(poll_mode, "EVENT") == 0) ib->poll.poll_mode = EVENT; - ib->poll.poll_func = ib_event_thread; - } - else if (strcmp(poll_mode, "BUSY") == 0) { + else if (strcmp(poll_mode, "BUSY") == 0) ib->poll.poll_mode = BUSY; - ib->poll.poll_func = ib_busy_poll_thread; - } else error("Failed to translate poll_mode in node %s. %s is not a valid \ poll mode!", node_name(n), poll_mode); @@ -444,17 +379,11 @@ int ib_parse(struct node *n, json_t *cfg) remote, node_name(n), gai_strerror(ret)); debug(LOG_IB | 4, "Translated %s:%s to a struct addrinfo", ip_adr, port); - - // Set correct Work Completion function - ib->poll.on_compl = ib_completion_source; } else { debug(LOG_IB | 3, "Node %s is set up as target", node_name(n)); ib->is_source = 0; - - // Set correct Work Completion function - ib->poll.on_compl = ib_completion_target; } return 0; @@ -508,7 +437,6 @@ void * ib_rdma_cm_event_thread(void *n) struct rdma_cm_event *event; int ret = 0; - debug(LOG_IB | 1, "Started rdma_cm_event thread of node %s", node_name(node)); // Wait until node is completely started @@ -586,7 +514,7 @@ void * ib_rdma_cm_event_thread(void *n) rdma_ack_cm_event(event); - if (ret || ib->stopThreads) + if (ret) break; } @@ -695,14 +623,6 @@ int ib_stop(struct node *n) debug(LOG_IB | 3, "Joined rdma_cm_event_thread"); - // Wait for polling thread to join - if (ib->is_source) { - ret = pthread_join(ib->poll.cq_poller_thread, NULL); - if (ret) - error("Error while joining cq_poller_thread in node %s: %i", node_name(n), ret); - } - - // Destroy RDMA CM ID rdma_destroy_id(ib->ctx.id); debug(LOG_IB | 3, "Destroyed rdma_cm_id"); @@ -768,6 +688,8 @@ int ib_read(struct node *n, struct sample *smps[], int *cnt) debug(LOG_IB | 10, "Prepared %i new receive Work Requests", (i+1)); wr[i].next = NULL; + i++; + break; } } @@ -801,8 +723,8 @@ int ib_read(struct node *n, struct sample *smps[], int *cnt) ret = 0; } else if (wc[j].opcode & IBV_WC_RECV) { - smps[j] = (struct sample*)(wc[j].wr_id); - smps[j]->length = wc[i].byte_len/sizeof(double); + smps[j] = (struct sample *) (wc[j].wr_id); + smps[j]->length = wc[i].byte_len / sizeof(double); } else ret = 0; @@ -835,14 +757,16 @@ int ib_write(struct node *n, struct sample *smps[], int *cnt) struct infiniband *ib = (struct infiniband *) n->_vd; struct ibv_send_wr wr[*cnt], *bad_wr = NULL; struct ibv_sge sge[*cnt]; + struct ibv_wc wc[ib->cq_size]; struct ibv_mr *mr; int ret; + int i = 0; //Used for first loop: prepare work requests to post to send queue + int j = 0; //Used for second loop: get values from Completion Queue debug(LOG_IB | 10, "ib_write is called"); if (n->state == STATE_CONNECTED) { - memset(&wr, 0, sizeof(wr)); - + // First, write //ToDo: Place this into configuration and create checks if settings are valid int send_inline = 1; @@ -851,17 +775,14 @@ int ib_write(struct node *n, struct sample *smps[], int *cnt) // Get Memory Region mr = memory_ib_get_mr(smps[0]); - for (int i = 0; i < *cnt; i++) { - // Increase refcnt of sample - sample_get(smps[i]); - + for (i = 0; i < *cnt; i++) { //Set Scatter/Gather element to data of sample - sge[i].addr = (uint64_t)&smps[i]->data; + sge[i].addr = (uint64_t) &smps[i]->data; sge[i].length = smps[i]->length*sizeof(double); sge[i].lkey = mr->lkey; // Set Send Work Request - wr[i].wr_id = (uintptr_t)smps[i]; //This way the sample can be release in WC + wr[i].wr_id = (uintptr_t) smps[i]; //This way the sample can be release in WC wr[i].sg_list = &sge[i]; wr[i].num_sge = 1; @@ -887,9 +808,24 @@ int ib_write(struct node *n, struct sample *smps[], int *cnt) } debug(LOG_IB | 4, "Succesfully posted receive Work Requests"); + + + // Subsequently, check if something is available in completion queue + ret = ibv_poll_cq(ib->ctx.send_cq, ib->cq_size, wc); + + for (j = 0; j < ret; j++) { + if (wc[j].status != IBV_WC_SUCCESS && wc[j].status != IBV_WC_WR_FLUSH_ERR) + warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i", + node_name(n), wc[j].status); + + + smps[j] = (struct sample *) (wc[j].wr_id); + } + + *cnt = j; } - return *cnt; + return i; } int ib_fd(struct node *n)