diff --git a/include/villas/nodes/infiniband.h b/include/villas/nodes/infiniband.h index 122521a80..cb0034711 100644 --- a/include/villas/nodes/infiniband.h +++ b/include/villas/nodes/infiniband.h @@ -35,9 +35,11 @@ #include #include -/* Forward declarations */ -struct format_type; +/* Function poitner typedefs */ +typedef void (*ib_on_completion)(struct node*, struct ibv_wc*, int*); +typedef void* (*ib_poll_function)(void*); +/* Enums */ enum poll_mode_e { EVENT, @@ -50,21 +52,33 @@ struct r_addr_key_s { }; struct infiniband { - struct rdma_cm_id *listen_id; - struct rdma_cm_id *id; - struct rdma_event_channel *ec; + /* IBV/RDMA CM structs */ struct context_s { + struct rdma_cm_id *listen_id; + struct rdma_cm_id *id; + struct rdma_event_channel *ec; + struct ibv_pd *pd; struct ibv_cq *cq; struct ibv_comp_channel *comp_channel; } ctx; + /* Work Completion related */ struct poll_s { enum poll_mode_e poll_mode; + + /* On completion function */ + ib_on_completion on_compl; + + /* Busy poll or Event function */ + ib_poll_function poll_func; + + /* Poll thread */ pthread_t cq_poller_thread; } poll; + /* Connection specific variables */ struct connection_s { struct addrinfo *src_addr; struct addrinfo *dst_addr; @@ -74,11 +88,7 @@ struct infiniband { struct r_addr_key_s *r_addr_key; } conn; - struct ibv_qp_init_attr qp_init; - - int is_source; - int cq_size; - + /* Memory related variables */ struct ib_memory { struct pool p_recv; struct pool p_send; @@ -86,6 +96,14 @@ struct infiniband { struct ibv_mr *mr_recv; struct ibv_mr *mr_send; } mem; + + /* Queue Pair init variables */ + struct ibv_qp_init_attr qp_init; + + /* Misc settings */ + int is_source; + int cq_size; + }; /** @see node_type::reverse */ diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index 436412c77..ea248fac4 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -31,6 +31,7 @@ #include + int ib_post_recv_wrs(struct node *n) { struct infiniband *ib = (struct infiniband *) n->_vd; @@ -50,35 +51,82 @@ int ib_post_recv_wrs(struct node *n) wr.num_sge = 1; // Post Work Request - ret = ibv_post_recv(ib->id->qp, &wr, &bad_wr); + ret = ibv_post_recv(ib->ctx.id->qp, &wr, &bad_wr); return ret; } -static void ib_create_busy_poll(struct node *n) +void ib_completion_target(struct node* n, struct ibv_wc* wc, int* size) { - struct infiniband *ib = (struct infiniband *) n->_vd; - - // Create completion queue and bind to channel - ib->ctx.cq = ibv_create_cq(ib->id->verbs, ib->cq_size, NULL, NULL, 0); - if(!ib->ctx.cq) - error("Could not create completion queue in node %s.", node_name(n)); - - //ToDo: Create poll pthread + //ToDo: No implementation yet. This is still handled in ib_read } -static void ib_create_event(struct node *n) +void ib_completion_source(struct node* n, struct ibv_wc* wc, int* size) +{ + for(int i=0; i<*size; i++) + { + if(wc[i].status != IBV_WC_SUCCESS) + error("Work Completion status was not IBV_WC_SUCCES in node %s: %s", + node_name(n), gai_strerror(wc[i].status)); + } + +} + +void * ib_event_thread(void *n) +{ + struct infiniband *ib = (struct infiniband *)((struct node *)n)->_vd; + struct ibv_wc wc[ib->cq_size]; + int size; + + while(1) + { + // Function blocks, until an event occurs + ibv_get_cq_event(ib->ctx.comp_channel, &ib->ctx.cq, NULL); + + // Poll as long as WCs are available + while((size = ibv_poll_cq(ib->ctx.cq, ib->cq_size, wc))) + ib->poll.on_compl(n, wc, &size); + + // Request a new event in the CQ and acknowledge event + ibv_req_notify_cq(ib->ctx.cq, 0); + ibv_ack_cq_events(ib->ctx.cq, 1); + } +} + +void * ib_busy_poll_thread(void *n) +{ + struct infiniband *ib = (struct infiniband *)((struct node *)n)->_vd; + struct ibv_wc wc[ib->cq_size]; + int size; + + while(1) + { + //ToDo: Implement stopThreads variable + if(0) + return NULL; + + // Poll as long as WCs are available + while((size = ibv_poll_cq(ib->ctx.cq, ib->cq_size, wc))) + ib->poll.on_compl(n, wc, &size); + } +} + +static void ib_init_wc_poll(struct node *n) { int ret; struct infiniband *ib = (struct infiniband *) n->_vd; + ib->ctx.comp_channel = NULL; - // Create completion channel - ib->ctx.comp_channel = ibv_create_comp_channel(ib->id->verbs); - if(!ib->ctx.comp_channel) - error("Could not create completion channel in node %s.", node_name(n)); + if(ib->poll.poll_mode == EVENT) + { + // Create completion channel + ib->ctx.comp_channel = ibv_create_comp_channel(ib->ctx.id->verbs); + if(!ib->ctx.comp_channel) + error("Could not create completion channel in node %s.", node_name(n)); + } - // Create completion queue and bind to channel - ib->ctx.cq = ibv_create_cq(ib->id->verbs, + // Create completion queue and bind to channel (or NULL) + ib->ctx.cq = ibv_create_cq(ib->ctx.id->verbs, ib->cq_size, NULL, ib->ctx.comp_channel, @@ -86,13 +134,26 @@ static void ib_create_event(struct node *n) if(!ib->ctx.cq) error("Could not create completion queue in node %s.", node_name(n)); - // Request notifications from completion queue - ret = ibv_req_notify_cq(ib->ctx.cq, 0); - if(ret) - error("Failed to request notifiy CQ in node %s: %s", - node_name(n), gai_strerror(ret)); + if(ib->poll.poll_mode == EVENT) + { + // Request notifications from completion queue + ret = ibv_req_notify_cq(ib->ctx.cq, 0); + if(ret) + error("Failed to request notifiy CQ in node %s: %s", + node_name(n), gai_strerror(ret)); + } - //ToDo: Create poll pthread + // Initialize polling pthread + //ToDo: Remove if(is_source) + if(ib->is_source) + { + ret = pthread_create(&ib->poll.cq_poller_thread, NULL, ib->poll.poll_func, n); + if(ret) + { + error("Failed to create poll thread of node %s: %s", + node_name(n), gai_strerror(ret)); + } + } } static void ib_build_ibv(struct node *n) @@ -101,20 +162,13 @@ static void ib_build_ibv(struct node *n) int ret; //Allocate protection domain - ib->ctx.pd = ibv_alloc_pd(ib->id->verbs); + ib->ctx.pd = ibv_alloc_pd(ib->ctx.id->verbs); if(!ib->ctx.pd) error("Could not allocate protection domain in node %s.", node_name(n)); + info("Allocated Protection Domain"); // Initiate poll mode - switch(ib->poll.poll_mode) - { - case EVENT: - ib_create_event(n); - break; - case BUSY: - ib_create_busy_poll(n); - break; - } + ib_init_wc_poll(n); // Prepare remaining Queue Pair (QP) attributes ib->qp_init.send_cq = ib->ctx.cq; @@ -123,7 +177,7 @@ static void ib_build_ibv(struct node *n) //ToDo: Set maximum inline data // Create the actual QP - ret = rdma_create_qp(ib->id, ib->ctx.pd, &ib->qp_init); + ret = rdma_create_qp(ib->ctx.id, ib->ctx.pd, &ib->qp_init); if(ret) error("Failed to create Queue Pair in node %s.", node_name(n)); @@ -138,7 +192,8 @@ static void ib_build_ibv(struct node *n) ib->qp_init.cap.max_recv_wr, sizeof(double), &memtype_heap); - if(ret) { + if(ret) + { error("Failed to init recv memory pool of node %s: %s", node_name(n), gai_strerror(ret)); } @@ -168,7 +223,8 @@ static void ib_build_ibv(struct node *n) ib->qp_init.cap.max_send_wr, sizeof(double), &memtype_heap); - if(ret) { + if(ret) + { error("Failed to init send memory of node %s: %s", node_name(n), gai_strerror(ret)); } @@ -195,7 +251,8 @@ static void ib_build_ibv(struct node *n) for(int i=0; iqp_init.cap.max_recv_wr; i++) { ret = ib_post_recv_wrs(n); - if(ret) { + if(ret) + { error("Failed to post initial receive Work Requests of node %s.", node_name(n)); } @@ -214,7 +271,7 @@ static int ib_addr_resolved(struct node *n) ib_build_ibv(n); // Resolve address - ret = rdma_resolve_route(ib->id, ib->conn.timeout); + ret = rdma_resolve_route(ib->ctx.id, ib->conn.timeout); if(ret) error("Failed to resolve route in node %s.", node_name(n)); @@ -236,7 +293,7 @@ static int ib_route_resolved(struct node *n) memset(&cm_params, 0, sizeof(cm_params)); // Send connection request - ret = rdma_connect(ib->id, &cm_params); + ret = rdma_connect(ib->ctx.id, &cm_params); if(ret) error("Failed to connect in node %s.", node_name(n)); @@ -251,14 +308,14 @@ static int ib_connect_request(struct node *n, struct rdma_cm_id *id) int ret; info("Received a connection request!"); - ib->id = id; + ib->ctx.id = id; ib_build_ibv(n); struct rdma_conn_param cm_params; memset(&cm_params, 0, sizeof(cm_params)); // Accept connection request - ret = rdma_accept(ib->id, &cm_params); + ret = rdma_accept(ib->ctx.id, &cm_params); if(ret) error("Failed to connect in node %s.", node_name(n)); @@ -341,7 +398,8 @@ int ib_parse(struct node *n, json_t *cfg) // Translate IP:PORT to a struct addrinfo //ToDo: Fix fixed port ret = getaddrinfo(local, (char *)"13337", NULL, &ib->conn.src_addr); - if(ret) { + if(ret) + { error("Failed to resolve local address '%s' of node %s: %s", local, node_name(n), gai_strerror(ret)); } @@ -360,9 +418,19 @@ int ib_parse(struct node *n, json_t *cfg) ib->conn.timeout = timeout; // Translate poll mode - if(strcmp(poll_mode, "EVENT") == 0) ib->poll.poll_mode = EVENT; - else if(strcmp(poll_mode, "BUSY") == 0) ib->poll.poll_mode = BUSY; - else { + if(strcmp(poll_mode, "EVENT") == 0) + { + ib->poll.poll_mode = EVENT; + ib->poll.poll_func = ib_event_thread; + + } + else if(strcmp(poll_mode, "BUSY") == 0) + { + ib->poll.poll_mode = BUSY; + ib->poll.poll_func = ib_busy_poll_thread; + } + else + { error("Failed to translate poll_mode in node %s. %s is not a valid \ poll mode!", node_name(n), poll_mode); } @@ -395,13 +463,22 @@ int ib_parse(struct node *n, json_t *cfg) // Translate address info //ToDo: Fix fixed port ret = getaddrinfo(remote, (char *)"13337", NULL, &ib->conn.dst_addr); - if(ret) { + if(ret) + { error("Failed to resolve remote address '%s' of node %s: %s", remote, node_name(n), gai_strerror(ret)); } + + // Set correct Work Completion function + ib->poll.on_compl = ib_completion_source; } else + { ib->is_source = 0; + + // Set correct Work Completion function + ib->poll.on_compl = ib_completion_target; + } return 0; } @@ -423,22 +500,24 @@ int ib_start(struct node *n) int ret; // Create event channel - ib->ec = rdma_create_event_channel(); - if(!ib->ec) { + ib->ctx.ec = rdma_create_event_channel(); + if(!ib->ctx.ec) { error("Failed to create event channel in node %s!", node_name(n)); } - ret = rdma_create_id(ib->ec, &ib->id, NULL, ib->conn.port_space); - if(ret) { + ret = rdma_create_id(ib->ctx.ec, &ib->ctx.id, NULL, ib->conn.port_space); + if(ret) + { error("Failed to create rdma_cm_id of node %s: %s", node_name(n), gai_strerror(ret)); } info("Succesfully created rdma_cm_id."); // Bind rdma_cm_id to the HCA - ret = rdma_bind_addr(ib->id, ib->conn.src_addr->ai_addr); - if(ret) { + ret = rdma_bind_addr(ib->ctx.id, ib->conn.src_addr->ai_addr); + if(ret) + { error("Failed to bind to local device of node %s: %s", node_name(n), gai_strerror(ret)); } @@ -447,11 +526,12 @@ int ib_start(struct node *n) if(ib->is_source) { // Resolve address - ret = rdma_resolve_addr(ib->id, + ret = rdma_resolve_addr(ib->ctx.id, NULL, ib->conn.dst_addr->ai_addr, ib->conn.timeout); - if(ret) { + if(ret) + { error("Failed to resolve remote address after %ims of node %s: %s", ib->conn.timeout, node_name(n), gai_strerror(ret)); } @@ -461,11 +541,12 @@ int ib_start(struct node *n) // The ID will be overwritten for the target. If the event type is // RDMA_CM_EVENT_CONNECT_REQUEST, >then this references a new id for // that communication. - ib->listen_id = ib->id; + ib->ctx.listen_id = ib->ctx.id; // Listen on rdma_cm_id for events - ret = rdma_listen(ib->listen_id, 10); - if(ret) { + ret = rdma_listen(ib->ctx.listen_id, 10); + if(ret) + { error("Failed to listen to rdma_cm_id on node %s", node_name(n)); } } @@ -474,7 +555,7 @@ int ib_start(struct node *n) // sure the nodes are succesfully connected. info("Starting to monitor events on rdma_cm_id."); - while(rdma_get_cm_event(ib->ec, &event) == 0) + while(rdma_get_cm_event(ib->ctx.ec, &event) == 0) { struct rdma_cm_event event_copy; memcpy(&event_copy, event, sizeof(*event)); @@ -583,7 +664,7 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt) } //Send linked list of Work Requests - ret = ibv_post_send(ib->id->qp, wr, &bad_wr); + ret = ibv_post_send(ib->ctx.id->qp, wr, &bad_wr); if(ret) { error("Failed to send message in node %s: %s", @@ -591,17 +672,6 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt) return -ret; } - - /* Debugging */ - struct ibv_wc wc[5]; - int size; - while(1) - { - size = ibv_poll_cq(ib->ctx.cq, 5, wc); - if(size) - for(int j=0; j