diff --git a/include/villas/node.h b/include/villas/node.h index cce336a90..a6f96f939 100644 --- a/include/villas/node.h +++ b/include/villas/node.h @@ -156,7 +156,7 @@ int node_reverse(struct node *n); int node_read(struct node *n, struct sample *smps[], int *cnt); -int node_write(struct node *n, struct sample *smps[], unsigned cnt); +int node_write(struct node *n, struct sample *smps[], int *cnt); int node_fd(struct node *n); diff --git a/include/villas/node_type.h b/include/villas/node_type.h index 5d716aea4..8b5bb0e3b 100644 --- a/include/villas/node_type.h +++ b/include/villas/node_type.h @@ -150,7 +150,7 @@ struct node_type { * @param cnt The number of messages which should be sent. * @return The number of messages actually sent. */ - int (*write)(struct node *n, struct sample *smps[], unsigned cnt); + int (*write)(struct node *n, struct sample *smps[], int *cnt); /** Reverse source and destination of a node. * diff --git a/include/villas/nodes/amqp.h b/include/villas/nodes/amqp.h index 42e379946..7438ab1a7 100644 --- a/include/villas/nodes/amqp.h +++ b/include/villas/nodes/amqp.h @@ -83,7 +83,7 @@ int amqp_stop(struct node *n); int amqp_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ -int amqp_write(struct node *n, struct sample *smps[], unsigned cnt); +int amqp_write(struct node *n, struct sample *smps[], int *cnt); #ifdef __cplusplus } diff --git a/include/villas/nodes/file.h b/include/villas/nodes/file.h index 4ccf2bcb2..c333e4a0b 100644 --- a/include/villas/nodes/file.h +++ b/include/villas/nodes/file.h @@ -86,7 +86,7 @@ int file_stop(struct node *n); int file_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ -int file_write(struct node *n, struct sample *smps[], unsigned cnt); +int file_write(struct node *n, struct sample *smps[], int *cnt); /** @} */ diff --git a/include/villas/nodes/infiniband.h b/include/villas/nodes/infiniband.h index 6452fa661..c60baab38 100644 --- a/include/villas/nodes/infiniband.h +++ b/include/villas/nodes/infiniband.h @@ -139,6 +139,6 @@ int ib_deinit(); int ib_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ -int infiniband_write(struct node *n, struct sample *smps[], unsigned cnt); +int infiniband_write(struct node *n, struct sample *smps[], int *cnt); /** @} */ diff --git a/include/villas/nodes/influxdb.h b/include/villas/nodes/influxdb.h index 3abbde1f9..3d4560923 100644 --- a/include/villas/nodes/influxdb.h +++ b/include/villas/nodes/influxdb.h @@ -65,7 +65,7 @@ int influxdb_open(struct node *n); int influxdb_close(struct node *n); /** @see node_type::write */ -int influxdb_write(struct node *n, struct sample *smps[], unsigned cnt); +int influxdb_write(struct node *n, struct sample *smps[], int *cnt); #ifdef __cplusplus } diff --git a/include/villas/nodes/loopback.h b/include/villas/nodes/loopback.h index 2c6556eb8..be442af3c 100644 --- a/include/villas/nodes/loopback.h +++ b/include/villas/nodes/loopback.h @@ -66,7 +66,7 @@ int loopback_close(struct node *n); int loopback_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ -int loopback_write(struct node *n, struct sample *smps[], unsigned cnt); +int loopback_write(struct node *n, struct sample *smps[], int *cnt); #ifdef __cplusplus } diff --git a/include/villas/nodes/mqtt.h b/include/villas/nodes/mqtt.h index d2f7061a2..d4a8a703c 100644 --- a/include/villas/nodes/mqtt.h +++ b/include/villas/nodes/mqtt.h @@ -98,7 +98,7 @@ int mqtt_deinit(); int mqtt_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ -int mqtt_write(struct node *n, struct sample *smps[], unsigned cnt); +int mqtt_write(struct node *n, struct sample *smps[], int *cnt); #ifdef __cplusplus } diff --git a/include/villas/nodes/nanomsg.h b/include/villas/nodes/nanomsg.h index 715d21122..bfcd1b277 100644 --- a/include/villas/nodes/nanomsg.h +++ b/include/villas/nodes/nanomsg.h @@ -74,7 +74,7 @@ int nanomsg_stop(struct node *n); int nanomsg_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ -int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt); +int nanomsg_write(struct node *n, struct sample *smps[], int *cnt); #ifdef __cplusplus } diff --git a/include/villas/nodes/ngsi.h b/include/villas/nodes/ngsi.h index c43f813e3..107fd5520 100644 --- a/include/villas/nodes/ngsi.h +++ b/include/villas/nodes/ngsi.h @@ -96,7 +96,7 @@ int ngsi_stop(struct node *n); int ngsi_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ -int ngsi_write(struct node *n, struct sample *smps[], unsigned cnt); +int ngsi_write(struct node *n, struct sample *smps[], int *cnt); /** @} */ diff --git a/include/villas/nodes/shmem.h b/include/villas/nodes/shmem.h index a37374111..29937cef0 100644 --- a/include/villas/nodes/shmem.h +++ b/include/villas/nodes/shmem.h @@ -67,7 +67,7 @@ int shmem_stop(struct node *n); int shmem_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ -int shmem_write(struct node *n, struct sample *smps[], unsigned cnt); +int shmem_write(struct node *n, struct sample *smps[], int *cnt); /** @} */ diff --git a/include/villas/nodes/socket.h b/include/villas/nodes/socket.h index cc80c6216..3059e3914 100644 --- a/include/villas/nodes/socket.h +++ b/include/villas/nodes/socket.h @@ -119,7 +119,7 @@ int socket_start(struct node *n); int socket_stop(struct node *n); /** @see node_type::write */ -int socket_write(struct node *n, struct sample *smps[], unsigned cnt); +int socket_write(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::read */ int socket_read(struct node *n, struct sample *smps[], int *cnt); diff --git a/include/villas/nodes/test_rtt.h b/include/villas/nodes/test_rtt.h index b65f92030..816327c1b 100644 --- a/include/villas/nodes/test_rtt.h +++ b/include/villas/nodes/test_rtt.h @@ -82,7 +82,7 @@ int test_rtt_stop(struct node *n); int test_rtt_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ -int test_rtt_write(struct node *n, struct sample *smps[], unsigned cnt); +int test_rtt_write(struct node *n, struct sample *smps[], int *cnt); /** @} */ diff --git a/include/villas/nodes/websocket.h b/include/villas/nodes/websocket.h index 6d6c15ad7..5160cd8b1 100644 --- a/include/villas/nodes/websocket.h +++ b/include/villas/nodes/websocket.h @@ -115,7 +115,7 @@ int websocket_destroy(struct node *n); int websocket_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ -int websocket_write(struct node *n, struct sample *smps[], unsigned cnt); +int websocket_write(struct node *n, struct sample *smps[], int *cnt); /** @} */ diff --git a/include/villas/nodes/zeromq.h b/include/villas/nodes/zeromq.h index 997388b45..72b7ac7a8 100644 --- a/include/villas/nodes/zeromq.h +++ b/include/villas/nodes/zeromq.h @@ -105,7 +105,7 @@ int zeromq_stop(struct node *n); int zeromq_read(struct node *n, struct sample *smps[], int *cnt); /** @see node_type::write */ -int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt); +int zeromq_write(struct node *n, struct sample *smps[], int *cnt); /** @} */ diff --git a/lib/node.c b/lib/node.c index 660a6ed34..6505d1d57 100644 --- a/lib/node.c +++ b/lib/node.c @@ -471,7 +471,7 @@ int node_read(struct node *n, struct sample *smps[], int *cnt) #endif /* WITH_HOOKS */ } -int node_write(struct node *n, struct sample *smps[], unsigned cnt) +int node_write(struct node *n, struct sample *smps[], int *cnt) { int sent, nsent = 0; @@ -480,15 +480,16 @@ int node_write(struct node *n, struct sample *smps[], unsigned cnt) #ifdef WITH_HOOKS /* Run write hooks */ - cnt = hook_write_list(&n->out.hooks, smps, cnt); - if (cnt <= 0) - return cnt; + *cnt = hook_write_list(&n->out.hooks, smps, *cnt); + if (*cnt <= 0) + return *cnt; #endif /* WITH_HOOKS */ /* Send in parts if vector not supported */ - if (n->_vt->vectorize > 0 && n->_vt->vectorize < cnt) { - while (cnt - nsent > 0) { - sent = n->_vt->write(n, &smps[nsent], MIN(cnt - nsent, n->_vt->vectorize)); + if (n->_vt->vectorize > 0 && n->_vt->vectorize < *cnt) { + int cnt_vec_min = MIN(*cnt - nsent, n->_vt->vectorize); + while (*cnt - nsent > 0) { + sent = n->_vt->write(n, &smps[nsent], &cnt_vec_min); if (sent < 0) return sent; diff --git a/lib/nodes/amqp.c b/lib/nodes/amqp.c index 13ae172ac..03f8c8107 100644 --- a/lib/nodes/amqp.c +++ b/lib/nodes/amqp.c @@ -319,14 +319,14 @@ int amqp_read(struct node *n, struct sample *smps[], int *cnt) return ret; } -int amqp_write(struct node *n, struct sample *smps[], unsigned cnt) +int amqp_write(struct node *n, struct sample *smps[], int *cnt) { int ret; struct amqp *a = n->_vd; char data[1500]; size_t wbytes; - ret = io_sprint(&a->io, data, sizeof(data), &wbytes, smps, cnt); + ret = io_sprint(&a->io, data, sizeof(data), &wbytes, smps, *cnt); if (ret <= 0) return -1; @@ -344,7 +344,7 @@ int amqp_write(struct node *n, struct sample *smps[], unsigned cnt) if (ret != AMQP_STATUS_OK) return -1; - return cnt; + return *cnt; } int amqp_fd(struct node *n) diff --git a/lib/nodes/cbuilder.c b/lib/nodes/cbuilder.c index acd58874b..ace12526a 100644 --- a/lib/nodes/cbuilder.c +++ b/lib/nodes/cbuilder.c @@ -127,7 +127,7 @@ int cbuilder_read(struct node *n, struct sample *smps[], int *cnt) return 1; } -int cbuilder_write(struct node *n, struct sample *smps[], unsigned cnt) +int cbuilder_write(struct node *n, struct sample *smps[], int *cnt) { struct cbuilder *cb = (struct cbuilder *) n->_vd; struct sample *smp = smps[0]; diff --git a/lib/nodes/file.c b/lib/nodes/file.c index a9dcbe67d..4fcec4b60 100644 --- a/lib/nodes/file.c +++ b/lib/nodes/file.c @@ -345,15 +345,15 @@ retry: ret = io_scan(&f->io, smps, *cnt); return *cnt; } -int file_write(struct node *n, struct sample *smps[], unsigned cnt) +int file_write(struct node *n, struct sample *smps[], int *cnt) { struct file *f = (struct file *) n->_vd; - assert(cnt == 1); + assert(*cnt == 1); - io_print(&f->io, smps, cnt); + io_print(&f->io, smps, *cnt); - return cnt; + return *cnt; } int file_fd(struct node *n) diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index 1549642aa..718c5508b 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -818,11 +818,11 @@ int ib_read(struct node *n, struct sample *smps[], int *cnt) return ret; } -int ib_write(struct node *n, struct sample *smps[], unsigned cnt) +int ib_write(struct node *n, struct sample *smps[], int *cnt) { struct infiniband *ib = (struct infiniband *) n->_vd; - struct ibv_send_wr wr[cnt], *bad_wr = NULL; - struct ibv_sge sge[cnt]; + struct ibv_send_wr wr[*cnt], *bad_wr = NULL; + struct ibv_sge sge[*cnt]; struct ibv_mr *mr; int ret; @@ -839,7 +839,7 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt) // Get Memory Region mr = memory_ib_get_mr(smps[0]); - for (int i = 0; i < cnt; i++) { + for (int i = 0; i < *cnt; i++) { // Increase refcnt of sample sample_get(smps[i]); @@ -853,7 +853,7 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt) wr[i].sg_list = &sge[i]; wr[i].num_sge = 1; - if (i == (cnt-1)) { + if (i == (*cnt-1)) { debug(LOG_IB | 10, "Prepared %i send Work Requests", (i+1)); wr[i].next = NULL; } @@ -877,7 +877,7 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt) debug(LOG_IB | 4, "Succesfully posted receive Work Requests"); } - return cnt; + return *cnt; } int ib_fd(struct node *n) diff --git a/lib/nodes/influxdb.c b/lib/nodes/influxdb.c index b6fae2062..505141ff0 100644 --- a/lib/nodes/influxdb.c +++ b/lib/nodes/influxdb.c @@ -114,14 +114,14 @@ int influxdb_close(struct node *n) return 0; } -int influxdb_write(struct node *n, struct sample *smps[], unsigned cnt) +int influxdb_write(struct node *n, struct sample *smps[], int *cnt) { struct influxdb *i = (struct influxdb *) n->_vd; char *buf = NULL; ssize_t sentlen, buflen; - for (int k = 0; k < cnt; k++) { + for (int k = 0; k < *cnt; k++) { /* Key */ strcatf(&buf, "%s", i->key); @@ -156,7 +156,7 @@ int influxdb_write(struct node *n, struct sample *smps[], unsigned cnt) free(buf); - return cnt; + return *cnt; } char * influxdb_print(struct node *n) diff --git a/lib/nodes/loopback.c b/lib/nodes/loopback.c index 2ab769d19..0c8c277a2 100644 --- a/lib/nodes/loopback.c +++ b/lib/nodes/loopback.c @@ -86,15 +86,15 @@ int loopback_read(struct node *n, struct sample *smps[], int *cnt) return avail; } -int loopback_write(struct node *n, struct sample *smps[], unsigned cnt) +int loopback_write(struct node *n, struct sample *smps[], int *cnt) { int copied; struct loopback *l = (struct loopback *) n->_vd; - struct sample *copies[cnt]; + struct sample *copies[*cnt]; - copied = sample_alloc_many(&l->pool, copies, cnt); - if (copied < cnt) + copied = sample_alloc_many(&l->pool, copies, *cnt); + if (copied < *cnt) warn("Pool underrun for node %s", node_name(n)); sample_copy_many(copies, smps, copied); diff --git a/lib/nodes/mqtt.c b/lib/nodes/mqtt.c index ee0230d31..f6c9e1fc1 100644 --- a/lib/nodes/mqtt.c +++ b/lib/nodes/mqtt.c @@ -372,7 +372,7 @@ int mqtt_read(struct node *n, struct sample *smps[], int *cnt) return pulled; } -int mqtt_write(struct node *n, struct sample *smps[], unsigned cnt) +int mqtt_write(struct node *n, struct sample *smps[], int *cnt) { int ret; struct mqtt *m = (struct mqtt *) n->_vd; @@ -381,7 +381,7 @@ int mqtt_write(struct node *n, struct sample *smps[], unsigned cnt) char data[1500]; - ret = io_sprint(&m->io, data, sizeof(data), &wbytes, smps, cnt); + ret = io_sprint(&m->io, data, sizeof(data), &wbytes, smps, *cnt); if (ret < 0) return ret; @@ -391,7 +391,7 @@ int mqtt_write(struct node *n, struct sample *smps[], unsigned cnt) return -abs(ret); } - return cnt; + return *cnt; } int mqtt_fd(struct node *n) diff --git a/lib/nodes/nanomsg.c b/lib/nodes/nanomsg.c index f6f397d40..15ddd7154 100644 --- a/lib/nodes/nanomsg.c +++ b/lib/nodes/nanomsg.c @@ -242,7 +242,7 @@ int nanomsg_read(struct node *n, struct sample *smps[], int *cnt) return io_sscan(&m->io, data, bytes, NULL, smps, *cnt); } -int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt) +int nanomsg_write(struct node *n, struct sample *smps[], int *cnt) { int ret; struct nanomsg *m = (struct nanomsg *) n->_vd; @@ -251,7 +251,7 @@ int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt) char data[NANOMSG_MAX_PACKET_LEN]; - ret = io_sprint(&m->io, data, sizeof(data), &wbytes, smps, cnt); + ret = io_sprint(&m->io, data, sizeof(data), &wbytes, smps, *cnt); if (ret <= 0) return -1; @@ -259,7 +259,7 @@ int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt) if (ret < 0) return ret; - return cnt; + return *cnt; } int nanomsg_fd(struct node *n) diff --git a/lib/nodes/ngsi.c b/lib/nodes/ngsi.c index 36a61cee6..5424f8ad0 100644 --- a/lib/nodes/ngsi.c +++ b/lib/nodes/ngsi.c @@ -558,18 +558,18 @@ out: json_decref(entity); return ret; } -int ngsi_write(struct node *n, struct sample *smps[], unsigned cnt) +int ngsi_write(struct node *n, struct sample *smps[], int *cnt) { struct ngsi *i = (struct ngsi *) n->_vd; int ret; - json_t *entity = ngsi_build_entity(i, smps, cnt, NGSI_ENTITY_VALUES); + json_t *entity = ngsi_build_entity(i, smps, *cnt, NGSI_ENTITY_VALUES); ret = ngsi_request_context_update(i->curl, i->endpoint, "UPDATE", entity); json_decref(entity); - return ret ? 0 : cnt; + return ret ? 0 : *cnt; } int ngsi_fd(struct node *n) diff --git a/lib/nodes/shmem.c b/lib/nodes/shmem.c index 0461ab7ab..8cba0de87 100644 --- a/lib/nodes/shmem.c +++ b/lib/nodes/shmem.c @@ -134,14 +134,14 @@ int shmem_read(struct node *n, struct sample *smps[], int *cnt) return recv; } -int shmem_write(struct node *n, struct sample *smps[], unsigned cnt) +int shmem_write(struct node *n, struct sample *smps[], int *cnt) { struct shmem *shm = (struct shmem *) n->_vd; - struct sample *shared_smps[cnt]; /* Samples need to be copied to the shared pool first */ + struct sample *shared_smps[*cnt]; /* Samples need to be copied to the shared pool first */ int avail, pushed, copied; - avail = sample_alloc_many(&shm->intf.write.shared->pool, shared_smps, cnt); - if (avail != cnt) + avail = sample_alloc_many(&shm->intf.write.shared->pool, shared_smps, *cnt); + if (avail != *cnt) warn("Pool underrun for shmem node %s", shm->out_name); copied = sample_copy_many(shared_smps, smps, avail); diff --git a/lib/nodes/socket.c b/lib/nodes/socket.c index 2b853a662..b5eccb4c0 100644 --- a/lib/nodes/socket.c +++ b/lib/nodes/socket.c @@ -401,7 +401,7 @@ out: free(buf); return ret; } -int socket_write(struct node *n, struct sample *smps[], unsigned cnt) +int socket_write(struct node *n, struct sample *smps[], int *cnt) { struct socket *s = (struct socket *) n->_vd; @@ -416,7 +416,7 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt) if (!buf) return -1; -retry: ret = io_sprint(&s->io, buf, buflen, &wbytes, smps, cnt); +retry: ret = io_sprint(&s->io, buf, buflen, &wbytes, smps, *cnt); if (ret < 0) goto out; diff --git a/lib/nodes/test_rtt.c b/lib/nodes/test_rtt.c index a48229ce1..8cd801a0d 100644 --- a/lib/nodes/test_rtt.c +++ b/lib/nodes/test_rtt.c @@ -348,7 +348,7 @@ int test_rtt_read(struct node *n, struct sample *smps[], int *cnt) return i; } -int test_rtt_write(struct node *n, struct sample *smps[], unsigned cnt) +int test_rtt_write(struct node *n, struct sample *smps[], int *cnt) { struct test_rtt *t = (struct test_rtt *) n->_vd; @@ -358,7 +358,7 @@ int test_rtt_write(struct node *n, struct sample *smps[], unsigned cnt) struct test_rtt_case *c = (struct test_rtt_case *) list_at(&t->cases, t->current); int i; - for (i = 0; i < cnt; i++) { + for (i = 0; i < *cnt; i++) { if (smps[i]->length != c->values) { warn("Discarding invalid sample due to mismatching length: expecting=%d, has=%d", c->values, smps[i]->length); continue; diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c index 29d996ce5..8eea85ba9 100644 --- a/lib/nodes/websocket.c +++ b/lib/nodes/websocket.c @@ -484,16 +484,16 @@ int websocket_read(struct node *n, struct sample *smps[], int *cnt) return avail; } -int websocket_write(struct node *n, struct sample *smps[], unsigned cnt) +int websocket_write(struct node *n, struct sample *smps[], int *cnt) { int avail; struct websocket *w = (struct websocket *) n->_vd; - struct sample *cpys[cnt]; + struct sample *cpys[*cnt]; /* Make copies of all samples */ - avail = sample_alloc_many(&w->pool, cpys, cnt); - if (avail < cnt) + avail = sample_alloc_many(&w->pool, cpys, *cnt); + if (avail < *cnt) warn("Pool underrun for node %s: avail=%u", node_name(n), avail); sample_copy_many(cpys, smps, avail); @@ -502,12 +502,12 @@ int websocket_write(struct node *n, struct sample *smps[], unsigned cnt) struct websocket_connection *c = (struct websocket_connection *) list_at(&connections, i); if (c->node == n) - websocket_connection_write(c, cpys, cnt); + websocket_connection_write(c, cpys, *cnt); } sample_put_many(cpys, avail); - return cnt; + return *cnt; } int websocket_parse(struct node *n, json_t *cfg) diff --git a/lib/nodes/zeromq.c b/lib/nodes/zeromq.c index db8176c46..a5abf5c92 100644 --- a/lib/nodes/zeromq.c +++ b/lib/nodes/zeromq.c @@ -455,7 +455,7 @@ int zeromq_read(struct node *n, struct sample *smps[], int *cnt) return recv; } -int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt) +int zeromq_write(struct node *n, struct sample *smps[], int *cnt) { int ret; struct zeromq *z = (struct zeromq *) n->_vd; @@ -465,7 +465,7 @@ int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt) char data[4096]; - ret = io_sprint(&z->io, data, sizeof(data), &wbytes, smps, cnt); + ret = io_sprint(&z->io, data, sizeof(data), &wbytes, smps, *cnt); if (ret <= 0) return -1; @@ -497,7 +497,7 @@ int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt) if (ret < 0) return ret; - return cnt; + return *cnt; fail: zmq_msg_close(&m); diff --git a/lib/path.c b/lib/path.c index 5fe35879f..870425f44 100644 --- a/lib/path.c +++ b/lib/path.c @@ -211,7 +211,7 @@ static void path_destination_write(struct path_destination *pd, struct path *p) debug(LOG_PATH | 15, "Dequeued %u samples from queue of node %s which is part of path %s", available, node_name(pd->node), path_name(p)); - sent = node_write(pd->node, smps, available); + sent = node_write(pd->node, smps, &available); if (sent < 0) error("Failed to sent %u samples to node %s", cnt, node_name(pd->node)); else if (sent < available) diff --git a/src/villas-pipe.cpp b/src/villas-pipe.cpp index 82669d01d..517dcad95 100644 --- a/src/villas-pipe.cpp +++ b/src/villas-pipe.cpp @@ -161,7 +161,7 @@ static void * send_loop(void *ctx) smps[i]->sequence = last_sequenceno++; } - sent = node_write(node, smps, scanned); + sent = node_write(node, smps, &scanned); if (sent < 0) warn("Failed to sent samples to node %s: reason=%d", node_name(node), sent); else if (sent < scanned) diff --git a/src/villas-test-rtt.cpp b/src/villas-test-rtt.cpp index 513bafef3..0f1dfa81f 100644 --- a/src/villas-test-rtt.cpp +++ b/src/villas-test-rtt.cpp @@ -167,7 +167,7 @@ void test_rtt() { clock_gettime(CLOCK_ID, &send); int one = 1; - node_write(node, &smp_send, one); /* Ping */ + node_write(node, &smp_send, &one); /* Ping */ node_read(node, &smp_recv, &one); /* Pong */ clock_gettime(CLOCK_ID, &recv);