mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
Refactored the way ib_read() handles the refence counts for the samples it uses. This is based on the algorithm described in issue #153
This commit is contained in:
parent
6150a36411
commit
ebb5446305
1 changed files with 29 additions and 17 deletions
|
@ -89,7 +89,7 @@ void ib_completion_target(struct node* n, struct ibv_wc* wc, int* size){}
|
|||
void ib_completion_source(struct node* n, struct ibv_wc* wc, int* size)
|
||||
{
|
||||
for (int i = 0; i < *size; i++) {
|
||||
if (wc[i].status != IBV_WC_SUCCESS)
|
||||
if (wc[i].status != IBV_WC_SUCCESS && wc[i].status != IBV_WC_WR_FLUSH_ERR)
|
||||
warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i",
|
||||
node_name(n), wc[i].status);
|
||||
|
||||
|
@ -738,6 +738,9 @@ int ib_read(struct node *n, struct sample *smps[], int *cnt)
|
|||
struct ibv_sge sge[*cnt];
|
||||
struct ibv_mr *mr;
|
||||
int ret = 0;
|
||||
int i = 0; //Used for first loop: post receive Work Requests
|
||||
int j = 0; //Used for second loop: get values from Completion Queue
|
||||
int k = 0; //Used for third loop: reorder list
|
||||
|
||||
debug(LOG_IB | 15, "ib_read is called");
|
||||
|
||||
|
@ -747,10 +750,7 @@ int ib_read(struct node *n, struct sample *smps[], int *cnt)
|
|||
// Get Memory Region
|
||||
mr = memory_ib_get_mr(smps[0]);
|
||||
|
||||
for (int i = 0; i < *cnt; i++) {
|
||||
// Increase refcnt of sample
|
||||
sample_get(smps[i]);
|
||||
|
||||
for (i = 0; i < *cnt; i++) {
|
||||
// Prepare receive Scatter/Gather element
|
||||
sge[i].addr = (uint64_t) &smps[i]->data;
|
||||
sge[i].length = SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN);
|
||||
|
@ -779,7 +779,6 @@ int ib_read(struct node *n, struct sample *smps[], int *cnt)
|
|||
node_name(n), ret, bad_wr->wr_id);
|
||||
|
||||
debug(LOG_IB | 10, "Succesfully posted receive Work Requests");
|
||||
|
||||
}
|
||||
|
||||
// Poll Completion Queue
|
||||
|
@ -790,31 +789,44 @@ int ib_read(struct node *n, struct sample *smps[], int *cnt)
|
|||
|
||||
ib->conn.available_recv_wrs -= ret;
|
||||
|
||||
for (int i = 0; i < ret; i++) {
|
||||
if (wc[i].status == IBV_WC_WR_FLUSH_ERR) {
|
||||
for (j = 0; j < ret; j++) {
|
||||
if (wc[j].status == IBV_WC_WR_FLUSH_ERR) {
|
||||
debug(LOG_IB | 5, "Received IBV_WC_WR_FLUSH_ERR (ib_read). Ignore it.");
|
||||
|
||||
ret = 0;
|
||||
}
|
||||
else if (wc[i].status != IBV_WC_SUCCESS) {
|
||||
else if (wc[j].status != IBV_WC_SUCCESS) {
|
||||
warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i",
|
||||
node_name(n), wc[i].status);
|
||||
node_name(n), wc[j].status);
|
||||
ret = 0;
|
||||
}
|
||||
else if (wc[i].opcode & IBV_WC_RECV) {
|
||||
smps[i] = (struct sample*)(wc[i].wr_id);
|
||||
smps[i]->length = wc[i].byte_len/sizeof(double);
|
||||
else if (wc[j].opcode & IBV_WC_RECV) {
|
||||
smps[j] = (struct sample*)(wc[j].wr_id);
|
||||
smps[j]->length = wc[i].byte_len/sizeof(double);
|
||||
}
|
||||
else
|
||||
ret = 0;
|
||||
|
||||
//Release sample
|
||||
sample_put((struct sample *) (wc[i].wr_id));
|
||||
debug(LOG_IB | 10, "Releasing sample %p", (struct sample *) (wc[i].wr_id));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Reorder list
|
||||
// The first part of the list is fine. It's filled with received values. Now append
|
||||
// unused values.
|
||||
// * j ==> Values from completion queue
|
||||
// * i ==> Values posted to receive queue
|
||||
// * (*cnt) ==> Available values to post to receive queue
|
||||
//
|
||||
// Thus:
|
||||
// * (*cnt - i) ==> number of unused values
|
||||
int l;
|
||||
for (k = j, l = 0; k < j + (*cnt - i); k++, l++) {
|
||||
smps[k] = smps[i + l];
|
||||
}
|
||||
|
||||
|
||||
*cnt = k;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue