diff --git a/include/villas/nodes/infiniband.h b/include/villas/nodes/infiniband.h index 6f54f04f7..7fa5f2285 100644 --- a/include/villas/nodes/infiniband.h +++ b/include/villas/nodes/infiniband.h @@ -40,7 +40,7 @@ typedef void (*ib_on_completion)(struct node*, struct ibv_wc*, int*); typedef void* (*ib_poll_function)(void*); /* Enums */ -enum poll_mode_e +enum poll_mode_e { EVENT, BUSY @@ -91,7 +91,7 @@ struct infiniband { pthread_t stop_thread; int rdma_disconnect_called; - int used_recv_wrs; + int available_recv_wrs; } conn; /* Memory related variables */ diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index efe372816..f12bfa215 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -88,15 +88,11 @@ int ib_post_recv_wrs(struct node *n) return ret; } -void ib_completion_target(struct node* n, struct ibv_wc* wc, int* size) -{ - //ToDo: No implementation yet. This is still handled in ib_read -} +void ib_completion_target(struct node* n, struct ibv_wc* wc, int* size){} void ib_completion_source(struct node* n, struct ibv_wc* wc, int* size) { struct infiniband *ib = (struct infiniband *)((struct node *)n)->_vd; - struct sample* smpl; for(int i=0; i<*size; i++) { @@ -115,8 +111,7 @@ void ib_completion_source(struct node* n, struct ibv_wc* wc, int* size) else { // Release sample - smpl = (struct sample*)wc[i].wr_id; - sample_put(smpl); + sample_put((struct sample*)(wc[i].wr_id)); } } } @@ -144,19 +139,19 @@ void * ib_event_thread(void *n) void * ib_busy_poll_thread(void *n) { - struct infiniband *ib = (struct infiniband *)((struct node *)n)->_vd; - struct ibv_wc wc[ib->cq_size]; - int size; + struct infiniband *ib = (struct infiniband *)((struct node *)n)->_vd; + struct ibv_wc wc[ib->cq_size]; + int size; - while(1) - { - // Poll as long as WCs are available - while((size = ibv_poll_cq(ib->ctx.cq, ib->cq_size, wc))) - ib->poll.on_compl(n, wc, &size); + while(1) + { + // Poll as long as WCs are available + while((size = ibv_poll_cq(ib->ctx.cq, ib->cq_size, wc))) + ib->poll.on_compl(n, wc, &size); - if(ib->poll.stopThread) - return NULL; - } + if(ib->poll.stopThread) + return NULL; + } } static void ib_init_wc_poll(struct node *n) @@ -239,7 +234,7 @@ static void ib_build_ibv(struct node *n) // Set pool size to maximum size of Receive Queue pool_init(&ib->mem.p_recv, ib->qp_init.cap.max_recv_wr, - 64*sizeof(double), + SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN), &memtype_heap); if(ret) { @@ -297,16 +292,16 @@ static void ib_build_ibv(struct node *n) // Post Receive Work Requests to be able to receive data // Fill complete Receive Queue during initialization - for(int i=0; iqp_init.cap.max_recv_wr; i++) - { - ret = ib_post_recv_wrs(n); - if(ret) - { - error("Failed to post initial receive Work Requests of node %s.", - node_name(n)); - } - } - info("Filled the complete Receive Queue."); + //for(int i=0; iqp_init.cap.max_recv_wr; i++) + //{ + // ret = ib_post_recv_wrs(n); + // if(ret) + // { + // error("Failed to post initial receive Work Requests of node %s.", + // node_name(n)); + // } + //} + //info("Filled the complete Receive Queue."); } static int ib_addr_resolved(struct node *n) @@ -514,8 +509,8 @@ int ib_parse(struct node *n, json_t *cfg) ib->qp_init.cap.max_send_wr = max_send_wr; ib->qp_init.cap.max_recv_wr = max_recv_wr; - // Set used receive Work Requests to 0 - ib->conn.used_recv_wrs = 0; + // Set available receive Work Requests to 0 + ib->conn.available_recv_wrs = 0; // Set remaining QP attributes ib->qp_init.cap.max_send_sge = 1; @@ -712,49 +707,65 @@ int ib_deinit() int ib_read(struct node *n, struct sample *smps[], unsigned cnt) { - //Create separate thread for polling! This impelemtation is just - //for testing purposes struct infiniband *ib = (struct infiniband *) n->_vd; + struct ibv_wc wc[n->in.vectorize]; + struct ibv_recv_wr wr, *bad_wr = NULL; + struct ibv_sge sge; + struct ibv_mr ** mr; + struct pool *p; int ret; - struct ibv_wc wc[cnt]; - char *ptr; - ret = ibv_poll_cq(ib->ctx.cq, cnt, wc); + ret = ibv_poll_cq(ib->ctx.cq, n->in.vectorize, wc); if(ret) { - ib->conn.used_recv_wrs += ret; + ib->conn.available_recv_wrs -= ret; - for(int i=0; ilength = wc[i].byte_len/sizeof(double); + smps[i]->capacity = DEFAULT_SAMPLELEN; - //Release memory - pool_put(&ib->mem.p_recv, (double*)(wc[i].wr_id)); - } - smps[i]->length = wc[i].byte_len/sizeof(double); - smps[i]->capacity = 64; - memcpy(smps[i]->data, ptr, wc[i].byte_len); - } + //Release sample + sample_put(smps[i]); + + } + } } - else + else if(ib->conn.available_recv_wrs < ib->qp_init.cap.max_recv_wr) { - //No data received? Put new receive Work Requests to Receive Queue - for(int i=0; iconn.used_recv_wrs; i++) - ib_post_recv_wrs(n); + // No data received? Put new receive Work Requests to Receive Queue + // Get Memory Region + p = sample_pool(smps[0]); + mr = (struct ibv_mr **)((char *)(p)+p->buffer_off-8); - ib->conn.used_recv_wrs = 0; + // Increase refcnt of sample + sample_get(smps[0]); + + // Prepare receive Scatter/Gather element + sge.addr = (uint64_t)&smps[0]->data; + sge.length = SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN); + sge.lkey = (*mr)->lkey; + + // Prepare a receive Work Request + wr.wr_id = (uintptr_t)smps[0]; + wr.next = NULL; + wr.sg_list = &sge; + wr.num_sge = 1; + + // Post Work Request + ret = ibv_post_recv(ib->ctx.id->qp, &wr, &bad_wr); + + ib->conn.available_recv_wrs++; } return ret; @@ -765,8 +776,8 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt) struct infiniband *ib = (struct infiniband *) n->_vd; struct ibv_send_wr wr[cnt], *bad_wr = NULL; struct ibv_sge sge[cnt]; - struct pool *p; struct ibv_mr ** mr; + struct pool *p; int ret; memset(&wr, 0, sizeof(wr)); @@ -784,12 +795,12 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt) sample_get(smps[i]); //Set Scatter/Gather element to data of sample - sge[i].addr = (uint64_t)&smps[i]->data->f; + sge[i].addr = (uint64_t)&smps[i]->data; sge[i].length = smps[i]->length*sizeof(double); sge[i].lkey = (*mr)->lkey; // Set Send Work Request - wr[i].wr_id = (uint64_t)&smps[i]; //This way the sample can be release in WC + wr[i].wr_id = (uintptr_t)smps[i]; //This way the sample can be release in WC wr[i].sg_list = &sge[i]; wr[i].num_sge = 1; diff --git a/src/pipe.c b/src/pipe.c index 2b0260398..f1267e876 100644 --- a/src/pipe.c +++ b/src/pipe.c @@ -204,8 +204,8 @@ static void * recv_loop(void *ctx) ready = sample_alloc_many(&recvv.pool, smps, node->in.vectorize); if (ready < 0) error("Failed to allocate %u samples from receive pool.", node->in.vectorize); - else if (ready < node->in.vectorize) - warn("Receive pool underrun"); +// else if (ready < node->in.vectorize) +// warn("Receive pool underrun"); recv = node_read(node, smps, ready); if (recv < 0)