diff --git a/include/villas/nodes/infiniband.h b/include/villas/nodes/infiniband.h index cafff88ff..7a5058f71 100644 --- a/include/villas/nodes/infiniband.h +++ b/include/villas/nodes/infiniband.h @@ -40,11 +40,6 @@ enum poll_mode_e { BUSY }; -struct r_addr_key_s { - uint64_t remote_addr; - uint32_t rkey; -}; - struct infiniband { /* IBV/RDMA CM structs */ struct context_s { @@ -58,11 +53,7 @@ struct infiniband { struct ibv_comp_channel *comp_channel; } ctx; - /* Work Completion related */ - struct poll_s { - enum poll_mode_e poll_mode; - } poll; - + /* Set if threads should be aborted */ int stopThreads; /* Connection specific variables */ @@ -72,35 +63,25 @@ struct infiniband { enum rdma_port_space port_space; int timeout; - struct r_addr_key_s *r_addr_key; - pthread_t rdma_cm_event_thread; int send_inline; - int available_recv_wrs; struct send_wc_stack_s { uint64_t* array; unsigned top; } send_wc_stack; + int available_recv_wrs; int buffer_subtraction; } conn; - /* Memory related variables */ - struct ib_memory { - struct pool p_recv; - struct pool p_send; - - struct ibv_mr *mr_recv; - struct ibv_mr *mr_send; - } mem; - /* Queue Pair init variables */ struct ibv_qp_init_attr qp_init; /* Misc settings */ + enum poll_mode_e poll_mode; int is_source; int recv_cq_size; int send_cq_size; diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index eba638c9a..ff13b2ebf 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -22,18 +22,15 @@ #include #include -#include #include #include #include #include #include -#include -#include #include -int ib_disconnect(struct node *n) +static int ib_disconnect(struct node *n) { struct infiniband *ib = (struct infiniband *) n->_vd; debug(LOG_IB | 1, "Starting to clean up"); @@ -44,50 +41,13 @@ int ib_disconnect(struct node *n) rdma_destroy_qp(ib->ctx.id); debug(LOG_IB | 3, "Destroyed QP"); - // Deregister memory regions - ibv_dereg_mr(ib->mem.mr_recv); - if (ib->is_source) - ibv_dereg_mr(ib->mem.mr_send); - debug(LOG_IB | 3, "Deregistered memory regions"); - - // Destroy pools - pool_destroy(&ib->mem.p_recv); - pool_destroy(&ib->mem.p_send); - debug(LOG_IB | 3, "Destroyed memory pools"); - - // Set available receive work requests to zero + // Set available receive WRs and stack top to zero ib->conn.available_recv_wrs = 0; - - // Reset stack top pointer ib->conn.send_wc_stack.top = 0; return ib->stopThreads; } -int ib_post_recv_wrs(struct node *n) -{ - struct infiniband *ib = (struct infiniband *) n->_vd; - struct ibv_recv_wr wr, *bad_wr = NULL; - int ret; - struct ibv_sge sge; - - // Prepare receive Scatter/Gather element - sge.addr = (uintptr_t) pool_get(&ib->mem.p_recv); - sge.length = ib->mem.p_recv.blocksz; - sge.lkey = ib->mem.mr_recv->lkey; - - // Prepare a receive Work Request - wr.wr_id = (uintptr_t) sge.addr; - wr.next = NULL; - wr.sg_list = &sge; - wr.num_sge = 1; - - // Post Work Request - ret = ibv_post_recv(ib->ctx.id->qp, &wr, &bad_wr); - - return ret; -} - static void ib_build_ibv(struct node *n) { struct infiniband *ib = (struct infiniband *) n->_vd; @@ -122,58 +82,6 @@ static void ib_build_ibv(struct node *n) if (ib->conn.send_inline) info("Maximum inline size is set to %i byte", ib->qp_init.cap.max_inline_data); - - // Allocate memory - ib->mem.p_recv.state = STATE_DESTROYED; - ib->mem.p_recv.queue.state = STATE_DESTROYED; - - // Set pool size to maximum size of Receive Queue - pool_init(&ib->mem.p_recv, ib->qp_init.cap.max_recv_wr, SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN), &memory_type_heap); - if (ret) - error("Failed to init recv memory pool of node %s: %s", - node_name(n), gai_strerror(ret)); - - debug(LOG_IB | 3, "Created internal receive pool with %i elements", - ib->qp_init.cap.max_recv_wr); - - //ToDo: initialize r_addr_key struct if mode is RDMA - - // Register memory for IB Device. Not necessary if data is send - // exclusively inline - ib->mem.mr_recv = ibv_reg_mr(ib->ctx.pd, - (char*)&ib->mem.p_recv+ib->mem.p_recv.buffer_off, - ib->mem.p_recv.len, - IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE); - if (!ib->mem.mr_recv) - error("Failed to register mr_recv with ibv_reg_mr of node %s", - node_name(n)); - - debug(LOG_IB | 3, "Registered receive pool with ibv_reg_mr"); - - if (ib->is_source) { - ib->mem.p_send.state = STATE_DESTROYED; - ib->mem.p_send.queue.state = STATE_DESTROYED; - - // Set pool size to maximum size of Receive Queue - pool_init(&ib->mem.p_send, ib->qp_init.cap.max_send_wr, sizeof(double), &memory_type_heap); - if (ret) - error("Failed to init send memory of node %s: %s", node_name(n), gai_strerror(ret)); - - debug(LOG_IB | 3, "Created internal send pool with %i elements", ib->qp_init.cap.max_recv_wr); - - //ToDo: initialize r_addr_key struct if mode is RDMA - - // Register memory for IB Device. Not necessary if data is send - // exclusively inline - ib->mem.mr_send = ibv_reg_mr(ib->ctx.pd, - (char*)&ib->mem.p_send+ib->mem.p_send.buffer_off, - ib->mem.p_send.len, - IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE); - if (!ib->mem.mr_send) - error("Failed to register mr_send with ibv_reg_mr of node %s", node_name(n)); - - debug(LOG_IB | 3, "Registered send pool with ibv_reg_mr"); - } } static int ib_addr_resolved(struct node *n) @@ -264,7 +172,7 @@ int ib_parse(struct node *n, json_t *cfg) int vectorize_out = 1; int buffer_subtraction = 2; - + // Parse JSON files and copy to local variables json_t *json_in = NULL; json_t *json_out = NULL; json_error_t err; @@ -353,9 +261,9 @@ int ib_parse(struct node *n, json_t *cfg) // Translate poll mode if (strcmp(poll_mode, "EVENT") == 0) - ib->poll.poll_mode = EVENT; + ib->poll_mode = EVENT; else if (strcmp(poll_mode, "BUSY") == 0) - ib->poll.poll_mode = BUSY; + ib->poll_mode = BUSY; else error("Failed to translate poll_mode in node %s. %s is not a valid \ poll mode!", node_name(n), poll_mode); @@ -401,6 +309,7 @@ int ib_parse(struct node *n, json_t *cfg) // Set number of bytes to be send inline ib->qp_init.cap.max_inline_data = max_inline_data; + // If node will send data, set remote address if (ib->is_source) { // Translate address info char* ip_adr = strtok(remote, ":"); @@ -480,11 +389,12 @@ int ib_destroy(struct node *n) return 0; } -void ib_create_bind_id(struct node *n) +static void ib_create_bind_id(struct node *n) { struct infiniband *ib = (struct infiniband *) n->_vd; int ret; + // Create rdma_cm_id ret = rdma_create_id(ib->ctx.ec, &ib->ctx.id, NULL, ib->conn.port_space); if (ret) error("Failed to create rdma_cm_id of node %s: %s", node_name(n), gai_strerror(ret)); @@ -505,7 +415,7 @@ void ib_create_bind_id(struct node *n) ib->ctx.listen_id = ib->ctx.id; } -void ib_continue_as_listen(struct node *n, struct rdma_cm_event *event) +static void ib_continue_as_listen(struct node *n, struct rdma_cm_event *event) { struct infiniband *ib = (struct infiniband *) n->_vd; int ret; @@ -526,7 +436,7 @@ void ib_continue_as_listen(struct node *n, struct rdma_cm_event *event) if (ret) error("Failed to listen to rdma_cm_id on node %s", node_name(n)); - // Node is not a source (and will not send data) + // Node is not a source (and will not send data ib->is_source = 0; info("Node %s is set to listening mode", node_name(n)); @@ -667,6 +577,7 @@ int ib_start(struct node *n) debug(LOG_IB | 3, "Initialized Work Completion Stack"); + // Resolve address or listen to rdma_cm_id if (ib->is_source) { // Resolve address ret = rdma_resolve_addr(ib->ctx.id, NULL, ib->conn.dst_addr->ai_addr, ib->conn.timeout); @@ -726,6 +637,8 @@ int ib_stop(struct node *n) debug(LOG_IB | 3, "Called rdma_disconnect"); } else { + // Since cannot use an event to unblock rdma_cm_get_event, we send + // SIGUSR1 to the thread and kill it. pthread_kill(ib->conn.rdma_cm_event_thread, SIGUSR1); debug(LOG_IB | 3, "Called pthread_kill()");