1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

Node is able to clean everything up and reconnect. Node can abort if it is in STARTED and in CONNECTED state

This commit is contained in:
Dennis Potter 2018-07-07 14:36:23 +02:00
parent 80da4801e1
commit 836adee4d6
2 changed files with 59 additions and 56 deletions

View file

@ -77,9 +77,11 @@ struct infiniband {
/* Poll thread */
pthread_t cq_poller_thread;
int stopThread;
int stopThreads;
} poll;
int stopThreads;
/* Connection specific variables */
struct connection_s {
struct addrinfo *src_addr;

View file

@ -22,7 +22,6 @@
#include <string.h>
#include <math.h>
#include <unistd.h> //ToDo: remove me.
#include <villas/nodes/infiniband.h>
#include <villas/plugin.h>
@ -33,7 +32,7 @@
#include <villas/memory.h>
#include <villas/memory/ib.h>
int ib_cleanup(struct node *n)
int ib_disconnect(struct node *n)
{
struct infiniband *ib = (struct infiniband *) n->_vd;
debug(LOG_IB | 1, "Starting to clean up");
@ -53,13 +52,8 @@ int ib_cleanup(struct node *n)
pool_destroy(&ib->mem.p_send);
debug(LOG_IB | 3, "Destroyed memory pools");
// Destroy RDMA CM ID
rdma_destroy_id(ib->ctx.id);
debug(LOG_IB | 3, "Destroyed rdma_cm_id");
// Destroy event channel
rdma_destroy_event_channel(ib->ctx.ec);
debug(LOG_IB | 3, "Destroyed event channel");
// Set available receive work requests to zero
ib->conn.available_recv_wrs = 0;
return 0;
}
@ -97,9 +91,9 @@ void ib_completion_source(struct node* n, struct ibv_wc* wc, int* size)
for (int i = 0; i < *size; i++) {
//On disconnect, the QP set to error state and will be flushed
if (wc[i].status == IBV_WC_WR_FLUSH_ERR) {
debug(LOG_IB | 5, "Received IBV_WC_WR_FLUSH_ERR in ib_completion_source. Stopping thread.");
debug(LOG_IB | 5, "Received IBV_WC_WR_FLUSH_ERR (ib_completion_source). Stopping thread.");
ib->poll.stopThread = 1;
ib->poll.stopThreads = 1;
return;
}
@ -146,7 +140,7 @@ void * ib_busy_poll_thread(void *n)
while ((size = ibv_poll_cq(ib->ctx.send_cq, ib->cq_size, wc)))
ib->poll.on_compl(n, wc, &size);
if (ib->poll.stopThread)
if (ib->poll.stopThreads)
return NULL;
}
}
@ -169,21 +163,13 @@ static void ib_init_wc_poll(struct node *n)
}
// Create completion queues and bind to channel (or NULL)
ib->ctx.recv_cq = ibv_create_cq(ib->ctx.id->verbs,
ib->cq_size,
NULL,
NULL,
0);
ib->ctx.recv_cq = ibv_create_cq(ib->ctx.id->verbs, ib->cq_size, NULL, NULL, 0);
if (!ib->ctx.recv_cq)
error("Could not create receive completion queue in node %s", node_name(n));
debug(LOG_IB | 3, "Created receive Completion Queue");
ib->ctx.send_cq = ibv_create_cq(ib->ctx.id->verbs,
ib->cq_size,
NULL,
ib->ctx.comp_channel,
0);
ib->ctx.send_cq = ibv_create_cq(ib->ctx.id->verbs, ib->cq_size, NULL, ib->ctx.comp_channel, 0);
if (!ib->ctx.send_cq)
error("Could not create send completion queue in node %s", node_name(n));
@ -237,10 +223,7 @@ static void ib_build_ibv(struct node *n)
ib->mem.p_recv.queue.state = STATE_DESTROYED;
// Set pool size to maximum size of Receive Queue
pool_init(&ib->mem.p_recv,
ib->qp_init.cap.max_recv_wr,
SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN),
&memory_type_heap);
pool_init(&ib->mem.p_recv, ib->qp_init.cap.max_recv_wr, SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN), &memory_type_heap);
if (ret)
error("Failed to init recv memory pool of node %s: %s",
node_name(n), gai_strerror(ret));
@ -252,8 +235,7 @@ static void ib_build_ibv(struct node *n)
// Register memory for IB Device. Not necessary if data is send
// exclusively inline
ib->mem.mr_recv = ibv_reg_mr(
ib->ctx.pd,
ib->mem.mr_recv = ibv_reg_mr(ib->ctx.pd,
(char*)&ib->mem.p_recv+ib->mem.p_recv.buffer_off,
ib->mem.p_recv.len,
IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
@ -268,29 +250,22 @@ static void ib_build_ibv(struct node *n)
ib->mem.p_send.queue.state = STATE_DESTROYED;
// Set pool size to maximum size of Receive Queue
pool_init(&ib->mem.p_send,
ib->qp_init.cap.max_send_wr,
sizeof(double),
&memory_type_heap);
pool_init(&ib->mem.p_send, ib->qp_init.cap.max_send_wr, sizeof(double), &memory_type_heap);
if (ret)
error("Failed to init send memory of node %s: %s",
node_name(n), gai_strerror(ret));
error("Failed to init send memory of node %s: %s", node_name(n), gai_strerror(ret));
debug(LOG_IB | 3, "Created internal send pool with %i elements",
ib->qp_init.cap.max_recv_wr);
debug(LOG_IB | 3, "Created internal send pool with %i elements", ib->qp_init.cap.max_recv_wr);
//ToDo: initialize r_addr_key struct if mode is RDMA
// Register memory for IB Device. Not necessary if data is send
// exclusively inline
ib->mem.mr_send = ibv_reg_mr(
ib->ctx.pd,
ib->mem.mr_send = ibv_reg_mr(ib->ctx.pd,
(char*)&ib->mem.p_send+ib->mem.p_send.buffer_off,
ib->mem.p_send.len,
IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
if (!ib->mem.mr_send)
error("Failed to register mr_send with ibv_reg_mr of node %s",
node_name(n));
error("Failed to register mr_send with ibv_reg_mr of node %s", node_name(n));
debug(LOG_IB | 3, "Registered send pool with ibv_reg_mr");
}
@ -463,7 +438,7 @@ int ib_parse(struct node *n, json_t *cfg)
//Check if node is a source and connect to target
if (remote) {
debug(LOG_IB | 3, "Node %s is set up to be able to send data (source and target)", node_name(n));
debug(LOG_IB | 3, "Node %s is up as source and target", node_name(n));
ib->is_source = 1;
@ -476,13 +451,13 @@ int ib_parse(struct node *n, json_t *cfg)
error("Failed to resolve remote address '%s' of node %s: %s",
remote, node_name(n), gai_strerror(ret));
debug(LOG_IB | 4, "Translated %s:%s to a struct addrinfo in node %s", ip_adr, port, node_name(n));
debug(LOG_IB | 4, "Translated %s:%s to a struct addrinfo", ip_adr, port);
// Set correct Work Completion function
ib->poll.on_compl = ib_completion_source;
}
else {
debug(LOG_IB | 3, "Node %s is set up to be able to only receive data (target)", node_name(n));
debug(LOG_IB | 3, "Node %s is set up as target", node_name(n));
ib->is_source = 0;
@ -504,11 +479,11 @@ int ib_check(struct node *n)
int max_recv_pow = (int) pow(2, ceil(log2(ib->qp_init.cap.max_recv_wr)));
if (ib->qp_init.cap.max_send_wr != max_send_pow)
warn("Max nr. of send WRs (%i) is not a power of 2! The HCA will change it to the next power of 2: %i",
warn("Max nr. of send WRs (%i) is not a power of 2! It will be changed to a power of 2: %i",
ib->qp_init.cap.max_send_wr, max_send_pow);
if (ib->qp_init.cap.max_recv_wr != max_recv_pow)
warn("Max nr. of recv WRs (%i) is not a power of 2! The HCA will change it to the next power of 2: %i",
warn("Max nr. of recv WRs (%i) is not a power of 2! It will be changed to a power of 2: %i",
ib->qp_init.cap.max_recv_wr, max_recv_pow);
@ -603,7 +578,12 @@ void * ib_rdma_cm_event_thread(void *n)
case RDMA_CM_EVENT_DISCONNECTED:
debug(LOG_IB | 2, "Received RDMA_CM_EVENT_DISCONNECTED");
ret = ib_cleanup(n);
node->state = STATE_STARTED;
ret = ib_disconnect(n);
break;
case RDMA_CM_EVENT_TIMEWAIT_EXIT:
break;
default:
@ -612,7 +592,7 @@ void * ib_rdma_cm_event_thread(void *n)
rdma_ack_cm_event(event);
if (ret) //ToDo: Fix me
if (ret || ib->stopThreads)
break;
}
@ -635,8 +615,7 @@ int ib_start(struct node *n)
ret = rdma_create_id(ib->ctx.ec, &ib->ctx.id, NULL, ib->conn.port_space);
if (ret)
error("Failed to create rdma_cm_id of node %s: %s",
node_name(n), gai_strerror(ret));
error("Failed to create rdma_cm_id of node %s: %s", node_name(n), gai_strerror(ret));
debug(LOG_IB | 3, "Created rdma_cm_id");
@ -650,10 +629,7 @@ int ib_start(struct node *n)
if (ib->is_source) {
// Resolve address
ret = rdma_resolve_addr(ib->ctx.id,
NULL,
ib->conn.dst_addr->ai_addr,
ib->conn.timeout);
ret = rdma_resolve_addr(ib->ctx.id, NULL, ib->conn.dst_addr->ai_addr, ib->conn.timeout);
if (ret)
error("Failed to resolve remote address after %ims of node %s: %s",
ib->conn.timeout, node_name(n), gai_strerror(ret));
@ -698,16 +674,41 @@ int ib_stop(struct node *n)
struct infiniband *ib = (struct infiniband *) n->_vd;
int ret;
debug(LOG_IB | 1, "Called ib_stop");
ib->stopThreads = 1;
// Call RDMA disconnect function
// Will flush all outstanding WRs to the Completion Queue and
// will call RDMA_CM_EVENT_DISCONNECTED if that is done.
ret = rdma_disconnect(ib->ctx.id);
if (ret)
error("Error while calling rdma_disconnect in node %s: %s",
node_name(n), gai_strerror(ret));
node_name(n), gai_strerror(ret));
debug(LOG_IB | 3, "Called rdma_disconnect");
info("Disconnecting... Please give me a few seconds.");
// Wait for event thread to join
ret = pthread_join(ib->conn.rdma_cm_event_thread, NULL);
if (ret)
error("Error while joining rdma_cm_event_thread in node %s: %i", node_name(n), ret);
debug(LOG_IB | 3, "Joined rdma_cm_event_thread");
// Destroy RDMA CM ID
rdma_destroy_id(ib->ctx.id);
debug(LOG_IB | 3, "Destroyed rdma_cm_id");
// Dealloc Protection Domain
ibv_dealloc_pd(ib->ctx.pd);
debug(LOG_IB | 3, "Destroyed protection domain");
// Destroy event channel
rdma_destroy_event_channel(ib->ctx.ec);
debug(LOG_IB | 3, "Destroyed event channel");
info("Successfully stopped %s", node_name(n));
return 0;
}
@ -784,7 +785,7 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt)
for (int i = 0; i < ret; i++) {
if (wc[i].status == IBV_WC_WR_FLUSH_ERR) {
debug(LOG_IB | 5, "Received IBV_WC_WR_FLUSH_ERR in ib_read. Ignore it.");
debug(LOG_IB | 5, "Received IBV_WC_WR_FLUSH_ERR (ib_read). Ignore it.");
ret = 0;
}