1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

Fixes #166, all node interfaces are modified

The functions now look like this

int node_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release);
int node_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release);

This commit enables nodes to control how many samples will
be released by the framework through *release
This commit is contained in:
Dennis Potter 2018-07-11 18:14:29 +02:00
parent 29ff75fad3
commit 72e627b327
38 changed files with 188 additions and 175 deletions

View file

@ -154,9 +154,9 @@ char * node_name_long(struct node *n);
*/
int node_reverse(struct node *n);
int node_read(struct node *n, struct sample *smps[], int *cnt);
int node_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release);
int node_write(struct node *n, struct sample *smps[], int *cnt);
int node_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release);
int node_fd(struct node *n);

View file

@ -136,7 +136,7 @@ struct node_type {
* @param cnt The number of messages which should be received.
* @return The number of messages actually received.
*/
int (*read)(struct node *n, struct sample *smps[], int *cnt);
int (*read)(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release);
/** Send multiple messages in a single datagram / packet.
*
@ -150,7 +150,7 @@ struct node_type {
* @param cnt The number of messages which should be sent.
* @return The number of messages actually sent.
*/
int (*write)(struct node *n, struct sample *smps[], int *cnt);
int (*write)(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release);
/** Reverse source and destination of a node.
*

View file

@ -80,10 +80,10 @@ int amqp_start(struct node *n);
int amqp_stop(struct node *n);
/** @see node_type::read */
int amqp_read(struct node *n, struct sample *smps[], int *cnt);
int amqp_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release);
/** @see node_type::write */
int amqp_write(struct node *n, struct sample *smps[], int *cnt);
int amqp_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release);
#ifdef __cplusplus
}

View file

@ -83,10 +83,10 @@ int file_start(struct node *n);
int file_stop(struct node *n);
/** @see node_type::read */
int file_read(struct node *n, struct sample *smps[], int *cnt);
int file_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release);
/** @see node_type::write */
int file_write(struct node *n, struct sample *smps[], int *cnt);
int file_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release);
/** @} */

View file

@ -121,9 +121,9 @@ int ib_init(struct super_node *n);
int ib_deinit();
/** @see node_type::read */
int ib_read(struct node *n, struct sample *smps[], int *cnt);
int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release);
/** @see node_type::write */
int infiniband_write(struct node *n, struct sample *smps[], int *cnt);
int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release);
/** @} */

View file

@ -65,7 +65,7 @@ int influxdb_open(struct node *n);
int influxdb_close(struct node *n);
/** @see node_type::write */
int influxdb_write(struct node *n, struct sample *smps[], int *cnt);
int influxdb_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release);
#ifdef __cplusplus
}

View file

@ -63,10 +63,10 @@ int loopback_open(struct node *n);
int loopback_close(struct node *n);
/** @see node_type::read */
int loopback_read(struct node *n, struct sample *smps[], int *cnt);
int loopback_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release);
/** @see node_type::write */
int loopback_write(struct node *n, struct sample *smps[], int *cnt);
int loopback_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release);
#ifdef __cplusplus
}

View file

@ -95,10 +95,10 @@ int mqtt_init();
int mqtt_deinit();
/** @see node_type::read */
int mqtt_read(struct node *n, struct sample *smps[], int *cnt);
int mqtt_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release);
/** @see node_type::write */
int mqtt_write(struct node *n, struct sample *smps[], int *cnt);
int mqtt_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release);
#ifdef __cplusplus
}

View file

@ -71,10 +71,10 @@ int nanomsg_start(struct node *n);
int nanomsg_stop(struct node *n);
/** @see node_type::read */
int nanomsg_read(struct node *n, struct sample *smps[], int *cnt);
int nanomsg_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release);
/** @see node_type::write */
int nanomsg_write(struct node *n, struct sample *smps[], int *cnt);
int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release);
#ifdef __cplusplus
}

View file

@ -93,10 +93,10 @@ int ngsi_start(struct node *n);
int ngsi_stop(struct node *n);
/** @see node_type::read */
int ngsi_read(struct node *n, struct sample *smps[], int *cnt);
int ngsi_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release);
/** @see node_type::write */
int ngsi_write(struct node *n, struct sample *smps[], int *cnt);
int ngsi_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release);
/** @} */

View file

@ -64,10 +64,10 @@ int shmem_start(struct node *n);
int shmem_stop(struct node *n);
/** @see node_type::read */
int shmem_read(struct node *n, struct sample *smps[], int *cnt);
int shmem_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release);
/** @see node_type::write */
int shmem_write(struct node *n, struct sample *smps[], int *cnt);
int shmem_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release);
/** @} */

View file

@ -89,7 +89,7 @@ int signal_generator_start(struct node *n);
int signal_generator_stop(struct node *n);
/** @see node_type::read */
int signal_generator_read(struct node *n, struct sample *smps[], int *cnt);
int signal_generator_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release);
enum signal_generator_type signal_generator_lookup_type(const char *type);

View file

@ -119,10 +119,10 @@ int socket_start(struct node *n);
int socket_stop(struct node *n);
/** @see node_type::write */
int socket_write(struct node *n, struct sample *smps[], int *cnt);
int socket_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release);
/** @see node_type::read */
int socket_read(struct node *n, struct sample *smps[], int *cnt);
int socket_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release);
/** @see node_type::parse */
int socket_parse(struct node *n, json_t *cfg);

View file

@ -67,7 +67,7 @@ int stats_node_start(struct node *n);
int stats_node_stop(struct node *n);
/** @see node_type::read */
int stats_node_read(struct node *n, struct sample *smps[], int *cnt);
int stats_node_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release);
#ifdef __cplusplus
}

View file

@ -79,10 +79,10 @@ int test_rtt_start(struct node *n);
int test_rtt_stop(struct node *n);
/** @see node_type::read */
int test_rtt_read(struct node *n, struct sample *smps[], int *cnt);
int test_rtt_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release);
/** @see node_type::write */
int test_rtt_write(struct node *n, struct sample *smps[], int *cnt);
int test_rtt_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release);
/** @} */

View file

@ -112,10 +112,10 @@ int websocket_stop(struct node *n);
int websocket_destroy(struct node *n);
/** @see node_type::read */
int websocket_read(struct node *n, struct sample *smps[], int *cnt);
int websocket_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release);
/** @see node_type::write */
int websocket_write(struct node *n, struct sample *smps[], int *cnt);
int websocket_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release);
/** @} */

View file

@ -102,10 +102,10 @@ int zeromq_start(struct node *n);
int zeromq_stop(struct node *n);
/** @see node_type::read */
int zeromq_read(struct node *n, struct sample *smps[], int *cnt);
int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release);
/** @see node_type::write */
int zeromq_write(struct node *n, struct sample *smps[], int *cnt);
int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release);
/** @} */

View file

@ -408,7 +408,7 @@ int node_destroy(struct node *n)
return 0;
}
int node_read(struct node *n, struct sample *smps[], int *cnt)
int node_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
{
int readd, nread = 0;
@ -416,10 +416,9 @@ int node_read(struct node *n, struct sample *smps[], int *cnt)
return -1;
/* Send in parts if vector not supported */
if (n->_vt->vectorize > 0 && n->_vt->vectorize < *cnt) {
int cnt_vec_min = MIN(*cnt - nread, n->_vt->vectorize);
while (*cnt - nread > 0) {
readd = n->_vt->read(n, &smps[nread], &cnt_vec_min);
if (n->_vt->vectorize > 0 && n->_vt->vectorize < cnt) {
while (cnt - nread > 0) {
readd = n->_vt->read(n, &smps[nread], MIN(cnt - nread, n->_vt->vectorize), release);
if (readd < 0)
return readd;
@ -427,7 +426,7 @@ int node_read(struct node *n, struct sample *smps[], int *cnt)
}
}
else {
nread = n->_vt->read(n, smps, cnt);
nread = n->_vt->read(n, smps, cnt, release);
if (nread < 0)
return nread;
}
@ -471,7 +470,7 @@ int node_read(struct node *n, struct sample *smps[], int *cnt)
#endif /* WITH_HOOKS */
}
int node_write(struct node *n, struct sample *smps[], int *cnt)
int node_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
{
int sent, nsent = 0;
@ -480,16 +479,15 @@ int node_write(struct node *n, struct sample *smps[], int *cnt)
#ifdef WITH_HOOKS
/* Run write hooks */
*cnt = hook_write_list(&n->out.hooks, smps, *cnt);
if (*cnt <= 0)
return *cnt;
cnt = hook_write_list(&n->out.hooks, smps, cnt);
if (cnt <= 0)
return cnt;
#endif /* WITH_HOOKS */
/* Send in parts if vector not supported */
if (n->_vt->vectorize > 0 && n->_vt->vectorize < *cnt) {
int cnt_vec_min = MIN(*cnt - nsent, n->_vt->vectorize);
while (*cnt - nsent > 0) {
sent = n->_vt->write(n, &smps[nsent], &cnt_vec_min);
if (n->_vt->vectorize > 0 && n->_vt->vectorize < cnt) {
while (cnt - nsent > 0) {
sent = n->_vt->write(n, &smps[nsent], MIN(cnt - nsent, n->_vt->vectorize), release);
if (sent < 0)
return sent;
@ -498,7 +496,7 @@ int node_write(struct node *n, struct sample *smps[], int *cnt)
}
}
else {
nsent = n->_vt->write(n, smps, cnt);
nsent = n->_vt->write(n, smps, cnt, release);
if (nsent < 0)
return nsent;

View file

@ -301,7 +301,7 @@ int amqp_stop(struct node *n)
return 0;
}
int amqp_read(struct node *n, struct sample *smps[], int *cnt)
int amqp_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
{
int ret;
struct amqp *a = n->_vd;
@ -312,21 +312,21 @@ int amqp_read(struct node *n, struct sample *smps[], int *cnt)
if (rep.reply_type != AMQP_RESPONSE_NORMAL)
return -1;
ret = io_sscan(&a->io, env.message.body.bytes, env.message.body.len, NULL, smps, *cnt);
ret = io_sscan(&a->io, env.message.body.bytes, env.message.body.len, NULL, smps, cnt);
amqp_destroy_envelope(&env);
return ret;
}
int amqp_write(struct node *n, struct sample *smps[], int *cnt)
int amqp_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
{
int ret;
struct amqp *a = n->_vd;
char data[1500];
size_t wbytes;
ret = io_sprint(&a->io, data, sizeof(data), &wbytes, smps, *cnt);
ret = io_sprint(&a->io, data, sizeof(data), &wbytes, smps, cnt);
if (ret <= 0)
return -1;
@ -344,7 +344,7 @@ int amqp_write(struct node *n, struct sample *smps[], int *cnt)
if (ret != AMQP_STATUS_OK)
return -1;
return *cnt;
return cnt;
}
int amqp_fd(struct node *n)

View file

@ -94,7 +94,7 @@ int cbuilder_stop(struct node *n)
return 0;
}
int cbuilder_read(struct node *n, struct sample *smps[], int *cnt)
int cbuilder_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
{
struct cbuilder *cb = (struct cbuilder *) n->_vd;
struct sample *smp = smps[0];
@ -127,7 +127,7 @@ int cbuilder_read(struct node *n, struct sample *smps[], int *cnt)
return 1;
}
int cbuilder_write(struct node *n, struct sample *smps[], int *cnt)
int cbuilder_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
{
struct cbuilder *cb = (struct cbuilder *) n->_vd;
struct sample *smp = smps[0];

View file

@ -278,15 +278,15 @@ int file_destroy(struct node *n)
return 0;
}
int file_read(struct node *n, struct sample *smps[], int *cnt)
int file_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
{
struct file *f = (struct file *) n->_vd;
int ret;
uint64_t steps;
assert(*cnt == 1);
assert(cnt == 1);
retry: ret = io_scan(&f->io, smps, *cnt);
retry: ret = io_scan(&f->io, smps, cnt);
if (ret <= 0) {
if (io_eof(&f->io)) {
switch (f->eof) {
@ -322,7 +322,7 @@ retry: ret = io_scan(&f->io, smps, *cnt);
/* We dont wait in FILE_EPOCH_ORIGINAL mode */
if (f->epoch_mode == FILE_EPOCH_ORIGINAL)
return *cnt;
return cnt;
if (f->rate) {
steps = task_wait(&f->task);
@ -342,18 +342,18 @@ retry: ret = io_scan(&f->io, smps, *cnt);
else if (steps != 1)
warn("Missed steps: %" PRIu64, steps - 1);
return *cnt;
return cnt;
}
int file_write(struct node *n, struct sample *smps[], int *cnt)
int file_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
{
struct file *f = (struct file *) n->_vd;
assert(*cnt == 1);
assert(cnt == 1);
io_print(&f->io, smps, *cnt);
io_print(&f->io, smps, cnt);
return *cnt;
return cnt;
}
int file_fd(struct node *n)

View file

@ -618,12 +618,12 @@ int ib_deinit()
return 0;
}
int ib_read(struct node *n, struct sample *smps[], int *cnt)
int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
{
struct infiniband *ib = (struct infiniband *) n->_vd;
struct ibv_wc wc[n->in.vectorize];
struct ibv_recv_wr wr[*cnt], *bad_wr = NULL;
struct ibv_sge sge[*cnt];
struct ibv_recv_wr wr[cnt], *bad_wr = NULL;
struct ibv_sge sge[cnt];
struct ibv_mr *mr;
int ret = 0;
int i = 0; //Used for first loop: post receive Work Requests
@ -634,11 +634,11 @@ int ib_read(struct node *n, struct sample *smps[], int *cnt)
if (n->state == STATE_CONNECTED) {
if (ib->conn.available_recv_wrs < ib->qp_init.cap.max_recv_wr && *cnt) {
if (ib->conn.available_recv_wrs < ib->qp_init.cap.max_recv_wr && cnt) {
// Get Memory Region
mr = memory_ib_get_mr(smps[0]);
for (i = 0; i < *cnt; i++) {
for (i = 0; i < cnt; i++) {
// Prepare receive Scatter/Gather element
sge[i].addr = (uint64_t) &smps[i]->data;
sge[i].length = SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN);
@ -652,7 +652,7 @@ int ib_read(struct node *n, struct sample *smps[], int *cnt)
ib->conn.available_recv_wrs++;
if (ib->conn.available_recv_wrs == ib->qp_init.cap.max_recv_wr || i==(*cnt-1)) {
if (ib->conn.available_recv_wrs == ib->qp_init.cap.max_recv_wr || i==(cnt-1)) {
debug(LOG_IB | 10, "Prepared %i new receive Work Requests", (i+1));
wr[i].next = NULL;
@ -705,26 +705,26 @@ int ib_read(struct node *n, struct sample *smps[], int *cnt)
// unused values.
// * j ==> Values from completion queue
// * i ==> Values posted to receive queue
// * (*cnt) ==> Available values to post to receive queue
// * (cnt) ==> Available values to post to receive queue
//
// Thus:
// * (*cnt - i) ==> number of unused values
// * (cnt - i) ==> number of unused values
int l;
for (k = j, l = 0; k < j + (*cnt - i); k++, l++) {
for (k = j, l = 0; k < j + (cnt - i); k++, l++) {
smps[k] = smps[i + l];
}
*cnt = k;
*release = k;
}
return ret;
}
int ib_write(struct node *n, struct sample *smps[], int *cnt)
int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *ready)
{
struct infiniband *ib = (struct infiniband *) n->_vd;
struct ibv_send_wr wr[*cnt], *bad_wr = NULL;
struct ibv_sge sge[*cnt];
struct ibv_send_wr wr[cnt], *bad_wr = NULL;
struct ibv_sge sge[cnt];
struct ibv_wc wc[ib->cq_size];
struct ibv_mr *mr;
int ret;
@ -743,7 +743,7 @@ int ib_write(struct node *n, struct sample *smps[], int *cnt)
// Get Memory Region
mr = memory_ib_get_mr(smps[0]);
for (i = 0; i < *cnt; i++) {
for (i = 0; i < cnt; i++) {
//Set Scatter/Gather element to data of sample
sge[i].addr = (uint64_t) &smps[i]->data;
sge[i].length = smps[i]->length*sizeof(double);
@ -754,7 +754,7 @@ int ib_write(struct node *n, struct sample *smps[], int *cnt)
wr[i].sg_list = &sge[i];
wr[i].num_sge = 1;
if (i == (*cnt-1)) {
if (i == (cnt-1)) {
debug(LOG_IB | 10, "Prepared %i send Work Requests", (i+1));
wr[i].next = NULL;
}
@ -786,11 +786,10 @@ int ib_write(struct node *n, struct sample *smps[], int *cnt)
warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i",
node_name(n), wc[j].status);
smps[j] = (struct sample *) (wc[j].wr_id);
}
*cnt = j;
*ready = j;
}
return i;

View file

@ -114,14 +114,14 @@ int influxdb_close(struct node *n)
return 0;
}
int influxdb_write(struct node *n, struct sample *smps[], int *cnt)
int influxdb_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
{
struct influxdb *i = (struct influxdb *) n->_vd;
char *buf = NULL;
ssize_t sentlen, buflen;
for (int k = 0; k < *cnt; k++) {
for (int k = 0; k < cnt; k++) {
/* Key */
strcatf(&buf, "%s", i->key);
@ -156,7 +156,7 @@ int influxdb_write(struct node *n, struct sample *smps[], int *cnt)
free(buf);
return *cnt;
return cnt;
}
char * influxdb_print(struct node *n)

View file

@ -69,14 +69,14 @@ int loopback_close(struct node *n)
return queue_signalled_destroy(&l->queue);
}
int loopback_read(struct node *n, struct sample *smps[], int *cnt)
int loopback_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
{
int avail;
struct loopback *l = (struct loopback *) n->_vd;
struct sample *cpys[*cnt];
struct sample *cpys[cnt];
avail = queue_signalled_pull_many(&l->queue, (void **) cpys, *cnt);
avail = queue_signalled_pull_many(&l->queue, (void **) cpys, cnt);
for (int i = 0; i < avail; i++) {
sample_copy(smps[i], cpys[i]);
@ -86,15 +86,15 @@ int loopback_read(struct node *n, struct sample *smps[], int *cnt)
return avail;
}
int loopback_write(struct node *n, struct sample *smps[], int *cnt)
int loopback_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
{
int copied;
struct loopback *l = (struct loopback *) n->_vd;
struct sample *copies[*cnt];
struct sample *copies[cnt];
copied = sample_alloc_many(&l->pool, copies, *cnt);
if (copied < *cnt)
copied = sample_alloc_many(&l->pool, copies, cnt);
if (copied < cnt)
warn("Pool underrun for node %s", node_name(n));
sample_copy_many(copies, smps, copied);

View file

@ -358,13 +358,13 @@ int mqtt_deinit()
return 0;
}
int mqtt_read(struct node *n, struct sample *smps[], int *cnt)
int mqtt_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
{
int pulled;
struct mqtt *m = (struct mqtt *) n->_vd;
struct sample *smpt[*cnt];
struct sample *smpt[cnt];
pulled = queue_signalled_pull_many(&m->queue, (void **) smpt, *cnt);
pulled = queue_signalled_pull_many(&m->queue, (void **) smpt, cnt);
sample_copy_many(smps, smpt, pulled);
sample_put_many(smpt, pulled);
@ -372,7 +372,7 @@ int mqtt_read(struct node *n, struct sample *smps[], int *cnt)
return pulled;
}
int mqtt_write(struct node *n, struct sample *smps[], int *cnt)
int mqtt_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
{
int ret;
struct mqtt *m = (struct mqtt *) n->_vd;
@ -381,7 +381,7 @@ int mqtt_write(struct node *n, struct sample *smps[], int *cnt)
char data[1500];
ret = io_sprint(&m->io, data, sizeof(data), &wbytes, smps, *cnt);
ret = io_sprint(&m->io, data, sizeof(data), &wbytes, smps, cnt);
if (ret < 0)
return ret;
@ -391,7 +391,7 @@ int mqtt_write(struct node *n, struct sample *smps[], int *cnt)
return -abs(ret);
}
return *cnt;
return cnt;
}
int mqtt_fd(struct node *n)

View file

@ -228,7 +228,7 @@ int nanomsg_deinit()
return 0;
}
int nanomsg_read(struct node *n, struct sample *smps[], int *cnt)
int nanomsg_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
{
struct nanomsg *m = (struct nanomsg *) n->_vd;
int bytes;
@ -239,10 +239,10 @@ int nanomsg_read(struct node *n, struct sample *smps[], int *cnt)
if (bytes < 0)
return -1;
return io_sscan(&m->io, data, bytes, NULL, smps, *cnt);
return io_sscan(&m->io, data, bytes, NULL, smps, cnt);
}
int nanomsg_write(struct node *n, struct sample *smps[], int *cnt)
int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
{
int ret;
struct nanomsg *m = (struct nanomsg *) n->_vd;
@ -251,7 +251,7 @@ int nanomsg_write(struct node *n, struct sample *smps[], int *cnt)
char data[NANOMSG_MAX_PACKET_LEN];
ret = io_sprint(&m->io, data, sizeof(data), &wbytes, smps, *cnt);
ret = io_sprint(&m->io, data, sizeof(data), &wbytes, smps, cnt);
if (ret <= 0)
return -1;
@ -259,7 +259,7 @@ int nanomsg_write(struct node *n, struct sample *smps[], int *cnt)
if (ret < 0)
return ret;
return *cnt;
return cnt;
}
int nanomsg_fd(struct node *n)

View file

@ -1,4 +1,4 @@
/** Node type: OMA Next Generation Services Interface 10 (NGSI) (FIWARE context broker)
/** Node type: OMA Next Generation Services Interface 9 (NGSI) (FIWARE context broker)
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017, Institute for Automation of Complex Power Systems, EONERC
@ -533,7 +533,7 @@ int ngsi_stop(struct node *n)
return ret;
}
int ngsi_read(struct node *n, struct sample *smps[], int *cnt)
int ngsi_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
{
struct ngsi *i = (struct ngsi *) n->_vd;
int ret;
@ -548,7 +548,7 @@ int ngsi_read(struct node *n, struct sample *smps[], int *cnt)
if (ret)
goto out;
ret = ngsi_parse_entity(rentity, i, smps, *cnt);
ret = ngsi_parse_entity(rentity, i, smps, cnt);
if (ret)
goto out2;
@ -558,18 +558,18 @@ out: json_decref(entity);
return ret;
}
int ngsi_write(struct node *n, struct sample *smps[], int *cnt)
int ngsi_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
{
struct ngsi *i = (struct ngsi *) n->_vd;
int ret;
json_t *entity = ngsi_build_entity(i, smps, *cnt, NGSI_ENTITY_VALUES);
json_t *entity = ngsi_build_entity(i, smps, cnt, NGSI_ENTITY_VALUES);
ret = ngsi_request_context_update(i->curl, i->endpoint, "UPDATE", entity);
json_decref(entity);
return ret ? 0 : *cnt;
return ret ? 0 : cnt;
}
int ngsi_fd(struct node *n)

View file

@ -110,14 +110,14 @@ int shmem_stop(struct node *n)
return shmem_int_close(&shm->intf);
}
int shmem_read(struct node *n, struct sample *smps[], int *cnt)
int shmem_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
{
struct shmem *shm = (struct shmem *) n->_vd;
int recv;
struct sample *shared_smps[*cnt];
struct sample *shared_smps[cnt];
do {
recv = shmem_int_read(&shm->intf, shared_smps, *cnt);
recv = shmem_int_read(&shm->intf, shared_smps, cnt);
} while (recv == 0);
if (recv < 0) {
@ -134,14 +134,14 @@ int shmem_read(struct node *n, struct sample *smps[], int *cnt)
return recv;
}
int shmem_write(struct node *n, struct sample *smps[], int *cnt)
int shmem_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
{
struct shmem *shm = (struct shmem *) n->_vd;
struct sample *shared_smps[*cnt]; /* Samples need to be copied to the shared pool first */
struct sample *shared_smps[cnt]; /* Samples need to be copied to the shared pool first */
int avail, pushed, copied;
avail = sample_alloc_many(&shm->intf.write.shared->pool, shared_smps, *cnt);
if (avail != *cnt)
avail = sample_alloc_many(&shm->intf.write.shared->pool, shared_smps, cnt);
if (avail != cnt)
warn("Pool underrun for shmem node %s", shm->out_name);
copied = sample_copy_many(shared_smps, smps, avail);

View file

@ -212,7 +212,7 @@ int signal_generator_stop(struct node *n)
return 0;
}
int signal_generator_read(struct node *n, struct sample *smps[], int *cnt)
int signal_generator_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
{
struct signal_generator *s = (struct signal_generator *) n->_vd;
struct sample *t = smps[0];
@ -220,7 +220,7 @@ int signal_generator_read(struct node *n, struct sample *smps[], int *cnt)
struct timespec ts;
int steps;
assert(*cnt == 1);
assert(cnt == 1);
/* Throttle output if desired */
if (s->rt) {

View file

@ -336,7 +336,7 @@ int socket_destroy(struct node *n)
return 0;
}
int socket_read(struct node *n, struct sample *smps[], int *cnt)
int socket_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
{
int ret;
struct socket *s = (struct socket *) n->_vd;
@ -391,7 +391,7 @@ int socket_read(struct node *n, struct sample *smps[], int *cnt)
goto out;
}
ret = io_sscan(&s->io, ptr, bytes, &rbytes, smps, *cnt);
ret = io_sscan(&s->io, ptr, bytes, &rbytes, smps, cnt);
if (ret < 0 || bytes != rbytes)
warn("Received invalid packet from node: %s ret=%d, bytes=%zu, rbytes=%zu", node_name(n), ret, bytes, rbytes);
@ -401,7 +401,7 @@ out: free(buf);
return ret;
}
int socket_write(struct node *n, struct sample *smps[], int *cnt)
int socket_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
{
struct socket *s = (struct socket *) n->_vd;
@ -416,7 +416,7 @@ int socket_write(struct node *n, struct sample *smps[], int *cnt)
if (!buf)
return -1;
retry: ret = io_sprint(&s->io, buf, buflen, &wbytes, smps, *cnt);
retry: ret = io_sprint(&s->io, buf, buflen, &wbytes, smps, cnt);
if (ret < 0)
goto out;

View file

@ -119,12 +119,12 @@ int stats_node_destroy(struct node *n)
return 0;
}
int stats_node_read(struct node *n, struct sample *smps[], int *cnt)
int stats_node_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
{
struct stats_node *sn = (struct stats_node *) n->_vd;
struct stats *s = sn->node->stats;
if (!*cnt)
if (!cnt)
return 0;
if (!sn->node->stats)

View file

@ -277,7 +277,7 @@ int test_rtt_stop(struct node *n)
return 0;
}
int test_rtt_read(struct node *n, struct sample *smps[], int *cnt)
int test_rtt_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
{
int i, ret, values;
uint64_t steps;
@ -320,7 +320,7 @@ int test_rtt_read(struct node *n, struct sample *smps[], int *cnt)
struct timespec now = time_now();
/* Prepare samples */
for (i = 0; i < *cnt; i++) {
for (i = 0; i < cnt; i++) {
values = c->values;
if (smps[i]->capacity < values) {
values = smps[i]->capacity;
@ -348,7 +348,7 @@ int test_rtt_read(struct node *n, struct sample *smps[], int *cnt)
return i;
}
int test_rtt_write(struct node *n, struct sample *smps[], int *cnt)
int test_rtt_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
{
struct test_rtt *t = (struct test_rtt *) n->_vd;
@ -358,7 +358,7 @@ int test_rtt_write(struct node *n, struct sample *smps[], int *cnt)
struct test_rtt_case *c = (struct test_rtt_case *) list_at(&t->cases, t->current);
int i;
for (i = 0; i < *cnt; i++) {
for (i = 0; i < cnt; i++) {
if (smps[i]->length != c->values) {
warn("Discarding invalid sample due to mismatching length: expecting=%d, has=%d", c->values, smps[i]->length);
continue;

View file

@ -467,14 +467,14 @@ int websocket_destroy(struct node *n)
return 0;
}
int websocket_read(struct node *n, struct sample *smps[], int *cnt)
int websocket_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
{
int avail;
struct websocket *w = (struct websocket *) n->_vd;
struct sample *cpys[*cnt];
struct sample *cpys[cnt];
avail = queue_signalled_pull_many(&w->queue, (void **) cpys, *cnt);
avail = queue_signalled_pull_many(&w->queue, (void **) cpys, cnt);
if (avail < 0)
return avail;
@ -484,16 +484,16 @@ int websocket_read(struct node *n, struct sample *smps[], int *cnt)
return avail;
}
int websocket_write(struct node *n, struct sample *smps[], int *cnt)
int websocket_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
{
int avail;
struct websocket *w = (struct websocket *) n->_vd;
struct sample *cpys[*cnt];
struct sample *cpys[cnt];
/* Make copies of all samples */
avail = sample_alloc_many(&w->pool, cpys, *cnt);
if (avail < *cnt)
avail = sample_alloc_many(&w->pool, cpys, cnt);
if (avail < cnt)
warn("Pool underrun for node %s: avail=%u", node_name(n), avail);
sample_copy_many(cpys, smps, avail);
@ -502,12 +502,12 @@ int websocket_write(struct node *n, struct sample *smps[], int *cnt)
struct websocket_connection *c = (struct websocket_connection *) list_at(&connections, i);
if (c->node == n)
websocket_connection_write(c, cpys, *cnt);
websocket_connection_write(c, cpys, cnt);
}
sample_put_many(cpys, avail);
return *cnt;
return cnt;
}
int websocket_parse(struct node *n, json_t *cfg)

View file

@ -419,7 +419,7 @@ int zeromq_destroy(struct node *n)
return 0;
}
int zeromq_read(struct node *n, struct sample *smps[], int *cnt)
int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
{
int recv, ret;
struct zeromq *z = (struct zeromq *) n->_vd;
@ -446,7 +446,7 @@ int zeromq_read(struct node *n, struct sample *smps[], int *cnt)
if (ret < 0)
return ret;
recv = io_sscan(&z->io, zmq_msg_data(&m), zmq_msg_size(&m), NULL, smps, *cnt);
recv = io_sscan(&z->io, zmq_msg_data(&m), zmq_msg_size(&m), NULL, smps, cnt);
ret = zmq_msg_close(&m);
if (ret)
@ -455,7 +455,7 @@ int zeromq_read(struct node *n, struct sample *smps[], int *cnt)
return recv;
}
int zeromq_write(struct node *n, struct sample *smps[], int *cnt)
int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
{
int ret;
struct zeromq *z = (struct zeromq *) n->_vd;
@ -465,7 +465,7 @@ int zeromq_write(struct node *n, struct sample *smps[], int *cnt)
char data[4096];
ret = io_sprint(&z->io, data, sizeof(data), &wbytes, smps, *cnt);
ret = io_sprint(&z->io, data, sizeof(data), &wbytes, smps, cnt);
if (ret <= 0)
return -1;
@ -497,7 +497,7 @@ int zeromq_write(struct node *n, struct sample *smps[], int *cnt)
if (ret < 0)
return ret;
return *cnt;
return cnt;
fail:
zmq_msg_close(&m);

View file

@ -70,7 +70,8 @@ static int path_source_destroy(struct path_source *ps)
static void path_source_read(struct path_source *ps, struct path *p, int i)
{
int recv, tomux, ready, cnt;
int recv, tomux, allocated, cnt;
unsigned release;
cnt = ps->node->in.vectorize;
@ -79,18 +80,20 @@ static void path_source_read(struct path_source *ps, struct path *p, int i)
struct sample **tomux_smps;
/* Fill smps[] free sample blocks from the pool */
ready = sample_alloc_many(&ps->pool, read_smps, cnt);
if (ready != cnt)
allocated = sample_alloc_many(&ps->pool, read_smps, cnt);
if (allocated != cnt)
warn("Pool underrun for path source %s", node_name(ps->node));
/* Read ready samples and store them to blocks pointed by smps[] */
recv = node_read(ps->node, read_smps, &ready);
release = allocated;
recv = node_read(ps->node, read_smps, allocated, &release);
if (recv == 0)
goto out2;
else if (recv < 0)
error("Failed to read samples from node %s", node_name(ps->node));
else if (recv < ready)
warn("Partial read for path %s: read=%u, expected=%u", path_name(p), recv, ready);
else if (recv < allocated)
warn("Partial read for path %s: read=%u, expected=%u", path_name(p), recv, allocated);
bitset_set(&p->received, i);
@ -141,7 +144,7 @@ static void path_source_read(struct path_source *ps, struct path *p, int i)
}
sample_put_many(muxed_smps, tomux);
out2: sample_put_many(read_smps, ready);
out2: sample_put_many(read_smps, release);
}
static int path_destination_init(struct path_destination *pd, int queuelen)
@ -196,28 +199,31 @@ static void path_destination_write(struct path_destination *pd, struct path *p)
{
int cnt = pd->node->out.vectorize;
int sent;
int available;
int released;
int allocated;
unsigned release;
struct sample *smps[cnt];
/* As long as there are still samples in the queue */
while (1) {
available = queue_pull_many(&pd->queue, (void **) smps, cnt);
if (available == 0)
allocated = queue_pull_many(&pd->queue, (void **) smps, cnt);
if (allocated == 0)
break;
else if (available < cnt)
debug(LOG_PATH | 5, "Queue underrun for path %s: available=%u expected=%u", path_name(p), available, cnt);
else if (allocated < cnt)
debug(LOG_PATH | 5, "Queue underrun for path %s: allocated=%u expected=%u", path_name(p), allocated, cnt);
debug(LOG_PATH | 15, "Dequeued %u samples from queue of node %s which is part of path %s", available, node_name(pd->node), path_name(p));
debug(LOG_PATH | 15, "Dequeued %u samples from queue of node %s which is part of path %s", allocated, node_name(pd->node), path_name(p));
sent = node_write(pd->node, smps, &available);
release = allocated;
sent = node_write(pd->node, smps, allocated, &release);
if (sent < 0)
error("Failed to sent %u samples to node %s", cnt, node_name(pd->node));
else if (sent < available)
warn("Partial write to node %s: written=%d, expected=%d", node_name(pd->node), sent, available);
else if (sent < allocated)
warn("Partial write to node %s: written=%d, expected=%d", node_name(pd->node), sent, allocated);
released = sample_put_many(smps, sent);
released = sample_put_many(smps, release);
debug(LOG_PATH | 15, "Released %d samples back to memory pool", released);
}

View file

@ -128,8 +128,8 @@ static void usage()
static void * send_loop(void *ctx)
{
unsigned last_sequenceno = 0;
int ret, scanned, sent, ready, cnt = 0;
unsigned last_sequenceno = 0, release;
int ret, scanned, sent, allocated, cnt = 0;
struct sample *smps[node->out.vectorize];
/* Initialize memory */
@ -139,13 +139,13 @@ static void * send_loop(void *ctx)
error("Failed to allocate memory for receive pool.");
while (!io_eof(&io)) {
ready = sample_alloc_many(&sendd.pool, smps, node->out.vectorize);
allocated = sample_alloc_many(&sendd.pool, smps, node->out.vectorize);
if (ret < 0)
error("Failed to get %u samples out of send pool (%d).", node->out.vectorize, ret);
else if (ready < node->out.vectorize)
else if (allocated < node->out.vectorize)
warn("Send pool underrun");
scanned = io_scan(&io, smps, ready);
scanned = io_scan(&io, smps, allocated);
if (scanned < 0) {
continue;
warn("Failed to read samples from stdin");
@ -161,13 +161,15 @@ static void * send_loop(void *ctx)
smps[i]->sequence = last_sequenceno++;
}
sent = node_write(node, smps, &scanned);
release = allocated;
sent = node_write(node, smps, scanned, &release);
if (sent < 0)
warn("Failed to sent samples to node %s: reason=%d", node_name(node), sent);
else if (sent < scanned)
warn("Failed to sent %d out of %d samples to node %s", scanned-sent, scanned, node_name(node));
sample_put_many(smps, ready);
sample_put_many(smps, release);
cnt += sent;
if (sendd.limit > 0 && cnt >= sendd.limit)
@ -194,7 +196,8 @@ leave: if (io_eof(&io)) {
static void * recv_loop(void *ctx)
{
int recv, ret, cnt = 0, ready = 0;
int recv, ret, cnt = 0, allocated = 0;
unsigned release;
struct sample *smps[node->in.vectorize];
/* Initialize memory */
@ -204,19 +207,21 @@ static void * recv_loop(void *ctx)
error("Failed to allocate memory for receive pool.");
for (;;) {
ready = sample_alloc_many(&recvv.pool, smps, node->in.vectorize);
if (ready < 0)
allocated = sample_alloc_many(&recvv.pool, smps, node->in.vectorize);
if (allocated < 0)
error("Failed to allocate %u samples from receive pool.", node->in.vectorize);
else if (ready < node->in.vectorize)
else if (allocated < node->in.vectorize)
warn("Receive pool underrun");
recv = node_read(node, smps, &ready);
release = allocated;
recv = node_read(node, smps, allocated, &release);
if (recv < 0)
warn("Failed to receive samples from node %s: reason=%d", node_name(node), recv);
io_print(&io, smps, recv);
sample_put_many(smps, ready);
sample_put_many(smps, release);
cnt += recv;
if (recvv.limit > 0 && cnt >= recvv.limit)

View file

@ -170,9 +170,10 @@ int main(int argc, char *argv[])
for (;;) {
t = sample_alloc(&q);
int one = 1;
node_read(&n, &t, &one);
io_print(&io, &t, one);
unsigned release = 1; // release = allocated
node_read(&n, &t, 1, &release);
io_print(&io, &t, 1);
sample_put(t);
}

View file

@ -166,9 +166,13 @@ void test_rtt() {
while (running && (count < 0 || count--)) {
clock_gettime(CLOCK_ID, &send);
int one = 1;
node_write(node, &smp_send, &one); /* Ping */
node_read(node, &smp_recv, &one); /* Pong */
unsigned release;
release = 1; // release = allocated
node_write(node, &smp_send, 1, &release); /* Ping */
release = 1; // release = allocated
node_read(node, &smp_recv, 1, &release); /* Pong */
clock_gettime(CLOCK_ID, &recv);