1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

Added meta data in transfer

ib_write() and ib_read() now point to the sequence, ts_origin, and format
members of struct sample in a separate scatter/gather element each.

ib_read() measures the time with time_now() (from villas/timing.h) and
sets all flags at receive side.
This commit is contained in:
Dennis Potter 2018-07-21 12:52:25 +02:00
parent 2c3ddfd0c2
commit 591f9f73bd

View file

@ -29,6 +29,7 @@
#include <villas/format_type.h>
#include <villas/memory.h>
#include <villas/memory/ib.h>
#include <villas/timing.h>
static int ib_disconnect(struct node *n)
{
@ -324,8 +325,8 @@ int ib_parse(struct node *n, json_t *cfg)
ib->conn.available_recv_wrs = 0;
// Set remaining QP attributes
ib->qp_init.cap.max_send_sge = 1;
ib->qp_init.cap.max_recv_sge = (ib->conn.port_space == RDMA_PS_UDP) ? 2 : 1;
ib->qp_init.cap.max_send_sge = 4;
ib->qp_init.cap.max_recv_sge = (ib->conn.port_space == RDMA_PS_UDP) ? 4 : 5;
// Set number of bytes to be send inline
ib->qp_init.cap.max_inline_data = max_inline_data;
@ -704,8 +705,9 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea
struct infiniband *ib = (struct infiniband *) n->_vd;
struct ibv_wc wc[cnt];
struct ibv_recv_wr wr[cnt], *bad_wr = NULL;
struct ibv_sge sge[cnt][(ib->conn.port_space == RDMA_PS_UDP) ? 2 : 1];
struct ibv_sge sge[cnt][ib->qp_init.cap.max_recv_sge];
struct ibv_mr *mr;
struct timespec ts_receive;
int ret = 0, wcs = 0, read_values = 0, max_wr_post;
debug(LOG_IB | 15, "ib_read is called");
@ -722,6 +724,9 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea
wcs = ibv_poll_cq(ib->ctx.recv_cq, cnt, wc);
if (wcs) {
// Get time directly after something arrived in Completion Queue
ts_receive = time_now();
debug(LOG_IB | 10, "Received %i Work Completions", wcs);
read_values = wcs; // Value to return
@ -745,9 +750,11 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea
mr = memory_ib_get_mr(smps[0]);
for (int i = 0; i < max_wr_post; i++) {
// Prepare receive Scatter/Gather element
int j = 0;
// Prepare receive Scatter/Gather element
// First 40 byte of UD data are GRH and unused in our case
if (ib->conn.port_space == RDMA_PS_UDP) {
sge[i][j].addr = (uint64_t) ib->conn.grh_ptr;
sge[i][j].length = 40;
@ -756,6 +763,27 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea
j++;
}
// Sequence
sge[i][j].addr = (uint64_t) &smps[i]->sequence;
sge[i][j].length = sizeof(smps[i]->sequence);
sge[i][j].lkey = mr->lkey;
j++;
// Format
sge[i][j].addr = (uint64_t) &smps[i]->format;
sge[i][j].length = sizeof(smps[i]->format);
sge[i][j].lkey = mr->lkey;
j++;
// Timespec origin
sge[i][j].addr = (uint64_t) &smps[i]->ts.origin;
sge[i][j].length = sizeof(smps[i]->ts.origin);
sge[i][j].lkey = mr->lkey;
j++;
sge[i][j].addr = (uint64_t) &smps[i]->data;
sge[i][j].length = SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN);
sge[i][j].lkey = mr->lkey;
@ -799,6 +827,9 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea
smps[j] = (struct sample *) (wc[j].wr_id);
smps[j]->length = (wc[j].byte_len - correction) / sizeof(double);
smps[j]->ts.received = ts_receive;
smps[j]->flags = (SAMPLE_HAS_ORIGIN | SAMPLE_HAS_RECEIVED | SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_FORMAT);
}
}
return read_values;
@ -808,7 +839,7 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele
{
struct infiniband *ib = (struct infiniband *) n->_vd;
struct ibv_send_wr wr[cnt], *bad_wr = NULL;
struct ibv_sge sge[cnt];
struct ibv_sge sge[cnt][ib->qp_init.cap.max_recv_sge];
struct ibv_wc wc[cnt];
struct ibv_mr *mr;
@ -826,10 +857,36 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele
mr = memory_ib_get_mr(smps[0]);
for (sent = 0; sent < cnt; sent++) {
int j = 0;
// Set Scatter/Gather element to data of sample
sge[sent].addr = (uint64_t) &smps[sent]->data;
sge[sent].length = smps[sent]->length*sizeof(double);
sge[sent].lkey = mr->lkey;
// Sequence
sge[sent][j].addr = (uint64_t) &smps[sent]->sequence;
sge[sent][j].length = sizeof(smps[sent]->sequence);
sge[sent][j].lkey = mr->lkey;
j++;
// Format
sge[sent][j].addr = (uint64_t) &smps[sent]->format;
sge[sent][j].length = sizeof(smps[sent]->format);
sge[sent][j].lkey = mr->lkey;
j++;
// Timespec origin
sge[sent][j].addr = (uint64_t) &smps[sent]->ts.origin;
sge[sent][j].length = sizeof(smps[sent]->ts.origin);
sge[sent][j].lkey = mr->lkey;
j++;
// Actual Payload
sge[sent][j].addr = (uint64_t) &smps[sent]->data;
sge[sent][j].length = smps[sent]->length*sizeof(double);
sge[sent][j].lkey = mr->lkey;
j++;
// Check if connection is connected or unconnected and set appropriate values
if (ib->conn.port_space == RDMA_PS_UDP) {
@ -839,7 +896,7 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele
}
// Check if data can be send inline
int send_inline = (sge[sent].length < ib->qp_init.cap.max_inline_data) ?
int send_inline = (sge[sent][j-1].length < ib->qp_init.cap.max_inline_data) ?
ib->conn.send_inline : 0;
@ -847,8 +904,8 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele
// Set Send Work Request
wr[sent].wr_id = send_inline ? 0 : (uintptr_t) smps[sent]; // This way the sample can be release in WC
wr[sent].sg_list = &sge[sent];
wr[sent].num_sge = 1;
wr[sent].sg_list = sge[sent];
wr[sent].num_sge = j;
wr[sent].next = &wr[sent+1];
wr[sent].send_flags = IBV_SEND_SIGNALED | (send_inline << 3);