1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

Basic implementation of zero-copying is done. The is still a problem with rread = hook_read_list. It doesn't return anything after the fourth read

This commit is contained in:
Dennis Potter 2018-07-02 16:03:16 +02:00
parent 97e25fb2b2
commit 669d75a666
3 changed files with 80 additions and 69 deletions

View file

@ -40,7 +40,7 @@ typedef void (*ib_on_completion)(struct node*, struct ibv_wc*, int*);
typedef void* (*ib_poll_function)(void*);
/* Enums */
enum poll_mode_e
enum poll_mode_e
{
EVENT,
BUSY
@ -91,7 +91,7 @@ struct infiniband {
pthread_t stop_thread;
int rdma_disconnect_called;
int used_recv_wrs;
int available_recv_wrs;
} conn;
/* Memory related variables */

View file

@ -88,15 +88,11 @@ int ib_post_recv_wrs(struct node *n)
return ret;
}
void ib_completion_target(struct node* n, struct ibv_wc* wc, int* size)
{
//ToDo: No implementation yet. This is still handled in ib_read
}
void ib_completion_target(struct node* n, struct ibv_wc* wc, int* size){}
void ib_completion_source(struct node* n, struct ibv_wc* wc, int* size)
{
struct infiniband *ib = (struct infiniband *)((struct node *)n)->_vd;
struct sample* smpl;
for(int i=0; i<*size; i++)
{
@ -115,8 +111,7 @@ void ib_completion_source(struct node* n, struct ibv_wc* wc, int* size)
else
{
// Release sample
smpl = (struct sample*)wc[i].wr_id;
sample_put(smpl);
sample_put((struct sample*)(wc[i].wr_id));
}
}
}
@ -144,19 +139,19 @@ void * ib_event_thread(void *n)
void * ib_busy_poll_thread(void *n)
{
struct infiniband *ib = (struct infiniband *)((struct node *)n)->_vd;
struct ibv_wc wc[ib->cq_size];
int size;
struct infiniband *ib = (struct infiniband *)((struct node *)n)->_vd;
struct ibv_wc wc[ib->cq_size];
int size;
while(1)
{
// Poll as long as WCs are available
while((size = ibv_poll_cq(ib->ctx.cq, ib->cq_size, wc)))
ib->poll.on_compl(n, wc, &size);
while(1)
{
// Poll as long as WCs are available
while((size = ibv_poll_cq(ib->ctx.cq, ib->cq_size, wc)))
ib->poll.on_compl(n, wc, &size);
if(ib->poll.stopThread)
return NULL;
}
if(ib->poll.stopThread)
return NULL;
}
}
static void ib_init_wc_poll(struct node *n)
@ -239,7 +234,7 @@ static void ib_build_ibv(struct node *n)
// Set pool size to maximum size of Receive Queue
pool_init(&ib->mem.p_recv,
ib->qp_init.cap.max_recv_wr,
64*sizeof(double),
SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN),
&memtype_heap);
if(ret)
{
@ -297,16 +292,16 @@ static void ib_build_ibv(struct node *n)
// Post Receive Work Requests to be able to receive data
// Fill complete Receive Queue during initialization
for(int i=0; i<ib->qp_init.cap.max_recv_wr; i++)
{
ret = ib_post_recv_wrs(n);
if(ret)
{
error("Failed to post initial receive Work Requests of node %s.",
node_name(n));
}
}
info("Filled the complete Receive Queue.");
//for(int i=0; i<ib->qp_init.cap.max_recv_wr; i++)
//{
// ret = ib_post_recv_wrs(n);
// if(ret)
// {
// error("Failed to post initial receive Work Requests of node %s.",
// node_name(n));
// }
//}
//info("Filled the complete Receive Queue.");
}
static int ib_addr_resolved(struct node *n)
@ -514,8 +509,8 @@ int ib_parse(struct node *n, json_t *cfg)
ib->qp_init.cap.max_send_wr = max_send_wr;
ib->qp_init.cap.max_recv_wr = max_recv_wr;
// Set used receive Work Requests to 0
ib->conn.used_recv_wrs = 0;
// Set available receive Work Requests to 0
ib->conn.available_recv_wrs = 0;
// Set remaining QP attributes
ib->qp_init.cap.max_send_sge = 1;
@ -712,49 +707,65 @@ int ib_deinit()
int ib_read(struct node *n, struct sample *smps[], unsigned cnt)
{
//Create separate thread for polling! This impelemtation is just
//for testing purposes
struct infiniband *ib = (struct infiniband *) n->_vd;
struct ibv_wc wc[n->in.vectorize];
struct ibv_recv_wr wr, *bad_wr = NULL;
struct ibv_sge sge;
struct ibv_mr ** mr;
struct pool *p;
int ret;
struct ibv_wc wc[cnt];
char *ptr;
ret = ibv_poll_cq(ib->ctx.cq, cnt, wc);
ret = ibv_poll_cq(ib->ctx.cq, n->in.vectorize, wc);
if(ret)
{
ib->conn.used_recv_wrs += ret;
ib->conn.available_recv_wrs -= ret;
for(int i=0; i<ret; i++)
{
if(wc[i].status == IBV_WC_WR_FLUSH_ERR)
return 0;
for(int i=0; i<ret; i++)
{
if(wc[i].status == IBV_WC_WR_FLUSH_ERR)
return 0;
if(wc[i].status != IBV_WC_SUCCESS)
{
warn("Work Completion status was not IBV_WC_SUCCES in node %s", node_name(n));
ret--;
}
else
{
//Copy Data
ptr = (char*)(wc[i].wr_id);
if(wc[i].status != IBV_WC_SUCCESS)
error("Work Completion status was not IBV_WC_SUCCES in node %s", node_name(n));
else if(wc[i].opcode & IBV_WC_RECV)
{
//Update address of smps argument
smps[i] = (struct sample*)(wc[i].wr_id);
smps[i]->length = wc[i].byte_len/sizeof(double);
smps[i]->capacity = DEFAULT_SAMPLELEN;
//Release memory
pool_put(&ib->mem.p_recv, (double*)(wc[i].wr_id));
}
smps[i]->length = wc[i].byte_len/sizeof(double);
smps[i]->capacity = 64;
memcpy(smps[i]->data, ptr, wc[i].byte_len);
}
//Release sample
sample_put(smps[i]);
}
}
}
else
else if(ib->conn.available_recv_wrs < ib->qp_init.cap.max_recv_wr)
{
//No data received? Put new receive Work Requests to Receive Queue
for(int i=0; i<ib->conn.used_recv_wrs; i++)
ib_post_recv_wrs(n);
// No data received? Put new receive Work Requests to Receive Queue
// Get Memory Region
p = sample_pool(smps[0]);
mr = (struct ibv_mr **)((char *)(p)+p->buffer_off-8);
ib->conn.used_recv_wrs = 0;
// Increase refcnt of sample
sample_get(smps[0]);
// Prepare receive Scatter/Gather element
sge.addr = (uint64_t)&smps[0]->data;
sge.length = SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN);
sge.lkey = (*mr)->lkey;
// Prepare a receive Work Request
wr.wr_id = (uintptr_t)smps[0];
wr.next = NULL;
wr.sg_list = &sge;
wr.num_sge = 1;
// Post Work Request
ret = ibv_post_recv(ib->ctx.id->qp, &wr, &bad_wr);
ib->conn.available_recv_wrs++;
}
return ret;
@ -765,8 +776,8 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt)
struct infiniband *ib = (struct infiniband *) n->_vd;
struct ibv_send_wr wr[cnt], *bad_wr = NULL;
struct ibv_sge sge[cnt];
struct pool *p;
struct ibv_mr ** mr;
struct pool *p;
int ret;
memset(&wr, 0, sizeof(wr));
@ -784,12 +795,12 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt)
sample_get(smps[i]);
//Set Scatter/Gather element to data of sample
sge[i].addr = (uint64_t)&smps[i]->data->f;
sge[i].addr = (uint64_t)&smps[i]->data;
sge[i].length = smps[i]->length*sizeof(double);
sge[i].lkey = (*mr)->lkey;
// Set Send Work Request
wr[i].wr_id = (uint64_t)&smps[i]; //This way the sample can be release in WC
wr[i].wr_id = (uintptr_t)smps[i]; //This way the sample can be release in WC
wr[i].sg_list = &sge[i];
wr[i].num_sge = 1;

View file

@ -204,8 +204,8 @@ static void * recv_loop(void *ctx)
ready = sample_alloc_many(&recvv.pool, smps, node->in.vectorize);
if (ready < 0)
error("Failed to allocate %u samples from receive pool.", node->in.vectorize);
else if (ready < node->in.vectorize)
warn("Receive pool underrun");
// else if (ready < node->in.vectorize)
// warn("Receive pool underrun");
recv = node_read(node, smps, ready);
if (recv < 0)