1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

Added buffer to ib_write()

Now, ib_write() reads every cycle cnt values from the Completion Queue.
If it is not able to return them to the framework immediately, it
temporarily saves them on a stack.

ib_write() checks every cycle if the stack is non-empty and if it is
possible to return values from the stack to the framework.
This commit is contained in:
Dennis Potter 2018-07-13 12:21:59 +02:00
parent 4cd8fc7150
commit b1b778f542
2 changed files with 45 additions and 12 deletions

View file

@ -77,6 +77,12 @@ struct infiniband {
pthread_t rdma_cm_event_thread;
int available_recv_wrs;
struct send_wc_stack_s {
uint64_t* array;
unsigned top;
} send_wc_stack;
} conn;
/* Memory related variables */

View file

@ -371,14 +371,21 @@ int ib_check(struct node *n)
int max_send_pow = (int) pow(2, ceil(log2(ib->qp_init.cap.max_send_wr)));
int max_recv_pow = (int) pow(2, ceil(log2(ib->qp_init.cap.max_recv_wr)));
if (ib->qp_init.cap.max_send_wr != max_send_pow)
if (ib->qp_init.cap.max_send_wr != max_send_pow) {
warn("Max nr. of send WRs (%i) is not a power of 2! It will be changed to a power of 2: %i",
ib->qp_init.cap.max_send_wr, max_send_pow);
if (ib->qp_init.cap.max_recv_wr != max_recv_pow)
// Change it now, because otherwise errors are possible in ib_start().
ib->qp_init.cap.max_send_wr = max_send_pow;
}
if (ib->qp_init.cap.max_recv_wr != max_recv_pow) {
warn("Max nr. of recv WRs (%i) is not a power of 2! It will be changed to a power of 2: %i",
ib->qp_init.cap.max_recv_wr, max_recv_pow);
// Change it now, because otherwise errors are possible in ib_start().
ib->qp_init.cap.max_recv_wr = max_recv_pow;
}
// Check maximum size of max_recv_wr and max_send_wr
if (ib->qp_init.cap.max_send_wr > 8192)
@ -526,6 +533,11 @@ int ib_start(struct node *n)
// that communication.
ib->ctx.listen_id = ib->ctx.id;
// Initialize send Work Completion stack
ib->conn.send_wc_stack.top = 0;
ib->conn.send_wc_stack.array = alloc(ib->qp_init.cap.max_recv_wr * sizeof(uint64_t) );
debug(LOG_IB | 3, "Initialized Work Completion Stack");
if (ib->is_source) {
// Resolve address
@ -638,8 +650,7 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea
// Poll Completion Queue
// If we've already posted enough receive WRs, try to pull cnt
//if (ib->conn.available_recv_wrs > (ib->qp_init.cap.max_recv_wr / 2)) { //ToDo: Make configurable
if (ib->conn.available_recv_wrs > ( ib->qp_init.cap.max_recv_wr - (1024 * n->in.vectorize) )) { //ToDo: Make configurable
if (ib->conn.available_recv_wrs > ( ib->qp_init.cap.max_recv_wr - (1024 * n->in.vectorize) ) ) { //ToDo: Make configurable
while(1) {
wcs = ibv_poll_cq(ib->ctx.recv_cq, cnt, wc);
if (wcs) {
@ -728,7 +739,7 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele
if (n->state == STATE_CONNECTED) {
// First, write
//ToDo: Place this into configuration and create checks if settings are valid
int send_inline = 0;
int send_inline = 1;
debug(LOG_IB | 10, "Data will be send inline [0/1]: %i", send_inline);
@ -792,12 +803,9 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele
debug(LOG_IB | 4, "%i samples will be released", *release);
// Subsequently, check if something is available in completion queue
// Take only as much Work Completions from queue, as are currently available
// in smps[]
ret = ibv_poll_cq(ib->ctx.send_cq, (cnt - (*release)), wc);
//ret = ibv_poll_cq(ib->ctx.send_cq, 200, wc);
// Always poll cnt items from Receive Queue. If there is not enough space in
// smps, we temporarily save it on a stack
ret = ibv_poll_cq(ib->ctx.send_cq, cnt, wc);
for (int i = 0; i < ret; i++) {
if (wc[i].status != IBV_WC_SUCCESS && wc[i].status != IBV_WC_WR_FLUSH_ERR)
@ -806,9 +814,28 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele
// Release only samples which were not send inline
if (wc[i].wr_id) {
smps[*release] = (struct sample *) (wc[i].wr_id);
if (cnt - *release > 0) {
smps[*release] = (struct sample *) (wc[i].wr_id);
(*release)++;
}
else {
ib->conn.send_wc_stack.array[ib->conn.send_wc_stack.top] = wc[i].wr_id;
ib->conn.send_wc_stack.top++;
}
}
}
// Check if we still have some space and try to get rid of some addresses on our stack
if (ib->conn.send_wc_stack.top > 0) {
int empty_smps = cnt - *release;
for (int i = 0; i < empty_smps; i++) {
ib->conn.send_wc_stack.top--;
smps[*release] = (struct sample *) ib->conn.send_wc_stack.array[ib->conn.send_wc_stack.top];
(*release)++;
if(ib->conn.send_wc_stack.top == 0) break;
}
}