diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index 64b6c8985..57fddd2bf 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -453,6 +453,7 @@ static void ib_continue_as_listen(struct node *n, struct rdma_cm_event *event) "continue as listening node in such cases, set use_fallback = true in the configuration", node_name(n)); + n->state = STATE_STARTED; // Acknowledge event rdma_ack_cm_event(event); @@ -533,10 +534,10 @@ void * ib_rdma_cm_event_thread(void *n) case RDMA_CM_EVENT_CONNECT_REQUEST: ret = ib_connect_request(n, event->id); - //ToDo: Think about this. In this context, we say that the QP is initialized - //and at least one other node send data - if (ib->conn.port_space == RDMA_PS_UDP) - node->state = STATE_CONNECTED; + // Set state to connected before RDMA_CM_EVENT_ESTABLISHED actually occurs. + // This way, we can already fill the receive queue with WRs at the receive side + node->state = STATE_CONNECTED; + break; case RDMA_CM_EVENT_CONNECT_ERROR: @@ -827,8 +828,10 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea // Doesn't start, if wcs == 0 for (int j = 0; j < wcs; j++) { - if ( !( (wc[j].opcode & IBV_WC_RECV) && wc[j].status == IBV_WC_SUCCESS) ) - read_values--; + if ( !( (wc[j].opcode & IBV_WC_RECV) && wc[j].status == IBV_WC_SUCCESS) ) { + // Drop all values, we don't know where the error occured + read_values = 0; + } if (wc[j].status == IBV_WC_WR_FLUSH_ERR) debug(LOG_IB | 5, "Received IBV_WC_WR_FLUSH_ERR (ib_read). Ignore it."); @@ -842,6 +845,7 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea int correction = (ib->conn.port_space == RDMA_PS_UDP) ? META_GRH_SIZE : META_SIZE; smps[j] = (struct sample *) (wc[j].wr_id); + smps[j]->length = (wc[j].byte_len - correction) / sizeof(double); smps[j]->ts.received = ts_receive;