1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

ib_write() now takes data directly from the super pool instead of copying it. ib_read() still copies data and the code needs cleanup after zero-copy is implemented

This commit is contained in:
Dennis Potter 2018-07-01 12:56:03 +02:00
parent c70dbe2263
commit c055010be1
2 changed files with 105 additions and 116 deletions

View file

@ -82,14 +82,3 @@ struct memtype * ib_memtype(struct node *n, struct memtype *parent)
return mt;
}
/* Ausserhalb von lib/nodes/infiniband.c */
/*
struct pool p = { .state = STATE_DESTROYED };
struct node *n = ..;
pool_init(&p, 100, 32, node_get_memtype(n));
*/

View file

@ -95,23 +95,30 @@ void ib_completion_target(struct node* n, struct ibv_wc* wc, int* size)
void ib_completion_source(struct node* n, struct ibv_wc* wc, int* size)
{
struct infiniband *ib = (struct infiniband *)((struct node *)n)->_vd;
struct infiniband *ib = (struct infiniband *)((struct node *)n)->_vd;
struct sample* smpl;
for(int i=0; i<*size; i++)
{
//On disconnect, the QP set to error state and will be flushed
if(wc[i].status == IBV_WC_WR_FLUSH_ERR)
{
ib->poll.stopThread = 1;
return;
}
if(wc[i].status != IBV_WC_SUCCESS)
warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i",
node_name(n), wc[i].status);
}
for(int i=0; i<*size; i++)
{
//On disconnect, the QP set to error state and will be flushed
if(wc[i].status == IBV_WC_WR_FLUSH_ERR)
{
ib->poll.stopThread = 1;
return;
}
if(wc[i].status != IBV_WC_SUCCESS)
{
warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i",
node_name(n), wc[i].status);
}
else
{
// Release sample
smpl = (struct sample*)wc[i].wr_id;
sample_put(smpl);
}
}
}
void * ib_event_thread(void *n)
@ -232,7 +239,7 @@ static void ib_build_ibv(struct node *n)
// Set pool size to maximum size of Receive Queue
pool_init(&ib->mem.p_recv,
ib->qp_init.cap.max_recv_wr,
sizeof(double),
64*sizeof(double),
&memtype_heap);
if(ret)
{
@ -705,115 +712,108 @@ int ib_deinit()
int ib_read(struct node *n, struct sample *smps[], unsigned cnt)
{
//Create separate thread for polling! This impelemtation is just
//for testing purposes
struct infiniband *ib = (struct infiniband *) n->_vd;
int ret;
struct ibv_wc wc[cnt];
union {
double f;
int64_t i;
} *data;
//Create separate thread for polling! This impelemtation is just
//for testing purposes
struct infiniband *ib = (struct infiniband *) n->_vd;
int ret;
struct ibv_wc wc[cnt];
char *ptr;
ret = ibv_poll_cq(ib->ctx.cq, cnt, wc);
ret = ibv_poll_cq(ib->ctx.cq, cnt, wc);
if(ret)
{
ib->conn.used_recv_wrs += ret;
if(ret)
{
ib->conn.used_recv_wrs += ret;
data = malloc(ret*sizeof(double));
for(int i=0; i<ret; i++)
{
if(wc[i].status == IBV_WC_WR_FLUSH_ERR)
return 0;
for(int i=0; i<ret; i++)
{
if(wc[i].status == IBV_WC_WR_FLUSH_ERR)
return 0;
if(wc[i].status != IBV_WC_SUCCESS)
{
warn("Work Completion status was not IBV_WC_SUCCES in node %s", node_name(n));
ret--;
}
else
{
//Copy Data
ptr = (char*)(wc[i].wr_id);
if(wc[i].status != IBV_WC_SUCCESS)
{
warn("Work Completion status was not IBV_WC_SUCCES in node %s", node_name(n));
ret--;
}
else
{
//Copy Data
data[i].f = *(double*)(wc[i].wr_id);
//Release memory
pool_put(&ib->mem.p_recv, (double*)(wc[i].wr_id));
}
smps[i]->length = wc[i].byte_len/sizeof(double);
smps[i]->capacity = 64;
memcpy(smps[i]->data, ptr, wc[i].byte_len);
}
}
else
{
//No data received? Put new receive Work Requests to Receive Queue
for(int i=0; i<ib->conn.used_recv_wrs; i++)
ib_post_recv_wrs(n);
//Release memory
pool_put(&ib->mem.p_recv, (double*)(wc[i].wr_id));
}
}
smps[0]->length = ret;
smps[0]->capacity = cnt;
memcpy(smps[0]->data, data, ret*sizeof(double));
}
else
{
//No data received? Put new receive Work Requests to Receive Queue
for(int i=0; i<ib->conn.used_recv_wrs; i++)
ib_post_recv_wrs(n);
ib->conn.used_recv_wrs = 0;
}
ib->conn.used_recv_wrs = 0;
}
return ret;
return ret;
}
int ib_write(struct node *n, struct sample *smps[], unsigned cnt)
{
/* Send pool is not used at this moment! */
struct infiniband *ib = (struct infiniband *) n->_vd;
int ret;
struct ibv_send_wr wr[smps[0]->length], *bad_wr = NULL;
struct ibv_sge sge[smps[0]->length];
struct infiniband *ib = (struct infiniband *) n->_vd;
struct ibv_send_wr wr[cnt], *bad_wr = NULL;
struct ibv_sge sge[cnt];
struct pool *p;
struct ibv_mr ** mr;
int ret;
memset(&wr, 0, sizeof(wr));
memset(&wr, 0, sizeof(wr));
//ToDo: Place this into configuration and create checks if settings are valid
int send_inline = 1;
//ToDo: Place this into configuration and create checks if settings are valid
int send_inline = 0;
for(int i=0; i<smps[0]->length; i++)
{
// If data is send inline, it is not necessary to copy data to protected
// memory region first.
if(send_inline)
{
sge[i].addr = (uint64_t)&smps[0]->data[i].f;
sge[i].length = sizeof(double);
}
else
{
//- copy value to send_region
//- give pointer to start of array
}
// Get Memory Region
p = sample_pool(smps[0]);
mr = (struct ibv_mr **)((char *)(p)+p->buffer_off-8);
// Set Send Work Request
wr[i].wr_id = 0; //ToDo: set this to a useful value
wr[i].sg_list = &sge[i];
wr[i].num_sge = 1; //ToDo: Right now only smps[0] is sg_list. This can be extended
//furthermore we should break the transaction up if inline mode
//is selected
for(int i=0; i<cnt; i++)
{
// Increase refcnt of sample
sample_get(smps[i]);
if(i == (smps[0]->length-1))
wr[i].next = NULL;
else
wr[i].next = &wr[i+1];
wr[i].send_flags = IBV_SEND_SIGNALED | (send_inline<<3);
wr[i].imm_data = htonl(0); //ToDo: set this to a useful value
wr[i].opcode = IBV_WR_SEND_WITH_IMM;
//Set Scatter/Gather element to data of sample
sge[i].addr = (uint64_t)&smps[i]->data->f;
sge[i].length = smps[i]->length*sizeof(double);
sge[i].lkey = (*mr)->lkey;
}
// Set Send Work Request
wr[i].wr_id = (uint64_t)&smps[i]; //This way the sample can be release in WC
wr[i].sg_list = &sge[i];
wr[i].num_sge = 1;
//Send linked list of Work Requests
ret = ibv_post_send(ib->ctx.id->qp, wr, &bad_wr);
if(ret)
{
error("Failed to send message in node %s: %s",
node_name(n), gai_strerror(ret));
if(i == (cnt-1))
wr[i].next = NULL;
else
wr[i].next = &wr[i+1];
return -ret;
}
wr[i].send_flags = IBV_SEND_SIGNALED | (send_inline<<3);
wr[i].imm_data = htonl(0); //ToDo: set this to a useful value
wr[i].opcode = IBV_WR_SEND_WITH_IMM;
}
return cnt;
//Send linked list of Work Requests
ret = ibv_post_send(ib->ctx.id->qp, wr, &bad_wr);
if(ret)
{
error("Failed to send message in node %s: %s",
node_name(n), gai_strerror(ret));
return -ret;
}
return cnt;
}
int ib_fd(struct node *n)