From 71134a4c81ab16fccf669f73b69766fd18331393 Mon Sep 17 00:00:00 2001 From: Dennis Potter Date: Sat, 4 Aug 2018 17:34:52 +0200 Subject: [PATCH] Node now already posts Work Receives if it accepts the connections. Before, it waited until it is really connected. That caused problems, because the send side will start immediately sending if it is connected. Especially at high rates (>100k) this was a problem. --- lib/nodes/infiniband.c | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index 64b6c8985..57fddd2bf 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -453,6 +453,7 @@ static void ib_continue_as_listen(struct node *n, struct rdma_cm_event *event) "continue as listening node in such cases, set use_fallback = true in the configuration", node_name(n)); + n->state = STATE_STARTED; // Acknowledge event rdma_ack_cm_event(event); @@ -533,10 +534,10 @@ void * ib_rdma_cm_event_thread(void *n) case RDMA_CM_EVENT_CONNECT_REQUEST: ret = ib_connect_request(n, event->id); - //ToDo: Think about this. In this context, we say that the QP is initialized - //and at least one other node send data - if (ib->conn.port_space == RDMA_PS_UDP) - node->state = STATE_CONNECTED; + // Set state to connected before RDMA_CM_EVENT_ESTABLISHED actually occurs. + // This way, we can already fill the receive queue with WRs at the receive side + node->state = STATE_CONNECTED; + break; case RDMA_CM_EVENT_CONNECT_ERROR: @@ -827,8 +828,10 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea // Doesn't start, if wcs == 0 for (int j = 0; j < wcs; j++) { - if ( !( (wc[j].opcode & IBV_WC_RECV) && wc[j].status == IBV_WC_SUCCESS) ) - read_values--; + if ( !( (wc[j].opcode & IBV_WC_RECV) && wc[j].status == IBV_WC_SUCCESS) ) { + // Drop all values, we don't know where the error occured + read_values = 0; + } if (wc[j].status == IBV_WC_WR_FLUSH_ERR) debug(LOG_IB | 5, "Received IBV_WC_WR_FLUSH_ERR (ib_read). Ignore it."); @@ -842,6 +845,7 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea int correction = (ib->conn.port_space == RDMA_PS_UDP) ? META_GRH_SIZE : META_SIZE; smps[j] = (struct sample *) (wc[j].wr_id); + smps[j]->length = (wc[j].byte_len - correction) / sizeof(double); smps[j]->ts.received = ts_receive;