1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

Added STATE_PENDING_CONNECT

This commit is contained in:
Dennis Potter 2018-08-13 14:50:49 +02:00
parent 90680613c3
commit d742364637
3 changed files with 15 additions and 8 deletions

View file

@ -38,8 +38,9 @@ enum state {
STATE_OPENED = 4, /* alias for STATE_STARTED used by struct io */
STATE_STOPPED = 5,
STATE_UNLOADED = 5, /* alias for STATE_STARTED used by struct plugin */
STATE_CLOSED = 5, /* alias for STATE_STARTED used by struct io */
STATE_CONNECTED = 6
STATE_CLOSED = 5, /* alias for STATE_STARTED used by struct io */
STATE_PENDING_CONNECT = 6,
STATE_CONNECTED = 7
};
/** Callback to destroy list elements.

View file

@ -351,7 +351,7 @@ int node_stop(struct node *n)
{
int ret;
if (n->state != STATE_STARTED && n->state != STATE_CONNECTED)
if (n->state != STATE_STARTED && n->state != STATE_CONNECTED && n->state != STATE_PENDING_CONNECT)
return 0;
info("Stopping node %s", node_name(n));
@ -416,7 +416,7 @@ int node_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rel
{
int readd, nread = 0;
assert(n->state == STATE_STARTED || n->state == STATE_CONNECTED);
assert(n->state == STATE_STARTED || n->state == STATE_CONNECTED || n->state == STATE_PENDING_CONNECT);
assert(node_type(n)->read);
/* Send in parts if vector not supported */

View file

@ -575,9 +575,13 @@ void * ib_rdma_cm_event_thread(void *n)
case RDMA_CM_EVENT_CONNECT_REQUEST:
ret = ib_connect_request(n, event->id);
// Set state to connected before RDMA_CM_EVENT_ESTABLISHED actually occurs.
// This way, we can already fill the receive queue with WRs at the receive side
node->state = STATE_CONNECTED;
// A target UDP node will never really connect. In order to receive data,
// we set it to connected after it answered the connection request
// with rdma_connect.
if (ib->conn.port_space == RDMA_PS_UDP && !ib->is_source)
node->state = STATE_CONNECTED;
else
node->state = STATE_PENDING_CONNECT;
break;
@ -766,7 +770,7 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea
debug(LOG_IB | 15, "ib_read is called");
if (n->state == STATE_CONNECTED) {
if (n->state == STATE_CONNECTED || n->state == STATE_PENDING_CONNECT) {
max_wr_post = cnt;
@ -776,6 +780,8 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea
for (int i = 0;; i++) {
if (i % CHK_PER_ITER == CHK_PER_ITER - 1) pthread_testcancel();
// If IB node disconnects or if it is still in STATE_PENDING_CONNECT, ib_read should
// return immediately if this condition holds
if (n->state != STATE_CONNECTED) return 0;
wcs = ibv_poll_cq(ib->ctx.recv_cq, cnt, wc);