From e2061e58fc1deacd0a4437772fdc5c42c7a26f15 Mon Sep 17 00:00:00 2001 From: Dennis Potter Date: Sat, 7 Jul 2018 12:49:22 +0200 Subject: [PATCH] Events are now monitored in a separate thread. The segmentation faults we saw earlier were caused because we exited ib_start before we created a protection domain, which is used by memory_allocation --- include/villas/nodes/infiniband.h | 5 +- lib/nodes/infiniband.c | 378 +++++++++++++++--------------- 2 files changed, 187 insertions(+), 196 deletions(-) diff --git a/include/villas/nodes/infiniband.h b/include/villas/nodes/infiniband.h index 4e7c056e4..449d8c88c 100644 --- a/include/villas/nodes/infiniband.h +++ b/include/villas/nodes/infiniband.h @@ -38,6 +38,7 @@ /* Function pointer typedefs */ typedef void (*ib_on_completion) (struct node*, struct ibv_wc*, int*); typedef void * (*ib_poll_function) (void*); +typedef void * (*ib_event_function) (void*); /* Enums */ enum poll_mode_e { @@ -55,6 +56,7 @@ struct infiniband { struct context_s { struct rdma_cm_id *listen_id; struct rdma_cm_id *id; + struct rdma_event_channel *ec; struct ibv_pd *pd; struct ibv_cq *recv_cq; @@ -87,8 +89,7 @@ struct infiniband { struct r_addr_key_s *r_addr_key; - pthread_t stop_thread; - int rdma_disconnect_called; + pthread_t rdma_cm_event_thread; int available_recv_wrs; } conn; diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index ce5245986..18736b088 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -22,6 +22,7 @@ #include #include +#include //ToDo: remove me. #include #include @@ -32,8 +33,6 @@ #include #include -static struct rdma_event_channel *rdma_event_channel; - int ib_cleanup(struct node *n) { struct infiniband *ib = (struct infiniband *) n->_vd; @@ -59,7 +58,7 @@ int ib_cleanup(struct node *n) debug(LOG_IB | 3, "Destroyed rdma_cm_id"); // Destroy event channel - rdma_destroy_event_channel(rdma_event_channel); + rdma_destroy_event_channel(ib->ctx.ec); debug(LOG_IB | 3, "Destroyed event channel"); return 0; @@ -216,13 +215,6 @@ static void ib_build_ibv(struct node *n) debug(LOG_IB | 1, "Starting to build IBV components"); - //Allocate protection domain - ib->ctx.pd = ibv_alloc_pd(ib->ctx.id->verbs); - if (!ib->ctx.pd) - error("Could not allocate protection domain in node %s", node_name(n)); - - debug(LOG_IB | 3, "Allocated Protection Domain"); - // Initiate poll mode ib_init_wc_poll(n); @@ -362,51 +354,6 @@ static int ib_connect_request(struct node *n, struct rdma_cm_id *id) return 0; } -static int ib_event(struct node *n, struct rdma_cm_event *event) -{ - int ret = 0; - - switch(event->event) { - case RDMA_CM_EVENT_ADDR_RESOLVED: - debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ADDR_RESOLVED"); - ret = ib_addr_resolved(n); - break; - case RDMA_CM_EVENT_ADDR_ERROR: - debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ADDR_ERROR"); - error("Address resolution (rdma_resolve_addr) failed!"); - case RDMA_CM_EVENT_ROUTE_RESOLVED: - debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ROUTE_RESOLVED"); - ret = ib_route_resolved(n); - break; - case RDMA_CM_EVENT_ROUTE_ERROR: - debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ROUTE_ERROR"); - error("Route resolution (rdma_resovle_route) failed!"); - case RDMA_CM_EVENT_CONNECT_REQUEST: - debug(LOG_IB | 2, "Received RDMA_CM_EVENT_CONNECT_REQUEST"); - ret = ib_connect_request(n, event->id); - break; - case RDMA_CM_EVENT_CONNECT_ERROR: - debug(LOG_IB | 2, "Received RDMA_CM_EVENT_CONNECT_ERROR"); - error("An error has occurred trying to establish a connection!"); - case RDMA_CM_EVENT_REJECTED: - debug(LOG_IB | 2, "Received RDMA_CM_EVENT_REJECTED"); - error("Connection request or response was rejected by the remote end point!"); - case RDMA_CM_EVENT_ESTABLISHED: - debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ESTABLISHED"); - info("Connection established in node %s", node_name(n)); - ret = 1; - break; - case RDMA_CM_EVENT_DISCONNECTED: - debug(LOG_IB | 2, "Received RDMA_CM_EVENT_DISCONNECTED"); - ret = ib_cleanup(n); - break; - default: - error("Unknown event occurred: %u", event->event); - } - - return ret; -} - int ib_reverse(struct node *n) { return 0; @@ -590,62 +537,109 @@ int ib_destroy(struct node *n) void * ib_rdma_cm_event_thread(void *n) { struct node *node = (struct node *) n; - //struct infiniband *ib = (struct infiniband *) node->_vd; + struct infiniband *ib = (struct infiniband *) node->_vd; struct rdma_cm_event *event; + int ret = 0; - debug(LOG_IB | 1, "Started rdma_cm_event thread of node %s", node_name(node)); - while (rdma_get_cm_event(rdma_event_channel, &event) == 0) { + //debug(LOG_IB | 1, "Started rdma_cm_event thread of node %s", node_name(node)); + + info("EVENT: %p", ib->ctx.ec); + info("FD, i: %i", ib->ctx.ec->fd); + + // Wait until node is completely started + //while (node->state != STATE_STARTED); + + while (rdma_get_cm_event(ib->ctx.ec, &event) == 0) { struct rdma_cm_event event_copy; + //ToDo: check if copy is really necessary memcpy(&event_copy, event, sizeof(*event)); rdma_ack_cm_event(event); - if (ib_event(n, &event_copy)) + switch(event_copy.event) { + case RDMA_CM_EVENT_ADDR_RESOLVED: + debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ADDR_RESOLVED"); + + ret = ib_addr_resolved(n); + break; + + case RDMA_CM_EVENT_ADDR_ERROR: + debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ADDR_ERROR"); + + error("Address resolution (rdma_resolve_addr) failed!"); + break; + + case RDMA_CM_EVENT_ROUTE_RESOLVED: + debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ROUTE_RESOLVED"); + + ret = ib_route_resolved(n); + break; + + case RDMA_CM_EVENT_ROUTE_ERROR: + debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ROUTE_ERROR"); + + error("Route resolution (rdma_resovle_route) failed!"); + break; + + case RDMA_CM_EVENT_CONNECT_REQUEST: + debug(LOG_IB | 2, "Received RDMA_CM_EVENT_CONNECT_REQUEST"); + + ret = ib_connect_request(n, event_copy.id); + break; + + case RDMA_CM_EVENT_CONNECT_ERROR: + debug(LOG_IB | 2, "Received RDMA_CM_EVENT_CONNECT_ERROR"); + + error("An error has occurred trying to establish a connection!"); + break; + + case RDMA_CM_EVENT_REJECTED: + debug(LOG_IB | 2, "Received RDMA_CM_EVENT_REJECTED"); + + error("Connection request or response was rejected by the remote end point!"); + break; + case RDMA_CM_EVENT_ESTABLISHED: + debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ESTABLISHED"); + + // node->state = STATE_CONNECTED; + + info("Connection established in node %s", node_name(n)); + break; + + case RDMA_CM_EVENT_DISCONNECTED: + debug(LOG_IB | 2, "Received RDMA_CM_EVENT_DISCONNECTED"); + + ret = ib_cleanup(n); + break; + + default: + error("Unknown event occurred: %u", event_copy.event); + } + + + if (ret) //ToDo: Fix me break; } return NULL; } -void * ib_disconnect_thread(void *n) -{ - struct node *node = (struct node *) n; - struct infiniband *ib = (struct infiniband *) node->_vd; - struct rdma_cm_event *event; - - debug(LOG_IB | 1, "Started disconnect thread of node %s", node_name(node)); - - while (rdma_get_cm_event(rdma_event_channel, &event) == 0) { - if (event->event == RDMA_CM_EVENT_DISCONNECTED) { - debug(LOG_IB | 2, "Received RDMA_CM_EVENT_DISCONNECTED"); - - rdma_ack_cm_event(event); - ib->conn.rdma_disconnect_called = 1; - - node_stop(node); - return NULL; - } - } - return NULL; -} - int ib_start(struct node *n) { struct infiniband *ib = (struct infiniband *) n->_vd; - struct rdma_cm_event *event = NULL; int ret; debug(LOG_IB | 1, "Started ib_start"); // Create event channel - rdma_event_channel = rdma_create_event_channel(); - if (!rdma_event_channel) + ib->ctx.ec = rdma_create_event_channel(); + if (!ib->ctx.ec) error("Failed to create event channel in node %s!", node_name(n)); debug(LOG_IB | 3, "Created event channel"); - ret = rdma_create_id(rdma_event_channel, &ib->ctx.id, NULL, ib->conn.port_space); + ret = rdma_create_id(ib->ctx.ec, &ib->ctx.id, NULL, ib->conn.port_space); if (ret) error("Failed to create rdma_cm_id of node %s: %s", node_name(n), gai_strerror(ret)); @@ -684,27 +678,30 @@ int ib_start(struct node *n) debug(LOG_IB | 3, "Started to listen to rdma_cm_id"); } + //Allocate protection domain + ib->ctx.pd = ibv_alloc_pd(ib->ctx.id->verbs); + if (!ib->ctx.pd) + error("Could not allocate protection domain in node %s", node_name(n)); + + debug(LOG_IB | 3, "Allocated Protection Domain"); + + // Several events should occur on the event channel, to make // sure the nodes are succesfully connected. debug(LOG_IB | 1, "Starting to monitor events on rdma_cm_id"); //Create thread to monitor rdma_cm_event channel - ret = pthread_create(&ib->conn.stop_thread, NULL, ib_disconnect_thread, n); + ret = pthread_create(&ib->conn.rdma_cm_event_thread, NULL, ib_rdma_cm_event_thread, n); if (ret) - error("Failed to create thread to monitor disconnects in node %s: %s", + error("Failed to create thread to monitor rdma_cm events in node %s: %s", node_name(n), gai_strerror(ret)); - - //Wait until rdma_cm_event thread sets state to STARTED - while ( !(n->state == STATE_CONNECTED)); - return 0; } int ib_stop(struct node *n) { struct infiniband *ib = (struct infiniband *) n->_vd; - struct rdma_cm_event *event = NULL; int ret; // Call RDMA disconnect function @@ -717,18 +714,6 @@ int ib_stop(struct node *n) debug(LOG_IB | 3, "Called rdma_disconnect"); - // If disconnected event already occured, directly call cleanup function - if (ib->conn.rdma_disconnect_called) - ib_cleanup(n); - else { - // Else, wait for event to occur - ib->conn.rdma_disconnect_called = 1; - rdma_get_cm_event(rdma_event_channel, &event); - - rdma_ack_cm_event(event); - - ib_event(n, event); - } return 0; } @@ -750,78 +735,81 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt) struct ibv_recv_wr wr[cnt], *bad_wr = NULL; struct ibv_sge sge[cnt]; struct ibv_mr *mr; - int ret; + int ret = 0; debug(LOG_IB | 15, "ib_read is called"); - if (ib->conn.available_recv_wrs < ib->qp_init.cap.max_recv_wr && cnt==n->in.vectorize) { - // Get Memory Region - mr = memory_ib_get_mr(smps[0]); + if (n->state == STATE_CONNECTED) { - for (int i = 0; i < cnt; i++) { - // Increase refcnt of sample - sample_get(smps[i]); + if (ib->conn.available_recv_wrs < ib->qp_init.cap.max_recv_wr && cnt==n->in.vectorize) { + // Get Memory Region + mr = memory_ib_get_mr(smps[0]); - // Prepare receive Scatter/Gather element - sge[i].addr = (uint64_t) &smps[i]->data; - sge[i].length = SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN); - sge[i].lkey = mr->lkey; + for (int i = 0; i < cnt; i++) { + // Increase refcnt of sample + sample_get(smps[i]); - // Prepare a receive Work Request - wr[i].wr_id = (uintptr_t) smps[i]; - wr[i].next = &wr[i+1]; - wr[i].sg_list = &sge[i]; - wr[i].num_sge = 1; + // Prepare receive Scatter/Gather element + sge[i].addr = (uint64_t) &smps[i]->data; + sge[i].length = SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN); + sge[i].lkey = mr->lkey; - ib->conn.available_recv_wrs++; + // Prepare a receive Work Request + wr[i].wr_id = (uintptr_t) smps[i]; + wr[i].next = &wr[i+1]; + wr[i].sg_list = &sge[i]; + wr[i].num_sge = 1; - if (ib->conn.available_recv_wrs == ib->qp_init.cap.max_recv_wr || i==(cnt-1)) { - debug(LOG_IB | 10, "Prepared %i new receive Work Requests", (i+1)); + ib->conn.available_recv_wrs++; - wr[i].next = NULL; - break; + if (ib->conn.available_recv_wrs == ib->qp_init.cap.max_recv_wr || i==(cnt-1)) { + debug(LOG_IB | 10, "Prepared %i new receive Work Requests", (i+1)); + + wr[i].next = NULL; + break; + } } + + // Post list of Work Requests + ret = ibv_post_recv(ib->ctx.id->qp, &wr[0], &bad_wr); + if (ret) + error("Was unable to post receive WR in node %s: %i, bad WR ID: 0x%lx", + node_name(n), ret, bad_wr->wr_id); + + debug(LOG_IB | 10, "Succesfully posted receive Work Requests"); + } - // Post list of Work Requests - ret = ibv_post_recv(ib->ctx.id->qp, &wr[0], &bad_wr); - if (ret) - error("Was unable to post receive WR in node %s: %i, bad WR ID: 0x%lx", - node_name(n), ret, bad_wr->wr_id); + // Poll Completion Queue + ret = ibv_poll_cq(ib->ctx.recv_cq, n->in.vectorize, wc); - debug(LOG_IB | 10, "Succesfully posted receive Work Requests"); + if (ret) { + debug(LOG_IB | 10, "Received %i Work Completions", ret); - } + ib->conn.available_recv_wrs -= ret; - // Poll Completion Queue - ret = ibv_poll_cq(ib->ctx.recv_cq, n->in.vectorize, wc); + for (int i = 0; i < ret; i++) { + if (wc[i].status == IBV_WC_WR_FLUSH_ERR) { + debug(LOG_IB | 5, "Received IBV_WC_WR_FLUSH_ERR in ib_read. Ignore it."); - if (ret) { - debug(LOG_IB | 10, "Received %i Work Completions", ret); + ret = 0; + } + else if (wc[i].status != IBV_WC_SUCCESS) { + warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i", + node_name(n), wc[i].status); + ret = 0; + } + else if (wc[i].opcode & IBV_WC_RECV) { + smps[i] = (struct sample*)(wc[i].wr_id); + smps[i]->length = wc[i].byte_len/sizeof(double); + } + else + ret = 0; - ib->conn.available_recv_wrs -= ret; - - for (int i = 0; i < ret; i++) { - if (wc[i].status == IBV_WC_WR_FLUSH_ERR) { - debug(LOG_IB | 5, "Received IBV_WC_WR_FLUSH_ERR in ib_read. Ignore it."); - - ret = 0; + //Release sample + sample_put((struct sample *) (wc[i].wr_id)); + debug(LOG_IB | 10, "Releasing sample %p", (struct sample *) (wc[i].wr_id)); } - else if (wc[i].status != IBV_WC_SUCCESS) { - warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i", - node_name(n), wc[i].status); - ret = 0; - } - else if (wc[i].opcode & IBV_WC_RECV) { - smps[i] = (struct sample*)(wc[i].wr_id); - smps[i]->length = wc[i].byte_len/sizeof(double); - } - else - ret = 0; - - //Release sample - sample_put((struct sample *) (wc[i].wr_id)); - debug(LOG_IB | 10, "Releasing sample %p", (struct sample *) (wc[i].wr_id)); } } @@ -838,53 +826,55 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt) debug(LOG_IB | 10, "ib_write is called"); - memset(&wr, 0, sizeof(wr)); + if (n->state == STATE_CONNECTED) { + memset(&wr, 0, sizeof(wr)); - //ToDo: Place this into configuration and create checks if settings are valid - int send_inline = 1; + //ToDo: Place this into configuration and create checks if settings are valid + int send_inline = 1; - debug(LOG_IB | 10, "Data will be send inline [0/1]: %i", send_inline); + debug(LOG_IB | 10, "Data will be send inline [0/1]: %i", send_inline); - // Get Memory Region - mr = memory_ib_get_mr(smps[0]); + // Get Memory Region + mr = memory_ib_get_mr(smps[0]); - for (int i = 0; i < cnt; i++) { - // Increase refcnt of sample - sample_get(smps[i]); + for (int i = 0; i < cnt; i++) { + // Increase refcnt of sample + sample_get(smps[i]); - //Set Scatter/Gather element to data of sample - sge[i].addr = (uint64_t)&smps[i]->data; - sge[i].length = smps[i]->length*sizeof(double); - sge[i].lkey = mr->lkey; + //Set Scatter/Gather element to data of sample + sge[i].addr = (uint64_t)&smps[i]->data; + sge[i].length = smps[i]->length*sizeof(double); + sge[i].lkey = mr->lkey; - // Set Send Work Request - wr[i].wr_id = (uintptr_t)smps[i]; //This way the sample can be release in WC - wr[i].sg_list = &sge[i]; - wr[i].num_sge = 1; + // Set Send Work Request + wr[i].wr_id = (uintptr_t)smps[i]; //This way the sample can be release in WC + wr[i].sg_list = &sge[i]; + wr[i].num_sge = 1; - if (i == (cnt-1)) { - debug(LOG_IB | 10, "Prepared %i send Work Requests", (i+1)); - wr[i].next = NULL; + if (i == (cnt-1)) { + debug(LOG_IB | 10, "Prepared %i send Work Requests", (i+1)); + wr[i].next = NULL; + } + else + wr[i].next = &wr[i+1]; + + wr[i].send_flags = IBV_SEND_SIGNALED | (send_inline << 3); + wr[i].imm_data = htonl(0); //ToDo: set this to a useful value + wr[i].opcode = IBV_WR_SEND_WITH_IMM; } - else - wr[i].next = &wr[i+1]; - wr[i].send_flags = IBV_SEND_SIGNALED | (send_inline << 3); - wr[i].imm_data = htonl(0); //ToDo: set this to a useful value - wr[i].opcode = IBV_WR_SEND_WITH_IMM; + //Send linked list of Work Requests + ret = ibv_post_send(ib->ctx.id->qp, wr, &bad_wr); + if (ret) { + error("Failed to send message in node %s: %i, bad WR ID: 0x%lx", + node_name(n), ret, bad_wr->wr_id); + + return -ret; + } + + debug(LOG_IB | 4, "Succesfully posted receive Work Requests"); } - //Send linked list of Work Requests - ret = ibv_post_send(ib->ctx.id->qp, wr, &bad_wr); - if (ret) { - error("Failed to send message in node %s: %i, bad WR ID: 0x%lx", - node_name(n), ret, bad_wr->wr_id); - - return -ret; - } - - debug(LOG_IB | 4, "Succesfully posted receive Work Requests"); - return cnt; }