1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

Changed all node_write() functions

This commit is contained in:
Dennis Potter 2018-07-07 17:48:07 +02:00
parent 4663f55e4b
commit 6150a36411
33 changed files with 73 additions and 72 deletions

View file

@ -156,7 +156,7 @@ int node_reverse(struct node *n);
int node_read(struct node *n, struct sample *smps[], int *cnt);
int node_write(struct node *n, struct sample *smps[], unsigned cnt);
int node_write(struct node *n, struct sample *smps[], int *cnt);
int node_fd(struct node *n);

View file

@ -150,7 +150,7 @@ struct node_type {
* @param cnt The number of messages which should be sent.
* @return The number of messages actually sent.
*/
int (*write)(struct node *n, struct sample *smps[], unsigned cnt);
int (*write)(struct node *n, struct sample *smps[], int *cnt);
/** Reverse source and destination of a node.
*

View file

@ -83,7 +83,7 @@ int amqp_stop(struct node *n);
int amqp_read(struct node *n, struct sample *smps[], int *cnt);
/** @see node_type::write */
int amqp_write(struct node *n, struct sample *smps[], unsigned cnt);
int amqp_write(struct node *n, struct sample *smps[], int *cnt);
#ifdef __cplusplus
}

View file

@ -86,7 +86,7 @@ int file_stop(struct node *n);
int file_read(struct node *n, struct sample *smps[], int *cnt);
/** @see node_type::write */
int file_write(struct node *n, struct sample *smps[], unsigned cnt);
int file_write(struct node *n, struct sample *smps[], int *cnt);
/** @} */

View file

@ -139,6 +139,6 @@ int ib_deinit();
int ib_read(struct node *n, struct sample *smps[], int *cnt);
/** @see node_type::write */
int infiniband_write(struct node *n, struct sample *smps[], unsigned cnt);
int infiniband_write(struct node *n, struct sample *smps[], int *cnt);
/** @} */

View file

@ -65,7 +65,7 @@ int influxdb_open(struct node *n);
int influxdb_close(struct node *n);
/** @see node_type::write */
int influxdb_write(struct node *n, struct sample *smps[], unsigned cnt);
int influxdb_write(struct node *n, struct sample *smps[], int *cnt);
#ifdef __cplusplus
}

View file

@ -66,7 +66,7 @@ int loopback_close(struct node *n);
int loopback_read(struct node *n, struct sample *smps[], int *cnt);
/** @see node_type::write */
int loopback_write(struct node *n, struct sample *smps[], unsigned cnt);
int loopback_write(struct node *n, struct sample *smps[], int *cnt);
#ifdef __cplusplus
}

View file

@ -98,7 +98,7 @@ int mqtt_deinit();
int mqtt_read(struct node *n, struct sample *smps[], int *cnt);
/** @see node_type::write */
int mqtt_write(struct node *n, struct sample *smps[], unsigned cnt);
int mqtt_write(struct node *n, struct sample *smps[], int *cnt);
#ifdef __cplusplus
}

View file

@ -74,7 +74,7 @@ int nanomsg_stop(struct node *n);
int nanomsg_read(struct node *n, struct sample *smps[], int *cnt);
/** @see node_type::write */
int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt);
int nanomsg_write(struct node *n, struct sample *smps[], int *cnt);
#ifdef __cplusplus
}

View file

@ -96,7 +96,7 @@ int ngsi_stop(struct node *n);
int ngsi_read(struct node *n, struct sample *smps[], int *cnt);
/** @see node_type::write */
int ngsi_write(struct node *n, struct sample *smps[], unsigned cnt);
int ngsi_write(struct node *n, struct sample *smps[], int *cnt);
/** @} */

View file

@ -67,7 +67,7 @@ int shmem_stop(struct node *n);
int shmem_read(struct node *n, struct sample *smps[], int *cnt);
/** @see node_type::write */
int shmem_write(struct node *n, struct sample *smps[], unsigned cnt);
int shmem_write(struct node *n, struct sample *smps[], int *cnt);
/** @} */

View file

@ -119,7 +119,7 @@ int socket_start(struct node *n);
int socket_stop(struct node *n);
/** @see node_type::write */
int socket_write(struct node *n, struct sample *smps[], unsigned cnt);
int socket_write(struct node *n, struct sample *smps[], int *cnt);
/** @see node_type::read */
int socket_read(struct node *n, struct sample *smps[], int *cnt);

View file

@ -82,7 +82,7 @@ int test_rtt_stop(struct node *n);
int test_rtt_read(struct node *n, struct sample *smps[], int *cnt);
/** @see node_type::write */
int test_rtt_write(struct node *n, struct sample *smps[], unsigned cnt);
int test_rtt_write(struct node *n, struct sample *smps[], int *cnt);
/** @} */

View file

@ -115,7 +115,7 @@ int websocket_destroy(struct node *n);
int websocket_read(struct node *n, struct sample *smps[], int *cnt);
/** @see node_type::write */
int websocket_write(struct node *n, struct sample *smps[], unsigned cnt);
int websocket_write(struct node *n, struct sample *smps[], int *cnt);
/** @} */

View file

@ -105,7 +105,7 @@ int zeromq_stop(struct node *n);
int zeromq_read(struct node *n, struct sample *smps[], int *cnt);
/** @see node_type::write */
int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt);
int zeromq_write(struct node *n, struct sample *smps[], int *cnt);
/** @} */

View file

@ -471,7 +471,7 @@ int node_read(struct node *n, struct sample *smps[], int *cnt)
#endif /* WITH_HOOKS */
}
int node_write(struct node *n, struct sample *smps[], unsigned cnt)
int node_write(struct node *n, struct sample *smps[], int *cnt)
{
int sent, nsent = 0;
@ -480,15 +480,16 @@ int node_write(struct node *n, struct sample *smps[], unsigned cnt)
#ifdef WITH_HOOKS
/* Run write hooks */
cnt = hook_write_list(&n->out.hooks, smps, cnt);
if (cnt <= 0)
return cnt;
*cnt = hook_write_list(&n->out.hooks, smps, *cnt);
if (*cnt <= 0)
return *cnt;
#endif /* WITH_HOOKS */
/* Send in parts if vector not supported */
if (n->_vt->vectorize > 0 && n->_vt->vectorize < cnt) {
while (cnt - nsent > 0) {
sent = n->_vt->write(n, &smps[nsent], MIN(cnt - nsent, n->_vt->vectorize));
if (n->_vt->vectorize > 0 && n->_vt->vectorize < *cnt) {
int cnt_vec_min = MIN(*cnt - nsent, n->_vt->vectorize);
while (*cnt - nsent > 0) {
sent = n->_vt->write(n, &smps[nsent], &cnt_vec_min);
if (sent < 0)
return sent;

View file

@ -319,14 +319,14 @@ int amqp_read(struct node *n, struct sample *smps[], int *cnt)
return ret;
}
int amqp_write(struct node *n, struct sample *smps[], unsigned cnt)
int amqp_write(struct node *n, struct sample *smps[], int *cnt)
{
int ret;
struct amqp *a = n->_vd;
char data[1500];
size_t wbytes;
ret = io_sprint(&a->io, data, sizeof(data), &wbytes, smps, cnt);
ret = io_sprint(&a->io, data, sizeof(data), &wbytes, smps, *cnt);
if (ret <= 0)
return -1;
@ -344,7 +344,7 @@ int amqp_write(struct node *n, struct sample *smps[], unsigned cnt)
if (ret != AMQP_STATUS_OK)
return -1;
return cnt;
return *cnt;
}
int amqp_fd(struct node *n)

View file

@ -127,7 +127,7 @@ int cbuilder_read(struct node *n, struct sample *smps[], int *cnt)
return 1;
}
int cbuilder_write(struct node *n, struct sample *smps[], unsigned cnt)
int cbuilder_write(struct node *n, struct sample *smps[], int *cnt)
{
struct cbuilder *cb = (struct cbuilder *) n->_vd;
struct sample *smp = smps[0];

View file

@ -345,15 +345,15 @@ retry: ret = io_scan(&f->io, smps, *cnt);
return *cnt;
}
int file_write(struct node *n, struct sample *smps[], unsigned cnt)
int file_write(struct node *n, struct sample *smps[], int *cnt)
{
struct file *f = (struct file *) n->_vd;
assert(cnt == 1);
assert(*cnt == 1);
io_print(&f->io, smps, cnt);
io_print(&f->io, smps, *cnt);
return cnt;
return *cnt;
}
int file_fd(struct node *n)

View file

@ -818,11 +818,11 @@ int ib_read(struct node *n, struct sample *smps[], int *cnt)
return ret;
}
int ib_write(struct node *n, struct sample *smps[], unsigned cnt)
int ib_write(struct node *n, struct sample *smps[], int *cnt)
{
struct infiniband *ib = (struct infiniband *) n->_vd;
struct ibv_send_wr wr[cnt], *bad_wr = NULL;
struct ibv_sge sge[cnt];
struct ibv_send_wr wr[*cnt], *bad_wr = NULL;
struct ibv_sge sge[*cnt];
struct ibv_mr *mr;
int ret;
@ -839,7 +839,7 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt)
// Get Memory Region
mr = memory_ib_get_mr(smps[0]);
for (int i = 0; i < cnt; i++) {
for (int i = 0; i < *cnt; i++) {
// Increase refcnt of sample
sample_get(smps[i]);
@ -853,7 +853,7 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt)
wr[i].sg_list = &sge[i];
wr[i].num_sge = 1;
if (i == (cnt-1)) {
if (i == (*cnt-1)) {
debug(LOG_IB | 10, "Prepared %i send Work Requests", (i+1));
wr[i].next = NULL;
}
@ -877,7 +877,7 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt)
debug(LOG_IB | 4, "Succesfully posted receive Work Requests");
}
return cnt;
return *cnt;
}
int ib_fd(struct node *n)

View file

@ -114,14 +114,14 @@ int influxdb_close(struct node *n)
return 0;
}
int influxdb_write(struct node *n, struct sample *smps[], unsigned cnt)
int influxdb_write(struct node *n, struct sample *smps[], int *cnt)
{
struct influxdb *i = (struct influxdb *) n->_vd;
char *buf = NULL;
ssize_t sentlen, buflen;
for (int k = 0; k < cnt; k++) {
for (int k = 0; k < *cnt; k++) {
/* Key */
strcatf(&buf, "%s", i->key);
@ -156,7 +156,7 @@ int influxdb_write(struct node *n, struct sample *smps[], unsigned cnt)
free(buf);
return cnt;
return *cnt;
}
char * influxdb_print(struct node *n)

View file

@ -86,15 +86,15 @@ int loopback_read(struct node *n, struct sample *smps[], int *cnt)
return avail;
}
int loopback_write(struct node *n, struct sample *smps[], unsigned cnt)
int loopback_write(struct node *n, struct sample *smps[], int *cnt)
{
int copied;
struct loopback *l = (struct loopback *) n->_vd;
struct sample *copies[cnt];
struct sample *copies[*cnt];
copied = sample_alloc_many(&l->pool, copies, cnt);
if (copied < cnt)
copied = sample_alloc_many(&l->pool, copies, *cnt);
if (copied < *cnt)
warn("Pool underrun for node %s", node_name(n));
sample_copy_many(copies, smps, copied);

View file

@ -372,7 +372,7 @@ int mqtt_read(struct node *n, struct sample *smps[], int *cnt)
return pulled;
}
int mqtt_write(struct node *n, struct sample *smps[], unsigned cnt)
int mqtt_write(struct node *n, struct sample *smps[], int *cnt)
{
int ret;
struct mqtt *m = (struct mqtt *) n->_vd;
@ -381,7 +381,7 @@ int mqtt_write(struct node *n, struct sample *smps[], unsigned cnt)
char data[1500];
ret = io_sprint(&m->io, data, sizeof(data), &wbytes, smps, cnt);
ret = io_sprint(&m->io, data, sizeof(data), &wbytes, smps, *cnt);
if (ret < 0)
return ret;
@ -391,7 +391,7 @@ int mqtt_write(struct node *n, struct sample *smps[], unsigned cnt)
return -abs(ret);
}
return cnt;
return *cnt;
}
int mqtt_fd(struct node *n)

View file

@ -242,7 +242,7 @@ int nanomsg_read(struct node *n, struct sample *smps[], int *cnt)
return io_sscan(&m->io, data, bytes, NULL, smps, *cnt);
}
int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt)
int nanomsg_write(struct node *n, struct sample *smps[], int *cnt)
{
int ret;
struct nanomsg *m = (struct nanomsg *) n->_vd;
@ -251,7 +251,7 @@ int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt)
char data[NANOMSG_MAX_PACKET_LEN];
ret = io_sprint(&m->io, data, sizeof(data), &wbytes, smps, cnt);
ret = io_sprint(&m->io, data, sizeof(data), &wbytes, smps, *cnt);
if (ret <= 0)
return -1;
@ -259,7 +259,7 @@ int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt)
if (ret < 0)
return ret;
return cnt;
return *cnt;
}
int nanomsg_fd(struct node *n)

View file

@ -558,18 +558,18 @@ out: json_decref(entity);
return ret;
}
int ngsi_write(struct node *n, struct sample *smps[], unsigned cnt)
int ngsi_write(struct node *n, struct sample *smps[], int *cnt)
{
struct ngsi *i = (struct ngsi *) n->_vd;
int ret;
json_t *entity = ngsi_build_entity(i, smps, cnt, NGSI_ENTITY_VALUES);
json_t *entity = ngsi_build_entity(i, smps, *cnt, NGSI_ENTITY_VALUES);
ret = ngsi_request_context_update(i->curl, i->endpoint, "UPDATE", entity);
json_decref(entity);
return ret ? 0 : cnt;
return ret ? 0 : *cnt;
}
int ngsi_fd(struct node *n)

View file

@ -134,14 +134,14 @@ int shmem_read(struct node *n, struct sample *smps[], int *cnt)
return recv;
}
int shmem_write(struct node *n, struct sample *smps[], unsigned cnt)
int shmem_write(struct node *n, struct sample *smps[], int *cnt)
{
struct shmem *shm = (struct shmem *) n->_vd;
struct sample *shared_smps[cnt]; /* Samples need to be copied to the shared pool first */
struct sample *shared_smps[*cnt]; /* Samples need to be copied to the shared pool first */
int avail, pushed, copied;
avail = sample_alloc_many(&shm->intf.write.shared->pool, shared_smps, cnt);
if (avail != cnt)
avail = sample_alloc_many(&shm->intf.write.shared->pool, shared_smps, *cnt);
if (avail != *cnt)
warn("Pool underrun for shmem node %s", shm->out_name);
copied = sample_copy_many(shared_smps, smps, avail);

View file

@ -401,7 +401,7 @@ out: free(buf);
return ret;
}
int socket_write(struct node *n, struct sample *smps[], unsigned cnt)
int socket_write(struct node *n, struct sample *smps[], int *cnt)
{
struct socket *s = (struct socket *) n->_vd;
@ -416,7 +416,7 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt)
if (!buf)
return -1;
retry: ret = io_sprint(&s->io, buf, buflen, &wbytes, smps, cnt);
retry: ret = io_sprint(&s->io, buf, buflen, &wbytes, smps, *cnt);
if (ret < 0)
goto out;

View file

@ -348,7 +348,7 @@ int test_rtt_read(struct node *n, struct sample *smps[], int *cnt)
return i;
}
int test_rtt_write(struct node *n, struct sample *smps[], unsigned cnt)
int test_rtt_write(struct node *n, struct sample *smps[], int *cnt)
{
struct test_rtt *t = (struct test_rtt *) n->_vd;
@ -358,7 +358,7 @@ int test_rtt_write(struct node *n, struct sample *smps[], unsigned cnt)
struct test_rtt_case *c = (struct test_rtt_case *) list_at(&t->cases, t->current);
int i;
for (i = 0; i < cnt; i++) {
for (i = 0; i < *cnt; i++) {
if (smps[i]->length != c->values) {
warn("Discarding invalid sample due to mismatching length: expecting=%d, has=%d", c->values, smps[i]->length);
continue;

View file

@ -484,16 +484,16 @@ int websocket_read(struct node *n, struct sample *smps[], int *cnt)
return avail;
}
int websocket_write(struct node *n, struct sample *smps[], unsigned cnt)
int websocket_write(struct node *n, struct sample *smps[], int *cnt)
{
int avail;
struct websocket *w = (struct websocket *) n->_vd;
struct sample *cpys[cnt];
struct sample *cpys[*cnt];
/* Make copies of all samples */
avail = sample_alloc_many(&w->pool, cpys, cnt);
if (avail < cnt)
avail = sample_alloc_many(&w->pool, cpys, *cnt);
if (avail < *cnt)
warn("Pool underrun for node %s: avail=%u", node_name(n), avail);
sample_copy_many(cpys, smps, avail);
@ -502,12 +502,12 @@ int websocket_write(struct node *n, struct sample *smps[], unsigned cnt)
struct websocket_connection *c = (struct websocket_connection *) list_at(&connections, i);
if (c->node == n)
websocket_connection_write(c, cpys, cnt);
websocket_connection_write(c, cpys, *cnt);
}
sample_put_many(cpys, avail);
return cnt;
return *cnt;
}
int websocket_parse(struct node *n, json_t *cfg)

View file

@ -455,7 +455,7 @@ int zeromq_read(struct node *n, struct sample *smps[], int *cnt)
return recv;
}
int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt)
int zeromq_write(struct node *n, struct sample *smps[], int *cnt)
{
int ret;
struct zeromq *z = (struct zeromq *) n->_vd;
@ -465,7 +465,7 @@ int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt)
char data[4096];
ret = io_sprint(&z->io, data, sizeof(data), &wbytes, smps, cnt);
ret = io_sprint(&z->io, data, sizeof(data), &wbytes, smps, *cnt);
if (ret <= 0)
return -1;
@ -497,7 +497,7 @@ int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt)
if (ret < 0)
return ret;
return cnt;
return *cnt;
fail:
zmq_msg_close(&m);

View file

@ -211,7 +211,7 @@ static void path_destination_write(struct path_destination *pd, struct path *p)
debug(LOG_PATH | 15, "Dequeued %u samples from queue of node %s which is part of path %s", available, node_name(pd->node), path_name(p));
sent = node_write(pd->node, smps, available);
sent = node_write(pd->node, smps, &available);
if (sent < 0)
error("Failed to sent %u samples to node %s", cnt, node_name(pd->node));
else if (sent < available)

View file

@ -161,7 +161,7 @@ static void * send_loop(void *ctx)
smps[i]->sequence = last_sequenceno++;
}
sent = node_write(node, smps, scanned);
sent = node_write(node, smps, &scanned);
if (sent < 0)
warn("Failed to sent samples to node %s: reason=%d", node_name(node), sent);
else if (sent < scanned)

View file

@ -167,7 +167,7 @@ void test_rtt() {
clock_gettime(CLOCK_ID, &send);
int one = 1;
node_write(node, &smp_send, one); /* Ping */
node_write(node, &smp_send, &one); /* Ping */
node_read(node, &smp_recv, &one); /* Pong */
clock_gettime(CLOCK_ID, &recv);