1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

Implemented disconnect function on source and target side. The cleanup function doesn't go through completely yet, probably because rdma_destroy_id blocks because not everything in the rdma_cm_id is destroyed yet.

This commit is contained in:
Dennis Potter 2018-06-28 12:46:16 +02:00
parent 1df18da3f2
commit 5598f93582
2 changed files with 113 additions and 11 deletions

View file

@ -63,7 +63,6 @@ struct infiniband {
struct ibv_cq *cq;
struct ibv_comp_channel *comp_channel;
} ctx;
/* Work Completion related */
struct poll_s {
enum poll_mode_e poll_mode;
@ -76,6 +75,8 @@ struct infiniband {
/* Poll thread */
pthread_t cq_poller_thread;
int stopThread;
} poll;
/* Connection specific variables */
@ -86,6 +87,9 @@ struct infiniband {
int timeout;
struct r_addr_key_s *r_addr_key;
pthread_t stop_thread;
int rdma_disconnect_called;
} conn;
/* Memory related variables */
@ -103,7 +107,6 @@ struct infiniband {
/* Misc settings */
int is_source;
int cq_size;
};
/** @see node_type::reverse */

View file

@ -31,6 +31,31 @@
#include <rdma/rdma_cma.h>
int ib_cleanup(struct node *n)
{
struct infiniband *ib = (struct infiniband *) n->_vd;
info("Starting to clean up");
// Destroy QP
rdma_destroy_qp(ib->ctx.id);
info("Destroyed QP");
// Deregister memory regions
ibv_dereg_mr(ib->mem.mr_recv);
if(ib->is_source)
ibv_dereg_mr(ib->mem.mr_send);
info("Deregistered memory regions");
// Destroy pools
pool_destroy(&ib->mem.p_recv);
pool_destroy(&ib->mem.p_send);
info("Destroyed memory pools");
rdma_destroy_id(ib->ctx.id);
info("Destroyed rdma_cm_id");
return 0;
}
int ib_post_recv_wrs(struct node *n)
{
@ -63,11 +88,21 @@ void ib_completion_target(struct node* n, struct ibv_wc* wc, int* size)
void ib_completion_source(struct node* n, struct ibv_wc* wc, int* size)
{
struct infiniband *ib = (struct infiniband *)((struct node *)n)->_vd;
for(int i=0; i<*size; i++)
{
//On disconnect, the QP set to error state and will be flushed
if(wc[i].status == IBV_WC_WR_FLUSH_ERR)
{
ib->poll.stopThread = 1;
return;
}
if(wc[i].status != IBV_WC_SUCCESS)
error("Work Completion status was not IBV_WC_SUCCES in node %s: %s",
node_name(n), gai_strerror(wc[i].status));
warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i",
node_name(n), wc[i].status);
}
}
@ -101,13 +136,12 @@ void * ib_busy_poll_thread(void *n)
while(1)
{
//ToDo: Implement stopThreads variable
if(0)
return NULL;
// Poll as long as WCs are available
while((size = ibv_poll_cq(ib->ctx.cq, ib->cq_size, wc)))
ib->poll.on_compl(n, wc, &size);
if(ib->poll.stopThread)
return NULL;
}
}
@ -351,6 +385,9 @@ static int ib_event(struct node *n, struct rdma_cm_event *event)
info("Connection established!");
ret = 1;
break;
case RDMA_CM_EVENT_DISCONNECTED:
ret = ib_cleanup(n);
break;
default:
error("Unknown event occurred: %u",
event->event);
@ -448,6 +485,7 @@ int ib_parse(struct node *n, json_t *cfg)
}
// Set max. send and receive Work Requests
//ToDo: Set hint that max_*_wr can only be a value 1<<<X>
ib->qp_init.cap.max_send_wr = max_send_wr;
ib->qp_init.cap.max_recv_wr = max_recv_wr;
@ -493,6 +531,23 @@ int ib_destroy(struct node *n)
return 0;
}
void * ib_stop_thread(void *n)
{
struct node *node = (struct node *)n;
struct infiniband *ib = (struct infiniband *)((struct node *)n)->_vd;
struct rdma_cm_event *event;
while(rdma_get_cm_event(ib->ctx.ec, &event) == 0)
{
if(event->event == RDMA_CM_EVENT_DISCONNECTED)
{
ib->conn.rdma_disconnect_called = 1;
node_stop(node);
return NULL;
}
}
return NULL;
}
int ib_start(struct node *n)
{
struct infiniband *ib = (struct infiniband *) n->_vd;
@ -566,11 +621,49 @@ int ib_start(struct node *n)
break;
}
ret = pthread_create(&ib->conn.stop_thread, NULL, ib_stop_thread, n);
if(ret)
{
error("Failed to create thread to monitor disconnects in node %s: %s",
node_name(n), gai_strerror(ret));
}
return 0;
}
int ib_stop(struct node *n)
{
struct infiniband *ib = (struct infiniband *) n->_vd;
struct rdma_cm_event *event = NULL;
int ret;
// Call RDMA disconnect function
// Will flush all outstanding WRs to the Completion Queue and
// will call RDMA_CM_EVENT_DISCONNECTED if that is done.
ret = rdma_disconnect(ib->ctx.id);
if(ret)
{
error("Error while calling rdma_disconnect in node %s: %s",
node_name(n), gai_strerror(ret));
}
info("Called rdma_disconnect.");
// If disconnected event already occured, directly call cleanup function
if(ib->conn.rdma_disconnect_called)
{
ib_cleanup(n);
}
// Else, wait for event to occur
else
{
ib->conn.rdma_disconnect_called = 1;
rdma_get_cm_event(ib->ctx.ec, &event);
ib_event(n, event);
rdma_ack_cm_event(event);
}
return 0;
}
@ -605,10 +698,16 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt)
for(int i=0; i<ret; i++)
{
if(wc[i].status != IBV_WC_SUCCESS)
error("Work Completion status was not IBV_WC_SUCCES in node %s", node_name(n));
if(wc[i].status == IBV_WC_WR_FLUSH_ERR)
return 0;
data[i].f = *(double*)(wc[i].wr_id);
if(wc[i].status != IBV_WC_SUCCESS)
{
warn("Work Completion status was not IBV_WC_SUCCES in node %s", node_name(n));
ret--;
}
else
data[i].f = *(double*)(wc[i].wr_id);
}
smps[0]->length = ret;
smps[0]->capacity = cnt;