1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

Removed signaling for inline messages. Fixes #206

This should decrease the time that is spent in the write-function a little bit and made an extra buffer obsolete.
This commit is contained in:
Dennis Potter 2018-10-20 17:05:52 +02:00
parent 7c6406ef7b
commit 3c33696265
2 changed files with 6 additions and 45 deletions

View file

@ -84,9 +84,6 @@ struct infiniband {
/* Bool, should node have a fallback if it can't connect to a remote host? */
int use_fallback;
/* Buffer, used to temporarily store Work Completions from send queue */
struct queue send_wc_buffer;
/* Counter to keep track of available recv. WRs */
int available_recv_wrs;

View file

@ -57,16 +57,6 @@ static int ib_disconnect(struct node *n)
if (wc[j].wr_id > 0)
sample_decref((struct sample *) (wc[j].wr_id));
// Send Queue Stack
struct sample *smp = NULL;
while (queue_available(&ib->conn.send_wc_buffer)) {
// Because of queue_available, queue_pull should always return. No need
// to double check return of queue_pull.
queue_pull(&ib->conn.send_wc_buffer, (void **) &smp);
sample_decref(smp);
}
// Destroy QP
rdma_destroy_qp(ib->ctx.id);
debug(LOG_IB | 3, "Destroyed QP");
@ -630,9 +620,6 @@ int ib_start(struct node *n)
// Create rdma_cm_id and bind to device
ib_create_bind_id(n);
// Initialize send Work Completion queue
queue_init(&ib->conn.send_wc_buffer, ib->qp_init.cap.max_send_wr, &memory_heap);
debug(LOG_IB | 3, "Initialized Work Completion Buffer");
// Resolve address or listen to rdma_cm_id
@ -930,12 +917,12 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele
debug(LOG_IB | 10, "Sample will be send inline [0/1]: %i", send_inline);
// Set Send Work Request
wr[sent].wr_id = send_inline ? 0 : (uintptr_t) smps[sent]; // This way the sample can be release in WC
wr[sent].wr_id = (uintptr_t) smps[sent];
wr[sent].sg_list = sge[sent];
wr[sent].num_sge = j;
wr[sent].next = &wr[sent+1];
wr[sent].send_flags = IBV_SEND_SIGNALED | (send_inline << 3);
wr[sent].send_flags = send_inline ? IBV_SEND_INLINE : IBV_SEND_SIGNALED;
wr[sent].opcode = IBV_WR_SEND;
}
@ -976,39 +963,16 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele
debug(LOG_IB | 4, "%i samples will be released (before WC)", *release);
// Always poll cnt items from Receive Queue. If there is not enough space in
// smps, we temporarily save it in a queue
ret = ibv_poll_cq(ib->ctx.send_cq, cnt, wc);
// Try to grab as many CQEs from CQ as there is space in *smps[]
ret = ibv_poll_cq(ib->ctx.send_cq, cnt - *release, wc);
for (int i = 0; i < ret; i++) {
if (wc[i].status != IBV_WC_SUCCESS && wc[i].status != IBV_WC_WR_FLUSH_ERR)
warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i",
node_name(n), wc[i].status);
// Release only samples which were not send inline
if (wc[i].wr_id) {
if (cnt - *release > 0) {
smps[*release] = (struct sample *) (wc[i].wr_id);
(*release)++;
}
else {
queue_push(&ib->conn.send_wc_buffer, (struct sample *) (wc[i].wr_id));
debug(LOG_IB | 10, "Push in send WC Queue: %px", (struct sample *) (wc[i].wr_id));
}
}
}
// Check if we still have some space and try to get rid of some addresses in our queue
if (queue_available(&ib->conn.send_wc_buffer)) {
int empty_smps = cnt - *release;
for (int i = 0; i < empty_smps; i++) {
ret = queue_pull(&ib->conn.send_wc_buffer, (void **) &smps[*release]);
debug(LOG_IB | 10, "Pull from send WC Queue: %px", (struct sample *) smps[*release]);
if (ret)
(*release)++;
else
break;
}
smps[*release] = (struct sample *) (wc[i].wr_id);
(*release)++;
}
debug(LOG_IB | 4, "%i samples will be released (after WC)", *release);