1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

Merge branch 'develop' of git.rwth-aachen.de:acs/public/villas/VILLASnode into develop

This commit is contained in:
Steffen Vogel 2018-08-20 18:40:46 +02:00
commit 52492d47d1
6 changed files with 22 additions and 16 deletions

View file

@ -38,8 +38,9 @@ enum state {
STATE_OPENED = 4, /* alias for STATE_STARTED used by struct io */
STATE_STOPPED = 5,
STATE_UNLOADED = 5, /* alias for STATE_STARTED used by struct plugin */
STATE_CLOSED = 5, /* alias for STATE_STARTED used by struct io */
STATE_CONNECTED = 6
STATE_CLOSED = 5, /* alias for STATE_STARTED used by struct io */
STATE_PENDING_CONNECT = 6,
STATE_CONNECTED = 7
};
/** Callback to destroy list elements.

View file

@ -28,4 +28,4 @@ struct memory_ib {
struct memory_type *parent;
};
struct ibv_mr * memory_ib_get_mr(struct sample *smps);
struct ibv_mr * memory_ib_get_mr(void *ptr);

View file

@ -50,6 +50,7 @@ struct pool {
};
#define INLINE static inline __attribute__((unused))
#define pool_buffer(p) ((char *) (p) + (p)->buffer_off)
/** Initiazlize a pool
*

View file

@ -27,16 +27,14 @@
#include <villas/utils.h>
#include <villas/memory/ib.h>
struct ibv_mr * memory_ib_get_mr(struct sample *smps)
struct ibv_mr * memory_ib_get_mr(void *ptr)
{
struct memory_allocation *ma;
struct pool *p;
struct ibv_mr *mr;
p = sample_pool(smps);
ma = memory_get_allocation((char *) (p) + p->buffer_off);
ma = memory_get_allocation(ptr);
mr = ma->ib.mr;
return mr;
}

View file

@ -342,7 +342,7 @@ int node_stop(struct node *n)
{
int ret;
if (n->state != STATE_STARTED && n->state != STATE_CONNECTED)
if (n->state != STATE_STARTED && n->state != STATE_CONNECTED && n->state != STATE_PENDING_CONNECT)
return 0;
info("Stopping node %s", node_name(n));
@ -410,7 +410,7 @@ int node_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rel
{
int readd, nread = 0;
assert(n->state == STATE_STARTED || n->state == STATE_CONNECTED);
assert(n->state == STATE_STARTED || n->state == STATE_CONNECTED || n->state == STATE_PENDING_CONNECT);
assert(node_type(n)->read);
/* Send in parts if vector not supported */

View file

@ -575,9 +575,13 @@ void * ib_rdma_cm_event_thread(void *n)
case RDMA_CM_EVENT_CONNECT_REQUEST:
ret = ib_connect_request(n, event->id);
// Set state to connected before RDMA_CM_EVENT_ESTABLISHED actually occurs.
// This way, we can already fill the receive queue with WRs at the receive side
node->state = STATE_CONNECTED;
// A target UDP node will never really connect. In order to receive data,
// we set it to connected after it answered the connection request
// with rdma_connect.
if (ib->conn.port_space == RDMA_PS_UDP && !ib->is_source)
node->state = STATE_CONNECTED;
else
node->state = STATE_PENDING_CONNECT;
break;
@ -766,7 +770,7 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea
debug(LOG_IB | 15, "ib_read is called");
if (n->state == STATE_CONNECTED) {
if (n->state == STATE_CONNECTED || n->state == STATE_PENDING_CONNECT) {
max_wr_post = cnt;
@ -776,6 +780,8 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea
for (int i = 0;; i++) {
if (i % CHK_PER_ITER == CHK_PER_ITER - 1) pthread_testcancel();
// If IB node disconnects or if it is still in STATE_PENDING_CONNECT, ib_read should
// return immediately if this condition holds
if (n->state != STATE_CONNECTED) return 0;
wcs = ibv_poll_cq(ib->ctx.recv_cq, cnt, wc);
@ -803,7 +809,7 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea
}
// Get Memory Region
mr = memory_ib_get_mr(smps[0]);
mr = memory_ib_get_mr(pool_buffer(sample_pool(smps[0])));
for (int i = 0; i < max_wr_post; i++) {
int j = 0;
@ -908,7 +914,7 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele
// First, write
// Get Memory Region
mr = memory_ib_get_mr(smps[0]);
mr = memory_ib_get_mr(pool_buffer(sample_pool(smps[0])));
for (sent = 0; sent < cnt; sent++) {
int j = 0;