diff --git a/include/villas/common.h b/include/villas/common.h index a7093ac38..4978f2200 100644 --- a/include/villas/common.h +++ b/include/villas/common.h @@ -38,8 +38,9 @@ enum state { STATE_OPENED = 4, /* alias for STATE_STARTED used by struct io */ STATE_STOPPED = 5, STATE_UNLOADED = 5, /* alias for STATE_STARTED used by struct plugin */ - STATE_CLOSED = 5, /* alias for STATE_STARTED used by struct io */ - STATE_CONNECTED = 6 + STATE_CLOSED = 5, /* alias for STATE_STARTED used by struct io */ + STATE_PENDING_CONNECT = 6, + STATE_CONNECTED = 7 }; /** Callback to destroy list elements. diff --git a/lib/node.c b/lib/node.c index 97789beb6..57d223638 100644 --- a/lib/node.c +++ b/lib/node.c @@ -351,7 +351,7 @@ int node_stop(struct node *n) { int ret; - if (n->state != STATE_STARTED && n->state != STATE_CONNECTED) + if (n->state != STATE_STARTED && n->state != STATE_CONNECTED && n->state != STATE_PENDING_CONNECT) return 0; info("Stopping node %s", node_name(n)); @@ -416,7 +416,7 @@ int node_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rel { int readd, nread = 0; - assert(n->state == STATE_STARTED || n->state == STATE_CONNECTED); + assert(n->state == STATE_STARTED || n->state == STATE_CONNECTED || n->state == STATE_PENDING_CONNECT); assert(node_type(n)->read); /* Send in parts if vector not supported */ diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index 8796d3946..4edb2ed31 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -575,9 +575,13 @@ void * ib_rdma_cm_event_thread(void *n) case RDMA_CM_EVENT_CONNECT_REQUEST: ret = ib_connect_request(n, event->id); - // Set state to connected before RDMA_CM_EVENT_ESTABLISHED actually occurs. - // This way, we can already fill the receive queue with WRs at the receive side - node->state = STATE_CONNECTED; + // A target UDP node will never really connect. In order to receive data, + // we set it to connected after it answered the connection request + // with rdma_connect. + if (ib->conn.port_space == RDMA_PS_UDP && !ib->is_source) + node->state = STATE_CONNECTED; + else + node->state = STATE_PENDING_CONNECT; break; @@ -766,7 +770,7 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea debug(LOG_IB | 15, "ib_read is called"); - if (n->state == STATE_CONNECTED) { + if (n->state == STATE_CONNECTED || n->state == STATE_PENDING_CONNECT) { max_wr_post = cnt; @@ -776,6 +780,8 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea for (int i = 0;; i++) { if (i % CHK_PER_ITER == CHK_PER_ITER - 1) pthread_testcancel(); + // If IB node disconnects or if it is still in STATE_PENDING_CONNECT, ib_read should + // return immediately if this condition holds if (n->state != STATE_CONNECTED) return 0; wcs = ibv_poll_cq(ib->ctx.recv_cq, cnt, wc);