diff --git a/include/villas/nodes/infiniband.h b/include/villas/nodes/infiniband.h index ebb443394..6f54f04f7 100644 --- a/include/villas/nodes/infiniband.h +++ b/include/villas/nodes/infiniband.h @@ -90,6 +90,8 @@ struct infiniband { pthread_t stop_thread; int rdma_disconnect_called; + + int used_recv_wrs; } conn; /* Memory related variables */ diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index 4783403fe..416449e73 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -21,6 +21,7 @@ *********************************************************************************/ #include +#include #include #include @@ -220,7 +221,8 @@ static void ib_build_ibv(struct node *n) if(ret) error("Failed to create Queue Pair in node %s.", node_name(n)); - info("Created Queue Pair."); + info("Created Queue Pair with %i receive and %i send elements.", + ib->qp_init.cap.max_recv_wr, ib->qp_init.cap.max_send_wr); // Allocate memory ib->mem.p_recv.state = STATE_DESTROYED; @@ -314,8 +316,6 @@ static int ib_addr_resolved(struct node *n) if(ret) error("Failed to resolve route in node %s.", node_name(n)); - //ToDo: create check if data can be send inline - return 0; } @@ -411,15 +411,15 @@ int ib_parse(struct node *n, json_t *cfg) struct infiniband *ib = (struct infiniband *) n->_vd; int ret; - const char *local = NULL; - const char *remote = NULL; + char *local = NULL; + char *remote = NULL; const char *port_space = "RDMA_PC_TCP"; const char *poll_mode = "BUSY"; const char *qp_type = "IBV_QPT_RC"; int timeout = 1000; - int cq_size = 10; - int max_send_wr = 100; - int max_recv_wr = 100; + int cq_size = 128; + int max_send_wr = 128; + int max_recv_wr = 128; json_error_t err; ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: s, s?: s, s?: i, \ @@ -438,8 +438,9 @@ int ib_parse(struct node *n, json_t *cfg) jerror(&err, "Failed to parse configuration of node %s", node_name(n)); // Translate IP:PORT to a struct addrinfo - //ToDo: Fix fixed port - ret = getaddrinfo(local, (char *)"13337", NULL, &ib->conn.src_addr); + char* ip_adr = strtok(local, ":"); + char* port = strtok(NULL, ":"); + ret = getaddrinfo(ip_adr, port, NULL, &ib->conn.src_addr); if(ret) { error("Failed to resolve local address '%s' of node %s: %s", @@ -490,10 +491,24 @@ int ib_parse(struct node *n, json_t *cfg) } // Set max. send and receive Work Requests - //ToDo: Set hint that max_*_wr can only be a value 1<< + // First check if the set value is a power of 2, and warn the user if this is not the case + int max_send_pow = (int) pow(2, ceil(log2(max_send_wr))); + int max_recv_pow = (int) pow(2, ceil(log2(max_recv_wr))); + + if(max_send_wr != max_send_pow) + warn("Max. number of send WRs (%i) is not a power of 2! The HCA will change this to a power of 2: %i", + max_send_wr, max_send_pow); + + if(max_recv_wr != max_recv_pow) + warn("Max. number of recv WRs (%i) is not a power of 2! The HCA will change this to a power of 2: %i", + max_recv_wr, max_recv_pow); + ib->qp_init.cap.max_send_wr = max_send_wr; ib->qp_init.cap.max_recv_wr = max_recv_wr; + // Set used receive Work Requests to 0 + ib->conn.used_recv_wrs = 0; + // Set remaining QP attributes ib->qp_init.cap.max_send_sge = 1; ib->qp_init.cap.max_recv_sge = 1; @@ -504,8 +519,9 @@ int ib_parse(struct node *n, json_t *cfg) ib->is_source = 1; // Translate address info - //ToDo: Fix fixed port - ret = getaddrinfo(remote, (char *)"13337", NULL, &ib->conn.dst_addr); + char* ip_adr = strtok(remote, ":"); + char* port = strtok(NULL, ":"); + ret = getaddrinfo(ip_adr, port, NULL, &ib->conn.dst_addr); if(ret) { error("Failed to resolve remote address '%s' of node %s: %s", @@ -702,6 +718,8 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt) if(ret) { + ib->conn.used_recv_wrs += ret; + data = malloc(ret*sizeof(double)); for(int i=0; imem.p_recv, (double*)(wc[i].wr_id)); + } } smps[0]->length = ret; smps[0]->capacity = cnt; memcpy(smps[0]->data, data, ret*sizeof(double)); } + else + { + //No data received? Put new receive Work Requests to Receive Queue + for(int i=0; iconn.used_recv_wrs; i++) + ib_post_recv_wrs(n); + + ib->conn.used_recv_wrs = 0; + } return ret; }