1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

ib_write and ib_read handle memory in a way that the pool doesn't underrun now. Sending data inline is broken in this commit

This commit is contained in:
Dennis Potter 2018-07-12 17:49:17 +02:00
parent 3092eddb69
commit 4cd8fc7150

View file

@ -269,6 +269,9 @@ int ib_parse(struct node *n, json_t *cfg)
char* ip_adr = strtok(local, ":");
char* port = strtok(NULL, ":");
//n->in.vectorize = 1024;
//n->out.vectorize = 1024; //ToDo: make configurable
ret = getaddrinfo(ip_adr, port, NULL, &ib->conn.src_addr);
if (ret)
error("Failed to resolve local address '%s' of node %s: %s",
@ -621,178 +624,197 @@ int ib_deinit()
int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
{
struct infiniband *ib = (struct infiniband *) n->_vd;
struct ibv_wc wc[n->in.vectorize];
struct ibv_wc wc[cnt];
struct ibv_recv_wr wr[cnt], *bad_wr = NULL;
struct ibv_sge sge[cnt];
struct ibv_mr *mr;
int ret = 0;
int i = 0; //Used for first loop: post receive Work Requests
int j = 0; //Used for second loop: get values from Completion Queue
int k = 0; //Used for third loop: reorder list
int ret = 0, wcs = 0, read_values = 0, max_wr_post;
debug(LOG_IB | 15, "ib_read is called");
if (n->state == STATE_CONNECTED) {
if (ib->conn.available_recv_wrs < ib->qp_init.cap.max_recv_wr && cnt) {
// Get Memory Region
mr = memory_ib_get_mr(smps[0]);
max_wr_post = cnt;
for (i = 0; i < cnt; i++) {
// Prepare receive Scatter/Gather element
sge[i].addr = (uint64_t) &smps[i]->data;
sge[i].length = SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN);
sge[i].lkey = mr->lkey;
// Poll Completion Queue
// If we've already posted enough receive WRs, try to pull cnt
//if (ib->conn.available_recv_wrs > (ib->qp_init.cap.max_recv_wr / 2)) { //ToDo: Make configurable
if (ib->conn.available_recv_wrs > ( ib->qp_init.cap.max_recv_wr - (1024 * n->in.vectorize) )) { //ToDo: Make configurable
while(1) {
wcs = ibv_poll_cq(ib->ctx.recv_cq, cnt, wc);
if (wcs) {
debug(LOG_IB | 10, "Received %i Work Completions", wcs);
// Prepare a receive Work Request
wr[i].wr_id = (uintptr_t) smps[i];
wr[i].next = &wr[i+1];
wr[i].sg_list = &sge[i];
wr[i].num_sge = 1;
ib->conn.available_recv_wrs++;
if (ib->conn.available_recv_wrs == ib->qp_init.cap.max_recv_wr || i==(cnt-1)) {
debug(LOG_IB | 10, "Prepared %i new receive Work Requests", (i+1));
wr[i].next = NULL;
i++;
read_values = wcs; // Value to return
max_wr_post = wcs; // Make space free in smps[]
break;
}
}
// Post list of Work Requests
ret = ibv_post_recv(ib->ctx.id->qp, &wr[0], &bad_wr);
if (ret)
error("Was unable to post receive WR in node %s: %i, bad WR ID: 0x%lx",
node_name(n), ret, bad_wr->wr_id);
debug(LOG_IB | 10, "Succesfully posted receive Work Requests");
// All samples (wcs * received + unposted) should be released. Let
// *release be equal to allocated.
//
// This is set in the framework, before this function was called
}
else {
ib->conn.available_recv_wrs += max_wr_post;
*release = 0; // While we fill the receive queue, we always use all samples
}
// Poll Completion Queue
ret = ibv_poll_cq(ib->ctx.recv_cq, n->in.vectorize, wc);
// Get Memory Region
mr = memory_ib_get_mr(smps[0]);
if (ret) {
debug(LOG_IB | 10, "Received %i Work Completions", ret);
for (int i = 0; i < max_wr_post; i++) {
// Prepare receive Scatter/Gather element
sge[i].addr = (uint64_t) &smps[i]->data;
sge[i].length = SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN);
sge[i].lkey = mr->lkey;
ib->conn.available_recv_wrs -= ret;
for (j = 0; j < ret; j++) {
if (wc[j].status == IBV_WC_WR_FLUSH_ERR) {
debug(LOG_IB | 5, "Received IBV_WC_WR_FLUSH_ERR (ib_read). Ignore it.");
ret = 0;
}
else if (wc[j].status != IBV_WC_SUCCESS) {
warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i",
node_name(n), wc[j].status);
ret = 0;
}
else if (wc[j].opcode & IBV_WC_RECV) {
smps[j] = (struct sample *) (wc[j].wr_id);
smps[j]->length = wc[i].byte_len / sizeof(double);
}
else
ret = 0;
}
// Prepare a receive Work Request
wr[i].wr_id = (uintptr_t) smps[i];
wr[i].next = &wr[i+1];
wr[i].sg_list = &sge[i];
wr[i].num_sge = 1;
}
// Reorder list
// The first part of the list is fine. It's filled with received values. Now append
// unused values.
// * j ==> Values from completion queue
// * i ==> Values posted to receive queue
// * (cnt) ==> Available values to post to receive queue
//
// Thus:
// * (cnt - i) ==> number of unused values
int l;
for (k = j, l = 0; k < j + (cnt - i); k++, l++) {
smps[k] = smps[i + l];
wr[max_wr_post-1].next = NULL;
debug(LOG_IB | 5, "Prepared %i new receive Work Requests", max_wr_post);
debug(LOG_IB | 5, "%i receive Work Requests in Receive Queue", ib->conn.available_recv_wrs);
// Post list of Work Requests
ret = ibv_post_recv(ib->ctx.id->qp, &wr[0], &bad_wr);
if (ret)
error("Was unable to post receive WR in node %s: %i, bad WR ID: 0x%lx",
node_name(n), ret, bad_wr->wr_id);
debug(LOG_IB | 10, "Succesfully posted receive Work Requests");
// Doesn't start, if wcs == 0
for (int j = 0; j < wcs; j++) {
if ( !( (wc[j].opcode & IBV_WC_RECV) && wc[j].status == IBV_WC_SUCCESS) )
read_values--;
if (wc[j].status == IBV_WC_WR_FLUSH_ERR)
debug(LOG_IB | 5, "Received IBV_WC_WR_FLUSH_ERR (ib_read). Ignore it.");
else if (wc[j].status != IBV_WC_SUCCESS)
warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i",
node_name(n), wc[j].status);
smps[j] = (struct sample *) (wc[j].wr_id);
smps[j]->length = wc[j].byte_len / sizeof(double);
}
*release = k;
}
return ret;
return read_values;
}
int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *ready)
int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
{
struct infiniband *ib = (struct infiniband *) n->_vd;
struct ibv_send_wr wr[cnt], *bad_wr = NULL;
struct ibv_sge sge[cnt];
struct ibv_wc wc[ib->cq_size];
struct ibv_wc wc[cnt];
struct ibv_mr *mr;
int ret;
int i = 0; //Used for first loop: prepare work requests to post to send queue
int j = 0; //Used for second loop: get values from Completion Queue
int sent = 0; //Used for first loop: prepare work requests to post to send queue
debug(LOG_IB | 10, "ib_write is called");
*release = 0;
if (n->state == STATE_CONNECTED) {
// First, write
//ToDo: Place this into configuration and create checks if settings are valid
int send_inline = 1;
int send_inline = 0;
debug(LOG_IB | 10, "Data will be send inline [0/1]: %i", send_inline);
// Get Memory Region
mr = memory_ib_get_mr(smps[0]);
for (i = 0; i < cnt; i++) {
//Set Scatter/Gather element to data of sample
sge[i].addr = (uint64_t) &smps[i]->data;
sge[i].length = smps[i]->length*sizeof(double);
sge[i].lkey = mr->lkey;
for (sent = 0; sent < cnt; sent++) {
// Set Scatter/Gather element to data of sample
sge[sent].addr = (uint64_t) &smps[sent]->data;
sge[sent].length = smps[sent]->length*sizeof(double);
sge[sent].lkey = mr->lkey;
// Set Send Work Request
wr[i].wr_id = (uintptr_t) smps[i]; //This way the sample can be release in WC
wr[i].sg_list = &sge[i];
wr[i].num_sge = 1;
wr[sent].wr_id = send_inline ? 0 : (uintptr_t) smps[sent]; // This way the sample can be release in WC
wr[sent].sg_list = &sge[sent];
wr[sent].num_sge = 1;
if (i == (cnt-1)) {
debug(LOG_IB | 10, "Prepared %i send Work Requests", (i+1));
wr[i].next = NULL;
if (sent == (cnt-1)) {
debug(LOG_IB | 10, "Prepared %i send Work Requests", (sent+1));
wr[sent].next = NULL;
}
else
wr[i].next = &wr[i+1];
wr[sent].next = &wr[sent+1];
wr[i].send_flags = IBV_SEND_SIGNALED | (send_inline << 3);
wr[i].imm_data = htonl(0); //ToDo: set this to a useful value
wr[i].opcode = IBV_WR_SEND_WITH_IMM;
wr[sent].send_flags = IBV_SEND_SIGNALED | (send_inline << 3);
wr[sent].imm_data = htonl(0); //ToDo: set this to a useful value
wr[sent].opcode = IBV_WR_SEND_WITH_IMM;
}
//Send linked list of Work Requests
// Send linked list of Work Requests
ret = ibv_post_send(ib->ctx.id->qp, wr, &bad_wr);
if (ret) {
error("Failed to send message in node %s: %i, bad WR ID: 0x%lx",
node_name(n), ret, bad_wr->wr_id);
debug(LOG_IB | 4, "Posted send Work Requests");
// Reorder list. Place inline and unposted samples to the top
// m will always be equal or smaller than *release
for (int m = 0; m < cnt; m++) {
// We can't use wr_id as identifier, since it is 0 for inline
// elements
if (ret && (wr[m].sg_list == bad_wr->sg_list)) {
// The remaining work requests will be bad. Ripple through list
// and prepare them to be released
debug(LOG_IB | 4, "Bad WR occured with ID: 0x%lx and S/G address: 0x%px: %i",
bad_wr->wr_id, bad_wr->sg_list, ret);
while (1) {
smps[*release] = smps[m];
(*release)++; // Increment number of samples to be released
sent--; // Decrement the number of succesfully posted elements
if (++m == cnt) break;
}
}
else if (wr[m].send_flags & IBV_SEND_INLINE) {
smps[*release] = smps[m];
(*release)++;
}
return -ret;
}
debug(LOG_IB | 4, "Succesfully posted receive Work Requests");
debug(LOG_IB | 4, "%i samples will be released", *release);
// Subsequently, check if something is available in completion queue
ret = ibv_poll_cq(ib->ctx.send_cq, ib->cq_size, wc);
// Take only as much Work Completions from queue, as are currently available
// in smps[]
ret = ibv_poll_cq(ib->ctx.send_cq, (cnt - (*release)), wc);
//ret = ibv_poll_cq(ib->ctx.send_cq, 200, wc);
for (j = 0; j < ret; j++) {
if (wc[j].status != IBV_WC_SUCCESS && wc[j].status != IBV_WC_WR_FLUSH_ERR)
for (int i = 0; i < ret; i++) {
if (wc[i].status != IBV_WC_SUCCESS && wc[i].status != IBV_WC_WR_FLUSH_ERR)
warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i",
node_name(n), wc[j].status);
node_name(n), wc[i].status);
smps[j] = (struct sample *) (wc[j].wr_id);
// Release only samples which were not send inline
if (wc[i].wr_id) {
smps[*release] = (struct sample *) (wc[i].wr_id);
(*release)++;
}
}
*ready = j;
}
return i;
return sent;
}
int ib_fd(struct node *n)