1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

Replaced send WC stack by queue

This commit is contained in:
Dennis Potter 2018-07-21 12:07:43 +02:00
parent 0e6d962c1a
commit 8704683bf2
2 changed files with 26 additions and 24 deletions

View file

@ -67,10 +67,7 @@ struct infiniband {
int send_inline;
struct send_wc_stack_s {
uint64_t* array;
unsigned top;
} send_wc_stack;
struct queue send_wc_buffer;
int available_recv_wrs;
int buffer_subtraction;

View file

@ -57,9 +57,14 @@ static int ib_disconnect(struct node *n)
sample_put((struct sample *) (wc[j].wr_id));
// Send Queue Stack
while (ib->conn.send_wc_stack.top != 0) {
ib->conn.send_wc_stack.top--;
sample_put((struct sample *) ib->conn.send_wc_stack.array[ib->conn.send_wc_stack.top]);
struct sample *smp = NULL;
while (queue_available(&ib->conn.send_wc_buffer)) {
// Because of queue_available, queue_pull should always return. No need
// to double check return of queue_pull.
queue_pull(&ib->conn.send_wc_buffer, (void **) &smp);
sample_put(smp);
}
// Destroy QP
@ -580,11 +585,10 @@ int ib_start(struct node *n)
// Create rdma_cm_id and bind to device
ib_create_bind_id(n);
// Initialize send Work Completion stack
ib->conn.send_wc_stack.top = 0;
ib->conn.send_wc_stack.array = alloc(ib->qp_init.cap.max_recv_wr * sizeof(uint64_t) );
// Initialize send Work Completion queue
queue_init(&ib->conn.send_wc_buffer, ib->qp_init.cap.max_send_wr, &memory_type_heap);
debug(LOG_IB | 3, "Initialized Work Completion Stack");
debug(LOG_IB | 3, "Initialized Work Completion Buffer");
// Resolve address or listen to rdma_cm_id
if (ib->is_source) {
@ -810,6 +814,7 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele
int send_inline = (sge[sent].length < ib->qp_init.cap.max_inline_data) ?
ib->conn.send_inline : 0;
debug(LOG_IB | 10, "Sample will be send inline [0/1]: %i", send_inline);
// Set Send Work Request
@ -858,10 +863,10 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele
}
debug(LOG_IB | 4, "%i samples will be released", *release);
debug(LOG_IB | 4, "%i samples will be released (before WC)", *release);
// Always poll cnt items from Receive Queue. If there is not enough space in
// smps, we temporarily save it on a stack
// smps, we temporarily save it in a queue
ret = ibv_poll_cq(ib->ctx.send_cq, cnt, wc);
for (int i = 0; i < ret; i++) {
@ -876,26 +881,26 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele
(*release)++;
}
else {
ib->conn.send_wc_stack.array[ib->conn.send_wc_stack.top] = wc[i].wr_id;
ib->conn.send_wc_stack.top++;
queue_push(&ib->conn.send_wc_buffer, (struct sample *) (wc[i].wr_id));
debug(LOG_IB | 10, "Push in send WC Queue: %px", (struct sample *) (wc[i].wr_id));
}
}
}
// Check if we still have some space and try to get rid of some addresses on our stack
if (ib->conn.send_wc_stack.top > 0) {
// Check if we still have some space and try to get rid of some addresses in our queue
if (queue_available(&ib->conn.send_wc_buffer)) {
int empty_smps = cnt - *release;
for (int i = 0; i < empty_smps; i++) {
ib->conn.send_wc_stack.top--;
ret = queue_pull(&ib->conn.send_wc_buffer, (void **) &smps[*release]);
debug(LOG_IB | 10, "Pull from send WC Queue: %px", (struct sample *) smps[*release]);
smps[*release] = (struct sample *) ib->conn.send_wc_stack.array[ib->conn.send_wc_stack.top];
(*release)++;
if(ib->conn.send_wc_stack.top == 0) break;
if (ret)
(*release)++;
else
break;
}
}
debug(LOG_IB | 4, "%i samples will be released (after WC)", *release);
}
return sent;