diff --git a/include/villas/common.h b/include/villas/common.h index 92b9895b8..73ba53ba0 100644 --- a/include/villas/common.h +++ b/include/villas/common.h @@ -38,8 +38,9 @@ enum state { STATE_OPENED = 4, /* alias for STATE_STARTED used by struct io */ STATE_STOPPED = 5, STATE_UNLOADED = 5, /* alias for STATE_STARTED used by struct plugin */ - STATE_CLOSED = 5, /* alias for STATE_STARTED used by struct io */ - STATE_CONNECTED = 6 + STATE_CLOSED = 5, /* alias for STATE_STARTED used by struct io */ + STATE_PENDING_CONNECT = 6, + STATE_CONNECTED = 7 }; /** Callback to destroy list elements. diff --git a/include/villas/memory/ib.h b/include/villas/memory/ib.h index 2f4597657..4b9996b67 100644 --- a/include/villas/memory/ib.h +++ b/include/villas/memory/ib.h @@ -28,4 +28,4 @@ struct memory_ib { struct memory_type *parent; }; -struct ibv_mr * memory_ib_get_mr(struct sample *smps); +struct ibv_mr * memory_ib_get_mr(void *ptr); diff --git a/include/villas/pool.h b/include/villas/pool.h index 49fdbaee9..562ca255a 100644 --- a/include/villas/pool.h +++ b/include/villas/pool.h @@ -50,6 +50,7 @@ struct pool { }; #define INLINE static inline __attribute__((unused)) +#define pool_buffer(p) ((char *) (p) + (p)->buffer_off) /** Initiazlize a pool * diff --git a/lib/memory/ib.c b/lib/memory/ib.c index 3141bf26f..65539ca8c 100644 --- a/lib/memory/ib.c +++ b/lib/memory/ib.c @@ -27,16 +27,14 @@ #include #include -struct ibv_mr * memory_ib_get_mr(struct sample *smps) +struct ibv_mr * memory_ib_get_mr(void *ptr) { struct memory_allocation *ma; - struct pool *p; struct ibv_mr *mr; - p = sample_pool(smps); - - ma = memory_get_allocation((char *) (p) + p->buffer_off); + ma = memory_get_allocation(ptr); mr = ma->ib.mr; + return mr; } diff --git a/lib/node.c b/lib/node.c index 21e20b2f2..fbd1337af 100644 --- a/lib/node.c +++ b/lib/node.c @@ -342,7 +342,7 @@ int node_stop(struct node *n) { int ret; - if (n->state != STATE_STARTED && n->state != STATE_CONNECTED) + if (n->state != STATE_STARTED && n->state != STATE_CONNECTED && n->state != STATE_PENDING_CONNECT) return 0; info("Stopping node %s", node_name(n)); @@ -410,7 +410,7 @@ int node_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rel { int readd, nread = 0; - assert(n->state == STATE_STARTED || n->state == STATE_CONNECTED); + assert(n->state == STATE_STARTED || n->state == STATE_CONNECTED || n->state == STATE_PENDING_CONNECT); assert(node_type(n)->read); /* Send in parts if vector not supported */ diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index c691f8511..43ed99f18 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -575,9 +575,13 @@ void * ib_rdma_cm_event_thread(void *n) case RDMA_CM_EVENT_CONNECT_REQUEST: ret = ib_connect_request(n, event->id); - // Set state to connected before RDMA_CM_EVENT_ESTABLISHED actually occurs. - // This way, we can already fill the receive queue with WRs at the receive side - node->state = STATE_CONNECTED; + // A target UDP node will never really connect. In order to receive data, + // we set it to connected after it answered the connection request + // with rdma_connect. + if (ib->conn.port_space == RDMA_PS_UDP && !ib->is_source) + node->state = STATE_CONNECTED; + else + node->state = STATE_PENDING_CONNECT; break; @@ -766,7 +770,7 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea debug(LOG_IB | 15, "ib_read is called"); - if (n->state == STATE_CONNECTED) { + if (n->state == STATE_CONNECTED || n->state == STATE_PENDING_CONNECT) { max_wr_post = cnt; @@ -776,6 +780,8 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea for (int i = 0;; i++) { if (i % CHK_PER_ITER == CHK_PER_ITER - 1) pthread_testcancel(); + // If IB node disconnects or if it is still in STATE_PENDING_CONNECT, ib_read should + // return immediately if this condition holds if (n->state != STATE_CONNECTED) return 0; wcs = ibv_poll_cq(ib->ctx.recv_cq, cnt, wc); @@ -803,7 +809,7 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea } // Get Memory Region - mr = memory_ib_get_mr(smps[0]); + mr = memory_ib_get_mr(pool_buffer(sample_pool(smps[0]))); for (int i = 0; i < max_wr_post; i++) { int j = 0; @@ -908,7 +914,7 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele // First, write // Get Memory Region - mr = memory_ib_get_mr(smps[0]); + mr = memory_ib_get_mr(pool_buffer(sample_pool(smps[0]))); for (sent = 0; sent < cnt; sent++) { int j = 0;