diff --git a/lib/nodes/socket.c b/lib/nodes/socket.c index af7977b31..0e9310516 100644 --- a/lib/nodes/socket.c +++ b/lib/nodes/socket.c @@ -40,8 +40,6 @@ #include "queue.h" #include "plugin.h" -#define MAX_PACKETLEN 1500 - /* Forward declartions */ static struct plugin p; diff --git a/lib/nodes/zeromq.c b/lib/nodes/zeromq.c index af68a49be..a7bb5b877 100644 --- a/lib/nodes/zeromq.c +++ b/lib/nodes/zeromq.c @@ -26,17 +26,10 @@ #include "utils.h" #include "queue.h" #include "plugin.h" +#include "msg.h" static void *context; -/* Release our samples. */ -static void free_msg(void *data, void *hint) -{ - struct sample *s = data; - - sample_put(s); -} - int zeromq_reverse(struct node *n) { // struct zeromq *z = n->_vd; @@ -187,44 +180,54 @@ int zeromq_stop(struct node *n) int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt) { - int i, ret; + int recv, ret; struct zeromq *z = n->_vd; - for (i = 0; i < cnt; i++) { - zmq_msg_t m; - - ret = zmq_msg_init_data(&m, smps[i], SAMPLE_LEN(smps[i]->capacity), free_msg, NULL); - if (ret < 0) - break; - - ret = zmq_msg_recv(&m, z->subscriber.socket, 0); - if (ret < 0) - break; - } + zmq_msg_t m; + + ret = zmq_msg_init(&m); + if (ret < 0) + return ret; + + ret = zmq_msg_recv(&m, z->subscriber.socket, 0); + if (ret < 0) + return ret; - return i; + recv = msg_buffer_to_samples(smps, cnt, zmq_msg_data(&m), zmq_msg_size(&m)); + + ret = zmq_msg_close(&m); + if (ret) + return ret; + + return recv; } int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt) { - int i, ret; + int ret; struct zeromq *z = n->_vd; + + ssize_t sent; + zmq_msg_t m; + + char data[1500]; - for (i = 0; i < cnt; i++) { - zmq_msg_t m; - - sample_get(smps[i]); - - ret = zmq_msg_init_data(&m, smps[i], SAMPLE_LEN(smps[i]->length), free_msg, NULL); - if (ret < 0) - break; - - ret = zmq_msg_send(&m, z->publisher.socket, 0); - if (ret < 0) - break; - } + sent = msg_buffer_from_samples(smps, cnt, data, sizeof(data)); + if (sent < 0) + return -1; - return i; + ret = zmq_msg_init_size(&m, sent); + + memcpy(zmq_msg_data(&m), data, sent); + + if (ret < 0) + return ret; + + ret = zmq_msg_send(&m, z->publisher.socket, 0); + if (ret < 0) + return ret; + + return cnt; } static struct plugin p = {