1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

Solved some state problems. This commit also solves #154, which was caused by a non-terminated thread. (This thread will be removed in a later commit anyway

This commit is contained in:
Dennis Potter 2018-07-07 15:34:07 +02:00
parent 1bdd0a9e34
commit 06e7434d6c
2 changed files with 24 additions and 19 deletions

View file

@ -76,8 +76,6 @@ struct infiniband {
/* Poll thread */
pthread_t cq_poller_thread;
int stopThreads;
} poll;
int stopThreads;

View file

@ -37,6 +37,8 @@ int ib_disconnect(struct node *n)
struct infiniband *ib = (struct infiniband *) n->_vd;
debug(LOG_IB | 1, "Starting to clean up");
rdma_disconnect(ib->ctx.id);
// Destroy QP
rdma_destroy_qp(ib->ctx.id);
debug(LOG_IB | 3, "Destroyed QP");
@ -86,17 +88,7 @@ void ib_completion_target(struct node* n, struct ibv_wc* wc, int* size){}
void ib_completion_source(struct node* n, struct ibv_wc* wc, int* size)
{
struct infiniband *ib = (struct infiniband *) ((struct node *) n)->_vd;
for (int i = 0; i < *size; i++) {
//On disconnect, the QP set to error state and will be flushed
if (wc[i].status == IBV_WC_WR_FLUSH_ERR) {
debug(LOG_IB | 5, "Received IBV_WC_WR_FLUSH_ERR (ib_completion_source). Stopping thread.");
ib->poll.stopThreads = 1;
return;
}
if (wc[i].status != IBV_WC_SUCCESS)
warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i",
node_name(n), wc[i].status);
@ -140,7 +132,7 @@ void * ib_busy_poll_thread(void *n)
while ((size = ibv_poll_cq(ib->ctx.send_cq, ib->cq_size, wc)))
ib->poll.on_compl(n, wc, &size);
if (ib->poll.stopThreads)
if (ib->stopThreads)
return NULL;
}
}
@ -581,6 +573,8 @@ void * ib_rdma_cm_event_thread(void *n)
node->state = STATE_STARTED;
ret = ib_disconnect(n);
info("Host disconnected. Ready to accept new connections.");
break;
case RDMA_CM_EVENT_TIMEWAIT_EXIT:
@ -627,6 +621,12 @@ int ib_start(struct node *n)
debug(LOG_IB | 3, "Bound rdma_cm_id to Infiniband device");
// The ID will be overwritten for the target. If the event type is
// RDMA_CM_EVENT_CONNECT_REQUEST, >then this references a new id for
// that communication.
ib->ctx.listen_id = ib->ctx.id;
if (ib->is_source) {
// Resolve address
ret = rdma_resolve_addr(ib->ctx.id, NULL, ib->conn.dst_addr->ai_addr, ib->conn.timeout);
@ -635,11 +635,6 @@ int ib_start(struct node *n)
ib->conn.timeout, node_name(n), gai_strerror(ret));
}
else {
// The ID will be overwritten for the target. If the event type is
// RDMA_CM_EVENT_CONNECT_REQUEST, >then this references a new id for
// that communication.
ib->ctx.listen_id = ib->ctx.id;
// Listen on rdma_cm_id for events
ret = rdma_listen(ib->ctx.listen_id, 10);
if (ret)
@ -681,7 +676,11 @@ int ib_stop(struct node *n)
// Call RDMA disconnect function
// Will flush all outstanding WRs to the Completion Queue and
// will call RDMA_CM_EVENT_DISCONNECTED if that is done.
ret = rdma_disconnect(ib->ctx.id);
if(! ib->is_source && n->state == STATE_CONNECTED)
ret = rdma_disconnect(ib->ctx.id);
else
ret = rdma_disconnect(ib->ctx.listen_id);
if (ret)
error("Error while calling rdma_disconnect in node %s: %s",
node_name(n), gai_strerror(ret));
@ -696,6 +695,14 @@ int ib_stop(struct node *n)
debug(LOG_IB | 3, "Joined rdma_cm_event_thread");
// Wait for polling thread to join
if (ib->is_source) {
ret = pthread_join(ib->poll.cq_poller_thread, NULL);
if (ret)
error("Error while joining cq_poller_thread in node %s: %i", node_name(n), ret);
}
// Destroy RDMA CM ID
rdma_destroy_id(ib->ctx.id);
debug(LOG_IB | 3, "Destroyed rdma_cm_id");