mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
Node is able to send messages. A lot is hardcoded and this was only meant to be a first setup of a working node.
This commit is contained in:
parent
4fef5d67e5
commit
9e5836001d
2 changed files with 112 additions and 46 deletions
|
@ -44,16 +44,13 @@ enum poll_mode_e
|
|||
BUSY
|
||||
};
|
||||
|
||||
struct payload_s {
|
||||
int data;
|
||||
};
|
||||
|
||||
struct r_addr_key_s {
|
||||
uint64_t remote_addr;
|
||||
uint32_t rkey;
|
||||
};
|
||||
|
||||
struct infiniband {
|
||||
struct rdma_cm_id *listen_id;
|
||||
struct rdma_cm_id *id;
|
||||
struct rdma_event_channel *ec;
|
||||
|
||||
|
|
|
@ -31,6 +31,30 @@
|
|||
|
||||
#include <rdma/rdma_cma.h>
|
||||
|
||||
int ib_post_recv_wrs(struct node *n)
|
||||
{
|
||||
struct infiniband *ib = (struct infiniband *) n->_vd;
|
||||
struct ibv_recv_wr wr, *bad_wr = NULL;
|
||||
int ret;
|
||||
struct ibv_sge sge;
|
||||
|
||||
// Prepare receive Scatter/Gather element
|
||||
sge.addr = (uintptr_t)pool_get(&ib->mem.p_recv);
|
||||
sge.length = ib->mem.p_recv.blocksz;
|
||||
sge.lkey = ib->mem.mr_recv->lkey;
|
||||
|
||||
// Prepare a receive Work Request
|
||||
wr.wr_id = (uintptr_t)sge.addr;
|
||||
wr.next = NULL;
|
||||
wr.sg_list = &sge;
|
||||
wr.num_sge = 1;
|
||||
|
||||
// Post Work Request
|
||||
ret = ibv_post_recv(ib->id->qp, &wr, &bad_wr);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static void ib_create_busy_poll(struct node *n, struct rdma_cm_id *id)
|
||||
{
|
||||
struct infiniband *ib = (struct infiniband *) n->_vd;
|
||||
|
@ -103,7 +127,7 @@ static void ib_build_ibv(struct node *n, struct rdma_cm_id *id)
|
|||
if(ret)
|
||||
error("Failed to create Queue Pair in node %s.", node_name(n));
|
||||
|
||||
info("Successfully created Queue Pair.");
|
||||
info("Created Queue Pair.");
|
||||
|
||||
// Allocate memory
|
||||
ib->mem.p_recv.state = STATE_DESTROYED;
|
||||
|
@ -112,7 +136,7 @@ static void ib_build_ibv(struct node *n, struct rdma_cm_id *id)
|
|||
// Set pool size to maximum size of Receive Queue
|
||||
pool_init(&ib->mem.p_recv,
|
||||
ib->qp_init.cap.max_recv_wr,
|
||||
sizeof(struct payload_s),
|
||||
sizeof(double),
|
||||
&memtype_heap);
|
||||
if(ret) {
|
||||
error("Failed to init recv memory pool of node %s: %s",
|
||||
|
@ -142,7 +166,7 @@ static void ib_build_ibv(struct node *n, struct rdma_cm_id *id)
|
|||
// Set pool size to maximum size of Receive Queue
|
||||
pool_init(&ib->mem.p_send,
|
||||
ib->qp_init.cap.max_send_wr,
|
||||
sizeof(struct payload_s),
|
||||
sizeof(double),
|
||||
&memtype_heap);
|
||||
if(ret) {
|
||||
error("Failed to init send memory of node %s: %s",
|
||||
|
@ -163,7 +187,20 @@ static void ib_build_ibv(struct node *n, struct rdma_cm_id *id)
|
|||
node_name(n));
|
||||
}
|
||||
info("Allocated send memory.");
|
||||
|
||||
}
|
||||
|
||||
// Post Receive Work Requests to be able to receive data
|
||||
// Fill complete Receive Queue during initialization
|
||||
for(int i=0; i<ib->qp_init.cap.max_recv_wr; i++)
|
||||
{
|
||||
ret = ib_post_recv_wrs(n);
|
||||
if(ret) {
|
||||
error("Failed to post initial receive Work Requests of node %s.",
|
||||
node_name(n));
|
||||
}
|
||||
}
|
||||
info("Filled the complete Receive Queue.");
|
||||
}
|
||||
|
||||
static int ib_addr_resolved(struct node *n, struct rdma_cm_id *id)
|
||||
|
@ -209,9 +246,11 @@ static int ib_route_resolved(struct node *n, struct rdma_cm_id *id)
|
|||
|
||||
static int ib_connect_request(struct node *n, struct rdma_cm_id *id)
|
||||
{
|
||||
struct infiniband *ib = (struct infiniband *) n->_vd;
|
||||
int ret;
|
||||
info("Received a connection request!");
|
||||
|
||||
ib->id = id;
|
||||
ib_build_ibv(n, id);
|
||||
|
||||
//ToDo: Post receive WRs
|
||||
|
@ -398,6 +437,9 @@ int ib_start(struct node *n)
|
|||
}
|
||||
info("Succesfully created rdma_cm_id.");
|
||||
|
||||
// The ID will be overwritten for the target
|
||||
ib->listen_id = ib->id;
|
||||
|
||||
// Bind rdma_cm_id to the HCA
|
||||
ret = rdma_bind_addr(ib->id, ib->conn.src_addr->ai_addr);
|
||||
if(ret) {
|
||||
|
@ -421,7 +463,7 @@ int ib_start(struct node *n)
|
|||
else
|
||||
{
|
||||
// Listen on rdma_cm_id for events
|
||||
ret = rdma_listen(ib->id, 10);
|
||||
ret = rdma_listen(ib->listen_id, 10);
|
||||
if(ret) {
|
||||
error("Failed to listen to rdma_cm_id on node %s", node_name(n));
|
||||
}
|
||||
|
@ -467,64 +509,80 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt)
|
|||
//for testing purposes
|
||||
struct infiniband *ib = (struct infiniband *) n->_vd;
|
||||
int ret;
|
||||
struct ibv_wc wc[100];
|
||||
struct ibv_wc wc[cnt];
|
||||
union {
|
||||
double f;
|
||||
int64_t i;
|
||||
} *data;
|
||||
|
||||
ret = ibv_poll_cq(ib->ctx.cq, 100, wc);
|
||||
ret = ibv_poll_cq(ib->ctx.cq, cnt, wc);
|
||||
|
||||
if(ret)
|
||||
{
|
||||
data = malloc(ret*sizeof(double));
|
||||
|
||||
|
||||
for(int i=0; i<ret; i++)
|
||||
{
|
||||
if(wc[i].status != IBV_WC_SUCCESS)
|
||||
error("Work Completion status was not IBV_WC_SUCCES in node %s", node_name(n));
|
||||
|
||||
data[i].f = *(double*)(wc[i].wr_id);
|
||||
}
|
||||
smps[0]->length = ret;
|
||||
smps[0]->capacity = cnt;
|
||||
memcpy(smps[0]->data, data, ret*sizeof(double));
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ib_write(struct node *n, struct sample *smps[], unsigned cnt)
|
||||
{
|
||||
/* Send pool is not used at this moment! */
|
||||
struct infiniband *ib = (struct infiniband *) n->_vd;
|
||||
int ret;
|
||||
struct ibv_send_wr wr;
|
||||
struct ibv_send_wr *bad_wr = NULL;
|
||||
struct ibv_sge sg_list;
|
||||
struct ibv_send_wr wr[smps[0]->length], *bad_wr = NULL;
|
||||
struct ibv_sge sge[smps[0]->length];
|
||||
|
||||
memset(&wr, 0, sizeof(wr));
|
||||
|
||||
struct payload_s *payl;
|
||||
payl = pool_get(&ib->mem.p_send);
|
||||
//ToDo: Place this into configuration and create checks if settings are valid
|
||||
int send_inline = 1;
|
||||
|
||||
payl->data = 1337;
|
||||
|
||||
// If data is send inline, it is not necessary to copy data to protected
|
||||
// memory region first.
|
||||
if(1)
|
||||
for(int i=0; i<smps[0]->length; i++)
|
||||
{
|
||||
//sg_list.addr = (uint64_t)smps[0]->data;
|
||||
//sg_list.length = smps[0]->length-1;
|
||||
sg_list.addr = (uintptr_t)payl;
|
||||
sg_list.length = 1;
|
||||
// lkey not necessary
|
||||
}
|
||||
else
|
||||
{
|
||||
//- copy value to send_region
|
||||
//- give pointer to start of array
|
||||
}
|
||||
// If data is send inline, it is not necessary to copy data to protected
|
||||
// memory region first.
|
||||
if(send_inline)
|
||||
{
|
||||
sge[i].addr = (uint64_t)&smps[0]->data[i].f;
|
||||
sge[i].length = sizeof(double);
|
||||
}
|
||||
else
|
||||
{
|
||||
//- copy value to send_region
|
||||
//- give pointer to start of array
|
||||
}
|
||||
|
||||
// Set Send Work Request
|
||||
wr.wr_id = 123; //ToDo: set this to a useful value
|
||||
wr.sg_list = &sg_list;
|
||||
wr.num_sge = 1; //ToDo: Right now only smps[0] is sg_list. This can be extended
|
||||
//furthermore we should break the transaction up if inline mode
|
||||
//is selected
|
||||
wr.next = NULL;
|
||||
wr.send_flags = IBV_SEND_SIGNALED;
|
||||
wr.imm_data = htonl(0); //ToDo: set this to a useful value
|
||||
wr.opcode = IBV_WR_SEND_WITH_IMM;
|
||||
// Set Send Work Request
|
||||
wr[i].wr_id = 0; //ToDo: set this to a useful value
|
||||
wr[i].sg_list = &sge[i];
|
||||
wr[i].num_sge = 1; //ToDo: Right now only smps[0] is sg_list. This can be extended
|
||||
//furthermore we should break the transaction up if inline mode
|
||||
//is selected
|
||||
|
||||
if(i == (smps[0]->length-1))
|
||||
wr[i].next = NULL;
|
||||
else
|
||||
wr[i].next = &wr[i+1];
|
||||
wr[i].send_flags = IBV_SEND_SIGNALED | (send_inline<<3);
|
||||
wr[i].imm_data = htonl(0); //ToDo: set this to a useful value
|
||||
wr[i].opcode = IBV_WR_SEND_WITH_IMM;
|
||||
|
||||
for(int i = 0; i < smps[0]->length; i++)
|
||||
{
|
||||
printf("Sample %i: %f\n", i, smps[0]->data[i].f);
|
||||
}
|
||||
|
||||
ret = ibv_post_send(ib->id->qp, &wr, &bad_wr);
|
||||
//Send linked list of Work Requests
|
||||
ret = ibv_post_send(ib->id->qp, wr, &bad_wr);
|
||||
if(ret)
|
||||
{
|
||||
error("Failed to send message in node %s: %s",
|
||||
|
@ -532,6 +590,17 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt)
|
|||
|
||||
return -ret;
|
||||
}
|
||||
|
||||
/* Debugging */
|
||||
struct ibv_wc wc[5];
|
||||
int size;
|
||||
while(1)
|
||||
{
|
||||
size = ibv_poll_cq(ib->ctx.cq, 5, wc);
|
||||
if(size)
|
||||
for(int j=0; j<size; j++)
|
||||
printf("Error: %i\n", wc[j].status);
|
||||
}
|
||||
|
||||
return cnt;
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue