1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

Merge branch 'ib-rearrange-qp' into develop

Closes #152. As described in #182, we will not rearrange the Queue Pairs
for connected mode. As soon as we test many-to-one connections for the
unrealiable connection, we will look again at this issue.
This commit is contained in:
Dennis Potter 2018-07-21 12:14:25 +02:00
commit 2c3ddfd0c2
3 changed files with 54 additions and 41 deletions

View file

@ -13,7 +13,7 @@ nodes = {
address = "10.0.0.2:1337",
max_wrs = 8192,
cq_size = 2048,
cq_size = 8192,
vectorize = 1,

View file

@ -80,11 +80,7 @@ struct infiniband {
/* Bool, should data be send inline if possible? */
int send_inline;
/* Stack to temporarily save sent sample */
struct send_wc_stack_s {
uint64_t* array;
unsigned top;
} send_wc_stack;
struct queue send_wc_buffer;
/* Counter to keep track of available recv. WRs */
int available_recv_wrs;

View file

@ -33,18 +33,44 @@
static int ib_disconnect(struct node *n)
{
struct infiniband *ib = (struct infiniband *) n->_vd;
struct ibv_wc wc[MAX(ib->recv_cq_size, ib->send_cq_size)];
int wcs;
debug(LOG_IB | 1, "Starting to clean up");
rdma_disconnect(ib->ctx.id);
// If there is anything in the Completion Queue, it should be given back to the framework
// Receive Queue
while (ib->conn.available_recv_wrs) {
wcs = ibv_poll_cq(ib->ctx.recv_cq, ib->recv_cq_size, wc);
ib->conn.available_recv_wrs -= wcs;
for (int j = 0; j < wcs; j++)
sample_put((struct sample *) (wc[j].wr_id));
}
// Send Queue
while ((wcs = ibv_poll_cq(ib->ctx.send_cq, ib->send_cq_size, wc)))
for (int j = 0; j < wcs; j++)
if (wc[j].wr_id > 0)
sample_put((struct sample *) (wc[j].wr_id));
// Send Queue Stack
struct sample *smp = NULL;
while (queue_available(&ib->conn.send_wc_buffer)) {
// Because of queue_available, queue_pull should always return. No need
// to double check return of queue_pull.
queue_pull(&ib->conn.send_wc_buffer, (void **) &smp);
sample_put(smp);
}
// Destroy QP
rdma_destroy_qp(ib->ctx.id);
debug(LOG_IB | 3, "Destroyed QP");
// Set available receive WRs and stack top to zero
ib->conn.available_recv_wrs = 0;
ib->conn.send_wc_stack.top = 0;
return ib->stopThreads;
}
@ -462,16 +488,14 @@ void * ib_rdma_cm_event_thread(void *n)
// Monitor event channel
while (rdma_get_cm_event(ib->ctx.ec, &event) == 0) {
debug(LOG_IB | 2, "Received communication event: %s", rdma_event_str(event->event));
switch(event->event) {
case RDMA_CM_EVENT_ADDR_RESOLVED:
debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ADDR_RESOLVED");
ret = ib_addr_resolved(n);
break;
case RDMA_CM_EVENT_ADDR_ERROR:
debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ADDR_ERROR");
warn("Address resolution (rdma_resolve_addr) failed!");
ib_continue_as_listen(n, event);
@ -479,13 +503,10 @@ void * ib_rdma_cm_event_thread(void *n)
break;
case RDMA_CM_EVENT_ROUTE_RESOLVED:
debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ROUTE_RESOLVED");
ret = ib_route_resolved(n);
break;
case RDMA_CM_EVENT_ROUTE_ERROR:
debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ROUTE_ERROR");
warn("Route resolution (rdma_resovle_route) failed!");
ib_continue_as_listen(n, event);
@ -499,8 +520,6 @@ void * ib_rdma_cm_event_thread(void *n)
break;
case RDMA_CM_EVENT_CONNECT_REQUEST:
debug(LOG_IB | 2, "Received RDMA_CM_EVENT_CONNECT_REQUEST");
ret = ib_connect_request(n, event->id);
//ToDo: Think about this. In this context, we say that the QP is initialized
@ -510,7 +529,6 @@ void * ib_rdma_cm_event_thread(void *n)
break;
case RDMA_CM_EVENT_CONNECT_ERROR:
debug(LOG_IB | 2, "Received RDMA_CM_EVENT_CONNECT_ERROR");
warn("An error has occurred trying to establish a connection!");
ib_continue_as_listen(n, event);
@ -518,7 +536,6 @@ void * ib_rdma_cm_event_thread(void *n)
break;
case RDMA_CM_EVENT_REJECTED:
debug(LOG_IB | 2, "Received RDMA_CM_EVENT_REJECTED");
warn("Connection request or response was rejected by the remote end point!");
ib_continue_as_listen(n, event);
@ -526,8 +543,6 @@ void * ib_rdma_cm_event_thread(void *n)
break;
case RDMA_CM_EVENT_ESTABLISHED:
debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ESTABLISHED");
// If the connection is unreliable connectionless, set appropriate variables
if (ib->conn.port_space == RDMA_PS_UDP)
ib->conn.ud = event->param.ud;
@ -535,12 +550,12 @@ void * ib_rdma_cm_event_thread(void *n)
node->state = STATE_CONNECTED;
info("Connection established in node %s", node_name(n));
break;
case RDMA_CM_EVENT_DISCONNECTED:
debug(LOG_IB | 2, "Received RDMA_CM_EVENT_DISCONNECTED");
node->state = STATE_STARTED;
ret = ib_disconnect(n);
if (!ret)
@ -581,11 +596,10 @@ int ib_start(struct node *n)
// Create rdma_cm_id and bind to device
ib_create_bind_id(n);
// Initialize send Work Completion stack
ib->conn.send_wc_stack.top = 0;
ib->conn.send_wc_stack.array = alloc(ib->qp_init.cap.max_recv_wr * sizeof(uint64_t) );
// Initialize send Work Completion queue
queue_init(&ib->conn.send_wc_buffer, ib->qp_init.cap.max_send_wr, &memory_type_heap);
debug(LOG_IB | 3, "Initialized Work Completion Stack");
debug(LOG_IB | 3, "Initialized Work Completion Buffer");
// Resolve address or listen to rdma_cm_id
if (ib->is_source) {
@ -704,6 +718,8 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea
// If we've already posted enough receive WRs, try to pull cnt
if (ib->conn.available_recv_wrs >= (ib->qp_init.cap.max_recv_wr - ib->conn.buffer_subtraction) ) {
while(1) {
if (n->state != STATE_CONNECTED) return 0;
wcs = ibv_poll_cq(ib->ctx.recv_cq, cnt, wc);
if (wcs) {
debug(LOG_IB | 10, "Received %i Work Completions", wcs);
@ -826,6 +842,7 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele
int send_inline = (sge[sent].length < ib->qp_init.cap.max_inline_data) ?
ib->conn.send_inline : 0;
debug(LOG_IB | 10, "Sample will be send inline [0/1]: %i", send_inline);
// Set Send Work Request
@ -874,10 +891,10 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele
}
debug(LOG_IB | 4, "%i samples will be released", *release);
debug(LOG_IB | 4, "%i samples will be released (before WC)", *release);
// Always poll cnt items from Receive Queue. If there is not enough space in
// smps, we temporarily save it on a stack
// smps, we temporarily save it in a queue
ret = ibv_poll_cq(ib->ctx.send_cq, cnt, wc);
for (int i = 0; i < ret; i++) {
@ -892,26 +909,26 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele
(*release)++;
}
else {
ib->conn.send_wc_stack.array[ib->conn.send_wc_stack.top] = wc[i].wr_id;
ib->conn.send_wc_stack.top++;
queue_push(&ib->conn.send_wc_buffer, (struct sample *) (wc[i].wr_id));
debug(LOG_IB | 10, "Push in send WC Queue: %px", (struct sample *) (wc[i].wr_id));
}
}
}
// Check if we still have some space and try to get rid of some addresses on our stack
if (ib->conn.send_wc_stack.top > 0) {
// Check if we still have some space and try to get rid of some addresses in our queue
if (queue_available(&ib->conn.send_wc_buffer)) {
int empty_smps = cnt - *release;
for (int i = 0; i < empty_smps; i++) {
ib->conn.send_wc_stack.top--;
ret = queue_pull(&ib->conn.send_wc_buffer, (void **) &smps[*release]);
debug(LOG_IB | 10, "Pull from send WC Queue: %px", (struct sample *) smps[*release]);
smps[*release] = (struct sample *) ib->conn.send_wc_stack.array[ib->conn.send_wc_stack.top];
(*release)++;
if(ib->conn.send_wc_stack.top == 0) break;
if (ret)
(*release)++;
else
break;
}
}
debug(LOG_IB | 4, "%i samples will be released (after WC)", *release);
}
return sent;