From 4663f55e4bcab5cba032a012c2151e739919d886 Mon Sep 17 00:00:00 2001 From: Dennis Potter Date: Sat, 7 Jul 2018 17:07:45 +0200 Subject: [PATCH 1/6] Changed all node_read() functions to support a *cnt instead of cnt --- include/villas/node.h | 2 +- include/villas/node_type.h | 2 +- include/villas/nodes/amqp.h | 2 +- include/villas/nodes/file.h | 2 +- include/villas/nodes/infiniband.h | 18 +++++++++--------- include/villas/nodes/loopback.h | 2 +- include/villas/nodes/mqtt.h | 2 +- include/villas/nodes/nanomsg.h | 2 +- include/villas/nodes/ngsi.h | 2 +- include/villas/nodes/shmem.h | 2 +- include/villas/nodes/signal_generator.h | 2 +- include/villas/nodes/socket.h | 2 +- include/villas/nodes/stats.h | 2 +- include/villas/nodes/test_rtt.h | 2 +- include/villas/nodes/websocket.h | 2 +- include/villas/nodes/zeromq.h | 2 +- include/villas/stats.h | 2 +- lib/node.c | 9 +++++---- lib/nodes/amqp.c | 4 ++-- lib/nodes/cbuilder.c | 2 +- lib/nodes/file.c | 10 +++++----- lib/nodes/infiniband.c | 12 ++++++------ lib/nodes/loopback.c | 6 +++--- lib/nodes/mqtt.c | 6 +++--- lib/nodes/nanomsg.c | 4 ++-- lib/nodes/ngsi.c | 4 ++-- lib/nodes/shmem.c | 6 +++--- lib/nodes/signal_generator.c | 4 ++-- lib/nodes/socket.c | 4 ++-- lib/nodes/stats.c | 4 ++-- lib/nodes/test_rtt.c | 4 ++-- lib/nodes/websocket.c | 6 +++--- lib/nodes/zeromq.c | 4 ++-- lib/path.c | 2 +- src/villas-pipe.cpp | 4 ++-- src/villas-signal.cpp | 5 +++-- src/villas-test-rtt.cpp | 5 +++-- 37 files changed, 79 insertions(+), 76 deletions(-) diff --git a/include/villas/node.h b/include/villas/node.h index 09ca1a429..cce336a90 100644 --- a/include/villas/node.h +++ b/include/villas/node.h @@ -154,7 +154,7 @@ char * node_name_long(struct node *n); */ int node_reverse(struct node *n); -int node_read(struct node *n, struct sample *smps[], unsigned cnt); +int node_read(struct node *n, struct sample *smps[], int *cnt); int node_write(struct node *n, struct sample *smps[], unsigned cnt); diff --git a/include/villas/node_type.h b/include/villas/node_type.h index b9026b1d3..5d716aea4 100644 --- a/include/villas/node_type.h +++ b/include/villas/node_type.h @@ -136,7 +136,7 @@ struct node_type { * @param cnt The number of messages which should be received. * @return The number of messages actually received. */ - int (*read)(struct node *n, struct sample *smps[], unsigned cnt); + int (*read)(struct node *n, struct sample *smps[], int *cnt); /** Send multiple messages in a single datagram / packet. * diff --git a/include/villas/nodes/amqp.h b/include/villas/nodes/amqp.h index b343190c1..42e379946 100644 --- a/include/villas/nodes/amqp.h +++ b/include/villas/nodes/amqp.h @@ -80,7 +80,7 @@ int amqp_start(struct node *n); int amqp_stop(struct node *n); /** @see node_type::read */ -int amqp_read(struct node *n, struct sample *smps[], unsigned cnt); +int amqp_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ int amqp_write(struct node *n, struct sample *smps[], unsigned cnt); diff --git a/include/villas/nodes/file.h b/include/villas/nodes/file.h index 498afb0a4..4ccf2bcb2 100644 --- a/include/villas/nodes/file.h +++ b/include/villas/nodes/file.h @@ -83,7 +83,7 @@ int file_start(struct node *n); int file_stop(struct node *n); /** @see node_type::read */ -int file_read(struct node *n, struct sample *smps[], unsigned cnt); +int file_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ int file_write(struct node *n, struct sample *smps[], unsigned cnt); diff --git a/include/villas/nodes/infiniband.h b/include/villas/nodes/infiniband.h index cf2cb286d..6452fa661 100644 --- a/include/villas/nodes/infiniband.h +++ b/include/villas/nodes/infiniband.h @@ -112,31 +112,31 @@ struct infiniband { }; /** @see node_type::reverse */ -int infiniband_reverse(struct node *n); +int ib_reverse(struct node *n); /** @see node_type::print */ -char * infiniband_print(struct node *n); +char * ib_print(struct node *n); /** @see node_type::parse */ -int infiniband_parse(struct node *n, json_t *cfg); +int ib_parse(struct node *n, json_t *cfg); /** @see node_type::open */ -int infiniband_start(struct node *n); +int ib_start(struct node *n); /** @see node_type::destroy */ -int infiniband_destroy(struct node *n); +int ib_destroy(struct node *n); /** @see node_type::close */ -int infiniband_stop(struct node *n); +int ib_stop(struct node *n); /** @see node_type::init */ -int infiniband_init(struct super_node *n); +int ib_init(struct super_node *n); /** @see node_type::deinit */ -int infiniband_deinit(); +int ib_deinit(); /** @see node_type::read */ -int infiniband_read(struct node *n, struct sample *smps[], unsigned cnt); +int ib_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ int infiniband_write(struct node *n, struct sample *smps[], unsigned cnt); diff --git a/include/villas/nodes/loopback.h b/include/villas/nodes/loopback.h index 4cff73eb6..2c6556eb8 100644 --- a/include/villas/nodes/loopback.h +++ b/include/villas/nodes/loopback.h @@ -63,7 +63,7 @@ int loopback_open(struct node *n); int loopback_close(struct node *n); /** @see node_type::read */ -int loopback_read(struct node *n, struct sample *smps[], unsigned cnt); +int loopback_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ int loopback_write(struct node *n, struct sample *smps[], unsigned cnt); diff --git a/include/villas/nodes/mqtt.h b/include/villas/nodes/mqtt.h index 40713c8b2..d2f7061a2 100644 --- a/include/villas/nodes/mqtt.h +++ b/include/villas/nodes/mqtt.h @@ -95,7 +95,7 @@ int mqtt_init(); int mqtt_deinit(); /** @see node_type::read */ -int mqtt_read(struct node *n, struct sample *smps[], unsigned cnt); +int mqtt_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ int mqtt_write(struct node *n, struct sample *smps[], unsigned cnt); diff --git a/include/villas/nodes/nanomsg.h b/include/villas/nodes/nanomsg.h index 11b5d2a3e..715d21122 100644 --- a/include/villas/nodes/nanomsg.h +++ b/include/villas/nodes/nanomsg.h @@ -71,7 +71,7 @@ int nanomsg_start(struct node *n); int nanomsg_stop(struct node *n); /** @see node_type::read */ -int nanomsg_read(struct node *n, struct sample *smps[], unsigned cnt); +int nanomsg_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt); diff --git a/include/villas/nodes/ngsi.h b/include/villas/nodes/ngsi.h index 6160907a5..c43f813e3 100644 --- a/include/villas/nodes/ngsi.h +++ b/include/villas/nodes/ngsi.h @@ -93,7 +93,7 @@ int ngsi_start(struct node *n); int ngsi_stop(struct node *n); /** @see node_type::read */ -int ngsi_read(struct node *n, struct sample *smps[], unsigned cnt); +int ngsi_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ int ngsi_write(struct node *n, struct sample *smps[], unsigned cnt); diff --git a/include/villas/nodes/shmem.h b/include/villas/nodes/shmem.h index 0426645ce..a37374111 100644 --- a/include/villas/nodes/shmem.h +++ b/include/villas/nodes/shmem.h @@ -64,7 +64,7 @@ int shmem_start(struct node *n); int shmem_stop(struct node *n); /** @see node_type::read */ -int shmem_read(struct node *n, struct sample *smps[], unsigned cnt); +int shmem_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ int shmem_write(struct node *n, struct sample *smps[], unsigned cnt); diff --git a/include/villas/nodes/signal_generator.h b/include/villas/nodes/signal_generator.h index 18d0c0b48..a43156af4 100644 --- a/include/villas/nodes/signal_generator.h +++ b/include/villas/nodes/signal_generator.h @@ -89,7 +89,7 @@ int signal_generator_start(struct node *n); int signal_generator_stop(struct node *n); /** @see node_type::read */ -int signal_generator_read(struct node *n, struct sample *smps[], unsigned cnt); +int signal_generator_read(struct node *n, struct sample *smps[], int *cnt); enum signal_generator_type signal_generator_lookup_type(const char *type); diff --git a/include/villas/nodes/socket.h b/include/villas/nodes/socket.h index ad2ed1372..cc80c6216 100644 --- a/include/villas/nodes/socket.h +++ b/include/villas/nodes/socket.h @@ -122,7 +122,7 @@ int socket_stop(struct node *n); int socket_write(struct node *n, struct sample *smps[], unsigned cnt); /** @see node_type::read */ -int socket_read(struct node *n, struct sample *smps[], unsigned cnt); +int socket_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::parse */ int socket_parse(struct node *n, json_t *cfg); diff --git a/include/villas/nodes/stats.h b/include/villas/nodes/stats.h index 89a161656..eea8ca746 100644 --- a/include/villas/nodes/stats.h +++ b/include/villas/nodes/stats.h @@ -67,7 +67,7 @@ int stats_node_start(struct node *n); int stats_node_stop(struct node *n); /** @see node_type::read */ -int stats_node_read(struct node *n, struct sample *smps[], unsigned cnt); +int stats_node_read(struct node *n, struct sample *smps[], int *cnt); #ifdef __cplusplus } diff --git a/include/villas/nodes/test_rtt.h b/include/villas/nodes/test_rtt.h index 20f204410..b65f92030 100644 --- a/include/villas/nodes/test_rtt.h +++ b/include/villas/nodes/test_rtt.h @@ -79,7 +79,7 @@ int test_rtt_start(struct node *n); int test_rtt_stop(struct node *n); /** @see node_type::read */ -int test_rtt_read(struct node *n, struct sample *smps[], unsigned cnt); +int test_rtt_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ int test_rtt_write(struct node *n, struct sample *smps[], unsigned cnt); diff --git a/include/villas/nodes/websocket.h b/include/villas/nodes/websocket.h index 0e5097a17..6d6c15ad7 100644 --- a/include/villas/nodes/websocket.h +++ b/include/villas/nodes/websocket.h @@ -112,7 +112,7 @@ int websocket_stop(struct node *n); int websocket_destroy(struct node *n); /** @see node_type::read */ -int websocket_read(struct node *n, struct sample *smps[], unsigned cnt); +int websocket_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ int websocket_write(struct node *n, struct sample *smps[], unsigned cnt); diff --git a/include/villas/nodes/zeromq.h b/include/villas/nodes/zeromq.h index 4f31de46e..997388b45 100644 --- a/include/villas/nodes/zeromq.h +++ b/include/villas/nodes/zeromq.h @@ -102,7 +102,7 @@ int zeromq_start(struct node *n); int zeromq_stop(struct node *n); /** @see node_type::read */ -int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt); +int zeromq_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt); diff --git a/include/villas/stats.h b/include/villas/stats.h index ac440a5f5..8f628ff10 100644 --- a/include/villas/stats.h +++ b/include/villas/stats.h @@ -72,7 +72,7 @@ int stats_destroy(struct stats *s); void stats_update(struct stats *s, enum stats_id id, double val); -void stats_collect(struct stats *s, struct sample *smps[], size_t cnt); +void stats_collect(struct stats *s, struct sample *smps[], size_t *cnt); int stats_commit(struct stats *s); diff --git a/lib/node.c b/lib/node.c index c7d93b84b..660a6ed34 100644 --- a/lib/node.c +++ b/lib/node.c @@ -408,7 +408,7 @@ int node_destroy(struct node *n) return 0; } -int node_read(struct node *n, struct sample *smps[], unsigned cnt) +int node_read(struct node *n, struct sample *smps[], int *cnt) { int readd, nread = 0; @@ -416,9 +416,10 @@ int node_read(struct node *n, struct sample *smps[], unsigned cnt) return -1; /* Send in parts if vector not supported */ - if (n->_vt->vectorize > 0 && n->_vt->vectorize < cnt) { - while (cnt - nread > 0) { - readd = n->_vt->read(n, &smps[nread], MIN(cnt - nread, n->_vt->vectorize)); + if (n->_vt->vectorize > 0 && n->_vt->vectorize < *cnt) { + int cnt_vec_min = MIN(*cnt - nread, n->_vt->vectorize); + while (*cnt - nread > 0) { + readd = n->_vt->read(n, &smps[nread], &cnt_vec_min); if (readd < 0) return readd; diff --git a/lib/nodes/amqp.c b/lib/nodes/amqp.c index 2e9472c2a..13ae172ac 100644 --- a/lib/nodes/amqp.c +++ b/lib/nodes/amqp.c @@ -301,7 +301,7 @@ int amqp_stop(struct node *n) return 0; } -int amqp_read(struct node *n, struct sample *smps[], unsigned cnt) +int amqp_read(struct node *n, struct sample *smps[], int *cnt) { int ret; struct amqp *a = n->_vd; @@ -312,7 +312,7 @@ int amqp_read(struct node *n, struct sample *smps[], unsigned cnt) if (rep.reply_type != AMQP_RESPONSE_NORMAL) return -1; - ret = io_sscan(&a->io, env.message.body.bytes, env.message.body.len, NULL, smps, cnt); + ret = io_sscan(&a->io, env.message.body.bytes, env.message.body.len, NULL, smps, *cnt); amqp_destroy_envelope(&env); diff --git a/lib/nodes/cbuilder.c b/lib/nodes/cbuilder.c index bf0ecaf2e..acd58874b 100644 --- a/lib/nodes/cbuilder.c +++ b/lib/nodes/cbuilder.c @@ -94,7 +94,7 @@ int cbuilder_stop(struct node *n) return 0; } -int cbuilder_read(struct node *n, struct sample *smps[], unsigned cnt) +int cbuilder_read(struct node *n, struct sample *smps[], int *cnt) { struct cbuilder *cb = (struct cbuilder *) n->_vd; struct sample *smp = smps[0]; diff --git a/lib/nodes/file.c b/lib/nodes/file.c index 674c0bd0b..a9dcbe67d 100644 --- a/lib/nodes/file.c +++ b/lib/nodes/file.c @@ -278,15 +278,15 @@ int file_destroy(struct node *n) return 0; } -int file_read(struct node *n, struct sample *smps[], unsigned cnt) +int file_read(struct node *n, struct sample *smps[], int *cnt) { struct file *f = (struct file *) n->_vd; int ret; uint64_t steps; - assert(cnt == 1); + assert(*cnt == 1); -retry: ret = io_scan(&f->io, smps, cnt); +retry: ret = io_scan(&f->io, smps, *cnt); if (ret <= 0) { if (io_eof(&f->io)) { switch (f->eof) { @@ -322,7 +322,7 @@ retry: ret = io_scan(&f->io, smps, cnt); /* We dont wait in FILE_EPOCH_ORIGINAL mode */ if (f->epoch_mode == FILE_EPOCH_ORIGINAL) - return cnt; + return *cnt; if (f->rate) { steps = task_wait(&f->task); @@ -342,7 +342,7 @@ retry: ret = io_scan(&f->io, smps, cnt); else if (steps != 1) warn("Missed steps: %" PRIu64, steps - 1); - return cnt; + return *cnt; } int file_write(struct node *n, struct sample *smps[], unsigned cnt) diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index 5b103b71e..1549642aa 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -730,12 +730,12 @@ int ib_deinit() return 0; } -int ib_read(struct node *n, struct sample *smps[], unsigned cnt) +int ib_read(struct node *n, struct sample *smps[], int *cnt) { struct infiniband *ib = (struct infiniband *) n->_vd; struct ibv_wc wc[n->in.vectorize]; - struct ibv_recv_wr wr[cnt], *bad_wr = NULL; - struct ibv_sge sge[cnt]; + struct ibv_recv_wr wr[*cnt], *bad_wr = NULL; + struct ibv_sge sge[*cnt]; struct ibv_mr *mr; int ret = 0; @@ -743,11 +743,11 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt) if (n->state == STATE_CONNECTED) { - if (ib->conn.available_recv_wrs < ib->qp_init.cap.max_recv_wr && cnt==n->in.vectorize) { + if (ib->conn.available_recv_wrs < ib->qp_init.cap.max_recv_wr && *cnt) { // Get Memory Region mr = memory_ib_get_mr(smps[0]); - for (int i = 0; i < cnt; i++) { + for (int i = 0; i < *cnt; i++) { // Increase refcnt of sample sample_get(smps[i]); @@ -764,7 +764,7 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt) ib->conn.available_recv_wrs++; - if (ib->conn.available_recv_wrs == ib->qp_init.cap.max_recv_wr || i==(cnt-1)) { + if (ib->conn.available_recv_wrs == ib->qp_init.cap.max_recv_wr || i==(*cnt-1)) { debug(LOG_IB | 10, "Prepared %i new receive Work Requests", (i+1)); wr[i].next = NULL; diff --git a/lib/nodes/loopback.c b/lib/nodes/loopback.c index 62b11519c..2ab769d19 100644 --- a/lib/nodes/loopback.c +++ b/lib/nodes/loopback.c @@ -69,14 +69,14 @@ int loopback_close(struct node *n) return queue_signalled_destroy(&l->queue); } -int loopback_read(struct node *n, struct sample *smps[], unsigned cnt) +int loopback_read(struct node *n, struct sample *smps[], int *cnt) { int avail; struct loopback *l = (struct loopback *) n->_vd; - struct sample *cpys[cnt]; + struct sample *cpys[*cnt]; - avail = queue_signalled_pull_many(&l->queue, (void **) cpys, cnt); + avail = queue_signalled_pull_many(&l->queue, (void **) cpys, *cnt); for (int i = 0; i < avail; i++) { sample_copy(smps[i], cpys[i]); diff --git a/lib/nodes/mqtt.c b/lib/nodes/mqtt.c index 893d10d8d..ee0230d31 100644 --- a/lib/nodes/mqtt.c +++ b/lib/nodes/mqtt.c @@ -358,13 +358,13 @@ int mqtt_deinit() return 0; } -int mqtt_read(struct node *n, struct sample *smps[], unsigned cnt) +int mqtt_read(struct node *n, struct sample *smps[], int *cnt) { int pulled; struct mqtt *m = (struct mqtt *) n->_vd; - struct sample *smpt[cnt]; + struct sample *smpt[*cnt]; - pulled = queue_signalled_pull_many(&m->queue, (void **) smpt, cnt); + pulled = queue_signalled_pull_many(&m->queue, (void **) smpt, *cnt); sample_copy_many(smps, smpt, pulled); sample_put_many(smpt, pulled); diff --git a/lib/nodes/nanomsg.c b/lib/nodes/nanomsg.c index 432734803..f6f397d40 100644 --- a/lib/nodes/nanomsg.c +++ b/lib/nodes/nanomsg.c @@ -228,7 +228,7 @@ int nanomsg_deinit() return 0; } -int nanomsg_read(struct node *n, struct sample *smps[], unsigned cnt) +int nanomsg_read(struct node *n, struct sample *smps[], int *cnt) { struct nanomsg *m = (struct nanomsg *) n->_vd; int bytes; @@ -239,7 +239,7 @@ int nanomsg_read(struct node *n, struct sample *smps[], unsigned cnt) if (bytes < 0) return -1; - return io_sscan(&m->io, data, bytes, NULL, smps, cnt); + return io_sscan(&m->io, data, bytes, NULL, smps, *cnt); } int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt) diff --git a/lib/nodes/ngsi.c b/lib/nodes/ngsi.c index 8e5618e43..36a61cee6 100644 --- a/lib/nodes/ngsi.c +++ b/lib/nodes/ngsi.c @@ -533,7 +533,7 @@ int ngsi_stop(struct node *n) return ret; } -int ngsi_read(struct node *n, struct sample *smps[], unsigned cnt) +int ngsi_read(struct node *n, struct sample *smps[], int *cnt) { struct ngsi *i = (struct ngsi *) n->_vd; int ret; @@ -548,7 +548,7 @@ int ngsi_read(struct node *n, struct sample *smps[], unsigned cnt) if (ret) goto out; - ret = ngsi_parse_entity(rentity, i, smps, cnt); + ret = ngsi_parse_entity(rentity, i, smps, *cnt); if (ret) goto out2; diff --git a/lib/nodes/shmem.c b/lib/nodes/shmem.c index 177b548be..0461ab7ab 100644 --- a/lib/nodes/shmem.c +++ b/lib/nodes/shmem.c @@ -110,14 +110,14 @@ int shmem_stop(struct node *n) return shmem_int_close(&shm->intf); } -int shmem_read(struct node *n, struct sample *smps[], unsigned cnt) +int shmem_read(struct node *n, struct sample *smps[], int *cnt) { struct shmem *shm = (struct shmem *) n->_vd; int recv; - struct sample *shared_smps[cnt]; + struct sample *shared_smps[*cnt]; do { - recv = shmem_int_read(&shm->intf, shared_smps, cnt); + recv = shmem_int_read(&shm->intf, shared_smps, *cnt); } while (recv == 0); if (recv < 0) { diff --git a/lib/nodes/signal_generator.c b/lib/nodes/signal_generator.c index 13a26dba6..8d038b7de 100644 --- a/lib/nodes/signal_generator.c +++ b/lib/nodes/signal_generator.c @@ -212,7 +212,7 @@ int signal_generator_stop(struct node *n) return 0; } -int signal_generator_read(struct node *n, struct sample *smps[], unsigned cnt) +int signal_generator_read(struct node *n, struct sample *smps[], int *cnt) { struct signal_generator *s = (struct signal_generator *) n->_vd; struct sample *t = smps[0]; @@ -220,7 +220,7 @@ int signal_generator_read(struct node *n, struct sample *smps[], unsigned cnt) struct timespec ts; int steps; - assert(cnt == 1); + assert(*cnt == 1); /* Throttle output if desired */ if (s->rt) { diff --git a/lib/nodes/socket.c b/lib/nodes/socket.c index 61c7fa2a1..2b853a662 100644 --- a/lib/nodes/socket.c +++ b/lib/nodes/socket.c @@ -336,7 +336,7 @@ int socket_destroy(struct node *n) return 0; } -int socket_read(struct node *n, struct sample *smps[], unsigned cnt) +int socket_read(struct node *n, struct sample *smps[], int *cnt) { int ret; struct socket *s = (struct socket *) n->_vd; @@ -391,7 +391,7 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt) goto out; } - ret = io_sscan(&s->io, ptr, bytes, &rbytes, smps, cnt); + ret = io_sscan(&s->io, ptr, bytes, &rbytes, smps, *cnt); if (ret < 0 || bytes != rbytes) warn("Received invalid packet from node: %s ret=%d, bytes=%zu, rbytes=%zu", node_name(n), ret, bytes, rbytes); diff --git a/lib/nodes/stats.c b/lib/nodes/stats.c index 4a5e0abd9..146e6fbf5 100644 --- a/lib/nodes/stats.c +++ b/lib/nodes/stats.c @@ -119,12 +119,12 @@ int stats_node_destroy(struct node *n) return 0; } -int stats_node_read(struct node *n, struct sample *smps[], unsigned cnt) +int stats_node_read(struct node *n, struct sample *smps[], int *cnt) { struct stats_node *sn = (struct stats_node *) n->_vd; struct stats *s = sn->node->stats; - if (!cnt) + if (!*cnt) return 0; if (!sn->node->stats) diff --git a/lib/nodes/test_rtt.c b/lib/nodes/test_rtt.c index 2de5d887f..a48229ce1 100644 --- a/lib/nodes/test_rtt.c +++ b/lib/nodes/test_rtt.c @@ -277,7 +277,7 @@ int test_rtt_stop(struct node *n) return 0; } -int test_rtt_read(struct node *n, struct sample *smps[], unsigned cnt) +int test_rtt_read(struct node *n, struct sample *smps[], int *cnt) { int i, ret, values; uint64_t steps; @@ -320,7 +320,7 @@ int test_rtt_read(struct node *n, struct sample *smps[], unsigned cnt) struct timespec now = time_now(); /* Prepare samples */ - for (i = 0; i < cnt; i++) { + for (i = 0; i < *cnt; i++) { values = c->values; if (smps[i]->capacity < values) { values = smps[i]->capacity; diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c index a81b8b2e8..29d996ce5 100644 --- a/lib/nodes/websocket.c +++ b/lib/nodes/websocket.c @@ -467,14 +467,14 @@ int websocket_destroy(struct node *n) return 0; } -int websocket_read(struct node *n, struct sample *smps[], unsigned cnt) +int websocket_read(struct node *n, struct sample *smps[], int *cnt) { int avail; struct websocket *w = (struct websocket *) n->_vd; - struct sample *cpys[cnt]; + struct sample *cpys[*cnt]; - avail = queue_signalled_pull_many(&w->queue, (void **) cpys, cnt); + avail = queue_signalled_pull_many(&w->queue, (void **) cpys, *cnt); if (avail < 0) return avail; diff --git a/lib/nodes/zeromq.c b/lib/nodes/zeromq.c index 930e45bf9..db8176c46 100644 --- a/lib/nodes/zeromq.c +++ b/lib/nodes/zeromq.c @@ -419,7 +419,7 @@ int zeromq_destroy(struct node *n) return 0; } -int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt) +int zeromq_read(struct node *n, struct sample *smps[], int *cnt) { int recv, ret; struct zeromq *z = (struct zeromq *) n->_vd; @@ -446,7 +446,7 @@ int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt) if (ret < 0) return ret; - recv = io_sscan(&z->io, zmq_msg_data(&m), zmq_msg_size(&m), NULL, smps, cnt); + recv = io_sscan(&z->io, zmq_msg_data(&m), zmq_msg_size(&m), NULL, smps, *cnt); ret = zmq_msg_close(&m); if (ret) diff --git a/lib/path.c b/lib/path.c index 48048d7e8..5fe35879f 100644 --- a/lib/path.c +++ b/lib/path.c @@ -84,7 +84,7 @@ static void path_source_read(struct path_source *ps, struct path *p, int i) warn("Pool underrun for path source %s", node_name(ps->node)); /* Read ready samples and store them to blocks pointed by smps[] */ - recv = node_read(ps->node, read_smps, ready); + recv = node_read(ps->node, read_smps, &ready); if (recv == 0) goto out2; else if (recv < 0) diff --git a/src/villas-pipe.cpp b/src/villas-pipe.cpp index 0133454e2..82669d01d 100644 --- a/src/villas-pipe.cpp +++ b/src/villas-pipe.cpp @@ -210,7 +210,7 @@ static void * recv_loop(void *ctx) else if (ready < node->in.vectorize) warn("Receive pool underrun"); - recv = node_read(node, smps, ready); + recv = node_read(node, smps, &ready); if (recv < 0) warn("Failed to receive samples from node %s: reason=%d", node_name(node), recv); @@ -239,7 +239,7 @@ int main(int argc, char *argv[]) sendd.enabled = true; sendd.limit = -1; - + recvv.enabled = true; recvv.limit = -1; diff --git a/src/villas-signal.cpp b/src/villas-signal.cpp index 2330d83e3..3e3a05ffb 100644 --- a/src/villas-signal.cpp +++ b/src/villas-signal.cpp @@ -170,8 +170,9 @@ int main(int argc, char *argv[]) for (;;) { t = sample_alloc(&q); - node_read(&n, &t, 1); - io_print(&io, &t, 1); + int one = 1; + node_read(&n, &t, &one); + io_print(&io, &t, one); sample_put(t); } diff --git a/src/villas-test-rtt.cpp b/src/villas-test-rtt.cpp index c591c853e..513bafef3 100644 --- a/src/villas-test-rtt.cpp +++ b/src/villas-test-rtt.cpp @@ -166,8 +166,9 @@ void test_rtt() { while (running && (count < 0 || count--)) { clock_gettime(CLOCK_ID, &send); - node_write(node, &smp_send, 1); /* Ping */ - node_read(node, &smp_recv, 1); /* Pong */ + int one = 1; + node_write(node, &smp_send, one); /* Ping */ + node_read(node, &smp_recv, &one); /* Pong */ clock_gettime(CLOCK_ID, &recv); From 6150a364113f783a7d18b9e0138015c56cf7ceb6 Mon Sep 17 00:00:00 2001 From: Dennis Potter Date: Sat, 7 Jul 2018 17:48:07 +0200 Subject: [PATCH 2/6] Changed all node_write() functions --- include/villas/node.h | 2 +- include/villas/node_type.h | 2 +- include/villas/nodes/amqp.h | 2 +- include/villas/nodes/file.h | 2 +- include/villas/nodes/infiniband.h | 2 +- include/villas/nodes/influxdb.h | 2 +- include/villas/nodes/loopback.h | 2 +- include/villas/nodes/mqtt.h | 2 +- include/villas/nodes/nanomsg.h | 2 +- include/villas/nodes/ngsi.h | 2 +- include/villas/nodes/shmem.h | 2 +- include/villas/nodes/socket.h | 2 +- include/villas/nodes/test_rtt.h | 2 +- include/villas/nodes/websocket.h | 2 +- include/villas/nodes/zeromq.h | 2 +- lib/node.c | 15 ++++++++------- lib/nodes/amqp.c | 6 +++--- lib/nodes/cbuilder.c | 2 +- lib/nodes/file.c | 8 ++++---- lib/nodes/infiniband.c | 12 ++++++------ lib/nodes/influxdb.c | 6 +++--- lib/nodes/loopback.c | 8 ++++---- lib/nodes/mqtt.c | 6 +++--- lib/nodes/nanomsg.c | 6 +++--- lib/nodes/ngsi.c | 6 +++--- lib/nodes/shmem.c | 8 ++++---- lib/nodes/socket.c | 4 ++-- lib/nodes/test_rtt.c | 4 ++-- lib/nodes/websocket.c | 12 ++++++------ lib/nodes/zeromq.c | 6 +++--- lib/path.c | 2 +- src/villas-pipe.cpp | 2 +- src/villas-test-rtt.cpp | 2 +- 33 files changed, 73 insertions(+), 72 deletions(-) diff --git a/include/villas/node.h b/include/villas/node.h index cce336a90..a6f96f939 100644 --- a/include/villas/node.h +++ b/include/villas/node.h @@ -156,7 +156,7 @@ int node_reverse(struct node *n); int node_read(struct node *n, struct sample *smps[], int *cnt); -int node_write(struct node *n, struct sample *smps[], unsigned cnt); +int node_write(struct node *n, struct sample *smps[], int *cnt); int node_fd(struct node *n); diff --git a/include/villas/node_type.h b/include/villas/node_type.h index 5d716aea4..8b5bb0e3b 100644 --- a/include/villas/node_type.h +++ b/include/villas/node_type.h @@ -150,7 +150,7 @@ struct node_type { * @param cnt The number of messages which should be sent. * @return The number of messages actually sent. */ - int (*write)(struct node *n, struct sample *smps[], unsigned cnt); + int (*write)(struct node *n, struct sample *smps[], int *cnt); /** Reverse source and destination of a node. * diff --git a/include/villas/nodes/amqp.h b/include/villas/nodes/amqp.h index 42e379946..7438ab1a7 100644 --- a/include/villas/nodes/amqp.h +++ b/include/villas/nodes/amqp.h @@ -83,7 +83,7 @@ int amqp_stop(struct node *n); int amqp_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ -int amqp_write(struct node *n, struct sample *smps[], unsigned cnt); +int amqp_write(struct node *n, struct sample *smps[], int *cnt); #ifdef __cplusplus } diff --git a/include/villas/nodes/file.h b/include/villas/nodes/file.h index 4ccf2bcb2..c333e4a0b 100644 --- a/include/villas/nodes/file.h +++ b/include/villas/nodes/file.h @@ -86,7 +86,7 @@ int file_stop(struct node *n); int file_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ -int file_write(struct node *n, struct sample *smps[], unsigned cnt); +int file_write(struct node *n, struct sample *smps[], int *cnt); /** @} */ diff --git a/include/villas/nodes/infiniband.h b/include/villas/nodes/infiniband.h index 6452fa661..c60baab38 100644 --- a/include/villas/nodes/infiniband.h +++ b/include/villas/nodes/infiniband.h @@ -139,6 +139,6 @@ int ib_deinit(); int ib_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ -int infiniband_write(struct node *n, struct sample *smps[], unsigned cnt); +int infiniband_write(struct node *n, struct sample *smps[], int *cnt); /** @} */ diff --git a/include/villas/nodes/influxdb.h b/include/villas/nodes/influxdb.h index 3abbde1f9..3d4560923 100644 --- a/include/villas/nodes/influxdb.h +++ b/include/villas/nodes/influxdb.h @@ -65,7 +65,7 @@ int influxdb_open(struct node *n); int influxdb_close(struct node *n); /** @see node_type::write */ -int influxdb_write(struct node *n, struct sample *smps[], unsigned cnt); +int influxdb_write(struct node *n, struct sample *smps[], int *cnt); #ifdef __cplusplus } diff --git a/include/villas/nodes/loopback.h b/include/villas/nodes/loopback.h index 2c6556eb8..be442af3c 100644 --- a/include/villas/nodes/loopback.h +++ b/include/villas/nodes/loopback.h @@ -66,7 +66,7 @@ int loopback_close(struct node *n); int loopback_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ -int loopback_write(struct node *n, struct sample *smps[], unsigned cnt); +int loopback_write(struct node *n, struct sample *smps[], int *cnt); #ifdef __cplusplus } diff --git a/include/villas/nodes/mqtt.h b/include/villas/nodes/mqtt.h index d2f7061a2..d4a8a703c 100644 --- a/include/villas/nodes/mqtt.h +++ b/include/villas/nodes/mqtt.h @@ -98,7 +98,7 @@ int mqtt_deinit(); int mqtt_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ -int mqtt_write(struct node *n, struct sample *smps[], unsigned cnt); +int mqtt_write(struct node *n, struct sample *smps[], int *cnt); #ifdef __cplusplus } diff --git a/include/villas/nodes/nanomsg.h b/include/villas/nodes/nanomsg.h index 715d21122..bfcd1b277 100644 --- a/include/villas/nodes/nanomsg.h +++ b/include/villas/nodes/nanomsg.h @@ -74,7 +74,7 @@ int nanomsg_stop(struct node *n); int nanomsg_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ -int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt); +int nanomsg_write(struct node *n, struct sample *smps[], int *cnt); #ifdef __cplusplus } diff --git a/include/villas/nodes/ngsi.h b/include/villas/nodes/ngsi.h index c43f813e3..107fd5520 100644 --- a/include/villas/nodes/ngsi.h +++ b/include/villas/nodes/ngsi.h @@ -96,7 +96,7 @@ int ngsi_stop(struct node *n); int ngsi_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ -int ngsi_write(struct node *n, struct sample *smps[], unsigned cnt); +int ngsi_write(struct node *n, struct sample *smps[], int *cnt); /** @} */ diff --git a/include/villas/nodes/shmem.h b/include/villas/nodes/shmem.h index a37374111..29937cef0 100644 --- a/include/villas/nodes/shmem.h +++ b/include/villas/nodes/shmem.h @@ -67,7 +67,7 @@ int shmem_stop(struct node *n); int shmem_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ -int shmem_write(struct node *n, struct sample *smps[], unsigned cnt); +int shmem_write(struct node *n, struct sample *smps[], int *cnt); /** @} */ diff --git a/include/villas/nodes/socket.h b/include/villas/nodes/socket.h index cc80c6216..3059e3914 100644 --- a/include/villas/nodes/socket.h +++ b/include/villas/nodes/socket.h @@ -119,7 +119,7 @@ int socket_start(struct node *n); int socket_stop(struct node *n); /** @see node_type::write */ -int socket_write(struct node *n, struct sample *smps[], unsigned cnt); +int socket_write(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::read */ int socket_read(struct node *n, struct sample *smps[], int *cnt); diff --git a/include/villas/nodes/test_rtt.h b/include/villas/nodes/test_rtt.h index b65f92030..816327c1b 100644 --- a/include/villas/nodes/test_rtt.h +++ b/include/villas/nodes/test_rtt.h @@ -82,7 +82,7 @@ int test_rtt_stop(struct node *n); int test_rtt_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ -int test_rtt_write(struct node *n, struct sample *smps[], unsigned cnt); +int test_rtt_write(struct node *n, struct sample *smps[], int *cnt); /** @} */ diff --git a/include/villas/nodes/websocket.h b/include/villas/nodes/websocket.h index 6d6c15ad7..5160cd8b1 100644 --- a/include/villas/nodes/websocket.h +++ b/include/villas/nodes/websocket.h @@ -115,7 +115,7 @@ int websocket_destroy(struct node *n); int websocket_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ -int websocket_write(struct node *n, struct sample *smps[], unsigned cnt); +int websocket_write(struct node *n, struct sample *smps[], int *cnt); /** @} */ diff --git a/include/villas/nodes/zeromq.h b/include/villas/nodes/zeromq.h index 997388b45..72b7ac7a8 100644 --- a/include/villas/nodes/zeromq.h +++ b/include/villas/nodes/zeromq.h @@ -105,7 +105,7 @@ int zeromq_stop(struct node *n); int zeromq_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ -int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt); +int zeromq_write(struct node *n, struct sample *smps[], int *cnt); /** @} */ diff --git a/lib/node.c b/lib/node.c index 660a6ed34..6505d1d57 100644 --- a/lib/node.c +++ b/lib/node.c @@ -471,7 +471,7 @@ int node_read(struct node *n, struct sample *smps[], int *cnt) #endif /* WITH_HOOKS */ } -int node_write(struct node *n, struct sample *smps[], unsigned cnt) +int node_write(struct node *n, struct sample *smps[], int *cnt) { int sent, nsent = 0; @@ -480,15 +480,16 @@ int node_write(struct node *n, struct sample *smps[], unsigned cnt) #ifdef WITH_HOOKS /* Run write hooks */ - cnt = hook_write_list(&n->out.hooks, smps, cnt); - if (cnt <= 0) - return cnt; + *cnt = hook_write_list(&n->out.hooks, smps, *cnt); + if (*cnt <= 0) + return *cnt; #endif /* WITH_HOOKS */ /* Send in parts if vector not supported */ - if (n->_vt->vectorize > 0 && n->_vt->vectorize < cnt) { - while (cnt - nsent > 0) { - sent = n->_vt->write(n, &smps[nsent], MIN(cnt - nsent, n->_vt->vectorize)); + if (n->_vt->vectorize > 0 && n->_vt->vectorize < *cnt) { + int cnt_vec_min = MIN(*cnt - nsent, n->_vt->vectorize); + while (*cnt - nsent > 0) { + sent = n->_vt->write(n, &smps[nsent], &cnt_vec_min); if (sent < 0) return sent; diff --git a/lib/nodes/amqp.c b/lib/nodes/amqp.c index 13ae172ac..03f8c8107 100644 --- a/lib/nodes/amqp.c +++ b/lib/nodes/amqp.c @@ -319,14 +319,14 @@ int amqp_read(struct node *n, struct sample *smps[], int *cnt) return ret; } -int amqp_write(struct node *n, struct sample *smps[], unsigned cnt) +int amqp_write(struct node *n, struct sample *smps[], int *cnt) { int ret; struct amqp *a = n->_vd; char data[1500]; size_t wbytes; - ret = io_sprint(&a->io, data, sizeof(data), &wbytes, smps, cnt); + ret = io_sprint(&a->io, data, sizeof(data), &wbytes, smps, *cnt); if (ret <= 0) return -1; @@ -344,7 +344,7 @@ int amqp_write(struct node *n, struct sample *smps[], unsigned cnt) if (ret != AMQP_STATUS_OK) return -1; - return cnt; + return *cnt; } int amqp_fd(struct node *n) diff --git a/lib/nodes/cbuilder.c b/lib/nodes/cbuilder.c index acd58874b..ace12526a 100644 --- a/lib/nodes/cbuilder.c +++ b/lib/nodes/cbuilder.c @@ -127,7 +127,7 @@ int cbuilder_read(struct node *n, struct sample *smps[], int *cnt) return 1; } -int cbuilder_write(struct node *n, struct sample *smps[], unsigned cnt) +int cbuilder_write(struct node *n, struct sample *smps[], int *cnt) { struct cbuilder *cb = (struct cbuilder *) n->_vd; struct sample *smp = smps[0]; diff --git a/lib/nodes/file.c b/lib/nodes/file.c index a9dcbe67d..4fcec4b60 100644 --- a/lib/nodes/file.c +++ b/lib/nodes/file.c @@ -345,15 +345,15 @@ retry: ret = io_scan(&f->io, smps, *cnt); return *cnt; } -int file_write(struct node *n, struct sample *smps[], unsigned cnt) +int file_write(struct node *n, struct sample *smps[], int *cnt) { struct file *f = (struct file *) n->_vd; - assert(cnt == 1); + assert(*cnt == 1); - io_print(&f->io, smps, cnt); + io_print(&f->io, smps, *cnt); - return cnt; + return *cnt; } int file_fd(struct node *n) diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index 1549642aa..718c5508b 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -818,11 +818,11 @@ int ib_read(struct node *n, struct sample *smps[], int *cnt) return ret; } -int ib_write(struct node *n, struct sample *smps[], unsigned cnt) +int ib_write(struct node *n, struct sample *smps[], int *cnt) { struct infiniband *ib = (struct infiniband *) n->_vd; - struct ibv_send_wr wr[cnt], *bad_wr = NULL; - struct ibv_sge sge[cnt]; + struct ibv_send_wr wr[*cnt], *bad_wr = NULL; + struct ibv_sge sge[*cnt]; struct ibv_mr *mr; int ret; @@ -839,7 +839,7 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt) // Get Memory Region mr = memory_ib_get_mr(smps[0]); - for (int i = 0; i < cnt; i++) { + for (int i = 0; i < *cnt; i++) { // Increase refcnt of sample sample_get(smps[i]); @@ -853,7 +853,7 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt) wr[i].sg_list = &sge[i]; wr[i].num_sge = 1; - if (i == (cnt-1)) { + if (i == (*cnt-1)) { debug(LOG_IB | 10, "Prepared %i send Work Requests", (i+1)); wr[i].next = NULL; } @@ -877,7 +877,7 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt) debug(LOG_IB | 4, "Succesfully posted receive Work Requests"); } - return cnt; + return *cnt; } int ib_fd(struct node *n) diff --git a/lib/nodes/influxdb.c b/lib/nodes/influxdb.c index b6fae2062..505141ff0 100644 --- a/lib/nodes/influxdb.c +++ b/lib/nodes/influxdb.c @@ -114,14 +114,14 @@ int influxdb_close(struct node *n) return 0; } -int influxdb_write(struct node *n, struct sample *smps[], unsigned cnt) +int influxdb_write(struct node *n, struct sample *smps[], int *cnt) { struct influxdb *i = (struct influxdb *) n->_vd; char *buf = NULL; ssize_t sentlen, buflen; - for (int k = 0; k < cnt; k++) { + for (int k = 0; k < *cnt; k++) { /* Key */ strcatf(&buf, "%s", i->key); @@ -156,7 +156,7 @@ int influxdb_write(struct node *n, struct sample *smps[], unsigned cnt) free(buf); - return cnt; + return *cnt; } char * influxdb_print(struct node *n) diff --git a/lib/nodes/loopback.c b/lib/nodes/loopback.c index 2ab769d19..0c8c277a2 100644 --- a/lib/nodes/loopback.c +++ b/lib/nodes/loopback.c @@ -86,15 +86,15 @@ int loopback_read(struct node *n, struct sample *smps[], int *cnt) return avail; } -int loopback_write(struct node *n, struct sample *smps[], unsigned cnt) +int loopback_write(struct node *n, struct sample *smps[], int *cnt) { int copied; struct loopback *l = (struct loopback *) n->_vd; - struct sample *copies[cnt]; + struct sample *copies[*cnt]; - copied = sample_alloc_many(&l->pool, copies, cnt); - if (copied < cnt) + copied = sample_alloc_many(&l->pool, copies, *cnt); + if (copied < *cnt) warn("Pool underrun for node %s", node_name(n)); sample_copy_many(copies, smps, copied); diff --git a/lib/nodes/mqtt.c b/lib/nodes/mqtt.c index ee0230d31..f6c9e1fc1 100644 --- a/lib/nodes/mqtt.c +++ b/lib/nodes/mqtt.c @@ -372,7 +372,7 @@ int mqtt_read(struct node *n, struct sample *smps[], int *cnt) return pulled; } -int mqtt_write(struct node *n, struct sample *smps[], unsigned cnt) +int mqtt_write(struct node *n, struct sample *smps[], int *cnt) { int ret; struct mqtt *m = (struct mqtt *) n->_vd; @@ -381,7 +381,7 @@ int mqtt_write(struct node *n, struct sample *smps[], unsigned cnt) char data[1500]; - ret = io_sprint(&m->io, data, sizeof(data), &wbytes, smps, cnt); + ret = io_sprint(&m->io, data, sizeof(data), &wbytes, smps, *cnt); if (ret < 0) return ret; @@ -391,7 +391,7 @@ int mqtt_write(struct node *n, struct sample *smps[], unsigned cnt) return -abs(ret); } - return cnt; + return *cnt; } int mqtt_fd(struct node *n) diff --git a/lib/nodes/nanomsg.c b/lib/nodes/nanomsg.c index f6f397d40..15ddd7154 100644 --- a/lib/nodes/nanomsg.c +++ b/lib/nodes/nanomsg.c @@ -242,7 +242,7 @@ int nanomsg_read(struct node *n, struct sample *smps[], int *cnt) return io_sscan(&m->io, data, bytes, NULL, smps, *cnt); } -int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt) +int nanomsg_write(struct node *n, struct sample *smps[], int *cnt) { int ret; struct nanomsg *m = (struct nanomsg *) n->_vd; @@ -251,7 +251,7 @@ int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt) char data[NANOMSG_MAX_PACKET_LEN]; - ret = io_sprint(&m->io, data, sizeof(data), &wbytes, smps, cnt); + ret = io_sprint(&m->io, data, sizeof(data), &wbytes, smps, *cnt); if (ret <= 0) return -1; @@ -259,7 +259,7 @@ int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt) if (ret < 0) return ret; - return cnt; + return *cnt; } int nanomsg_fd(struct node *n) diff --git a/lib/nodes/ngsi.c b/lib/nodes/ngsi.c index 36a61cee6..5424f8ad0 100644 --- a/lib/nodes/ngsi.c +++ b/lib/nodes/ngsi.c @@ -558,18 +558,18 @@ out: json_decref(entity); return ret; } -int ngsi_write(struct node *n, struct sample *smps[], unsigned cnt) +int ngsi_write(struct node *n, struct sample *smps[], int *cnt) { struct ngsi *i = (struct ngsi *) n->_vd; int ret; - json_t *entity = ngsi_build_entity(i, smps, cnt, NGSI_ENTITY_VALUES); + json_t *entity = ngsi_build_entity(i, smps, *cnt, NGSI_ENTITY_VALUES); ret = ngsi_request_context_update(i->curl, i->endpoint, "UPDATE", entity); json_decref(entity); - return ret ? 0 : cnt; + return ret ? 0 : *cnt; } int ngsi_fd(struct node *n) diff --git a/lib/nodes/shmem.c b/lib/nodes/shmem.c index 0461ab7ab..8cba0de87 100644 --- a/lib/nodes/shmem.c +++ b/lib/nodes/shmem.c @@ -134,14 +134,14 @@ int shmem_read(struct node *n, struct sample *smps[], int *cnt) return recv; } -int shmem_write(struct node *n, struct sample *smps[], unsigned cnt) +int shmem_write(struct node *n, struct sample *smps[], int *cnt) { struct shmem *shm = (struct shmem *) n->_vd; - struct sample *shared_smps[cnt]; /* Samples need to be copied to the shared pool first */ + struct sample *shared_smps[*cnt]; /* Samples need to be copied to the shared pool first */ int avail, pushed, copied; - avail = sample_alloc_many(&shm->intf.write.shared->pool, shared_smps, cnt); - if (avail != cnt) + avail = sample_alloc_many(&shm->intf.write.shared->pool, shared_smps, *cnt); + if (avail != *cnt) warn("Pool underrun for shmem node %s", shm->out_name); copied = sample_copy_many(shared_smps, smps, avail); diff --git a/lib/nodes/socket.c b/lib/nodes/socket.c index 2b853a662..b5eccb4c0 100644 --- a/lib/nodes/socket.c +++ b/lib/nodes/socket.c @@ -401,7 +401,7 @@ out: free(buf); return ret; } -int socket_write(struct node *n, struct sample *smps[], unsigned cnt) +int socket_write(struct node *n, struct sample *smps[], int *cnt) { struct socket *s = (struct socket *) n->_vd; @@ -416,7 +416,7 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt) if (!buf) return -1; -retry: ret = io_sprint(&s->io, buf, buflen, &wbytes, smps, cnt); +retry: ret = io_sprint(&s->io, buf, buflen, &wbytes, smps, *cnt); if (ret < 0) goto out; diff --git a/lib/nodes/test_rtt.c b/lib/nodes/test_rtt.c index a48229ce1..8cd801a0d 100644 --- a/lib/nodes/test_rtt.c +++ b/lib/nodes/test_rtt.c @@ -348,7 +348,7 @@ int test_rtt_read(struct node *n, struct sample *smps[], int *cnt) return i; } -int test_rtt_write(struct node *n, struct sample *smps[], unsigned cnt) +int test_rtt_write(struct node *n, struct sample *smps[], int *cnt) { struct test_rtt *t = (struct test_rtt *) n->_vd; @@ -358,7 +358,7 @@ int test_rtt_write(struct node *n, struct sample *smps[], unsigned cnt) struct test_rtt_case *c = (struct test_rtt_case *) list_at(&t->cases, t->current); int i; - for (i = 0; i < cnt; i++) { + for (i = 0; i < *cnt; i++) { if (smps[i]->length != c->values) { warn("Discarding invalid sample due to mismatching length: expecting=%d, has=%d", c->values, smps[i]->length); continue; diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c index 29d996ce5..8eea85ba9 100644 --- a/lib/nodes/websocket.c +++ b/lib/nodes/websocket.c @@ -484,16 +484,16 @@ int websocket_read(struct node *n, struct sample *smps[], int *cnt) return avail; } -int websocket_write(struct node *n, struct sample *smps[], unsigned cnt) +int websocket_write(struct node *n, struct sample *smps[], int *cnt) { int avail; struct websocket *w = (struct websocket *) n->_vd; - struct sample *cpys[cnt]; + struct sample *cpys[*cnt]; /* Make copies of all samples */ - avail = sample_alloc_many(&w->pool, cpys, cnt); - if (avail < cnt) + avail = sample_alloc_many(&w->pool, cpys, *cnt); + if (avail < *cnt) warn("Pool underrun for node %s: avail=%u", node_name(n), avail); sample_copy_many(cpys, smps, avail); @@ -502,12 +502,12 @@ int websocket_write(struct node *n, struct sample *smps[], unsigned cnt) struct websocket_connection *c = (struct websocket_connection *) list_at(&connections, i); if (c->node == n) - websocket_connection_write(c, cpys, cnt); + websocket_connection_write(c, cpys, *cnt); } sample_put_many(cpys, avail); - return cnt; + return *cnt; } int websocket_parse(struct node *n, json_t *cfg) diff --git a/lib/nodes/zeromq.c b/lib/nodes/zeromq.c index db8176c46..a5abf5c92 100644 --- a/lib/nodes/zeromq.c +++ b/lib/nodes/zeromq.c @@ -455,7 +455,7 @@ int zeromq_read(struct node *n, struct sample *smps[], int *cnt) return recv; } -int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt) +int zeromq_write(struct node *n, struct sample *smps[], int *cnt) { int ret; struct zeromq *z = (struct zeromq *) n->_vd; @@ -465,7 +465,7 @@ int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt) char data[4096]; - ret = io_sprint(&z->io, data, sizeof(data), &wbytes, smps, cnt); + ret = io_sprint(&z->io, data, sizeof(data), &wbytes, smps, *cnt); if (ret <= 0) return -1; @@ -497,7 +497,7 @@ int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt) if (ret < 0) return ret; - return cnt; + return *cnt; fail: zmq_msg_close(&m); diff --git a/lib/path.c b/lib/path.c index 5fe35879f..870425f44 100644 --- a/lib/path.c +++ b/lib/path.c @@ -211,7 +211,7 @@ static void path_destination_write(struct path_destination *pd, struct path *p) debug(LOG_PATH | 15, "Dequeued %u samples from queue of node %s which is part of path %s", available, node_name(pd->node), path_name(p)); - sent = node_write(pd->node, smps, available); + sent = node_write(pd->node, smps, &available); if (sent < 0) error("Failed to sent %u samples to node %s", cnt, node_name(pd->node)); else if (sent < available) diff --git a/src/villas-pipe.cpp b/src/villas-pipe.cpp index 82669d01d..517dcad95 100644 --- a/src/villas-pipe.cpp +++ b/src/villas-pipe.cpp @@ -161,7 +161,7 @@ static void * send_loop(void *ctx) smps[i]->sequence = last_sequenceno++; } - sent = node_write(node, smps, scanned); + sent = node_write(node, smps, &scanned); if (sent < 0) warn("Failed to sent samples to node %s: reason=%d", node_name(node), sent); else if (sent < scanned) diff --git a/src/villas-test-rtt.cpp b/src/villas-test-rtt.cpp index 513bafef3..0f1dfa81f 100644 --- a/src/villas-test-rtt.cpp +++ b/src/villas-test-rtt.cpp @@ -167,7 +167,7 @@ void test_rtt() { clock_gettime(CLOCK_ID, &send); int one = 1; - node_write(node, &smp_send, one); /* Ping */ + node_write(node, &smp_send, &one); /* Ping */ node_read(node, &smp_recv, &one); /* Pong */ clock_gettime(CLOCK_ID, &recv); From ebb54463053f4a5301b281b62abf0a9042834e3e Mon Sep 17 00:00:00 2001 From: Dennis Potter Date: Sun, 8 Jul 2018 14:05:48 +0200 Subject: [PATCH 3/6] Refactored the way ib_read() handles the refence counts for the samples it uses. This is based on the algorithm described in issue #153 --- lib/nodes/infiniband.c | 46 ++++++++++++++++++++++++++---------------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index 718c5508b..af93cbaf1 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -89,7 +89,7 @@ void ib_completion_target(struct node* n, struct ibv_wc* wc, int* size){} void ib_completion_source(struct node* n, struct ibv_wc* wc, int* size) { for (int i = 0; i < *size; i++) { - if (wc[i].status != IBV_WC_SUCCESS) + if (wc[i].status != IBV_WC_SUCCESS && wc[i].status != IBV_WC_WR_FLUSH_ERR) warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i", node_name(n), wc[i].status); @@ -738,6 +738,9 @@ int ib_read(struct node *n, struct sample *smps[], int *cnt) struct ibv_sge sge[*cnt]; struct ibv_mr *mr; int ret = 0; + int i = 0; //Used for first loop: post receive Work Requests + int j = 0; //Used for second loop: get values from Completion Queue + int k = 0; //Used for third loop: reorder list debug(LOG_IB | 15, "ib_read is called"); @@ -747,10 +750,7 @@ int ib_read(struct node *n, struct sample *smps[], int *cnt) // Get Memory Region mr = memory_ib_get_mr(smps[0]); - for (int i = 0; i < *cnt; i++) { - // Increase refcnt of sample - sample_get(smps[i]); - + for (i = 0; i < *cnt; i++) { // Prepare receive Scatter/Gather element sge[i].addr = (uint64_t) &smps[i]->data; sge[i].length = SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN); @@ -779,7 +779,6 @@ int ib_read(struct node *n, struct sample *smps[], int *cnt) node_name(n), ret, bad_wr->wr_id); debug(LOG_IB | 10, "Succesfully posted receive Work Requests"); - } // Poll Completion Queue @@ -790,31 +789,44 @@ int ib_read(struct node *n, struct sample *smps[], int *cnt) ib->conn.available_recv_wrs -= ret; - for (int i = 0; i < ret; i++) { - if (wc[i].status == IBV_WC_WR_FLUSH_ERR) { + for (j = 0; j < ret; j++) { + if (wc[j].status == IBV_WC_WR_FLUSH_ERR) { debug(LOG_IB | 5, "Received IBV_WC_WR_FLUSH_ERR (ib_read). Ignore it."); ret = 0; } - else if (wc[i].status != IBV_WC_SUCCESS) { + else if (wc[j].status != IBV_WC_SUCCESS) { warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i", - node_name(n), wc[i].status); + node_name(n), wc[j].status); ret = 0; } - else if (wc[i].opcode & IBV_WC_RECV) { - smps[i] = (struct sample*)(wc[i].wr_id); - smps[i]->length = wc[i].byte_len/sizeof(double); + else if (wc[j].opcode & IBV_WC_RECV) { + smps[j] = (struct sample*)(wc[j].wr_id); + smps[j]->length = wc[i].byte_len/sizeof(double); } else ret = 0; - //Release sample - sample_put((struct sample *) (wc[i].wr_id)); - debug(LOG_IB | 10, "Releasing sample %p", (struct sample *) (wc[i].wr_id)); } } - } + // Reorder list + // The first part of the list is fine. It's filled with received values. Now append + // unused values. + // * j ==> Values from completion queue + // * i ==> Values posted to receive queue + // * (*cnt) ==> Available values to post to receive queue + // + // Thus: + // * (*cnt - i) ==> number of unused values + int l; + for (k = j, l = 0; k < j + (*cnt - i); k++, l++) { + smps[k] = smps[i + l]; + } + + + *cnt = k; + } return ret; } From 746fd2f694588c979029cc64c342ecfad990e3c4 Mon Sep 17 00:00:00 2001 From: Dennis Potter Date: Sun, 8 Jul 2018 15:00:47 +0200 Subject: [PATCH 4/6] Refactored ib_write in the same way as ib_read (described in #153). Merged separate completion queue polls to ib_write. Closes #167 --- include/villas/nodes/infiniband.h | 15 ---- lib/nodes/infiniband.c | 128 ++++++++---------------------- 2 files changed, 32 insertions(+), 111 deletions(-) diff --git a/include/villas/nodes/infiniband.h b/include/villas/nodes/infiniband.h index c60baab38..e6f55d197 100644 --- a/include/villas/nodes/infiniband.h +++ b/include/villas/nodes/infiniband.h @@ -34,12 +34,6 @@ #include #include #include - -/* Function pointer typedefs */ -typedef void (*ib_on_completion) (struct node*, struct ibv_wc*, int*); -typedef void * (*ib_poll_function) (void*); -typedef void * (*ib_event_function) (void*); - /* Enums */ enum poll_mode_e { EVENT, @@ -67,15 +61,6 @@ struct infiniband { /* Work Completion related */ struct poll_s { enum poll_mode_e poll_mode; - - /* On completion function */ - ib_on_completion on_compl; - - /* Busy poll or Event function */ - ib_poll_function poll_func; - - /* Poll thread */ - pthread_t cq_poller_thread; } poll; int stopThreads; diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index af93cbaf1..b4b7bcc7d 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -57,7 +57,7 @@ int ib_disconnect(struct node *n) // Set available receive work requests to zero ib->conn.available_recv_wrs = 0; - return 0; + return ib->stopThreads; } int ib_post_recv_wrs(struct node *n) @@ -84,66 +84,13 @@ int ib_post_recv_wrs(struct node *n) return ret; } -void ib_completion_target(struct node* n, struct ibv_wc* wc, int* size){} - -void ib_completion_source(struct node* n, struct ibv_wc* wc, int* size) -{ - for (int i = 0; i < *size; i++) { - if (wc[i].status != IBV_WC_SUCCESS && wc[i].status != IBV_WC_WR_FLUSH_ERR) - warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i", - node_name(n), wc[i].status); - - sample_put((struct sample *) wc[i].wr_id); - } -} - -void * ib_event_thread(void *n) -{ - struct infiniband *ib = (struct infiniband *) ((struct node *) n)->_vd; - struct ibv_wc wc[ib->cq_size]; - int size; - - debug(LOG_IB | 1, "Initialized event based poll thread of node %s", node_name(n)); - - while (1) { - // Function blocks, until an event occurs - ibv_get_cq_event(ib->ctx.comp_channel, &ib->ctx.send_cq, NULL); - - // Poll as long as WCs are available - while ((size = ibv_poll_cq(ib->ctx.send_cq, ib->cq_size, wc))) - ib->poll.on_compl(n, wc, &size); - - // Request a new event in the CQ and acknowledge event - ibv_req_notify_cq(ib->ctx.send_cq, 0); - ibv_ack_cq_events(ib->ctx.send_cq, 1); - } -} - -void * ib_busy_poll_thread(void *n) -{ - struct infiniband *ib = (struct infiniband *) ((struct node *) n)->_vd; - struct ibv_wc wc[ib->cq_size]; - int size; - - debug(LOG_IB | 1, "Initialized busy poll thread of node %s", node_name(n)); - - while (1) { - // Poll as long as WCs are available - while ((size = ibv_poll_cq(ib->ctx.send_cq, ib->cq_size, wc))) - ib->poll.on_compl(n, wc, &size); - - if (ib->stopThreads) - return NULL; - } -} - static void ib_init_wc_poll(struct node *n) { int ret; struct infiniband *ib = (struct infiniband *) n->_vd; ib->ctx.comp_channel = NULL; - debug(LOG_IB | 1, "Starting to initialize completion queues and threads"); + debug(LOG_IB | 1, "Starting to initialize completion queues"); if (ib->poll.poll_mode == EVENT) { // Create completion channel @@ -176,14 +123,6 @@ static void ib_init_wc_poll(struct node *n) debug(LOG_IB | 3, "Called ibv_req_notificy_cq on send Completion Queue"); } - - // Initialize polling pthread for source - if (ib->is_source) { - ret = pthread_create(&ib->poll.cq_poller_thread, NULL, ib->poll.poll_func, n); - if (ret) - error("Failed to create poll thread of node %s: %s", - node_name(n), gai_strerror(ret)); - } } static void ib_build_ibv(struct node *n) @@ -385,14 +324,10 @@ int ib_parse(struct node *n, json_t *cfg) debug(LOG_IB | 4, "Set timeout to %i in node %s", timeout, node_name(n)); // Translate poll mode - if (strcmp(poll_mode, "EVENT") == 0) { + if (strcmp(poll_mode, "EVENT") == 0) ib->poll.poll_mode = EVENT; - ib->poll.poll_func = ib_event_thread; - } - else if (strcmp(poll_mode, "BUSY") == 0) { + else if (strcmp(poll_mode, "BUSY") == 0) ib->poll.poll_mode = BUSY; - ib->poll.poll_func = ib_busy_poll_thread; - } else error("Failed to translate poll_mode in node %s. %s is not a valid \ poll mode!", node_name(n), poll_mode); @@ -444,17 +379,11 @@ int ib_parse(struct node *n, json_t *cfg) remote, node_name(n), gai_strerror(ret)); debug(LOG_IB | 4, "Translated %s:%s to a struct addrinfo", ip_adr, port); - - // Set correct Work Completion function - ib->poll.on_compl = ib_completion_source; } else { debug(LOG_IB | 3, "Node %s is set up as target", node_name(n)); ib->is_source = 0; - - // Set correct Work Completion function - ib->poll.on_compl = ib_completion_target; } return 0; @@ -508,7 +437,6 @@ void * ib_rdma_cm_event_thread(void *n) struct rdma_cm_event *event; int ret = 0; - debug(LOG_IB | 1, "Started rdma_cm_event thread of node %s", node_name(node)); // Wait until node is completely started @@ -586,7 +514,7 @@ void * ib_rdma_cm_event_thread(void *n) rdma_ack_cm_event(event); - if (ret || ib->stopThreads) + if (ret) break; } @@ -695,14 +623,6 @@ int ib_stop(struct node *n) debug(LOG_IB | 3, "Joined rdma_cm_event_thread"); - // Wait for polling thread to join - if (ib->is_source) { - ret = pthread_join(ib->poll.cq_poller_thread, NULL); - if (ret) - error("Error while joining cq_poller_thread in node %s: %i", node_name(n), ret); - } - - // Destroy RDMA CM ID rdma_destroy_id(ib->ctx.id); debug(LOG_IB | 3, "Destroyed rdma_cm_id"); @@ -768,6 +688,8 @@ int ib_read(struct node *n, struct sample *smps[], int *cnt) debug(LOG_IB | 10, "Prepared %i new receive Work Requests", (i+1)); wr[i].next = NULL; + i++; + break; } } @@ -801,8 +723,8 @@ int ib_read(struct node *n, struct sample *smps[], int *cnt) ret = 0; } else if (wc[j].opcode & IBV_WC_RECV) { - smps[j] = (struct sample*)(wc[j].wr_id); - smps[j]->length = wc[i].byte_len/sizeof(double); + smps[j] = (struct sample *) (wc[j].wr_id); + smps[j]->length = wc[i].byte_len / sizeof(double); } else ret = 0; @@ -835,14 +757,16 @@ int ib_write(struct node *n, struct sample *smps[], int *cnt) struct infiniband *ib = (struct infiniband *) n->_vd; struct ibv_send_wr wr[*cnt], *bad_wr = NULL; struct ibv_sge sge[*cnt]; + struct ibv_wc wc[ib->cq_size]; struct ibv_mr *mr; int ret; + int i = 0; //Used for first loop: prepare work requests to post to send queue + int j = 0; //Used for second loop: get values from Completion Queue debug(LOG_IB | 10, "ib_write is called"); if (n->state == STATE_CONNECTED) { - memset(&wr, 0, sizeof(wr)); - + // First, write //ToDo: Place this into configuration and create checks if settings are valid int send_inline = 1; @@ -851,17 +775,14 @@ int ib_write(struct node *n, struct sample *smps[], int *cnt) // Get Memory Region mr = memory_ib_get_mr(smps[0]); - for (int i = 0; i < *cnt; i++) { - // Increase refcnt of sample - sample_get(smps[i]); - + for (i = 0; i < *cnt; i++) { //Set Scatter/Gather element to data of sample - sge[i].addr = (uint64_t)&smps[i]->data; + sge[i].addr = (uint64_t) &smps[i]->data; sge[i].length = smps[i]->length*sizeof(double); sge[i].lkey = mr->lkey; // Set Send Work Request - wr[i].wr_id = (uintptr_t)smps[i]; //This way the sample can be release in WC + wr[i].wr_id = (uintptr_t) smps[i]; //This way the sample can be release in WC wr[i].sg_list = &sge[i]; wr[i].num_sge = 1; @@ -887,9 +808,24 @@ int ib_write(struct node *n, struct sample *smps[], int *cnt) } debug(LOG_IB | 4, "Succesfully posted receive Work Requests"); + + + // Subsequently, check if something is available in completion queue + ret = ibv_poll_cq(ib->ctx.send_cq, ib->cq_size, wc); + + for (j = 0; j < ret; j++) { + if (wc[j].status != IBV_WC_SUCCESS && wc[j].status != IBV_WC_WR_FLUSH_ERR) + warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i", + node_name(n), wc[j].status); + + + smps[j] = (struct sample *) (wc[j].wr_id); + } + + *cnt = j; } - return *cnt; + return i; } int ib_fd(struct node *n) From 29ff75fad33b17f7629032e34536a97a56d686c5 Mon Sep 17 00:00:00 2001 From: Dennis Potter Date: Sun, 8 Jul 2018 15:32:28 +0200 Subject: [PATCH 5/6] Removed some obsolete code for completion queues --- lib/nodes/infiniband.c | 58 ++++++++++-------------------------------- 1 file changed, 13 insertions(+), 45 deletions(-) diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index b4b7bcc7d..62a028076 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -84,47 +84,6 @@ int ib_post_recv_wrs(struct node *n) return ret; } -static void ib_init_wc_poll(struct node *n) -{ - int ret; - struct infiniband *ib = (struct infiniband *) n->_vd; - ib->ctx.comp_channel = NULL; - - debug(LOG_IB | 1, "Starting to initialize completion queues"); - - if (ib->poll.poll_mode == EVENT) { - // Create completion channel - ib->ctx.comp_channel = ibv_create_comp_channel(ib->ctx.id->verbs); - if (!ib->ctx.comp_channel) - error("Could not create completion channel in node %s", node_name(n)); - - debug(LOG_IB | 3, "Created Completion channel"); - } - - // Create completion queues and bind to channel (or NULL) - ib->ctx.recv_cq = ibv_create_cq(ib->ctx.id->verbs, ib->cq_size, NULL, NULL, 0); - if (!ib->ctx.recv_cq) - error("Could not create receive completion queue in node %s", node_name(n)); - - debug(LOG_IB | 3, "Created receive Completion Queue"); - - ib->ctx.send_cq = ibv_create_cq(ib->ctx.id->verbs, ib->cq_size, NULL, ib->ctx.comp_channel, 0); - if (!ib->ctx.send_cq) - error("Could not create send completion queue in node %s", node_name(n)); - - debug(LOG_IB | 3, "Created send Completion Queue"); - - if (ib->poll.poll_mode == EVENT) { - // Request notifications from completion queue - ret = ibv_req_notify_cq(ib->ctx.send_cq, 0); - if (ret) - error("Failed to request notifiy CQ in node %s: %s", - node_name(n), gai_strerror(ret)); - - debug(LOG_IB | 3, "Called ibv_req_notificy_cq on send Completion Queue"); - } -} - static void ib_build_ibv(struct node *n) { struct infiniband *ib = (struct infiniband *) n->_vd; @@ -132,8 +91,18 @@ static void ib_build_ibv(struct node *n) debug(LOG_IB | 1, "Starting to build IBV components"); - // Initiate poll mode - ib_init_wc_poll(n); + // Create completion queues. No completion channel!) + ib->ctx.recv_cq = ibv_create_cq(ib->ctx.id->verbs, ib->cq_size, NULL, NULL, 0); + if (!ib->ctx.recv_cq) + error("Could not create receive completion queue in node %s", node_name(n)); + + debug(LOG_IB | 3, "Created receive Completion Queue"); + + ib->ctx.send_cq = ibv_create_cq(ib->ctx.id->verbs, ib->cq_size, NULL, NULL, 0); + if (!ib->ctx.send_cq) + error("Could not create send completion queue in node %s", node_name(n)); + + debug(LOG_IB | 3, "Created send Completion Queue"); // Prepare remaining Queue Pair (QP) attributes ib->qp_init.send_cq = ib->ctx.send_cq; @@ -604,11 +573,10 @@ int ib_stop(struct node *n) // Call RDMA disconnect function // Will flush all outstanding WRs to the Completion Queue and // will call RDMA_CM_EVENT_DISCONNECTED if that is done. - if(! ib->is_source && n->state == STATE_CONNECTED) + if (n->state == STATE_CONNECTED) ret = rdma_disconnect(ib->ctx.id); else ret = rdma_disconnect(ib->ctx.listen_id); - if (ret) error("Error while calling rdma_disconnect in node %s: %s", node_name(n), gai_strerror(ret)); From 72e627b327236782839fcabded1744e3100bb945 Mon Sep 17 00:00:00 2001 From: Dennis Potter Date: Wed, 11 Jul 2018 18:14:29 +0200 Subject: [PATCH 6/6] Fixes #166, all node interfaces are modified The functions now look like this int node_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); int node_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); This commit enables nodes to control how many samples will be released by the framework through *release --- include/villas/node.h | 4 +-- include/villas/node_type.h | 4 +-- include/villas/nodes/amqp.h | 4 +-- include/villas/nodes/file.h | 4 +-- include/villas/nodes/infiniband.h | 4 +-- include/villas/nodes/influxdb.h | 2 +- include/villas/nodes/loopback.h | 4 +-- include/villas/nodes/mqtt.h | 4 +-- include/villas/nodes/nanomsg.h | 4 +-- include/villas/nodes/ngsi.h | 4 +-- include/villas/nodes/shmem.h | 4 +-- include/villas/nodes/signal_generator.h | 2 +- include/villas/nodes/socket.h | 4 +-- include/villas/nodes/stats.h | 2 +- include/villas/nodes/test_rtt.h | 4 +-- include/villas/nodes/websocket.h | 4 +-- include/villas/nodes/zeromq.h | 4 +-- lib/node.c | 28 ++++++++--------- lib/nodes/amqp.c | 10 +++---- lib/nodes/cbuilder.c | 4 +-- lib/nodes/file.c | 18 +++++------ lib/nodes/infiniband.c | 33 ++++++++++---------- lib/nodes/influxdb.c | 6 ++-- lib/nodes/loopback.c | 14 ++++----- lib/nodes/mqtt.c | 12 ++++---- lib/nodes/nanomsg.c | 10 +++---- lib/nodes/ngsi.c | 12 ++++---- lib/nodes/shmem.c | 14 ++++----- lib/nodes/signal_generator.c | 4 +-- lib/nodes/socket.c | 8 ++--- lib/nodes/stats.c | 4 +-- lib/nodes/test_rtt.c | 8 ++--- lib/nodes/websocket.c | 18 +++++------ lib/nodes/zeromq.c | 10 +++---- lib/path.c | 40 ++++++++++++++----------- src/villas-pipe.cpp | 31 +++++++++++-------- src/villas-signal.cpp | 7 +++-- src/villas-test-rtt.cpp | 10 +++++-- 38 files changed, 188 insertions(+), 175 deletions(-) diff --git a/include/villas/node.h b/include/villas/node.h index a6f96f939..6f5503c92 100644 --- a/include/villas/node.h +++ b/include/villas/node.h @@ -154,9 +154,9 @@ char * node_name_long(struct node *n); */ int node_reverse(struct node *n); -int node_read(struct node *n, struct sample *smps[], int *cnt); +int node_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); -int node_write(struct node *n, struct sample *smps[], int *cnt); +int node_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); int node_fd(struct node *n); diff --git a/include/villas/node_type.h b/include/villas/node_type.h index 8b5bb0e3b..dee935d32 100644 --- a/include/villas/node_type.h +++ b/include/villas/node_type.h @@ -136,7 +136,7 @@ struct node_type { * @param cnt The number of messages which should be received. * @return The number of messages actually received. */ - int (*read)(struct node *n, struct sample *smps[], int *cnt); + int (*read)(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); /** Send multiple messages in a single datagram / packet. * @@ -150,7 +150,7 @@ struct node_type { * @param cnt The number of messages which should be sent. * @return The number of messages actually sent. */ - int (*write)(struct node *n, struct sample *smps[], int *cnt); + int (*write)(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); /** Reverse source and destination of a node. * diff --git a/include/villas/nodes/amqp.h b/include/villas/nodes/amqp.h index 7438ab1a7..6c46a70d6 100644 --- a/include/villas/nodes/amqp.h +++ b/include/villas/nodes/amqp.h @@ -80,10 +80,10 @@ int amqp_start(struct node *n); int amqp_stop(struct node *n); /** @see node_type::read */ -int amqp_read(struct node *n, struct sample *smps[], int *cnt); +int amqp_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); /** @see node_type::write */ -int amqp_write(struct node *n, struct sample *smps[], int *cnt); +int amqp_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); #ifdef __cplusplus } diff --git a/include/villas/nodes/file.h b/include/villas/nodes/file.h index c333e4a0b..d8e24e5e1 100644 --- a/include/villas/nodes/file.h +++ b/include/villas/nodes/file.h @@ -83,10 +83,10 @@ int file_start(struct node *n); int file_stop(struct node *n); /** @see node_type::read */ -int file_read(struct node *n, struct sample *smps[], int *cnt); +int file_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); /** @see node_type::write */ -int file_write(struct node *n, struct sample *smps[], int *cnt); +int file_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); /** @} */ diff --git a/include/villas/nodes/infiniband.h b/include/villas/nodes/infiniband.h index e6f55d197..67289a4dc 100644 --- a/include/villas/nodes/infiniband.h +++ b/include/villas/nodes/infiniband.h @@ -121,9 +121,9 @@ int ib_init(struct super_node *n); int ib_deinit(); /** @see node_type::read */ -int ib_read(struct node *n, struct sample *smps[], int *cnt); +int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); /** @see node_type::write */ -int infiniband_write(struct node *n, struct sample *smps[], int *cnt); +int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); /** @} */ diff --git a/include/villas/nodes/influxdb.h b/include/villas/nodes/influxdb.h index 3d4560923..2d9167961 100644 --- a/include/villas/nodes/influxdb.h +++ b/include/villas/nodes/influxdb.h @@ -65,7 +65,7 @@ int influxdb_open(struct node *n); int influxdb_close(struct node *n); /** @see node_type::write */ -int influxdb_write(struct node *n, struct sample *smps[], int *cnt); +int influxdb_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); #ifdef __cplusplus } diff --git a/include/villas/nodes/loopback.h b/include/villas/nodes/loopback.h index be442af3c..c80939440 100644 --- a/include/villas/nodes/loopback.h +++ b/include/villas/nodes/loopback.h @@ -63,10 +63,10 @@ int loopback_open(struct node *n); int loopback_close(struct node *n); /** @see node_type::read */ -int loopback_read(struct node *n, struct sample *smps[], int *cnt); +int loopback_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); /** @see node_type::write */ -int loopback_write(struct node *n, struct sample *smps[], int *cnt); +int loopback_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); #ifdef __cplusplus } diff --git a/include/villas/nodes/mqtt.h b/include/villas/nodes/mqtt.h index d4a8a703c..fb4b96563 100644 --- a/include/villas/nodes/mqtt.h +++ b/include/villas/nodes/mqtt.h @@ -95,10 +95,10 @@ int mqtt_init(); int mqtt_deinit(); /** @see node_type::read */ -int mqtt_read(struct node *n, struct sample *smps[], int *cnt); +int mqtt_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); /** @see node_type::write */ -int mqtt_write(struct node *n, struct sample *smps[], int *cnt); +int mqtt_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); #ifdef __cplusplus } diff --git a/include/villas/nodes/nanomsg.h b/include/villas/nodes/nanomsg.h index bfcd1b277..91d2496b2 100644 --- a/include/villas/nodes/nanomsg.h +++ b/include/villas/nodes/nanomsg.h @@ -71,10 +71,10 @@ int nanomsg_start(struct node *n); int nanomsg_stop(struct node *n); /** @see node_type::read */ -int nanomsg_read(struct node *n, struct sample *smps[], int *cnt); +int nanomsg_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); /** @see node_type::write */ -int nanomsg_write(struct node *n, struct sample *smps[], int *cnt); +int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); #ifdef __cplusplus } diff --git a/include/villas/nodes/ngsi.h b/include/villas/nodes/ngsi.h index 107fd5520..b417317ca 100644 --- a/include/villas/nodes/ngsi.h +++ b/include/villas/nodes/ngsi.h @@ -93,10 +93,10 @@ int ngsi_start(struct node *n); int ngsi_stop(struct node *n); /** @see node_type::read */ -int ngsi_read(struct node *n, struct sample *smps[], int *cnt); +int ngsi_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); /** @see node_type::write */ -int ngsi_write(struct node *n, struct sample *smps[], int *cnt); +int ngsi_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); /** @} */ diff --git a/include/villas/nodes/shmem.h b/include/villas/nodes/shmem.h index 29937cef0..b2091af9c 100644 --- a/include/villas/nodes/shmem.h +++ b/include/villas/nodes/shmem.h @@ -64,10 +64,10 @@ int shmem_start(struct node *n); int shmem_stop(struct node *n); /** @see node_type::read */ -int shmem_read(struct node *n, struct sample *smps[], int *cnt); +int shmem_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); /** @see node_type::write */ -int shmem_write(struct node *n, struct sample *smps[], int *cnt); +int shmem_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); /** @} */ diff --git a/include/villas/nodes/signal_generator.h b/include/villas/nodes/signal_generator.h index a43156af4..ae227fdc0 100644 --- a/include/villas/nodes/signal_generator.h +++ b/include/villas/nodes/signal_generator.h @@ -89,7 +89,7 @@ int signal_generator_start(struct node *n); int signal_generator_stop(struct node *n); /** @see node_type::read */ -int signal_generator_read(struct node *n, struct sample *smps[], int *cnt); +int signal_generator_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); enum signal_generator_type signal_generator_lookup_type(const char *type); diff --git a/include/villas/nodes/socket.h b/include/villas/nodes/socket.h index 3059e3914..5db551339 100644 --- a/include/villas/nodes/socket.h +++ b/include/villas/nodes/socket.h @@ -119,10 +119,10 @@ int socket_start(struct node *n); int socket_stop(struct node *n); /** @see node_type::write */ -int socket_write(struct node *n, struct sample *smps[], int *cnt); +int socket_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); /** @see node_type::read */ -int socket_read(struct node *n, struct sample *smps[], int *cnt); +int socket_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); /** @see node_type::parse */ int socket_parse(struct node *n, json_t *cfg); diff --git a/include/villas/nodes/stats.h b/include/villas/nodes/stats.h index eea8ca746..cc50d9148 100644 --- a/include/villas/nodes/stats.h +++ b/include/villas/nodes/stats.h @@ -67,7 +67,7 @@ int stats_node_start(struct node *n); int stats_node_stop(struct node *n); /** @see node_type::read */ -int stats_node_read(struct node *n, struct sample *smps[], int *cnt); +int stats_node_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); #ifdef __cplusplus } diff --git a/include/villas/nodes/test_rtt.h b/include/villas/nodes/test_rtt.h index 816327c1b..6f0375232 100644 --- a/include/villas/nodes/test_rtt.h +++ b/include/villas/nodes/test_rtt.h @@ -79,10 +79,10 @@ int test_rtt_start(struct node *n); int test_rtt_stop(struct node *n); /** @see node_type::read */ -int test_rtt_read(struct node *n, struct sample *smps[], int *cnt); +int test_rtt_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); /** @see node_type::write */ -int test_rtt_write(struct node *n, struct sample *smps[], int *cnt); +int test_rtt_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); /** @} */ diff --git a/include/villas/nodes/websocket.h b/include/villas/nodes/websocket.h index 5160cd8b1..232bcc04d 100644 --- a/include/villas/nodes/websocket.h +++ b/include/villas/nodes/websocket.h @@ -112,10 +112,10 @@ int websocket_stop(struct node *n); int websocket_destroy(struct node *n); /** @see node_type::read */ -int websocket_read(struct node *n, struct sample *smps[], int *cnt); +int websocket_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); /** @see node_type::write */ -int websocket_write(struct node *n, struct sample *smps[], int *cnt); +int websocket_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); /** @} */ diff --git a/include/villas/nodes/zeromq.h b/include/villas/nodes/zeromq.h index 72b7ac7a8..c5d62c4b2 100644 --- a/include/villas/nodes/zeromq.h +++ b/include/villas/nodes/zeromq.h @@ -102,10 +102,10 @@ int zeromq_start(struct node *n); int zeromq_stop(struct node *n); /** @see node_type::read */ -int zeromq_read(struct node *n, struct sample *smps[], int *cnt); +int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); /** @see node_type::write */ -int zeromq_write(struct node *n, struct sample *smps[], int *cnt); +int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); /** @} */ diff --git a/lib/node.c b/lib/node.c index 6505d1d57..bcad35239 100644 --- a/lib/node.c +++ b/lib/node.c @@ -408,7 +408,7 @@ int node_destroy(struct node *n) return 0; } -int node_read(struct node *n, struct sample *smps[], int *cnt) +int node_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { int readd, nread = 0; @@ -416,10 +416,9 @@ int node_read(struct node *n, struct sample *smps[], int *cnt) return -1; /* Send in parts if vector not supported */ - if (n->_vt->vectorize > 0 && n->_vt->vectorize < *cnt) { - int cnt_vec_min = MIN(*cnt - nread, n->_vt->vectorize); - while (*cnt - nread > 0) { - readd = n->_vt->read(n, &smps[nread], &cnt_vec_min); + if (n->_vt->vectorize > 0 && n->_vt->vectorize < cnt) { + while (cnt - nread > 0) { + readd = n->_vt->read(n, &smps[nread], MIN(cnt - nread, n->_vt->vectorize), release); if (readd < 0) return readd; @@ -427,7 +426,7 @@ int node_read(struct node *n, struct sample *smps[], int *cnt) } } else { - nread = n->_vt->read(n, smps, cnt); + nread = n->_vt->read(n, smps, cnt, release); if (nread < 0) return nread; } @@ -471,7 +470,7 @@ int node_read(struct node *n, struct sample *smps[], int *cnt) #endif /* WITH_HOOKS */ } -int node_write(struct node *n, struct sample *smps[], int *cnt) +int node_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { int sent, nsent = 0; @@ -480,16 +479,15 @@ int node_write(struct node *n, struct sample *smps[], int *cnt) #ifdef WITH_HOOKS /* Run write hooks */ - *cnt = hook_write_list(&n->out.hooks, smps, *cnt); - if (*cnt <= 0) - return *cnt; + cnt = hook_write_list(&n->out.hooks, smps, cnt); + if (cnt <= 0) + return cnt; #endif /* WITH_HOOKS */ /* Send in parts if vector not supported */ - if (n->_vt->vectorize > 0 && n->_vt->vectorize < *cnt) { - int cnt_vec_min = MIN(*cnt - nsent, n->_vt->vectorize); - while (*cnt - nsent > 0) { - sent = n->_vt->write(n, &smps[nsent], &cnt_vec_min); + if (n->_vt->vectorize > 0 && n->_vt->vectorize < cnt) { + while (cnt - nsent > 0) { + sent = n->_vt->write(n, &smps[nsent], MIN(cnt - nsent, n->_vt->vectorize), release); if (sent < 0) return sent; @@ -498,7 +496,7 @@ int node_write(struct node *n, struct sample *smps[], int *cnt) } } else { - nsent = n->_vt->write(n, smps, cnt); + nsent = n->_vt->write(n, smps, cnt, release); if (nsent < 0) return nsent; diff --git a/lib/nodes/amqp.c b/lib/nodes/amqp.c index 03f8c8107..cd512ca50 100644 --- a/lib/nodes/amqp.c +++ b/lib/nodes/amqp.c @@ -301,7 +301,7 @@ int amqp_stop(struct node *n) return 0; } -int amqp_read(struct node *n, struct sample *smps[], int *cnt) +int amqp_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { int ret; struct amqp *a = n->_vd; @@ -312,21 +312,21 @@ int amqp_read(struct node *n, struct sample *smps[], int *cnt) if (rep.reply_type != AMQP_RESPONSE_NORMAL) return -1; - ret = io_sscan(&a->io, env.message.body.bytes, env.message.body.len, NULL, smps, *cnt); + ret = io_sscan(&a->io, env.message.body.bytes, env.message.body.len, NULL, smps, cnt); amqp_destroy_envelope(&env); return ret; } -int amqp_write(struct node *n, struct sample *smps[], int *cnt) +int amqp_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { int ret; struct amqp *a = n->_vd; char data[1500]; size_t wbytes; - ret = io_sprint(&a->io, data, sizeof(data), &wbytes, smps, *cnt); + ret = io_sprint(&a->io, data, sizeof(data), &wbytes, smps, cnt); if (ret <= 0) return -1; @@ -344,7 +344,7 @@ int amqp_write(struct node *n, struct sample *smps[], int *cnt) if (ret != AMQP_STATUS_OK) return -1; - return *cnt; + return cnt; } int amqp_fd(struct node *n) diff --git a/lib/nodes/cbuilder.c b/lib/nodes/cbuilder.c index ace12526a..a2fb61dfe 100644 --- a/lib/nodes/cbuilder.c +++ b/lib/nodes/cbuilder.c @@ -94,7 +94,7 @@ int cbuilder_stop(struct node *n) return 0; } -int cbuilder_read(struct node *n, struct sample *smps[], int *cnt) +int cbuilder_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { struct cbuilder *cb = (struct cbuilder *) n->_vd; struct sample *smp = smps[0]; @@ -127,7 +127,7 @@ int cbuilder_read(struct node *n, struct sample *smps[], int *cnt) return 1; } -int cbuilder_write(struct node *n, struct sample *smps[], int *cnt) +int cbuilder_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { struct cbuilder *cb = (struct cbuilder *) n->_vd; struct sample *smp = smps[0]; diff --git a/lib/nodes/file.c b/lib/nodes/file.c index 4fcec4b60..95a061020 100644 --- a/lib/nodes/file.c +++ b/lib/nodes/file.c @@ -278,15 +278,15 @@ int file_destroy(struct node *n) return 0; } -int file_read(struct node *n, struct sample *smps[], int *cnt) +int file_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { struct file *f = (struct file *) n->_vd; int ret; uint64_t steps; - assert(*cnt == 1); + assert(cnt == 1); -retry: ret = io_scan(&f->io, smps, *cnt); +retry: ret = io_scan(&f->io, smps, cnt); if (ret <= 0) { if (io_eof(&f->io)) { switch (f->eof) { @@ -322,7 +322,7 @@ retry: ret = io_scan(&f->io, smps, *cnt); /* We dont wait in FILE_EPOCH_ORIGINAL mode */ if (f->epoch_mode == FILE_EPOCH_ORIGINAL) - return *cnt; + return cnt; if (f->rate) { steps = task_wait(&f->task); @@ -342,18 +342,18 @@ retry: ret = io_scan(&f->io, smps, *cnt); else if (steps != 1) warn("Missed steps: %" PRIu64, steps - 1); - return *cnt; + return cnt; } -int file_write(struct node *n, struct sample *smps[], int *cnt) +int file_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { struct file *f = (struct file *) n->_vd; - assert(*cnt == 1); + assert(cnt == 1); - io_print(&f->io, smps, *cnt); + io_print(&f->io, smps, cnt); - return *cnt; + return cnt; } int file_fd(struct node *n) diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index 62a028076..9cff766e5 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -618,12 +618,12 @@ int ib_deinit() return 0; } -int ib_read(struct node *n, struct sample *smps[], int *cnt) +int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { struct infiniband *ib = (struct infiniband *) n->_vd; struct ibv_wc wc[n->in.vectorize]; - struct ibv_recv_wr wr[*cnt], *bad_wr = NULL; - struct ibv_sge sge[*cnt]; + struct ibv_recv_wr wr[cnt], *bad_wr = NULL; + struct ibv_sge sge[cnt]; struct ibv_mr *mr; int ret = 0; int i = 0; //Used for first loop: post receive Work Requests @@ -634,11 +634,11 @@ int ib_read(struct node *n, struct sample *smps[], int *cnt) if (n->state == STATE_CONNECTED) { - if (ib->conn.available_recv_wrs < ib->qp_init.cap.max_recv_wr && *cnt) { + if (ib->conn.available_recv_wrs < ib->qp_init.cap.max_recv_wr && cnt) { // Get Memory Region mr = memory_ib_get_mr(smps[0]); - for (i = 0; i < *cnt; i++) { + for (i = 0; i < cnt; i++) { // Prepare receive Scatter/Gather element sge[i].addr = (uint64_t) &smps[i]->data; sge[i].length = SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN); @@ -652,7 +652,7 @@ int ib_read(struct node *n, struct sample *smps[], int *cnt) ib->conn.available_recv_wrs++; - if (ib->conn.available_recv_wrs == ib->qp_init.cap.max_recv_wr || i==(*cnt-1)) { + if (ib->conn.available_recv_wrs == ib->qp_init.cap.max_recv_wr || i==(cnt-1)) { debug(LOG_IB | 10, "Prepared %i new receive Work Requests", (i+1)); wr[i].next = NULL; @@ -705,26 +705,26 @@ int ib_read(struct node *n, struct sample *smps[], int *cnt) // unused values. // * j ==> Values from completion queue // * i ==> Values posted to receive queue - // * (*cnt) ==> Available values to post to receive queue + // * (cnt) ==> Available values to post to receive queue // // Thus: - // * (*cnt - i) ==> number of unused values + // * (cnt - i) ==> number of unused values int l; - for (k = j, l = 0; k < j + (*cnt - i); k++, l++) { + for (k = j, l = 0; k < j + (cnt - i); k++, l++) { smps[k] = smps[i + l]; } - *cnt = k; + *release = k; } return ret; } -int ib_write(struct node *n, struct sample *smps[], int *cnt) +int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *ready) { struct infiniband *ib = (struct infiniband *) n->_vd; - struct ibv_send_wr wr[*cnt], *bad_wr = NULL; - struct ibv_sge sge[*cnt]; + struct ibv_send_wr wr[cnt], *bad_wr = NULL; + struct ibv_sge sge[cnt]; struct ibv_wc wc[ib->cq_size]; struct ibv_mr *mr; int ret; @@ -743,7 +743,7 @@ int ib_write(struct node *n, struct sample *smps[], int *cnt) // Get Memory Region mr = memory_ib_get_mr(smps[0]); - for (i = 0; i < *cnt; i++) { + for (i = 0; i < cnt; i++) { //Set Scatter/Gather element to data of sample sge[i].addr = (uint64_t) &smps[i]->data; sge[i].length = smps[i]->length*sizeof(double); @@ -754,7 +754,7 @@ int ib_write(struct node *n, struct sample *smps[], int *cnt) wr[i].sg_list = &sge[i]; wr[i].num_sge = 1; - if (i == (*cnt-1)) { + if (i == (cnt-1)) { debug(LOG_IB | 10, "Prepared %i send Work Requests", (i+1)); wr[i].next = NULL; } @@ -786,11 +786,10 @@ int ib_write(struct node *n, struct sample *smps[], int *cnt) warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i", node_name(n), wc[j].status); - smps[j] = (struct sample *) (wc[j].wr_id); } - *cnt = j; + *ready = j; } return i; diff --git a/lib/nodes/influxdb.c b/lib/nodes/influxdb.c index 505141ff0..416469616 100644 --- a/lib/nodes/influxdb.c +++ b/lib/nodes/influxdb.c @@ -114,14 +114,14 @@ int influxdb_close(struct node *n) return 0; } -int influxdb_write(struct node *n, struct sample *smps[], int *cnt) +int influxdb_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { struct influxdb *i = (struct influxdb *) n->_vd; char *buf = NULL; ssize_t sentlen, buflen; - for (int k = 0; k < *cnt; k++) { + for (int k = 0; k < cnt; k++) { /* Key */ strcatf(&buf, "%s", i->key); @@ -156,7 +156,7 @@ int influxdb_write(struct node *n, struct sample *smps[], int *cnt) free(buf); - return *cnt; + return cnt; } char * influxdb_print(struct node *n) diff --git a/lib/nodes/loopback.c b/lib/nodes/loopback.c index 0c8c277a2..cf200e1d4 100644 --- a/lib/nodes/loopback.c +++ b/lib/nodes/loopback.c @@ -69,14 +69,14 @@ int loopback_close(struct node *n) return queue_signalled_destroy(&l->queue); } -int loopback_read(struct node *n, struct sample *smps[], int *cnt) +int loopback_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { int avail; struct loopback *l = (struct loopback *) n->_vd; - struct sample *cpys[*cnt]; + struct sample *cpys[cnt]; - avail = queue_signalled_pull_many(&l->queue, (void **) cpys, *cnt); + avail = queue_signalled_pull_many(&l->queue, (void **) cpys, cnt); for (int i = 0; i < avail; i++) { sample_copy(smps[i], cpys[i]); @@ -86,15 +86,15 @@ int loopback_read(struct node *n, struct sample *smps[], int *cnt) return avail; } -int loopback_write(struct node *n, struct sample *smps[], int *cnt) +int loopback_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { int copied; struct loopback *l = (struct loopback *) n->_vd; - struct sample *copies[*cnt]; + struct sample *copies[cnt]; - copied = sample_alloc_many(&l->pool, copies, *cnt); - if (copied < *cnt) + copied = sample_alloc_many(&l->pool, copies, cnt); + if (copied < cnt) warn("Pool underrun for node %s", node_name(n)); sample_copy_many(copies, smps, copied); diff --git a/lib/nodes/mqtt.c b/lib/nodes/mqtt.c index f6c9e1fc1..296b61985 100644 --- a/lib/nodes/mqtt.c +++ b/lib/nodes/mqtt.c @@ -358,13 +358,13 @@ int mqtt_deinit() return 0; } -int mqtt_read(struct node *n, struct sample *smps[], int *cnt) +int mqtt_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { int pulled; struct mqtt *m = (struct mqtt *) n->_vd; - struct sample *smpt[*cnt]; + struct sample *smpt[cnt]; - pulled = queue_signalled_pull_many(&m->queue, (void **) smpt, *cnt); + pulled = queue_signalled_pull_many(&m->queue, (void **) smpt, cnt); sample_copy_many(smps, smpt, pulled); sample_put_many(smpt, pulled); @@ -372,7 +372,7 @@ int mqtt_read(struct node *n, struct sample *smps[], int *cnt) return pulled; } -int mqtt_write(struct node *n, struct sample *smps[], int *cnt) +int mqtt_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { int ret; struct mqtt *m = (struct mqtt *) n->_vd; @@ -381,7 +381,7 @@ int mqtt_write(struct node *n, struct sample *smps[], int *cnt) char data[1500]; - ret = io_sprint(&m->io, data, sizeof(data), &wbytes, smps, *cnt); + ret = io_sprint(&m->io, data, sizeof(data), &wbytes, smps, cnt); if (ret < 0) return ret; @@ -391,7 +391,7 @@ int mqtt_write(struct node *n, struct sample *smps[], int *cnt) return -abs(ret); } - return *cnt; + return cnt; } int mqtt_fd(struct node *n) diff --git a/lib/nodes/nanomsg.c b/lib/nodes/nanomsg.c index 15ddd7154..0ce59c31c 100644 --- a/lib/nodes/nanomsg.c +++ b/lib/nodes/nanomsg.c @@ -228,7 +228,7 @@ int nanomsg_deinit() return 0; } -int nanomsg_read(struct node *n, struct sample *smps[], int *cnt) +int nanomsg_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { struct nanomsg *m = (struct nanomsg *) n->_vd; int bytes; @@ -239,10 +239,10 @@ int nanomsg_read(struct node *n, struct sample *smps[], int *cnt) if (bytes < 0) return -1; - return io_sscan(&m->io, data, bytes, NULL, smps, *cnt); + return io_sscan(&m->io, data, bytes, NULL, smps, cnt); } -int nanomsg_write(struct node *n, struct sample *smps[], int *cnt) +int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { int ret; struct nanomsg *m = (struct nanomsg *) n->_vd; @@ -251,7 +251,7 @@ int nanomsg_write(struct node *n, struct sample *smps[], int *cnt) char data[NANOMSG_MAX_PACKET_LEN]; - ret = io_sprint(&m->io, data, sizeof(data), &wbytes, smps, *cnt); + ret = io_sprint(&m->io, data, sizeof(data), &wbytes, smps, cnt); if (ret <= 0) return -1; @@ -259,7 +259,7 @@ int nanomsg_write(struct node *n, struct sample *smps[], int *cnt) if (ret < 0) return ret; - return *cnt; + return cnt; } int nanomsg_fd(struct node *n) diff --git a/lib/nodes/ngsi.c b/lib/nodes/ngsi.c index 5424f8ad0..59a8588e9 100644 --- a/lib/nodes/ngsi.c +++ b/lib/nodes/ngsi.c @@ -1,4 +1,4 @@ -/** Node type: OMA Next Generation Services Interface 10 (NGSI) (FIWARE context broker) +/** Node type: OMA Next Generation Services Interface 9 (NGSI) (FIWARE context broker) * * @author Steffen Vogel * @copyright 2017, Institute for Automation of Complex Power Systems, EONERC @@ -533,7 +533,7 @@ int ngsi_stop(struct node *n) return ret; } -int ngsi_read(struct node *n, struct sample *smps[], int *cnt) +int ngsi_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { struct ngsi *i = (struct ngsi *) n->_vd; int ret; @@ -548,7 +548,7 @@ int ngsi_read(struct node *n, struct sample *smps[], int *cnt) if (ret) goto out; - ret = ngsi_parse_entity(rentity, i, smps, *cnt); + ret = ngsi_parse_entity(rentity, i, smps, cnt); if (ret) goto out2; @@ -558,18 +558,18 @@ out: json_decref(entity); return ret; } -int ngsi_write(struct node *n, struct sample *smps[], int *cnt) +int ngsi_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { struct ngsi *i = (struct ngsi *) n->_vd; int ret; - json_t *entity = ngsi_build_entity(i, smps, *cnt, NGSI_ENTITY_VALUES); + json_t *entity = ngsi_build_entity(i, smps, cnt, NGSI_ENTITY_VALUES); ret = ngsi_request_context_update(i->curl, i->endpoint, "UPDATE", entity); json_decref(entity); - return ret ? 0 : *cnt; + return ret ? 0 : cnt; } int ngsi_fd(struct node *n) diff --git a/lib/nodes/shmem.c b/lib/nodes/shmem.c index 8cba0de87..38bfbc249 100644 --- a/lib/nodes/shmem.c +++ b/lib/nodes/shmem.c @@ -110,14 +110,14 @@ int shmem_stop(struct node *n) return shmem_int_close(&shm->intf); } -int shmem_read(struct node *n, struct sample *smps[], int *cnt) +int shmem_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { struct shmem *shm = (struct shmem *) n->_vd; int recv; - struct sample *shared_smps[*cnt]; + struct sample *shared_smps[cnt]; do { - recv = shmem_int_read(&shm->intf, shared_smps, *cnt); + recv = shmem_int_read(&shm->intf, shared_smps, cnt); } while (recv == 0); if (recv < 0) { @@ -134,14 +134,14 @@ int shmem_read(struct node *n, struct sample *smps[], int *cnt) return recv; } -int shmem_write(struct node *n, struct sample *smps[], int *cnt) +int shmem_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { struct shmem *shm = (struct shmem *) n->_vd; - struct sample *shared_smps[*cnt]; /* Samples need to be copied to the shared pool first */ + struct sample *shared_smps[cnt]; /* Samples need to be copied to the shared pool first */ int avail, pushed, copied; - avail = sample_alloc_many(&shm->intf.write.shared->pool, shared_smps, *cnt); - if (avail != *cnt) + avail = sample_alloc_many(&shm->intf.write.shared->pool, shared_smps, cnt); + if (avail != cnt) warn("Pool underrun for shmem node %s", shm->out_name); copied = sample_copy_many(shared_smps, smps, avail); diff --git a/lib/nodes/signal_generator.c b/lib/nodes/signal_generator.c index 8d038b7de..8d733fdc8 100644 --- a/lib/nodes/signal_generator.c +++ b/lib/nodes/signal_generator.c @@ -212,7 +212,7 @@ int signal_generator_stop(struct node *n) return 0; } -int signal_generator_read(struct node *n, struct sample *smps[], int *cnt) +int signal_generator_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { struct signal_generator *s = (struct signal_generator *) n->_vd; struct sample *t = smps[0]; @@ -220,7 +220,7 @@ int signal_generator_read(struct node *n, struct sample *smps[], int *cnt) struct timespec ts; int steps; - assert(*cnt == 1); + assert(cnt == 1); /* Throttle output if desired */ if (s->rt) { diff --git a/lib/nodes/socket.c b/lib/nodes/socket.c index b5eccb4c0..ec8d58bac 100644 --- a/lib/nodes/socket.c +++ b/lib/nodes/socket.c @@ -336,7 +336,7 @@ int socket_destroy(struct node *n) return 0; } -int socket_read(struct node *n, struct sample *smps[], int *cnt) +int socket_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { int ret; struct socket *s = (struct socket *) n->_vd; @@ -391,7 +391,7 @@ int socket_read(struct node *n, struct sample *smps[], int *cnt) goto out; } - ret = io_sscan(&s->io, ptr, bytes, &rbytes, smps, *cnt); + ret = io_sscan(&s->io, ptr, bytes, &rbytes, smps, cnt); if (ret < 0 || bytes != rbytes) warn("Received invalid packet from node: %s ret=%d, bytes=%zu, rbytes=%zu", node_name(n), ret, bytes, rbytes); @@ -401,7 +401,7 @@ out: free(buf); return ret; } -int socket_write(struct node *n, struct sample *smps[], int *cnt) +int socket_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { struct socket *s = (struct socket *) n->_vd; @@ -416,7 +416,7 @@ int socket_write(struct node *n, struct sample *smps[], int *cnt) if (!buf) return -1; -retry: ret = io_sprint(&s->io, buf, buflen, &wbytes, smps, *cnt); +retry: ret = io_sprint(&s->io, buf, buflen, &wbytes, smps, cnt); if (ret < 0) goto out; diff --git a/lib/nodes/stats.c b/lib/nodes/stats.c index 146e6fbf5..57dcfa2ab 100644 --- a/lib/nodes/stats.c +++ b/lib/nodes/stats.c @@ -119,12 +119,12 @@ int stats_node_destroy(struct node *n) return 0; } -int stats_node_read(struct node *n, struct sample *smps[], int *cnt) +int stats_node_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { struct stats_node *sn = (struct stats_node *) n->_vd; struct stats *s = sn->node->stats; - if (!*cnt) + if (!cnt) return 0; if (!sn->node->stats) diff --git a/lib/nodes/test_rtt.c b/lib/nodes/test_rtt.c index 8cd801a0d..8240c695d 100644 --- a/lib/nodes/test_rtt.c +++ b/lib/nodes/test_rtt.c @@ -277,7 +277,7 @@ int test_rtt_stop(struct node *n) return 0; } -int test_rtt_read(struct node *n, struct sample *smps[], int *cnt) +int test_rtt_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { int i, ret, values; uint64_t steps; @@ -320,7 +320,7 @@ int test_rtt_read(struct node *n, struct sample *smps[], int *cnt) struct timespec now = time_now(); /* Prepare samples */ - for (i = 0; i < *cnt; i++) { + for (i = 0; i < cnt; i++) { values = c->values; if (smps[i]->capacity < values) { values = smps[i]->capacity; @@ -348,7 +348,7 @@ int test_rtt_read(struct node *n, struct sample *smps[], int *cnt) return i; } -int test_rtt_write(struct node *n, struct sample *smps[], int *cnt) +int test_rtt_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { struct test_rtt *t = (struct test_rtt *) n->_vd; @@ -358,7 +358,7 @@ int test_rtt_write(struct node *n, struct sample *smps[], int *cnt) struct test_rtt_case *c = (struct test_rtt_case *) list_at(&t->cases, t->current); int i; - for (i = 0; i < *cnt; i++) { + for (i = 0; i < cnt; i++) { if (smps[i]->length != c->values) { warn("Discarding invalid sample due to mismatching length: expecting=%d, has=%d", c->values, smps[i]->length); continue; diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c index 8eea85ba9..008dbfe11 100644 --- a/lib/nodes/websocket.c +++ b/lib/nodes/websocket.c @@ -467,14 +467,14 @@ int websocket_destroy(struct node *n) return 0; } -int websocket_read(struct node *n, struct sample *smps[], int *cnt) +int websocket_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { int avail; struct websocket *w = (struct websocket *) n->_vd; - struct sample *cpys[*cnt]; + struct sample *cpys[cnt]; - avail = queue_signalled_pull_many(&w->queue, (void **) cpys, *cnt); + avail = queue_signalled_pull_many(&w->queue, (void **) cpys, cnt); if (avail < 0) return avail; @@ -484,16 +484,16 @@ int websocket_read(struct node *n, struct sample *smps[], int *cnt) return avail; } -int websocket_write(struct node *n, struct sample *smps[], int *cnt) +int websocket_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { int avail; struct websocket *w = (struct websocket *) n->_vd; - struct sample *cpys[*cnt]; + struct sample *cpys[cnt]; /* Make copies of all samples */ - avail = sample_alloc_many(&w->pool, cpys, *cnt); - if (avail < *cnt) + avail = sample_alloc_many(&w->pool, cpys, cnt); + if (avail < cnt) warn("Pool underrun for node %s: avail=%u", node_name(n), avail); sample_copy_many(cpys, smps, avail); @@ -502,12 +502,12 @@ int websocket_write(struct node *n, struct sample *smps[], int *cnt) struct websocket_connection *c = (struct websocket_connection *) list_at(&connections, i); if (c->node == n) - websocket_connection_write(c, cpys, *cnt); + websocket_connection_write(c, cpys, cnt); } sample_put_many(cpys, avail); - return *cnt; + return cnt; } int websocket_parse(struct node *n, json_t *cfg) diff --git a/lib/nodes/zeromq.c b/lib/nodes/zeromq.c index a5abf5c92..db27d6f6b 100644 --- a/lib/nodes/zeromq.c +++ b/lib/nodes/zeromq.c @@ -419,7 +419,7 @@ int zeromq_destroy(struct node *n) return 0; } -int zeromq_read(struct node *n, struct sample *smps[], int *cnt) +int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { int recv, ret; struct zeromq *z = (struct zeromq *) n->_vd; @@ -446,7 +446,7 @@ int zeromq_read(struct node *n, struct sample *smps[], int *cnt) if (ret < 0) return ret; - recv = io_sscan(&z->io, zmq_msg_data(&m), zmq_msg_size(&m), NULL, smps, *cnt); + recv = io_sscan(&z->io, zmq_msg_data(&m), zmq_msg_size(&m), NULL, smps, cnt); ret = zmq_msg_close(&m); if (ret) @@ -455,7 +455,7 @@ int zeromq_read(struct node *n, struct sample *smps[], int *cnt) return recv; } -int zeromq_write(struct node *n, struct sample *smps[], int *cnt) +int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { int ret; struct zeromq *z = (struct zeromq *) n->_vd; @@ -465,7 +465,7 @@ int zeromq_write(struct node *n, struct sample *smps[], int *cnt) char data[4096]; - ret = io_sprint(&z->io, data, sizeof(data), &wbytes, smps, *cnt); + ret = io_sprint(&z->io, data, sizeof(data), &wbytes, smps, cnt); if (ret <= 0) return -1; @@ -497,7 +497,7 @@ int zeromq_write(struct node *n, struct sample *smps[], int *cnt) if (ret < 0) return ret; - return *cnt; + return cnt; fail: zmq_msg_close(&m); diff --git a/lib/path.c b/lib/path.c index 870425f44..2878b5100 100644 --- a/lib/path.c +++ b/lib/path.c @@ -70,7 +70,8 @@ static int path_source_destroy(struct path_source *ps) static void path_source_read(struct path_source *ps, struct path *p, int i) { - int recv, tomux, ready, cnt; + int recv, tomux, allocated, cnt; + unsigned release; cnt = ps->node->in.vectorize; @@ -79,18 +80,20 @@ static void path_source_read(struct path_source *ps, struct path *p, int i) struct sample **tomux_smps; /* Fill smps[] free sample blocks from the pool */ - ready = sample_alloc_many(&ps->pool, read_smps, cnt); - if (ready != cnt) + allocated = sample_alloc_many(&ps->pool, read_smps, cnt); + if (allocated != cnt) warn("Pool underrun for path source %s", node_name(ps->node)); /* Read ready samples and store them to blocks pointed by smps[] */ - recv = node_read(ps->node, read_smps, &ready); + release = allocated; + + recv = node_read(ps->node, read_smps, allocated, &release); if (recv == 0) goto out2; else if (recv < 0) error("Failed to read samples from node %s", node_name(ps->node)); - else if (recv < ready) - warn("Partial read for path %s: read=%u, expected=%u", path_name(p), recv, ready); + else if (recv < allocated) + warn("Partial read for path %s: read=%u, expected=%u", path_name(p), recv, allocated); bitset_set(&p->received, i); @@ -141,7 +144,7 @@ static void path_source_read(struct path_source *ps, struct path *p, int i) } sample_put_many(muxed_smps, tomux); -out2: sample_put_many(read_smps, ready); +out2: sample_put_many(read_smps, release); } static int path_destination_init(struct path_destination *pd, int queuelen) @@ -196,28 +199,31 @@ static void path_destination_write(struct path_destination *pd, struct path *p) { int cnt = pd->node->out.vectorize; int sent; - int available; int released; + int allocated; + unsigned release; struct sample *smps[cnt]; /* As long as there are still samples in the queue */ while (1) { - available = queue_pull_many(&pd->queue, (void **) smps, cnt); - if (available == 0) + allocated = queue_pull_many(&pd->queue, (void **) smps, cnt); + if (allocated == 0) break; - else if (available < cnt) - debug(LOG_PATH | 5, "Queue underrun for path %s: available=%u expected=%u", path_name(p), available, cnt); + else if (allocated < cnt) + debug(LOG_PATH | 5, "Queue underrun for path %s: allocated=%u expected=%u", path_name(p), allocated, cnt); - debug(LOG_PATH | 15, "Dequeued %u samples from queue of node %s which is part of path %s", available, node_name(pd->node), path_name(p)); + debug(LOG_PATH | 15, "Dequeued %u samples from queue of node %s which is part of path %s", allocated, node_name(pd->node), path_name(p)); - sent = node_write(pd->node, smps, &available); + release = allocated; + + sent = node_write(pd->node, smps, allocated, &release); if (sent < 0) error("Failed to sent %u samples to node %s", cnt, node_name(pd->node)); - else if (sent < available) - warn("Partial write to node %s: written=%d, expected=%d", node_name(pd->node), sent, available); + else if (sent < allocated) + warn("Partial write to node %s: written=%d, expected=%d", node_name(pd->node), sent, allocated); - released = sample_put_many(smps, sent); + released = sample_put_many(smps, release); debug(LOG_PATH | 15, "Released %d samples back to memory pool", released); } diff --git a/src/villas-pipe.cpp b/src/villas-pipe.cpp index 517dcad95..8b09fcfe7 100644 --- a/src/villas-pipe.cpp +++ b/src/villas-pipe.cpp @@ -128,8 +128,8 @@ static void usage() static void * send_loop(void *ctx) { - unsigned last_sequenceno = 0; - int ret, scanned, sent, ready, cnt = 0; + unsigned last_sequenceno = 0, release; + int ret, scanned, sent, allocated, cnt = 0; struct sample *smps[node->out.vectorize]; /* Initialize memory */ @@ -139,13 +139,13 @@ static void * send_loop(void *ctx) error("Failed to allocate memory for receive pool."); while (!io_eof(&io)) { - ready = sample_alloc_many(&sendd.pool, smps, node->out.vectorize); + allocated = sample_alloc_many(&sendd.pool, smps, node->out.vectorize); if (ret < 0) error("Failed to get %u samples out of send pool (%d).", node->out.vectorize, ret); - else if (ready < node->out.vectorize) + else if (allocated < node->out.vectorize) warn("Send pool underrun"); - scanned = io_scan(&io, smps, ready); + scanned = io_scan(&io, smps, allocated); if (scanned < 0) { continue; warn("Failed to read samples from stdin"); @@ -161,13 +161,15 @@ static void * send_loop(void *ctx) smps[i]->sequence = last_sequenceno++; } - sent = node_write(node, smps, &scanned); + release = allocated; + + sent = node_write(node, smps, scanned, &release); if (sent < 0) warn("Failed to sent samples to node %s: reason=%d", node_name(node), sent); else if (sent < scanned) warn("Failed to sent %d out of %d samples to node %s", scanned-sent, scanned, node_name(node)); - sample_put_many(smps, ready); + sample_put_many(smps, release); cnt += sent; if (sendd.limit > 0 && cnt >= sendd.limit) @@ -194,7 +196,8 @@ leave: if (io_eof(&io)) { static void * recv_loop(void *ctx) { - int recv, ret, cnt = 0, ready = 0; + int recv, ret, cnt = 0, allocated = 0; + unsigned release; struct sample *smps[node->in.vectorize]; /* Initialize memory */ @@ -204,19 +207,21 @@ static void * recv_loop(void *ctx) error("Failed to allocate memory for receive pool."); for (;;) { - ready = sample_alloc_many(&recvv.pool, smps, node->in.vectorize); - if (ready < 0) + allocated = sample_alloc_many(&recvv.pool, smps, node->in.vectorize); + if (allocated < 0) error("Failed to allocate %u samples from receive pool.", node->in.vectorize); - else if (ready < node->in.vectorize) + else if (allocated < node->in.vectorize) warn("Receive pool underrun"); - recv = node_read(node, smps, &ready); + release = allocated; + + recv = node_read(node, smps, allocated, &release); if (recv < 0) warn("Failed to receive samples from node %s: reason=%d", node_name(node), recv); io_print(&io, smps, recv); - sample_put_many(smps, ready); + sample_put_many(smps, release); cnt += recv; if (recvv.limit > 0 && cnt >= recvv.limit) diff --git a/src/villas-signal.cpp b/src/villas-signal.cpp index 3e3a05ffb..508d6c641 100644 --- a/src/villas-signal.cpp +++ b/src/villas-signal.cpp @@ -170,9 +170,10 @@ int main(int argc, char *argv[]) for (;;) { t = sample_alloc(&q); - int one = 1; - node_read(&n, &t, &one); - io_print(&io, &t, one); + unsigned release = 1; // release = allocated + + node_read(&n, &t, 1, &release); + io_print(&io, &t, 1); sample_put(t); } diff --git a/src/villas-test-rtt.cpp b/src/villas-test-rtt.cpp index 0f1dfa81f..a91f33461 100644 --- a/src/villas-test-rtt.cpp +++ b/src/villas-test-rtt.cpp @@ -166,9 +166,13 @@ void test_rtt() { while (running && (count < 0 || count--)) { clock_gettime(CLOCK_ID, &send); - int one = 1; - node_write(node, &smp_send, &one); /* Ping */ - node_read(node, &smp_recv, &one); /* Pong */ + unsigned release; + + release = 1; // release = allocated + node_write(node, &smp_send, 1, &release); /* Ping */ + + release = 1; // release = allocated + node_read(node, &smp_recv, 1, &release); /* Pong */ clock_gettime(CLOCK_ID, &recv);