1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

Node gives back samples to framework at disconnect

The node blocks a certain amount of samples to use in its queues.
Before this commit, the only moment to release them to the framwork was
during ib_read()/ib_write().

But, there were a couple of problems. In the following I will take
ib_read() as example, but ib_write() will be analogous.

The first problem was:
1. If a QP disconnect, all Work Requests get invalidated and will be
   "flushed" to a Completion Queue.

A possible solution would be, to save them in an intermediate buffer.
We could then "exchange" these samples with the framework as soon as the node
connects again and ib_read() is called again. So, we would get valid
samples from the framwork, post them, and give the "invalidated" samples back.

But, there is a second problem:
2. We cannot assume that ib_read() is ever called again after
   ib_disconnect(). This is for example the case if the disconnect is
   triggered by ib_stop() and not by an external node that disconnects.

   This would result in a memory leak, since the samples would never be
   returned to the framework, although the node is stopped.

Because of this second problem, I decided to return all samples with
sample_put() in the disconnect function. An additional benefit is that
this is more convenient than another buffer to temporarily safe the
invalidated samples.
This commit is contained in:
Dennis Potter 2018-07-16 13:41:53 +02:00
parent 3f58359ce4
commit 0230389593

View file

@ -22,6 +22,7 @@
#include <string.h>
#include <math.h>
#include <unistd.h>
#include <villas/nodes/infiniband.h>
#include <villas/plugin.h>
@ -33,18 +34,42 @@
static int ib_disconnect(struct node *n)
{
struct infiniband *ib = (struct infiniband *) n->_vd;
struct ibv_wc wc[MAX(ib->recv_cq_size, ib->send_cq_size)];
int wcs;
debug(LOG_IB | 1, "Starting to clean up");
rdma_disconnect(ib->ctx.id);
// Give the Completion Queues a chance to fill after rdma_disconnect
usleep(50000);
// If there is anything in the Completion Queue, it should be given back to the framework
// Receive Queue
while ((wcs = ibv_poll_cq(ib->ctx.recv_cq, ib->recv_cq_size, wc))) {
ib->conn.available_recv_wrs -= wcs;
for (int j = 0; j < wcs; j++)
sample_put((struct sample *) (wc[j].wr_id));
}
// Send Queue
while ((wcs = ibv_poll_cq(ib->ctx.send_cq, ib->send_cq_size, wc)))
for (int j = 0; j < wcs; j++)
if (wc[j].wr_id > 0)
sample_put((struct sample *) (wc[j].wr_id));
// Send Queue Stack
while (ib->conn.send_wc_stack.top != 0) {
ib->conn.send_wc_stack.top--;
sample_put((struct sample *) ib->conn.send_wc_stack.array[ib->conn.send_wc_stack.top]);
}
info("WCS: %i", ib->conn.available_recv_wrs);
// Destroy QP
rdma_destroy_qp(ib->ctx.id);
debug(LOG_IB | 3, "Destroyed QP");
// Set available receive WRs and stack top to zero
ib->conn.available_recv_wrs = 0;
ib->conn.send_wc_stack.top = 0;
return ib->stopThreads;
}
@ -529,8 +554,8 @@ void * ib_rdma_cm_event_thread(void *n)
case RDMA_CM_EVENT_DISCONNECTED:
debug(LOG_IB | 2, "Received RDMA_CM_EVENT_DISCONNECTED");
node->state = STATE_STARTED;
ret = ib_disconnect(n);
if (!ret)
@ -699,6 +724,8 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea
// If we've already posted enough receive WRs, try to pull cnt
if (ib->conn.available_recv_wrs >= (ib->qp_init.cap.max_recv_wr - ib->conn.buffer_subtraction) ) {
while(1) {
if (n->state != STATE_CONNECTED) return 0;
wcs = ibv_poll_cq(ib->ctx.recv_cq, cnt, wc);
if (wcs) {
debug(LOG_IB | 10, "Received %i Work Completions", wcs);