mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
Refactored ib_write in the same way as ib_read (described in #153). Merged separate completion queue polls to ib_write. Closes #167
This commit is contained in:
parent
ebb5446305
commit
746fd2f694
2 changed files with 32 additions and 111 deletions
|
@ -34,12 +34,6 @@
|
|||
#include <villas/io.h>
|
||||
#include <villas/queue_signalled.h>
|
||||
#include <rdma/rdma_cma.h>
|
||||
|
||||
/* Function pointer typedefs */
|
||||
typedef void (*ib_on_completion) (struct node*, struct ibv_wc*, int*);
|
||||
typedef void * (*ib_poll_function) (void*);
|
||||
typedef void * (*ib_event_function) (void*);
|
||||
|
||||
/* Enums */
|
||||
enum poll_mode_e {
|
||||
EVENT,
|
||||
|
@ -67,15 +61,6 @@ struct infiniband {
|
|||
/* Work Completion related */
|
||||
struct poll_s {
|
||||
enum poll_mode_e poll_mode;
|
||||
|
||||
/* On completion function */
|
||||
ib_on_completion on_compl;
|
||||
|
||||
/* Busy poll or Event function */
|
||||
ib_poll_function poll_func;
|
||||
|
||||
/* Poll thread */
|
||||
pthread_t cq_poller_thread;
|
||||
} poll;
|
||||
|
||||
int stopThreads;
|
||||
|
|
|
@ -57,7 +57,7 @@ int ib_disconnect(struct node *n)
|
|||
// Set available receive work requests to zero
|
||||
ib->conn.available_recv_wrs = 0;
|
||||
|
||||
return 0;
|
||||
return ib->stopThreads;
|
||||
}
|
||||
|
||||
int ib_post_recv_wrs(struct node *n)
|
||||
|
@ -84,66 +84,13 @@ int ib_post_recv_wrs(struct node *n)
|
|||
return ret;
|
||||
}
|
||||
|
||||
void ib_completion_target(struct node* n, struct ibv_wc* wc, int* size){}
|
||||
|
||||
void ib_completion_source(struct node* n, struct ibv_wc* wc, int* size)
|
||||
{
|
||||
for (int i = 0; i < *size; i++) {
|
||||
if (wc[i].status != IBV_WC_SUCCESS && wc[i].status != IBV_WC_WR_FLUSH_ERR)
|
||||
warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i",
|
||||
node_name(n), wc[i].status);
|
||||
|
||||
sample_put((struct sample *) wc[i].wr_id);
|
||||
}
|
||||
}
|
||||
|
||||
void * ib_event_thread(void *n)
|
||||
{
|
||||
struct infiniband *ib = (struct infiniband *) ((struct node *) n)->_vd;
|
||||
struct ibv_wc wc[ib->cq_size];
|
||||
int size;
|
||||
|
||||
debug(LOG_IB | 1, "Initialized event based poll thread of node %s", node_name(n));
|
||||
|
||||
while (1) {
|
||||
// Function blocks, until an event occurs
|
||||
ibv_get_cq_event(ib->ctx.comp_channel, &ib->ctx.send_cq, NULL);
|
||||
|
||||
// Poll as long as WCs are available
|
||||
while ((size = ibv_poll_cq(ib->ctx.send_cq, ib->cq_size, wc)))
|
||||
ib->poll.on_compl(n, wc, &size);
|
||||
|
||||
// Request a new event in the CQ and acknowledge event
|
||||
ibv_req_notify_cq(ib->ctx.send_cq, 0);
|
||||
ibv_ack_cq_events(ib->ctx.send_cq, 1);
|
||||
}
|
||||
}
|
||||
|
||||
void * ib_busy_poll_thread(void *n)
|
||||
{
|
||||
struct infiniband *ib = (struct infiniband *) ((struct node *) n)->_vd;
|
||||
struct ibv_wc wc[ib->cq_size];
|
||||
int size;
|
||||
|
||||
debug(LOG_IB | 1, "Initialized busy poll thread of node %s", node_name(n));
|
||||
|
||||
while (1) {
|
||||
// Poll as long as WCs are available
|
||||
while ((size = ibv_poll_cq(ib->ctx.send_cq, ib->cq_size, wc)))
|
||||
ib->poll.on_compl(n, wc, &size);
|
||||
|
||||
if (ib->stopThreads)
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
static void ib_init_wc_poll(struct node *n)
|
||||
{
|
||||
int ret;
|
||||
struct infiniband *ib = (struct infiniband *) n->_vd;
|
||||
ib->ctx.comp_channel = NULL;
|
||||
|
||||
debug(LOG_IB | 1, "Starting to initialize completion queues and threads");
|
||||
debug(LOG_IB | 1, "Starting to initialize completion queues");
|
||||
|
||||
if (ib->poll.poll_mode == EVENT) {
|
||||
// Create completion channel
|
||||
|
@ -176,14 +123,6 @@ static void ib_init_wc_poll(struct node *n)
|
|||
|
||||
debug(LOG_IB | 3, "Called ibv_req_notificy_cq on send Completion Queue");
|
||||
}
|
||||
|
||||
// Initialize polling pthread for source
|
||||
if (ib->is_source) {
|
||||
ret = pthread_create(&ib->poll.cq_poller_thread, NULL, ib->poll.poll_func, n);
|
||||
if (ret)
|
||||
error("Failed to create poll thread of node %s: %s",
|
||||
node_name(n), gai_strerror(ret));
|
||||
}
|
||||
}
|
||||
|
||||
static void ib_build_ibv(struct node *n)
|
||||
|
@ -385,14 +324,10 @@ int ib_parse(struct node *n, json_t *cfg)
|
|||
debug(LOG_IB | 4, "Set timeout to %i in node %s", timeout, node_name(n));
|
||||
|
||||
// Translate poll mode
|
||||
if (strcmp(poll_mode, "EVENT") == 0) {
|
||||
if (strcmp(poll_mode, "EVENT") == 0)
|
||||
ib->poll.poll_mode = EVENT;
|
||||
ib->poll.poll_func = ib_event_thread;
|
||||
}
|
||||
else if (strcmp(poll_mode, "BUSY") == 0) {
|
||||
else if (strcmp(poll_mode, "BUSY") == 0)
|
||||
ib->poll.poll_mode = BUSY;
|
||||
ib->poll.poll_func = ib_busy_poll_thread;
|
||||
}
|
||||
else
|
||||
error("Failed to translate poll_mode in node %s. %s is not a valid \
|
||||
poll mode!", node_name(n), poll_mode);
|
||||
|
@ -444,17 +379,11 @@ int ib_parse(struct node *n, json_t *cfg)
|
|||
remote, node_name(n), gai_strerror(ret));
|
||||
|
||||
debug(LOG_IB | 4, "Translated %s:%s to a struct addrinfo", ip_adr, port);
|
||||
|
||||
// Set correct Work Completion function
|
||||
ib->poll.on_compl = ib_completion_source;
|
||||
}
|
||||
else {
|
||||
debug(LOG_IB | 3, "Node %s is set up as target", node_name(n));
|
||||
|
||||
ib->is_source = 0;
|
||||
|
||||
// Set correct Work Completion function
|
||||
ib->poll.on_compl = ib_completion_target;
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -508,7 +437,6 @@ void * ib_rdma_cm_event_thread(void *n)
|
|||
struct rdma_cm_event *event;
|
||||
int ret = 0;
|
||||
|
||||
|
||||
debug(LOG_IB | 1, "Started rdma_cm_event thread of node %s", node_name(node));
|
||||
|
||||
// Wait until node is completely started
|
||||
|
@ -586,7 +514,7 @@ void * ib_rdma_cm_event_thread(void *n)
|
|||
|
||||
rdma_ack_cm_event(event);
|
||||
|
||||
if (ret || ib->stopThreads)
|
||||
if (ret)
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -695,14 +623,6 @@ int ib_stop(struct node *n)
|
|||
|
||||
debug(LOG_IB | 3, "Joined rdma_cm_event_thread");
|
||||
|
||||
// Wait for polling thread to join
|
||||
if (ib->is_source) {
|
||||
ret = pthread_join(ib->poll.cq_poller_thread, NULL);
|
||||
if (ret)
|
||||
error("Error while joining cq_poller_thread in node %s: %i", node_name(n), ret);
|
||||
}
|
||||
|
||||
|
||||
// Destroy RDMA CM ID
|
||||
rdma_destroy_id(ib->ctx.id);
|
||||
debug(LOG_IB | 3, "Destroyed rdma_cm_id");
|
||||
|
@ -768,6 +688,8 @@ int ib_read(struct node *n, struct sample *smps[], int *cnt)
|
|||
debug(LOG_IB | 10, "Prepared %i new receive Work Requests", (i+1));
|
||||
|
||||
wr[i].next = NULL;
|
||||
i++;
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -801,8 +723,8 @@ int ib_read(struct node *n, struct sample *smps[], int *cnt)
|
|||
ret = 0;
|
||||
}
|
||||
else if (wc[j].opcode & IBV_WC_RECV) {
|
||||
smps[j] = (struct sample*)(wc[j].wr_id);
|
||||
smps[j]->length = wc[i].byte_len/sizeof(double);
|
||||
smps[j] = (struct sample *) (wc[j].wr_id);
|
||||
smps[j]->length = wc[i].byte_len / sizeof(double);
|
||||
}
|
||||
else
|
||||
ret = 0;
|
||||
|
@ -835,14 +757,16 @@ int ib_write(struct node *n, struct sample *smps[], int *cnt)
|
|||
struct infiniband *ib = (struct infiniband *) n->_vd;
|
||||
struct ibv_send_wr wr[*cnt], *bad_wr = NULL;
|
||||
struct ibv_sge sge[*cnt];
|
||||
struct ibv_wc wc[ib->cq_size];
|
||||
struct ibv_mr *mr;
|
||||
int ret;
|
||||
int i = 0; //Used for first loop: prepare work requests to post to send queue
|
||||
int j = 0; //Used for second loop: get values from Completion Queue
|
||||
|
||||
debug(LOG_IB | 10, "ib_write is called");
|
||||
|
||||
if (n->state == STATE_CONNECTED) {
|
||||
memset(&wr, 0, sizeof(wr));
|
||||
|
||||
// First, write
|
||||
//ToDo: Place this into configuration and create checks if settings are valid
|
||||
int send_inline = 1;
|
||||
|
||||
|
@ -851,17 +775,14 @@ int ib_write(struct node *n, struct sample *smps[], int *cnt)
|
|||
// Get Memory Region
|
||||
mr = memory_ib_get_mr(smps[0]);
|
||||
|
||||
for (int i = 0; i < *cnt; i++) {
|
||||
// Increase refcnt of sample
|
||||
sample_get(smps[i]);
|
||||
|
||||
for (i = 0; i < *cnt; i++) {
|
||||
//Set Scatter/Gather element to data of sample
|
||||
sge[i].addr = (uint64_t)&smps[i]->data;
|
||||
sge[i].addr = (uint64_t) &smps[i]->data;
|
||||
sge[i].length = smps[i]->length*sizeof(double);
|
||||
sge[i].lkey = mr->lkey;
|
||||
|
||||
// Set Send Work Request
|
||||
wr[i].wr_id = (uintptr_t)smps[i]; //This way the sample can be release in WC
|
||||
wr[i].wr_id = (uintptr_t) smps[i]; //This way the sample can be release in WC
|
||||
wr[i].sg_list = &sge[i];
|
||||
wr[i].num_sge = 1;
|
||||
|
||||
|
@ -887,9 +808,24 @@ int ib_write(struct node *n, struct sample *smps[], int *cnt)
|
|||
}
|
||||
|
||||
debug(LOG_IB | 4, "Succesfully posted receive Work Requests");
|
||||
|
||||
|
||||
// Subsequently, check if something is available in completion queue
|
||||
ret = ibv_poll_cq(ib->ctx.send_cq, ib->cq_size, wc);
|
||||
|
||||
for (j = 0; j < ret; j++) {
|
||||
if (wc[j].status != IBV_WC_SUCCESS && wc[j].status != IBV_WC_WR_FLUSH_ERR)
|
||||
warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i",
|
||||
node_name(n), wc[j].status);
|
||||
|
||||
|
||||
smps[j] = (struct sample *) (wc[j].wr_id);
|
||||
}
|
||||
|
||||
*cnt = j;
|
||||
}
|
||||
|
||||
return *cnt;
|
||||
return i;
|
||||
}
|
||||
|
||||
int ib_fd(struct node *n)
|
||||
|
|
Loading…
Add table
Reference in a new issue