mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
Split configuration in an in and out part
This commit is contained in:
parent
6975854376
commit
6444a9e337
2 changed files with 88 additions and 44 deletions
|
@ -76,7 +76,7 @@ struct infiniband {
|
|||
|
||||
pthread_t rdma_cm_event_thread;
|
||||
|
||||
int inline_mode;
|
||||
int send_inline;
|
||||
|
||||
int available_recv_wrs;
|
||||
struct send_wc_stack_s {
|
||||
|
@ -84,6 +84,8 @@ struct infiniband {
|
|||
unsigned top;
|
||||
} send_wc_stack;
|
||||
|
||||
int buffer_subtraction;
|
||||
|
||||
} conn;
|
||||
|
||||
/* Memory related variables */
|
||||
|
@ -100,7 +102,8 @@ struct infiniband {
|
|||
|
||||
/* Misc settings */
|
||||
int is_source;
|
||||
int cq_size;
|
||||
int recv_cq_size;
|
||||
int send_cq_size;
|
||||
};
|
||||
|
||||
/** @see node_type::reverse */
|
||||
|
|
|
@ -96,13 +96,13 @@ static void ib_build_ibv(struct node *n)
|
|||
debug(LOG_IB | 1, "Starting to build IBV components");
|
||||
|
||||
// Create completion queues. No completion channel!)
|
||||
ib->ctx.recv_cq = ibv_create_cq(ib->ctx.id->verbs, ib->cq_size, NULL, NULL, 0);
|
||||
ib->ctx.recv_cq = ibv_create_cq(ib->ctx.id->verbs, ib->recv_cq_size, NULL, NULL, 0);
|
||||
if (!ib->ctx.recv_cq)
|
||||
error("Could not create receive completion queue in node %s", node_name(n));
|
||||
|
||||
debug(LOG_IB | 3, "Created receive Completion Queue");
|
||||
|
||||
ib->ctx.send_cq = ibv_create_cq(ib->ctx.id->verbs, ib->cq_size, NULL, NULL, 0);
|
||||
ib->ctx.send_cq = ibv_create_cq(ib->ctx.id->verbs, ib->send_cq_size, NULL, NULL, 0);
|
||||
if (!ib->ctx.send_cq)
|
||||
error("Could not create send completion queue in node %s", node_name(n));
|
||||
|
||||
|
@ -120,7 +120,7 @@ static void ib_build_ibv(struct node *n)
|
|||
debug(LOG_IB | 3, "Created Queue Pair with %i receive and %i send elements",
|
||||
ib->qp_init.cap.max_recv_wr, ib->qp_init.cap.max_send_wr);
|
||||
|
||||
if (ib->conn.inline_mode)
|
||||
if (ib->conn.send_inline)
|
||||
info("Maximum inline size is set to %i byte", ib->qp_init.cap.max_inline_data);
|
||||
|
||||
// Allocate memory
|
||||
|
@ -246,42 +246,84 @@ int ib_parse(struct node *n, json_t *cfg)
|
|||
int ret;
|
||||
char *local = NULL;
|
||||
char *remote = NULL;
|
||||
const char *port_space = "RDMA_PC_TCP";
|
||||
const char *port_space = "RDMA_PS_TCP";
|
||||
const char *poll_mode = "BUSY";
|
||||
const char *qp_type = "IBV_QPT_RC";
|
||||
int timeout = 1000;
|
||||
int cq_size = 128;
|
||||
int recv_cq_size = 128;
|
||||
int send_cq_size = 128;
|
||||
int max_send_wr = 128;
|
||||
int max_recv_wr = 128;
|
||||
int max_inline_data = 0;
|
||||
int inline_mode = 1;
|
||||
int send_inline = 1;
|
||||
int vectorize_in = 1;
|
||||
int vectorize_out = 1;
|
||||
int buffer_subtraction = 2;
|
||||
|
||||
|
||||
json_t *json_in = NULL;
|
||||
json_t *json_out = NULL;
|
||||
json_error_t err;
|
||||
ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: s, s?: s, s?: i, \
|
||||
s?: s, s?: i, s?: s, s?: i, s?: i, \
|
||||
s?: i, s?: i}",
|
||||
"remote", &remote,
|
||||
"local", &local,
|
||||
"rdma_port_space", &port_space,
|
||||
"resolution_timeout", &timeout,
|
||||
"poll_mode", &poll_mode,
|
||||
"cq_size", &cq_size,
|
||||
|
||||
ret = json_unpack_ex(cfg, &err, 0, "{s?: o, s?: o, s?: s, s?: s}",
|
||||
"in", &json_in,
|
||||
"out", &json_out,
|
||||
"qp_type", &qp_type,
|
||||
"max_send_wr", &max_send_wr,
|
||||
"max_recv_wr", &max_recv_wr,
|
||||
"max_inline_data", &max_inline_data,
|
||||
"inline_mode", &inline_mode
|
||||
"rdma_port_space", &port_space
|
||||
);
|
||||
if (ret)
|
||||
jerror(&err, "Failed to parse configuration of node %s", node_name(n));
|
||||
jerror(&err, "Failed to parse in/out json blocks");
|
||||
|
||||
|
||||
if (json_in) {
|
||||
ret = json_unpack_ex(json_in, &err, 0, "{ s?: s, s?: s, s?: i, s?: i, s?: i, s?: i}",
|
||||
"address", &local,
|
||||
"poll_mode", &poll_mode,
|
||||
"cq_size", &recv_cq_size,
|
||||
"max_wrs", &max_recv_wr,
|
||||
"vectorize", &vectorize_in,
|
||||
"buffer_subtraction", &buffer_subtraction
|
||||
);
|
||||
if (ret)
|
||||
jerror(&err, "Failed to parse input configuration of node %s", node_name(n));
|
||||
}
|
||||
|
||||
if (json_out) {
|
||||
ret = json_unpack_ex(json_out, &err, 0, "{ s?: s, s?: i, s?: i, s?: i, s?: i, s?: i, s?: i,}",
|
||||
"address", &remote,
|
||||
"resolution_timeout", &timeout,
|
||||
"cq_size", &send_cq_size,
|
||||
"max_wrs", &max_send_wr,
|
||||
"max_inline_data", &max_inline_data,
|
||||
"send_inline", &send_inline,
|
||||
"vectorize", &vectorize_out
|
||||
);
|
||||
if (ret)
|
||||
jerror(&err, "Failed to parse output configuration of node %s", node_name(n));
|
||||
|
||||
ib->is_source = 1;
|
||||
|
||||
debug(LOG_IB | 3, "Node %s is up as source and target", node_name(n));
|
||||
}
|
||||
else {
|
||||
ib->is_source = 0;
|
||||
|
||||
debug(LOG_IB | 3, "Node %s is up as target", node_name(n));
|
||||
}
|
||||
|
||||
// Set vectorize mode. Do not print, since framework will print this information
|
||||
n->in.vectorize = vectorize_in;
|
||||
n->out.vectorize = vectorize_out;
|
||||
|
||||
// Set buffer subtraction
|
||||
ib->conn.buffer_subtraction = buffer_subtraction;
|
||||
|
||||
debug(LOG_IB | 4, "Set buffer subtraction to %i in node %s", buffer_subtraction, node_name(n));
|
||||
|
||||
// Translate IP:PORT to a struct addrinfo
|
||||
char* ip_adr = strtok(local, ":");
|
||||
char* port = strtok(NULL, ":");
|
||||
|
||||
//n->in.vectorize = 1024;
|
||||
//n->out.vectorize = 1024; //ToDo: make configurable
|
||||
|
||||
ret = getaddrinfo(ip_adr, port, NULL, &ib->conn.src_addr);
|
||||
if (ret)
|
||||
error("Failed to resolve local address '%s' of node %s: %s",
|
||||
|
@ -317,9 +359,11 @@ int ib_parse(struct node *n, json_t *cfg)
|
|||
debug(LOG_IB | 4, "Set poll mode to %s in node %s", poll_mode, node_name(n));
|
||||
|
||||
// Set completion queue size
|
||||
ib->cq_size = cq_size;
|
||||
ib->recv_cq_size = recv_cq_size;
|
||||
ib->send_cq_size = send_cq_size;
|
||||
|
||||
debug(LOG_IB | 4, "Set Completion Queue size to %i in node %s", cq_size, node_name(n));
|
||||
debug(LOG_IB | 4, "Set Completion Queue size to %i & %i (in & out) in node %s",
|
||||
recv_cq_size, send_cq_size, node_name(n));
|
||||
|
||||
// Translate QP type
|
||||
if (strcmp(qp_type, "IBV_QPT_RC") == 0) ib->qp_init.qp_type = IBV_QPT_RC;
|
||||
|
@ -332,9 +376,9 @@ int ib_parse(struct node *n, json_t *cfg)
|
|||
debug(LOG_IB | 4, "Set Queue Pair type to %s in node %s", qp_type, node_name(n));
|
||||
|
||||
// Translate inline mode
|
||||
ib->conn.inline_mode = inline_mode;
|
||||
ib->conn.send_inline = send_inline;
|
||||
|
||||
debug(LOG_IB | 4, "Set inline_modee to %i in node %s", inline_mode, node_name(n));
|
||||
debug(LOG_IB | 4, "Set send_inline to %i in node %s", send_inline, node_name(n));
|
||||
|
||||
// Set max. send and receive Work Requests
|
||||
ib->qp_init.cap.max_send_wr = max_send_wr;
|
||||
|
@ -353,12 +397,7 @@ int ib_parse(struct node *n, json_t *cfg)
|
|||
// Set number of bytes to be send inline
|
||||
ib->qp_init.cap.max_inline_data = max_inline_data;
|
||||
|
||||
//Check if node is a source and connect to target
|
||||
if (remote) {
|
||||
debug(LOG_IB | 3, "Node %s is up as source and target", node_name(n));
|
||||
|
||||
ib->is_source = 1;
|
||||
|
||||
if (ib->is_source) {
|
||||
// Translate address info
|
||||
char* ip_adr = strtok(remote, ":");
|
||||
char* port = strtok(NULL, ":");
|
||||
|
@ -370,11 +409,6 @@ int ib_parse(struct node *n, json_t *cfg)
|
|||
|
||||
debug(LOG_IB | 4, "Translated %s:%s to a struct addrinfo", ip_adr, port);
|
||||
}
|
||||
else {
|
||||
debug(LOG_IB | 3, "Node %s is set up as target", node_name(n));
|
||||
|
||||
ib->is_source = 0;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -385,6 +419,13 @@ int ib_check(struct node *n)
|
|||
|
||||
info("Starting check of node %s", node_name(n));
|
||||
|
||||
// Check if read substraction makes sense
|
||||
if (ib->conn.buffer_subtraction < 2 * n->in.vectorize)
|
||||
error("The buffer substraction value must be bigger than 2 * in.vectorize");
|
||||
|
||||
if (ib->conn.buffer_subtraction >= ib->qp_init.cap.max_recv_wr - n->in.vectorize)
|
||||
error("The buffer substraction value cannot be smaller than in.max_wrs - in.vectorize");
|
||||
|
||||
// Check if the set value is a power of 2, and warn the user if this is not the case
|
||||
int max_send_pow = (int) pow(2, ceil(log2(ib->qp_init.cap.max_send_wr)));
|
||||
int max_recv_pow = (int) pow(2, ceil(log2(ib->qp_init.cap.max_recv_wr)));
|
||||
|
@ -417,8 +458,8 @@ int ib_check(struct node *n)
|
|||
warn("You changed the default value of max_inline_data. This might influence the maximum number of outstanding Work Requests in the Queue Pair and can be a reason for the Queue Pair creation to fail");
|
||||
|
||||
// Check if inline mode is set to a valid value
|
||||
if (ib->conn.inline_mode != 0 && ib->conn.inline_mode != 1)
|
||||
error("inline_mode has to be set to either 0 or 1! %i is not a valid value", ib->conn.inline_mode);
|
||||
if (ib->conn.send_inline != 0 && ib->conn.send_inline != 1)
|
||||
error("send_inline has to be set to either 0 or 1! %i is not a valid value", ib->conn.send_inline);
|
||||
|
||||
info("Finished check of node %s", node_name(n));
|
||||
|
||||
|
@ -693,7 +734,7 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea
|
|||
|
||||
// Poll Completion Queue
|
||||
// If we've already posted enough receive WRs, try to pull cnt
|
||||
if (ib->conn.available_recv_wrs > ( ib->qp_init.cap.max_recv_wr - (1024 * n->in.vectorize) ) ) { //ToDo: Make configurable
|
||||
if (ib->conn.available_recv_wrs >= (ib->qp_init.cap.max_recv_wr - ib->conn.buffer_subtraction) ) {
|
||||
while(1) {
|
||||
wcs = ibv_poll_cq(ib->ctx.recv_cq, cnt, wc);
|
||||
if (wcs) {
|
||||
|
@ -793,7 +834,7 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele
|
|||
|
||||
// Check if data can be send inline
|
||||
int send_inline = (sge[sent].length < ib->qp_init.cap.max_inline_data) ?
|
||||
ib->conn.inline_mode : 0;
|
||||
ib->conn.send_inline : 0;
|
||||
|
||||
debug(LOG_IB | 10, "Sample will be send inline [0/1]: %i", send_inline);
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue