diff --git a/include/villas/nodes/infiniband.h b/include/villas/nodes/infiniband.h index b2a30f539..cafff88ff 100644 --- a/include/villas/nodes/infiniband.h +++ b/include/villas/nodes/infiniband.h @@ -76,7 +76,7 @@ struct infiniband { pthread_t rdma_cm_event_thread; - int inline_mode; + int send_inline; int available_recv_wrs; struct send_wc_stack_s { @@ -84,6 +84,8 @@ struct infiniband { unsigned top; } send_wc_stack; + int buffer_subtraction; + } conn; /* Memory related variables */ @@ -100,7 +102,8 @@ struct infiniband { /* Misc settings */ int is_source; - int cq_size; + int recv_cq_size; + int send_cq_size; }; /** @see node_type::reverse */ diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index 113771f78..f674e4cea 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -96,13 +96,13 @@ static void ib_build_ibv(struct node *n) debug(LOG_IB | 1, "Starting to build IBV components"); // Create completion queues. No completion channel!) - ib->ctx.recv_cq = ibv_create_cq(ib->ctx.id->verbs, ib->cq_size, NULL, NULL, 0); + ib->ctx.recv_cq = ibv_create_cq(ib->ctx.id->verbs, ib->recv_cq_size, NULL, NULL, 0); if (!ib->ctx.recv_cq) error("Could not create receive completion queue in node %s", node_name(n)); debug(LOG_IB | 3, "Created receive Completion Queue"); - ib->ctx.send_cq = ibv_create_cq(ib->ctx.id->verbs, ib->cq_size, NULL, NULL, 0); + ib->ctx.send_cq = ibv_create_cq(ib->ctx.id->verbs, ib->send_cq_size, NULL, NULL, 0); if (!ib->ctx.send_cq) error("Could not create send completion queue in node %s", node_name(n)); @@ -120,7 +120,7 @@ static void ib_build_ibv(struct node *n) debug(LOG_IB | 3, "Created Queue Pair with %i receive and %i send elements", ib->qp_init.cap.max_recv_wr, ib->qp_init.cap.max_send_wr); - if (ib->conn.inline_mode) + if (ib->conn.send_inline) info("Maximum inline size is set to %i byte", ib->qp_init.cap.max_inline_data); // Allocate memory @@ -246,42 +246,84 @@ int ib_parse(struct node *n, json_t *cfg) int ret; char *local = NULL; char *remote = NULL; - const char *port_space = "RDMA_PC_TCP"; + const char *port_space = "RDMA_PS_TCP"; const char *poll_mode = "BUSY"; const char *qp_type = "IBV_QPT_RC"; int timeout = 1000; - int cq_size = 128; + int recv_cq_size = 128; + int send_cq_size = 128; int max_send_wr = 128; int max_recv_wr = 128; int max_inline_data = 0; - int inline_mode = 1; + int send_inline = 1; + int vectorize_in = 1; + int vectorize_out = 1; + int buffer_subtraction = 2; + + json_t *json_in = NULL; + json_t *json_out = NULL; json_error_t err; - ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: s, s?: s, s?: i, \ - s?: s, s?: i, s?: s, s?: i, s?: i, \ - s?: i, s?: i}", - "remote", &remote, - "local", &local, - "rdma_port_space", &port_space, - "resolution_timeout", &timeout, - "poll_mode", &poll_mode, - "cq_size", &cq_size, + + ret = json_unpack_ex(cfg, &err, 0, "{s?: o, s?: o, s?: s, s?: s}", + "in", &json_in, + "out", &json_out, "qp_type", &qp_type, - "max_send_wr", &max_send_wr, - "max_recv_wr", &max_recv_wr, - "max_inline_data", &max_inline_data, - "inline_mode", &inline_mode + "rdma_port_space", &port_space ); if (ret) - jerror(&err, "Failed to parse configuration of node %s", node_name(n)); + jerror(&err, "Failed to parse in/out json blocks"); + + + if (json_in) { + ret = json_unpack_ex(json_in, &err, 0, "{ s?: s, s?: s, s?: i, s?: i, s?: i, s?: i}", + "address", &local, + "poll_mode", &poll_mode, + "cq_size", &recv_cq_size, + "max_wrs", &max_recv_wr, + "vectorize", &vectorize_in, + "buffer_subtraction", &buffer_subtraction + ); + if (ret) + jerror(&err, "Failed to parse input configuration of node %s", node_name(n)); + } + + if (json_out) { + ret = json_unpack_ex(json_out, &err, 0, "{ s?: s, s?: i, s?: i, s?: i, s?: i, s?: i, s?: i,}", + "address", &remote, + "resolution_timeout", &timeout, + "cq_size", &send_cq_size, + "max_wrs", &max_send_wr, + "max_inline_data", &max_inline_data, + "send_inline", &send_inline, + "vectorize", &vectorize_out + ); + if (ret) + jerror(&err, "Failed to parse output configuration of node %s", node_name(n)); + + ib->is_source = 1; + + debug(LOG_IB | 3, "Node %s is up as source and target", node_name(n)); + } + else { + ib->is_source = 0; + + debug(LOG_IB | 3, "Node %s is up as target", node_name(n)); + } + + // Set vectorize mode. Do not print, since framework will print this information + n->in.vectorize = vectorize_in; + n->out.vectorize = vectorize_out; + + // Set buffer subtraction + ib->conn.buffer_subtraction = buffer_subtraction; + + debug(LOG_IB | 4, "Set buffer subtraction to %i in node %s", buffer_subtraction, node_name(n)); // Translate IP:PORT to a struct addrinfo char* ip_adr = strtok(local, ":"); char* port = strtok(NULL, ":"); - //n->in.vectorize = 1024; - //n->out.vectorize = 1024; //ToDo: make configurable - ret = getaddrinfo(ip_adr, port, NULL, &ib->conn.src_addr); if (ret) error("Failed to resolve local address '%s' of node %s: %s", @@ -317,9 +359,11 @@ int ib_parse(struct node *n, json_t *cfg) debug(LOG_IB | 4, "Set poll mode to %s in node %s", poll_mode, node_name(n)); // Set completion queue size - ib->cq_size = cq_size; + ib->recv_cq_size = recv_cq_size; + ib->send_cq_size = send_cq_size; - debug(LOG_IB | 4, "Set Completion Queue size to %i in node %s", cq_size, node_name(n)); + debug(LOG_IB | 4, "Set Completion Queue size to %i & %i (in & out) in node %s", + recv_cq_size, send_cq_size, node_name(n)); // Translate QP type if (strcmp(qp_type, "IBV_QPT_RC") == 0) ib->qp_init.qp_type = IBV_QPT_RC; @@ -332,9 +376,9 @@ int ib_parse(struct node *n, json_t *cfg) debug(LOG_IB | 4, "Set Queue Pair type to %s in node %s", qp_type, node_name(n)); // Translate inline mode - ib->conn.inline_mode = inline_mode; + ib->conn.send_inline = send_inline; - debug(LOG_IB | 4, "Set inline_modee to %i in node %s", inline_mode, node_name(n)); + debug(LOG_IB | 4, "Set send_inline to %i in node %s", send_inline, node_name(n)); // Set max. send and receive Work Requests ib->qp_init.cap.max_send_wr = max_send_wr; @@ -353,12 +397,7 @@ int ib_parse(struct node *n, json_t *cfg) // Set number of bytes to be send inline ib->qp_init.cap.max_inline_data = max_inline_data; - //Check if node is a source and connect to target - if (remote) { - debug(LOG_IB | 3, "Node %s is up as source and target", node_name(n)); - - ib->is_source = 1; - + if (ib->is_source) { // Translate address info char* ip_adr = strtok(remote, ":"); char* port = strtok(NULL, ":"); @@ -370,11 +409,6 @@ int ib_parse(struct node *n, json_t *cfg) debug(LOG_IB | 4, "Translated %s:%s to a struct addrinfo", ip_adr, port); } - else { - debug(LOG_IB | 3, "Node %s is set up as target", node_name(n)); - - ib->is_source = 0; - } return 0; } @@ -385,6 +419,13 @@ int ib_check(struct node *n) info("Starting check of node %s", node_name(n)); + // Check if read substraction makes sense + if (ib->conn.buffer_subtraction < 2 * n->in.vectorize) + error("The buffer substraction value must be bigger than 2 * in.vectorize"); + + if (ib->conn.buffer_subtraction >= ib->qp_init.cap.max_recv_wr - n->in.vectorize) + error("The buffer substraction value cannot be smaller than in.max_wrs - in.vectorize"); + // Check if the set value is a power of 2, and warn the user if this is not the case int max_send_pow = (int) pow(2, ceil(log2(ib->qp_init.cap.max_send_wr))); int max_recv_pow = (int) pow(2, ceil(log2(ib->qp_init.cap.max_recv_wr))); @@ -417,8 +458,8 @@ int ib_check(struct node *n) warn("You changed the default value of max_inline_data. This might influence the maximum number of outstanding Work Requests in the Queue Pair and can be a reason for the Queue Pair creation to fail"); // Check if inline mode is set to a valid value - if (ib->conn.inline_mode != 0 && ib->conn.inline_mode != 1) - error("inline_mode has to be set to either 0 or 1! %i is not a valid value", ib->conn.inline_mode); + if (ib->conn.send_inline != 0 && ib->conn.send_inline != 1) + error("send_inline has to be set to either 0 or 1! %i is not a valid value", ib->conn.send_inline); info("Finished check of node %s", node_name(n)); @@ -693,7 +734,7 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea // Poll Completion Queue // If we've already posted enough receive WRs, try to pull cnt - if (ib->conn.available_recv_wrs > ( ib->qp_init.cap.max_recv_wr - (1024 * n->in.vectorize) ) ) { //ToDo: Make configurable + if (ib->conn.available_recv_wrs >= (ib->qp_init.cap.max_recv_wr - ib->conn.buffer_subtraction) ) { while(1) { wcs = ibv_poll_cq(ib->ctx.recv_cq, cnt, wc); if (wcs) { @@ -793,7 +834,7 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele // Check if data can be send inline int send_inline = (sge[sent].length < ib->qp_init.cap.max_inline_data) ? - ib->conn.inline_mode : 0; + ib->conn.send_inline : 0; debug(LOG_IB | 10, "Sample will be send inline [0/1]: %i", send_inline);