1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

add new msg_buffer() functions to read / write multiple struct sample from / to a buffer using the struct msg wire protocol.

This commit is contained in:
Steffen Vogel 2017-05-23 09:33:42 +02:00
parent 465999d608
commit 98cd2938af
2 changed files with 39 additions and 38 deletions

View file

@ -40,8 +40,6 @@
#include "queue.h"
#include "plugin.h"
#define MAX_PACKETLEN 1500
/* Forward declartions */
static struct plugin p;

View file

@ -26,17 +26,10 @@
#include "utils.h"
#include "queue.h"
#include "plugin.h"
#include "msg.h"
static void *context;
/* Release our samples. */
static void free_msg(void *data, void *hint)
{
struct sample *s = data;
sample_put(s);
}
int zeromq_reverse(struct node *n)
{
// struct zeromq *z = n->_vd;
@ -187,44 +180,54 @@ int zeromq_stop(struct node *n)
int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt)
{
int i, ret;
int recv, ret;
struct zeromq *z = n->_vd;
for (i = 0; i < cnt; i++) {
zmq_msg_t m;
ret = zmq_msg_init_data(&m, smps[i], SAMPLE_LEN(smps[i]->capacity), free_msg, NULL);
if (ret < 0)
break;
ret = zmq_msg_recv(&m, z->subscriber.socket, 0);
if (ret < 0)
break;
}
zmq_msg_t m;
ret = zmq_msg_init(&m);
if (ret < 0)
return ret;
ret = zmq_msg_recv(&m, z->subscriber.socket, 0);
if (ret < 0)
return ret;
return i;
recv = msg_buffer_to_samples(smps, cnt, zmq_msg_data(&m), zmq_msg_size(&m));
ret = zmq_msg_close(&m);
if (ret)
return ret;
return recv;
}
int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt)
{
int i, ret;
int ret;
struct zeromq *z = n->_vd;
ssize_t sent;
zmq_msg_t m;
char data[1500];
for (i = 0; i < cnt; i++) {
zmq_msg_t m;
sample_get(smps[i]);
ret = zmq_msg_init_data(&m, smps[i], SAMPLE_LEN(smps[i]->length), free_msg, NULL);
if (ret < 0)
break;
ret = zmq_msg_send(&m, z->publisher.socket, 0);
if (ret < 0)
break;
}
sent = msg_buffer_from_samples(smps, cnt, data, sizeof(data));
if (sent < 0)
return -1;
return i;
ret = zmq_msg_init_size(&m, sent);
memcpy(zmq_msg_data(&m), data, sent);
if (ret < 0)
return ret;
ret = zmq_msg_send(&m, z->publisher.socket, 0);
if (ret < 0)
return ret;
return cnt;
}
static struct plugin p = {