1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

Changed all node_read() functions to support a *cnt instead of cnt

This commit is contained in:
Dennis Potter 2018-07-07 17:07:45 +02:00
parent 8119c39c1a
commit 4663f55e4b
37 changed files with 79 additions and 76 deletions

View file

@ -154,7 +154,7 @@ char * node_name_long(struct node *n);
*/
int node_reverse(struct node *n);
int node_read(struct node *n, struct sample *smps[], unsigned cnt);
int node_read(struct node *n, struct sample *smps[], int *cnt);
int node_write(struct node *n, struct sample *smps[], unsigned cnt);

View file

@ -136,7 +136,7 @@ struct node_type {
* @param cnt The number of messages which should be received.
* @return The number of messages actually received.
*/
int (*read)(struct node *n, struct sample *smps[], unsigned cnt);
int (*read)(struct node *n, struct sample *smps[], int *cnt);
/** Send multiple messages in a single datagram / packet.
*

View file

@ -80,7 +80,7 @@ int amqp_start(struct node *n);
int amqp_stop(struct node *n);
/** @see node_type::read */
int amqp_read(struct node *n, struct sample *smps[], unsigned cnt);
int amqp_read(struct node *n, struct sample *smps[], int *cnt);
/** @see node_type::write */
int amqp_write(struct node *n, struct sample *smps[], unsigned cnt);

View file

@ -83,7 +83,7 @@ int file_start(struct node *n);
int file_stop(struct node *n);
/** @see node_type::read */
int file_read(struct node *n, struct sample *smps[], unsigned cnt);
int file_read(struct node *n, struct sample *smps[], int *cnt);
/** @see node_type::write */
int file_write(struct node *n, struct sample *smps[], unsigned cnt);

View file

@ -112,31 +112,31 @@ struct infiniband {
};
/** @see node_type::reverse */
int infiniband_reverse(struct node *n);
int ib_reverse(struct node *n);
/** @see node_type::print */
char * infiniband_print(struct node *n);
char * ib_print(struct node *n);
/** @see node_type::parse */
int infiniband_parse(struct node *n, json_t *cfg);
int ib_parse(struct node *n, json_t *cfg);
/** @see node_type::open */
int infiniband_start(struct node *n);
int ib_start(struct node *n);
/** @see node_type::destroy */
int infiniband_destroy(struct node *n);
int ib_destroy(struct node *n);
/** @see node_type::close */
int infiniband_stop(struct node *n);
int ib_stop(struct node *n);
/** @see node_type::init */
int infiniband_init(struct super_node *n);
int ib_init(struct super_node *n);
/** @see node_type::deinit */
int infiniband_deinit();
int ib_deinit();
/** @see node_type::read */
int infiniband_read(struct node *n, struct sample *smps[], unsigned cnt);
int ib_read(struct node *n, struct sample *smps[], int *cnt);
/** @see node_type::write */
int infiniband_write(struct node *n, struct sample *smps[], unsigned cnt);

View file

@ -63,7 +63,7 @@ int loopback_open(struct node *n);
int loopback_close(struct node *n);
/** @see node_type::read */
int loopback_read(struct node *n, struct sample *smps[], unsigned cnt);
int loopback_read(struct node *n, struct sample *smps[], int *cnt);
/** @see node_type::write */
int loopback_write(struct node *n, struct sample *smps[], unsigned cnt);

View file

@ -95,7 +95,7 @@ int mqtt_init();
int mqtt_deinit();
/** @see node_type::read */
int mqtt_read(struct node *n, struct sample *smps[], unsigned cnt);
int mqtt_read(struct node *n, struct sample *smps[], int *cnt);
/** @see node_type::write */
int mqtt_write(struct node *n, struct sample *smps[], unsigned cnt);

View file

@ -71,7 +71,7 @@ int nanomsg_start(struct node *n);
int nanomsg_stop(struct node *n);
/** @see node_type::read */
int nanomsg_read(struct node *n, struct sample *smps[], unsigned cnt);
int nanomsg_read(struct node *n, struct sample *smps[], int *cnt);
/** @see node_type::write */
int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt);

View file

@ -93,7 +93,7 @@ int ngsi_start(struct node *n);
int ngsi_stop(struct node *n);
/** @see node_type::read */
int ngsi_read(struct node *n, struct sample *smps[], unsigned cnt);
int ngsi_read(struct node *n, struct sample *smps[], int *cnt);
/** @see node_type::write */
int ngsi_write(struct node *n, struct sample *smps[], unsigned cnt);

View file

@ -64,7 +64,7 @@ int shmem_start(struct node *n);
int shmem_stop(struct node *n);
/** @see node_type::read */
int shmem_read(struct node *n, struct sample *smps[], unsigned cnt);
int shmem_read(struct node *n, struct sample *smps[], int *cnt);
/** @see node_type::write */
int shmem_write(struct node *n, struct sample *smps[], unsigned cnt);

View file

@ -89,7 +89,7 @@ int signal_generator_start(struct node *n);
int signal_generator_stop(struct node *n);
/** @see node_type::read */
int signal_generator_read(struct node *n, struct sample *smps[], unsigned cnt);
int signal_generator_read(struct node *n, struct sample *smps[], int *cnt);
enum signal_generator_type signal_generator_lookup_type(const char *type);

View file

@ -122,7 +122,7 @@ int socket_stop(struct node *n);
int socket_write(struct node *n, struct sample *smps[], unsigned cnt);
/** @see node_type::read */
int socket_read(struct node *n, struct sample *smps[], unsigned cnt);
int socket_read(struct node *n, struct sample *smps[], int *cnt);
/** @see node_type::parse */
int socket_parse(struct node *n, json_t *cfg);

View file

@ -67,7 +67,7 @@ int stats_node_start(struct node *n);
int stats_node_stop(struct node *n);
/** @see node_type::read */
int stats_node_read(struct node *n, struct sample *smps[], unsigned cnt);
int stats_node_read(struct node *n, struct sample *smps[], int *cnt);
#ifdef __cplusplus
}

View file

@ -79,7 +79,7 @@ int test_rtt_start(struct node *n);
int test_rtt_stop(struct node *n);
/** @see node_type::read */
int test_rtt_read(struct node *n, struct sample *smps[], unsigned cnt);
int test_rtt_read(struct node *n, struct sample *smps[], int *cnt);
/** @see node_type::write */
int test_rtt_write(struct node *n, struct sample *smps[], unsigned cnt);

View file

@ -112,7 +112,7 @@ int websocket_stop(struct node *n);
int websocket_destroy(struct node *n);
/** @see node_type::read */
int websocket_read(struct node *n, struct sample *smps[], unsigned cnt);
int websocket_read(struct node *n, struct sample *smps[], int *cnt);
/** @see node_type::write */
int websocket_write(struct node *n, struct sample *smps[], unsigned cnt);

View file

@ -102,7 +102,7 @@ int zeromq_start(struct node *n);
int zeromq_stop(struct node *n);
/** @see node_type::read */
int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt);
int zeromq_read(struct node *n, struct sample *smps[], int *cnt);
/** @see node_type::write */
int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt);

View file

@ -72,7 +72,7 @@ int stats_destroy(struct stats *s);
void stats_update(struct stats *s, enum stats_id id, double val);
void stats_collect(struct stats *s, struct sample *smps[], size_t cnt);
void stats_collect(struct stats *s, struct sample *smps[], size_t *cnt);
int stats_commit(struct stats *s);

View file

@ -408,7 +408,7 @@ int node_destroy(struct node *n)
return 0;
}
int node_read(struct node *n, struct sample *smps[], unsigned cnt)
int node_read(struct node *n, struct sample *smps[], int *cnt)
{
int readd, nread = 0;
@ -416,9 +416,10 @@ int node_read(struct node *n, struct sample *smps[], unsigned cnt)
return -1;
/* Send in parts if vector not supported */
if (n->_vt->vectorize > 0 && n->_vt->vectorize < cnt) {
while (cnt - nread > 0) {
readd = n->_vt->read(n, &smps[nread], MIN(cnt - nread, n->_vt->vectorize));
if (n->_vt->vectorize > 0 && n->_vt->vectorize < *cnt) {
int cnt_vec_min = MIN(*cnt - nread, n->_vt->vectorize);
while (*cnt - nread > 0) {
readd = n->_vt->read(n, &smps[nread], &cnt_vec_min);
if (readd < 0)
return readd;

View file

@ -301,7 +301,7 @@ int amqp_stop(struct node *n)
return 0;
}
int amqp_read(struct node *n, struct sample *smps[], unsigned cnt)
int amqp_read(struct node *n, struct sample *smps[], int *cnt)
{
int ret;
struct amqp *a = n->_vd;
@ -312,7 +312,7 @@ int amqp_read(struct node *n, struct sample *smps[], unsigned cnt)
if (rep.reply_type != AMQP_RESPONSE_NORMAL)
return -1;
ret = io_sscan(&a->io, env.message.body.bytes, env.message.body.len, NULL, smps, cnt);
ret = io_sscan(&a->io, env.message.body.bytes, env.message.body.len, NULL, smps, *cnt);
amqp_destroy_envelope(&env);

View file

@ -94,7 +94,7 @@ int cbuilder_stop(struct node *n)
return 0;
}
int cbuilder_read(struct node *n, struct sample *smps[], unsigned cnt)
int cbuilder_read(struct node *n, struct sample *smps[], int *cnt)
{
struct cbuilder *cb = (struct cbuilder *) n->_vd;
struct sample *smp = smps[0];

View file

@ -278,15 +278,15 @@ int file_destroy(struct node *n)
return 0;
}
int file_read(struct node *n, struct sample *smps[], unsigned cnt)
int file_read(struct node *n, struct sample *smps[], int *cnt)
{
struct file *f = (struct file *) n->_vd;
int ret;
uint64_t steps;
assert(cnt == 1);
assert(*cnt == 1);
retry: ret = io_scan(&f->io, smps, cnt);
retry: ret = io_scan(&f->io, smps, *cnt);
if (ret <= 0) {
if (io_eof(&f->io)) {
switch (f->eof) {
@ -322,7 +322,7 @@ retry: ret = io_scan(&f->io, smps, cnt);
/* We dont wait in FILE_EPOCH_ORIGINAL mode */
if (f->epoch_mode == FILE_EPOCH_ORIGINAL)
return cnt;
return *cnt;
if (f->rate) {
steps = task_wait(&f->task);
@ -342,7 +342,7 @@ retry: ret = io_scan(&f->io, smps, cnt);
else if (steps != 1)
warn("Missed steps: %" PRIu64, steps - 1);
return cnt;
return *cnt;
}
int file_write(struct node *n, struct sample *smps[], unsigned cnt)

View file

@ -730,12 +730,12 @@ int ib_deinit()
return 0;
}
int ib_read(struct node *n, struct sample *smps[], unsigned cnt)
int ib_read(struct node *n, struct sample *smps[], int *cnt)
{
struct infiniband *ib = (struct infiniband *) n->_vd;
struct ibv_wc wc[n->in.vectorize];
struct ibv_recv_wr wr[cnt], *bad_wr = NULL;
struct ibv_sge sge[cnt];
struct ibv_recv_wr wr[*cnt], *bad_wr = NULL;
struct ibv_sge sge[*cnt];
struct ibv_mr *mr;
int ret = 0;
@ -743,11 +743,11 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt)
if (n->state == STATE_CONNECTED) {
if (ib->conn.available_recv_wrs < ib->qp_init.cap.max_recv_wr && cnt==n->in.vectorize) {
if (ib->conn.available_recv_wrs < ib->qp_init.cap.max_recv_wr && *cnt) {
// Get Memory Region
mr = memory_ib_get_mr(smps[0]);
for (int i = 0; i < cnt; i++) {
for (int i = 0; i < *cnt; i++) {
// Increase refcnt of sample
sample_get(smps[i]);
@ -764,7 +764,7 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt)
ib->conn.available_recv_wrs++;
if (ib->conn.available_recv_wrs == ib->qp_init.cap.max_recv_wr || i==(cnt-1)) {
if (ib->conn.available_recv_wrs == ib->qp_init.cap.max_recv_wr || i==(*cnt-1)) {
debug(LOG_IB | 10, "Prepared %i new receive Work Requests", (i+1));
wr[i].next = NULL;

View file

@ -69,14 +69,14 @@ int loopback_close(struct node *n)
return queue_signalled_destroy(&l->queue);
}
int loopback_read(struct node *n, struct sample *smps[], unsigned cnt)
int loopback_read(struct node *n, struct sample *smps[], int *cnt)
{
int avail;
struct loopback *l = (struct loopback *) n->_vd;
struct sample *cpys[cnt];
struct sample *cpys[*cnt];
avail = queue_signalled_pull_many(&l->queue, (void **) cpys, cnt);
avail = queue_signalled_pull_many(&l->queue, (void **) cpys, *cnt);
for (int i = 0; i < avail; i++) {
sample_copy(smps[i], cpys[i]);

View file

@ -358,13 +358,13 @@ int mqtt_deinit()
return 0;
}
int mqtt_read(struct node *n, struct sample *smps[], unsigned cnt)
int mqtt_read(struct node *n, struct sample *smps[], int *cnt)
{
int pulled;
struct mqtt *m = (struct mqtt *) n->_vd;
struct sample *smpt[cnt];
struct sample *smpt[*cnt];
pulled = queue_signalled_pull_many(&m->queue, (void **) smpt, cnt);
pulled = queue_signalled_pull_many(&m->queue, (void **) smpt, *cnt);
sample_copy_many(smps, smpt, pulled);
sample_put_many(smpt, pulled);

View file

@ -228,7 +228,7 @@ int nanomsg_deinit()
return 0;
}
int nanomsg_read(struct node *n, struct sample *smps[], unsigned cnt)
int nanomsg_read(struct node *n, struct sample *smps[], int *cnt)
{
struct nanomsg *m = (struct nanomsg *) n->_vd;
int bytes;
@ -239,7 +239,7 @@ int nanomsg_read(struct node *n, struct sample *smps[], unsigned cnt)
if (bytes < 0)
return -1;
return io_sscan(&m->io, data, bytes, NULL, smps, cnt);
return io_sscan(&m->io, data, bytes, NULL, smps, *cnt);
}
int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt)

View file

@ -533,7 +533,7 @@ int ngsi_stop(struct node *n)
return ret;
}
int ngsi_read(struct node *n, struct sample *smps[], unsigned cnt)
int ngsi_read(struct node *n, struct sample *smps[], int *cnt)
{
struct ngsi *i = (struct ngsi *) n->_vd;
int ret;
@ -548,7 +548,7 @@ int ngsi_read(struct node *n, struct sample *smps[], unsigned cnt)
if (ret)
goto out;
ret = ngsi_parse_entity(rentity, i, smps, cnt);
ret = ngsi_parse_entity(rentity, i, smps, *cnt);
if (ret)
goto out2;

View file

@ -110,14 +110,14 @@ int shmem_stop(struct node *n)
return shmem_int_close(&shm->intf);
}
int shmem_read(struct node *n, struct sample *smps[], unsigned cnt)
int shmem_read(struct node *n, struct sample *smps[], int *cnt)
{
struct shmem *shm = (struct shmem *) n->_vd;
int recv;
struct sample *shared_smps[cnt];
struct sample *shared_smps[*cnt];
do {
recv = shmem_int_read(&shm->intf, shared_smps, cnt);
recv = shmem_int_read(&shm->intf, shared_smps, *cnt);
} while (recv == 0);
if (recv < 0) {

View file

@ -212,7 +212,7 @@ int signal_generator_stop(struct node *n)
return 0;
}
int signal_generator_read(struct node *n, struct sample *smps[], unsigned cnt)
int signal_generator_read(struct node *n, struct sample *smps[], int *cnt)
{
struct signal_generator *s = (struct signal_generator *) n->_vd;
struct sample *t = smps[0];
@ -220,7 +220,7 @@ int signal_generator_read(struct node *n, struct sample *smps[], unsigned cnt)
struct timespec ts;
int steps;
assert(cnt == 1);
assert(*cnt == 1);
/* Throttle output if desired */
if (s->rt) {

View file

@ -336,7 +336,7 @@ int socket_destroy(struct node *n)
return 0;
}
int socket_read(struct node *n, struct sample *smps[], unsigned cnt)
int socket_read(struct node *n, struct sample *smps[], int *cnt)
{
int ret;
struct socket *s = (struct socket *) n->_vd;
@ -391,7 +391,7 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt)
goto out;
}
ret = io_sscan(&s->io, ptr, bytes, &rbytes, smps, cnt);
ret = io_sscan(&s->io, ptr, bytes, &rbytes, smps, *cnt);
if (ret < 0 || bytes != rbytes)
warn("Received invalid packet from node: %s ret=%d, bytes=%zu, rbytes=%zu", node_name(n), ret, bytes, rbytes);

View file

@ -119,12 +119,12 @@ int stats_node_destroy(struct node *n)
return 0;
}
int stats_node_read(struct node *n, struct sample *smps[], unsigned cnt)
int stats_node_read(struct node *n, struct sample *smps[], int *cnt)
{
struct stats_node *sn = (struct stats_node *) n->_vd;
struct stats *s = sn->node->stats;
if (!cnt)
if (!*cnt)
return 0;
if (!sn->node->stats)

View file

@ -277,7 +277,7 @@ int test_rtt_stop(struct node *n)
return 0;
}
int test_rtt_read(struct node *n, struct sample *smps[], unsigned cnt)
int test_rtt_read(struct node *n, struct sample *smps[], int *cnt)
{
int i, ret, values;
uint64_t steps;
@ -320,7 +320,7 @@ int test_rtt_read(struct node *n, struct sample *smps[], unsigned cnt)
struct timespec now = time_now();
/* Prepare samples */
for (i = 0; i < cnt; i++) {
for (i = 0; i < *cnt; i++) {
values = c->values;
if (smps[i]->capacity < values) {
values = smps[i]->capacity;

View file

@ -467,14 +467,14 @@ int websocket_destroy(struct node *n)
return 0;
}
int websocket_read(struct node *n, struct sample *smps[], unsigned cnt)
int websocket_read(struct node *n, struct sample *smps[], int *cnt)
{
int avail;
struct websocket *w = (struct websocket *) n->_vd;
struct sample *cpys[cnt];
struct sample *cpys[*cnt];
avail = queue_signalled_pull_many(&w->queue, (void **) cpys, cnt);
avail = queue_signalled_pull_many(&w->queue, (void **) cpys, *cnt);
if (avail < 0)
return avail;

View file

@ -419,7 +419,7 @@ int zeromq_destroy(struct node *n)
return 0;
}
int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt)
int zeromq_read(struct node *n, struct sample *smps[], int *cnt)
{
int recv, ret;
struct zeromq *z = (struct zeromq *) n->_vd;
@ -446,7 +446,7 @@ int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt)
if (ret < 0)
return ret;
recv = io_sscan(&z->io, zmq_msg_data(&m), zmq_msg_size(&m), NULL, smps, cnt);
recv = io_sscan(&z->io, zmq_msg_data(&m), zmq_msg_size(&m), NULL, smps, *cnt);
ret = zmq_msg_close(&m);
if (ret)

View file

@ -84,7 +84,7 @@ static void path_source_read(struct path_source *ps, struct path *p, int i)
warn("Pool underrun for path source %s", node_name(ps->node));
/* Read ready samples and store them to blocks pointed by smps[] */
recv = node_read(ps->node, read_smps, ready);
recv = node_read(ps->node, read_smps, &ready);
if (recv == 0)
goto out2;
else if (recv < 0)

View file

@ -210,7 +210,7 @@ static void * recv_loop(void *ctx)
else if (ready < node->in.vectorize)
warn("Receive pool underrun");
recv = node_read(node, smps, ready);
recv = node_read(node, smps, &ready);
if (recv < 0)
warn("Failed to receive samples from node %s: reason=%d", node_name(node), recv);
@ -239,7 +239,7 @@ int main(int argc, char *argv[])
sendd.enabled = true;
sendd.limit = -1;
recvv.enabled = true;
recvv.limit = -1;

View file

@ -170,8 +170,9 @@ int main(int argc, char *argv[])
for (;;) {
t = sample_alloc(&q);
node_read(&n, &t, 1);
io_print(&io, &t, 1);
int one = 1;
node_read(&n, &t, &one);
io_print(&io, &t, one);
sample_put(t);
}

View file

@ -166,8 +166,9 @@ void test_rtt() {
while (running && (count < 0 || count--)) {
clock_gettime(CLOCK_ID, &send);
node_write(node, &smp_send, 1); /* Ping */
node_read(node, &smp_recv, 1); /* Pong */
int one = 1;
node_write(node, &smp_send, one); /* Ping */
node_read(node, &smp_recv, &one); /* Pong */
clock_gettime(CLOCK_ID, &recv);