diff --git a/include/villas/node.h b/include/villas/node.h index 09ca1a429..cce336a90 100644 --- a/include/villas/node.h +++ b/include/villas/node.h @@ -154,7 +154,7 @@ char * node_name_long(struct node *n); */ int node_reverse(struct node *n); -int node_read(struct node *n, struct sample *smps[], unsigned cnt); +int node_read(struct node *n, struct sample *smps[], int *cnt); int node_write(struct node *n, struct sample *smps[], unsigned cnt); diff --git a/include/villas/node_type.h b/include/villas/node_type.h index b9026b1d3..5d716aea4 100644 --- a/include/villas/node_type.h +++ b/include/villas/node_type.h @@ -136,7 +136,7 @@ struct node_type { * @param cnt The number of messages which should be received. * @return The number of messages actually received. */ - int (*read)(struct node *n, struct sample *smps[], unsigned cnt); + int (*read)(struct node *n, struct sample *smps[], int *cnt); /** Send multiple messages in a single datagram / packet. * diff --git a/include/villas/nodes/amqp.h b/include/villas/nodes/amqp.h index b343190c1..42e379946 100644 --- a/include/villas/nodes/amqp.h +++ b/include/villas/nodes/amqp.h @@ -80,7 +80,7 @@ int amqp_start(struct node *n); int amqp_stop(struct node *n); /** @see node_type::read */ -int amqp_read(struct node *n, struct sample *smps[], unsigned cnt); +int amqp_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ int amqp_write(struct node *n, struct sample *smps[], unsigned cnt); diff --git a/include/villas/nodes/file.h b/include/villas/nodes/file.h index 498afb0a4..4ccf2bcb2 100644 --- a/include/villas/nodes/file.h +++ b/include/villas/nodes/file.h @@ -83,7 +83,7 @@ int file_start(struct node *n); int file_stop(struct node *n); /** @see node_type::read */ -int file_read(struct node *n, struct sample *smps[], unsigned cnt); +int file_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ int file_write(struct node *n, struct sample *smps[], unsigned cnt); diff --git a/include/villas/nodes/infiniband.h b/include/villas/nodes/infiniband.h index cf2cb286d..6452fa661 100644 --- a/include/villas/nodes/infiniband.h +++ b/include/villas/nodes/infiniband.h @@ -112,31 +112,31 @@ struct infiniband { }; /** @see node_type::reverse */ -int infiniband_reverse(struct node *n); +int ib_reverse(struct node *n); /** @see node_type::print */ -char * infiniband_print(struct node *n); +char * ib_print(struct node *n); /** @see node_type::parse */ -int infiniband_parse(struct node *n, json_t *cfg); +int ib_parse(struct node *n, json_t *cfg); /** @see node_type::open */ -int infiniband_start(struct node *n); +int ib_start(struct node *n); /** @see node_type::destroy */ -int infiniband_destroy(struct node *n); +int ib_destroy(struct node *n); /** @see node_type::close */ -int infiniband_stop(struct node *n); +int ib_stop(struct node *n); /** @see node_type::init */ -int infiniband_init(struct super_node *n); +int ib_init(struct super_node *n); /** @see node_type::deinit */ -int infiniband_deinit(); +int ib_deinit(); /** @see node_type::read */ -int infiniband_read(struct node *n, struct sample *smps[], unsigned cnt); +int ib_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ int infiniband_write(struct node *n, struct sample *smps[], unsigned cnt); diff --git a/include/villas/nodes/loopback.h b/include/villas/nodes/loopback.h index 4cff73eb6..2c6556eb8 100644 --- a/include/villas/nodes/loopback.h +++ b/include/villas/nodes/loopback.h @@ -63,7 +63,7 @@ int loopback_open(struct node *n); int loopback_close(struct node *n); /** @see node_type::read */ -int loopback_read(struct node *n, struct sample *smps[], unsigned cnt); +int loopback_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ int loopback_write(struct node *n, struct sample *smps[], unsigned cnt); diff --git a/include/villas/nodes/mqtt.h b/include/villas/nodes/mqtt.h index 40713c8b2..d2f7061a2 100644 --- a/include/villas/nodes/mqtt.h +++ b/include/villas/nodes/mqtt.h @@ -95,7 +95,7 @@ int mqtt_init(); int mqtt_deinit(); /** @see node_type::read */ -int mqtt_read(struct node *n, struct sample *smps[], unsigned cnt); +int mqtt_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ int mqtt_write(struct node *n, struct sample *smps[], unsigned cnt); diff --git a/include/villas/nodes/nanomsg.h b/include/villas/nodes/nanomsg.h index 11b5d2a3e..715d21122 100644 --- a/include/villas/nodes/nanomsg.h +++ b/include/villas/nodes/nanomsg.h @@ -71,7 +71,7 @@ int nanomsg_start(struct node *n); int nanomsg_stop(struct node *n); /** @see node_type::read */ -int nanomsg_read(struct node *n, struct sample *smps[], unsigned cnt); +int nanomsg_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt); diff --git a/include/villas/nodes/ngsi.h b/include/villas/nodes/ngsi.h index 6160907a5..c43f813e3 100644 --- a/include/villas/nodes/ngsi.h +++ b/include/villas/nodes/ngsi.h @@ -93,7 +93,7 @@ int ngsi_start(struct node *n); int ngsi_stop(struct node *n); /** @see node_type::read */ -int ngsi_read(struct node *n, struct sample *smps[], unsigned cnt); +int ngsi_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ int ngsi_write(struct node *n, struct sample *smps[], unsigned cnt); diff --git a/include/villas/nodes/shmem.h b/include/villas/nodes/shmem.h index 0426645ce..a37374111 100644 --- a/include/villas/nodes/shmem.h +++ b/include/villas/nodes/shmem.h @@ -64,7 +64,7 @@ int shmem_start(struct node *n); int shmem_stop(struct node *n); /** @see node_type::read */ -int shmem_read(struct node *n, struct sample *smps[], unsigned cnt); +int shmem_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ int shmem_write(struct node *n, struct sample *smps[], unsigned cnt); diff --git a/include/villas/nodes/signal_generator.h b/include/villas/nodes/signal_generator.h index 18d0c0b48..a43156af4 100644 --- a/include/villas/nodes/signal_generator.h +++ b/include/villas/nodes/signal_generator.h @@ -89,7 +89,7 @@ int signal_generator_start(struct node *n); int signal_generator_stop(struct node *n); /** @see node_type::read */ -int signal_generator_read(struct node *n, struct sample *smps[], unsigned cnt); +int signal_generator_read(struct node *n, struct sample *smps[], int *cnt); enum signal_generator_type signal_generator_lookup_type(const char *type); diff --git a/include/villas/nodes/socket.h b/include/villas/nodes/socket.h index ad2ed1372..cc80c6216 100644 --- a/include/villas/nodes/socket.h +++ b/include/villas/nodes/socket.h @@ -122,7 +122,7 @@ int socket_stop(struct node *n); int socket_write(struct node *n, struct sample *smps[], unsigned cnt); /** @see node_type::read */ -int socket_read(struct node *n, struct sample *smps[], unsigned cnt); +int socket_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::parse */ int socket_parse(struct node *n, json_t *cfg); diff --git a/include/villas/nodes/stats.h b/include/villas/nodes/stats.h index 89a161656..eea8ca746 100644 --- a/include/villas/nodes/stats.h +++ b/include/villas/nodes/stats.h @@ -67,7 +67,7 @@ int stats_node_start(struct node *n); int stats_node_stop(struct node *n); /** @see node_type::read */ -int stats_node_read(struct node *n, struct sample *smps[], unsigned cnt); +int stats_node_read(struct node *n, struct sample *smps[], int *cnt); #ifdef __cplusplus } diff --git a/include/villas/nodes/test_rtt.h b/include/villas/nodes/test_rtt.h index 20f204410..b65f92030 100644 --- a/include/villas/nodes/test_rtt.h +++ b/include/villas/nodes/test_rtt.h @@ -79,7 +79,7 @@ int test_rtt_start(struct node *n); int test_rtt_stop(struct node *n); /** @see node_type::read */ -int test_rtt_read(struct node *n, struct sample *smps[], unsigned cnt); +int test_rtt_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ int test_rtt_write(struct node *n, struct sample *smps[], unsigned cnt); diff --git a/include/villas/nodes/websocket.h b/include/villas/nodes/websocket.h index 0e5097a17..6d6c15ad7 100644 --- a/include/villas/nodes/websocket.h +++ b/include/villas/nodes/websocket.h @@ -112,7 +112,7 @@ int websocket_stop(struct node *n); int websocket_destroy(struct node *n); /** @see node_type::read */ -int websocket_read(struct node *n, struct sample *smps[], unsigned cnt); +int websocket_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ int websocket_write(struct node *n, struct sample *smps[], unsigned cnt); diff --git a/include/villas/nodes/zeromq.h b/include/villas/nodes/zeromq.h index 4f31de46e..997388b45 100644 --- a/include/villas/nodes/zeromq.h +++ b/include/villas/nodes/zeromq.h @@ -102,7 +102,7 @@ int zeromq_start(struct node *n); int zeromq_stop(struct node *n); /** @see node_type::read */ -int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt); +int zeromq_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt); diff --git a/include/villas/stats.h b/include/villas/stats.h index ac440a5f5..8f628ff10 100644 --- a/include/villas/stats.h +++ b/include/villas/stats.h @@ -72,7 +72,7 @@ int stats_destroy(struct stats *s); void stats_update(struct stats *s, enum stats_id id, double val); -void stats_collect(struct stats *s, struct sample *smps[], size_t cnt); +void stats_collect(struct stats *s, struct sample *smps[], size_t *cnt); int stats_commit(struct stats *s); diff --git a/lib/node.c b/lib/node.c index c7d93b84b..660a6ed34 100644 --- a/lib/node.c +++ b/lib/node.c @@ -408,7 +408,7 @@ int node_destroy(struct node *n) return 0; } -int node_read(struct node *n, struct sample *smps[], unsigned cnt) +int node_read(struct node *n, struct sample *smps[], int *cnt) { int readd, nread = 0; @@ -416,9 +416,10 @@ int node_read(struct node *n, struct sample *smps[], unsigned cnt) return -1; /* Send in parts if vector not supported */ - if (n->_vt->vectorize > 0 && n->_vt->vectorize < cnt) { - while (cnt - nread > 0) { - readd = n->_vt->read(n, &smps[nread], MIN(cnt - nread, n->_vt->vectorize)); + if (n->_vt->vectorize > 0 && n->_vt->vectorize < *cnt) { + int cnt_vec_min = MIN(*cnt - nread, n->_vt->vectorize); + while (*cnt - nread > 0) { + readd = n->_vt->read(n, &smps[nread], &cnt_vec_min); if (readd < 0) return readd; diff --git a/lib/nodes/amqp.c b/lib/nodes/amqp.c index 2e9472c2a..13ae172ac 100644 --- a/lib/nodes/amqp.c +++ b/lib/nodes/amqp.c @@ -301,7 +301,7 @@ int amqp_stop(struct node *n) return 0; } -int amqp_read(struct node *n, struct sample *smps[], unsigned cnt) +int amqp_read(struct node *n, struct sample *smps[], int *cnt) { int ret; struct amqp *a = n->_vd; @@ -312,7 +312,7 @@ int amqp_read(struct node *n, struct sample *smps[], unsigned cnt) if (rep.reply_type != AMQP_RESPONSE_NORMAL) return -1; - ret = io_sscan(&a->io, env.message.body.bytes, env.message.body.len, NULL, smps, cnt); + ret = io_sscan(&a->io, env.message.body.bytes, env.message.body.len, NULL, smps, *cnt); amqp_destroy_envelope(&env); diff --git a/lib/nodes/cbuilder.c b/lib/nodes/cbuilder.c index bf0ecaf2e..acd58874b 100644 --- a/lib/nodes/cbuilder.c +++ b/lib/nodes/cbuilder.c @@ -94,7 +94,7 @@ int cbuilder_stop(struct node *n) return 0; } -int cbuilder_read(struct node *n, struct sample *smps[], unsigned cnt) +int cbuilder_read(struct node *n, struct sample *smps[], int *cnt) { struct cbuilder *cb = (struct cbuilder *) n->_vd; struct sample *smp = smps[0]; diff --git a/lib/nodes/file.c b/lib/nodes/file.c index 674c0bd0b..a9dcbe67d 100644 --- a/lib/nodes/file.c +++ b/lib/nodes/file.c @@ -278,15 +278,15 @@ int file_destroy(struct node *n) return 0; } -int file_read(struct node *n, struct sample *smps[], unsigned cnt) +int file_read(struct node *n, struct sample *smps[], int *cnt) { struct file *f = (struct file *) n->_vd; int ret; uint64_t steps; - assert(cnt == 1); + assert(*cnt == 1); -retry: ret = io_scan(&f->io, smps, cnt); +retry: ret = io_scan(&f->io, smps, *cnt); if (ret <= 0) { if (io_eof(&f->io)) { switch (f->eof) { @@ -322,7 +322,7 @@ retry: ret = io_scan(&f->io, smps, cnt); /* We dont wait in FILE_EPOCH_ORIGINAL mode */ if (f->epoch_mode == FILE_EPOCH_ORIGINAL) - return cnt; + return *cnt; if (f->rate) { steps = task_wait(&f->task); @@ -342,7 +342,7 @@ retry: ret = io_scan(&f->io, smps, cnt); else if (steps != 1) warn("Missed steps: %" PRIu64, steps - 1); - return cnt; + return *cnt; } int file_write(struct node *n, struct sample *smps[], unsigned cnt) diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index 5b103b71e..1549642aa 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -730,12 +730,12 @@ int ib_deinit() return 0; } -int ib_read(struct node *n, struct sample *smps[], unsigned cnt) +int ib_read(struct node *n, struct sample *smps[], int *cnt) { struct infiniband *ib = (struct infiniband *) n->_vd; struct ibv_wc wc[n->in.vectorize]; - struct ibv_recv_wr wr[cnt], *bad_wr = NULL; - struct ibv_sge sge[cnt]; + struct ibv_recv_wr wr[*cnt], *bad_wr = NULL; + struct ibv_sge sge[*cnt]; struct ibv_mr *mr; int ret = 0; @@ -743,11 +743,11 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt) if (n->state == STATE_CONNECTED) { - if (ib->conn.available_recv_wrs < ib->qp_init.cap.max_recv_wr && cnt==n->in.vectorize) { + if (ib->conn.available_recv_wrs < ib->qp_init.cap.max_recv_wr && *cnt) { // Get Memory Region mr = memory_ib_get_mr(smps[0]); - for (int i = 0; i < cnt; i++) { + for (int i = 0; i < *cnt; i++) { // Increase refcnt of sample sample_get(smps[i]); @@ -764,7 +764,7 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt) ib->conn.available_recv_wrs++; - if (ib->conn.available_recv_wrs == ib->qp_init.cap.max_recv_wr || i==(cnt-1)) { + if (ib->conn.available_recv_wrs == ib->qp_init.cap.max_recv_wr || i==(*cnt-1)) { debug(LOG_IB | 10, "Prepared %i new receive Work Requests", (i+1)); wr[i].next = NULL; diff --git a/lib/nodes/loopback.c b/lib/nodes/loopback.c index 62b11519c..2ab769d19 100644 --- a/lib/nodes/loopback.c +++ b/lib/nodes/loopback.c @@ -69,14 +69,14 @@ int loopback_close(struct node *n) return queue_signalled_destroy(&l->queue); } -int loopback_read(struct node *n, struct sample *smps[], unsigned cnt) +int loopback_read(struct node *n, struct sample *smps[], int *cnt) { int avail; struct loopback *l = (struct loopback *) n->_vd; - struct sample *cpys[cnt]; + struct sample *cpys[*cnt]; - avail = queue_signalled_pull_many(&l->queue, (void **) cpys, cnt); + avail = queue_signalled_pull_many(&l->queue, (void **) cpys, *cnt); for (int i = 0; i < avail; i++) { sample_copy(smps[i], cpys[i]); diff --git a/lib/nodes/mqtt.c b/lib/nodes/mqtt.c index 893d10d8d..ee0230d31 100644 --- a/lib/nodes/mqtt.c +++ b/lib/nodes/mqtt.c @@ -358,13 +358,13 @@ int mqtt_deinit() return 0; } -int mqtt_read(struct node *n, struct sample *smps[], unsigned cnt) +int mqtt_read(struct node *n, struct sample *smps[], int *cnt) { int pulled; struct mqtt *m = (struct mqtt *) n->_vd; - struct sample *smpt[cnt]; + struct sample *smpt[*cnt]; - pulled = queue_signalled_pull_many(&m->queue, (void **) smpt, cnt); + pulled = queue_signalled_pull_many(&m->queue, (void **) smpt, *cnt); sample_copy_many(smps, smpt, pulled); sample_put_many(smpt, pulled); diff --git a/lib/nodes/nanomsg.c b/lib/nodes/nanomsg.c index 432734803..f6f397d40 100644 --- a/lib/nodes/nanomsg.c +++ b/lib/nodes/nanomsg.c @@ -228,7 +228,7 @@ int nanomsg_deinit() return 0; } -int nanomsg_read(struct node *n, struct sample *smps[], unsigned cnt) +int nanomsg_read(struct node *n, struct sample *smps[], int *cnt) { struct nanomsg *m = (struct nanomsg *) n->_vd; int bytes; @@ -239,7 +239,7 @@ int nanomsg_read(struct node *n, struct sample *smps[], unsigned cnt) if (bytes < 0) return -1; - return io_sscan(&m->io, data, bytes, NULL, smps, cnt); + return io_sscan(&m->io, data, bytes, NULL, smps, *cnt); } int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt) diff --git a/lib/nodes/ngsi.c b/lib/nodes/ngsi.c index 8e5618e43..36a61cee6 100644 --- a/lib/nodes/ngsi.c +++ b/lib/nodes/ngsi.c @@ -533,7 +533,7 @@ int ngsi_stop(struct node *n) return ret; } -int ngsi_read(struct node *n, struct sample *smps[], unsigned cnt) +int ngsi_read(struct node *n, struct sample *smps[], int *cnt) { struct ngsi *i = (struct ngsi *) n->_vd; int ret; @@ -548,7 +548,7 @@ int ngsi_read(struct node *n, struct sample *smps[], unsigned cnt) if (ret) goto out; - ret = ngsi_parse_entity(rentity, i, smps, cnt); + ret = ngsi_parse_entity(rentity, i, smps, *cnt); if (ret) goto out2; diff --git a/lib/nodes/shmem.c b/lib/nodes/shmem.c index 177b548be..0461ab7ab 100644 --- a/lib/nodes/shmem.c +++ b/lib/nodes/shmem.c @@ -110,14 +110,14 @@ int shmem_stop(struct node *n) return shmem_int_close(&shm->intf); } -int shmem_read(struct node *n, struct sample *smps[], unsigned cnt) +int shmem_read(struct node *n, struct sample *smps[], int *cnt) { struct shmem *shm = (struct shmem *) n->_vd; int recv; - struct sample *shared_smps[cnt]; + struct sample *shared_smps[*cnt]; do { - recv = shmem_int_read(&shm->intf, shared_smps, cnt); + recv = shmem_int_read(&shm->intf, shared_smps, *cnt); } while (recv == 0); if (recv < 0) { diff --git a/lib/nodes/signal_generator.c b/lib/nodes/signal_generator.c index 13a26dba6..8d038b7de 100644 --- a/lib/nodes/signal_generator.c +++ b/lib/nodes/signal_generator.c @@ -212,7 +212,7 @@ int signal_generator_stop(struct node *n) return 0; } -int signal_generator_read(struct node *n, struct sample *smps[], unsigned cnt) +int signal_generator_read(struct node *n, struct sample *smps[], int *cnt) { struct signal_generator *s = (struct signal_generator *) n->_vd; struct sample *t = smps[0]; @@ -220,7 +220,7 @@ int signal_generator_read(struct node *n, struct sample *smps[], unsigned cnt) struct timespec ts; int steps; - assert(cnt == 1); + assert(*cnt == 1); /* Throttle output if desired */ if (s->rt) { diff --git a/lib/nodes/socket.c b/lib/nodes/socket.c index 61c7fa2a1..2b853a662 100644 --- a/lib/nodes/socket.c +++ b/lib/nodes/socket.c @@ -336,7 +336,7 @@ int socket_destroy(struct node *n) return 0; } -int socket_read(struct node *n, struct sample *smps[], unsigned cnt) +int socket_read(struct node *n, struct sample *smps[], int *cnt) { int ret; struct socket *s = (struct socket *) n->_vd; @@ -391,7 +391,7 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt) goto out; } - ret = io_sscan(&s->io, ptr, bytes, &rbytes, smps, cnt); + ret = io_sscan(&s->io, ptr, bytes, &rbytes, smps, *cnt); if (ret < 0 || bytes != rbytes) warn("Received invalid packet from node: %s ret=%d, bytes=%zu, rbytes=%zu", node_name(n), ret, bytes, rbytes); diff --git a/lib/nodes/stats.c b/lib/nodes/stats.c index 4a5e0abd9..146e6fbf5 100644 --- a/lib/nodes/stats.c +++ b/lib/nodes/stats.c @@ -119,12 +119,12 @@ int stats_node_destroy(struct node *n) return 0; } -int stats_node_read(struct node *n, struct sample *smps[], unsigned cnt) +int stats_node_read(struct node *n, struct sample *smps[], int *cnt) { struct stats_node *sn = (struct stats_node *) n->_vd; struct stats *s = sn->node->stats; - if (!cnt) + if (!*cnt) return 0; if (!sn->node->stats) diff --git a/lib/nodes/test_rtt.c b/lib/nodes/test_rtt.c index 2de5d887f..a48229ce1 100644 --- a/lib/nodes/test_rtt.c +++ b/lib/nodes/test_rtt.c @@ -277,7 +277,7 @@ int test_rtt_stop(struct node *n) return 0; } -int test_rtt_read(struct node *n, struct sample *smps[], unsigned cnt) +int test_rtt_read(struct node *n, struct sample *smps[], int *cnt) { int i, ret, values; uint64_t steps; @@ -320,7 +320,7 @@ int test_rtt_read(struct node *n, struct sample *smps[], unsigned cnt) struct timespec now = time_now(); /* Prepare samples */ - for (i = 0; i < cnt; i++) { + for (i = 0; i < *cnt; i++) { values = c->values; if (smps[i]->capacity < values) { values = smps[i]->capacity; diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c index a81b8b2e8..29d996ce5 100644 --- a/lib/nodes/websocket.c +++ b/lib/nodes/websocket.c @@ -467,14 +467,14 @@ int websocket_destroy(struct node *n) return 0; } -int websocket_read(struct node *n, struct sample *smps[], unsigned cnt) +int websocket_read(struct node *n, struct sample *smps[], int *cnt) { int avail; struct websocket *w = (struct websocket *) n->_vd; - struct sample *cpys[cnt]; + struct sample *cpys[*cnt]; - avail = queue_signalled_pull_many(&w->queue, (void **) cpys, cnt); + avail = queue_signalled_pull_many(&w->queue, (void **) cpys, *cnt); if (avail < 0) return avail; diff --git a/lib/nodes/zeromq.c b/lib/nodes/zeromq.c index 930e45bf9..db8176c46 100644 --- a/lib/nodes/zeromq.c +++ b/lib/nodes/zeromq.c @@ -419,7 +419,7 @@ int zeromq_destroy(struct node *n) return 0; } -int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt) +int zeromq_read(struct node *n, struct sample *smps[], int *cnt) { int recv, ret; struct zeromq *z = (struct zeromq *) n->_vd; @@ -446,7 +446,7 @@ int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt) if (ret < 0) return ret; - recv = io_sscan(&z->io, zmq_msg_data(&m), zmq_msg_size(&m), NULL, smps, cnt); + recv = io_sscan(&z->io, zmq_msg_data(&m), zmq_msg_size(&m), NULL, smps, *cnt); ret = zmq_msg_close(&m); if (ret) diff --git a/lib/path.c b/lib/path.c index 48048d7e8..5fe35879f 100644 --- a/lib/path.c +++ b/lib/path.c @@ -84,7 +84,7 @@ static void path_source_read(struct path_source *ps, struct path *p, int i) warn("Pool underrun for path source %s", node_name(ps->node)); /* Read ready samples and store them to blocks pointed by smps[] */ - recv = node_read(ps->node, read_smps, ready); + recv = node_read(ps->node, read_smps, &ready); if (recv == 0) goto out2; else if (recv < 0) diff --git a/src/villas-pipe.cpp b/src/villas-pipe.cpp index 0133454e2..82669d01d 100644 --- a/src/villas-pipe.cpp +++ b/src/villas-pipe.cpp @@ -210,7 +210,7 @@ static void * recv_loop(void *ctx) else if (ready < node->in.vectorize) warn("Receive pool underrun"); - recv = node_read(node, smps, ready); + recv = node_read(node, smps, &ready); if (recv < 0) warn("Failed to receive samples from node %s: reason=%d", node_name(node), recv); @@ -239,7 +239,7 @@ int main(int argc, char *argv[]) sendd.enabled = true; sendd.limit = -1; - + recvv.enabled = true; recvv.limit = -1; diff --git a/src/villas-signal.cpp b/src/villas-signal.cpp index 2330d83e3..3e3a05ffb 100644 --- a/src/villas-signal.cpp +++ b/src/villas-signal.cpp @@ -170,8 +170,9 @@ int main(int argc, char *argv[]) for (;;) { t = sample_alloc(&q); - node_read(&n, &t, 1); - io_print(&io, &t, 1); + int one = 1; + node_read(&n, &t, &one); + io_print(&io, &t, one); sample_put(t); } diff --git a/src/villas-test-rtt.cpp b/src/villas-test-rtt.cpp index c591c853e..513bafef3 100644 --- a/src/villas-test-rtt.cpp +++ b/src/villas-test-rtt.cpp @@ -166,8 +166,9 @@ void test_rtt() { while (running && (count < 0 || count--)) { clock_gettime(CLOCK_ID, &send); - node_write(node, &smp_send, 1); /* Ping */ - node_read(node, &smp_recv, 1); /* Pong */ + int one = 1; + node_write(node, &smp_send, one); /* Ping */ + node_read(node, &smp_recv, &one); /* Pong */ clock_gettime(CLOCK_ID, &recv);