1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

io: remove state-less IO calls

This commit is contained in:
Steffen Vogel 2018-05-12 15:25:29 +02:00
parent a9a3d1d9eb
commit 4ba077d14e
24 changed files with 245 additions and 145 deletions

View file

@ -30,7 +30,7 @@ struct sample;
struct io;
enum format_type_flags {
format_type_BINARY = (1 << 8)
FORMAT_TYPE_BINARY = (1 << 8)
};
struct format_type {
@ -95,12 +95,6 @@ struct format_type {
/** @see format_type_sprint */
int (*sprint)(char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt, int flags);
/** @see format_type_fscan */
int (*fscan)(FILE *f, struct sample *smps[], unsigned cnt, int flags);
/** @see format_type_fprint */
int (*fprint)(FILE *f, struct sample *smps[], unsigned cnt, int flags);
/** @} */
size_t size; /**< Number of bytes to allocate for io::_vd */
@ -108,43 +102,3 @@ struct format_type {
};
struct format_type * format_type_lookup(const char *name);
/** Parse samples from the buffer \p buf with a length of \p len bytes.
*
* @param buf[in] The buffer of data which should be parsed / de-serialized.
* @param len[in] The length of the buffer \p buf.
* @param rbytes[out] The number of bytes which have been read from \p buf.
* @param smps[out] The array of pointers to samples.
* @param cnt[in] The number of pointers in the array \p smps.
*
* @retval >=0 The number of samples which have been parsed from \p buf and written into \p smps.
* @retval <0 Something went wrong.
*/
int format_type_sscan(struct format_type *fmt, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int flags);
/** Print \p cnt samples from \p smps into buffer \p buf of length \p len.
*
* @param buf[out] The buffer which should be filled with serialized data.
* @param len[in] The length of the buffer \p buf.
* @param rbytes[out] The number of bytes which have been written to \p buf. Ignored if NULL.
* @param smps[in] The array of pointers to samples.
* @param cnt[in] The number of pointers in the array \p smps.
*
* @retval >=0 The number of samples from \p smps which have been written into \p buf.
* @retval <0 Something went wrong.
*/
int format_type_sprint(struct format_type *fmt, char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt, int flags);
/** Parse up to \p cnt samples from stream \p f into array \p smps.
*
* @retval >=0 The number of samples which have been parsed from \p f and written into \p smps.
* @retval <0 Something went wrong.
*/
int format_type_fscan(struct format_type *fmt, FILE *f, struct sample *smps[], unsigned cnt, int flags);
/** Print \p cnt samples from \p smps to stream \p f.
*
* @retval >=0 The number of samples from \p smps which have been written to \p f.
* @retval <0 Something went wrong.
*/
int format_type_fprint(struct format_type *fmt, FILE *f, struct sample *smps[], unsigned cnt, int flags);

View file

@ -25,6 +25,7 @@
#include "advio.h"
#include "common.h"
#include "node.h"
/* Forward declarations */
struct sample;
@ -68,7 +69,7 @@ struct io {
struct format_type *_vt;
};
int io_init(struct io *io, struct format_type *fmt, int flags);
int io_init(struct io *io, struct format_type *fmt, struct node *n, int flags);
int io_destroy(struct io *io);
@ -106,3 +107,29 @@ int io_stream_flush(struct io *io);
FILE * io_stream_input(struct io *io);
FILE * io_stream_output(struct io *io);
/** Parse samples from the buffer \p buf with a length of \p len bytes.
*
* @param buf[in] The buffer of data which should be parsed / de-serialized.
* @param len[in] The length of the buffer \p buf.
* @param rbytes[out] The number of bytes which have been read from \p buf.
* @param smps[out] The array of pointers to samples.
* @param cnt[in] The number of pointers in the array \p smps.
*
* @retval >=0 The number of samples which have been parsed from \p buf and written into \p smps.
* @retval <0 Something went wrong.
*/
int io_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt);
/** Print \p cnt samples from \p smps into buffer \p buf of length \p len.
*
* @param buf[out] The buffer which should be filled with serialized data.
* @param len[in] The length of the buffer \p buf.
* @param rbytes[out] The number of bytes which have been written to \p buf. Ignored if NULL.
* @param smps[in] The array of pointers to samples.
* @param cnt[in] The number of pointers in the array \p smps.
*
* @retval >=0 The number of samples from \p smps which have been written into \p buf.
* @retval <0 Something went wrong.
*/
int io_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt);

View file

@ -35,6 +35,7 @@
#include <villas/node.h>
#include <villas/list.h>
#include <villas/io.h>
/* Forward declarations */
struct format_type;
@ -61,6 +62,7 @@ struct amqp {
amqp_connection_state_t consumer;
struct format_type *format;
struct io io;
};
/** @see node_type::print */

View file

@ -33,6 +33,7 @@
#include <villas/node.h>
#include <villas/pool.h>
#include <villas/io.h>
#include <villas/queue_signalled.h>
/* Forward declarations */
@ -64,6 +65,7 @@ struct mqtt {
} ssl;
struct format_type *format;
struct io io;
};
/** @see node_type::reverse */

View file

@ -33,6 +33,7 @@
#include <villas/node.h>
#include <villas/list.h>
#include <villas/io.h>
/** The maximum length of a packet which contains stuct msg. */
#define NANOMSG_MAX_PACKET_LEN 1500
@ -52,6 +53,7 @@ struct nanomsg {
} subscriber;
struct format_type *format;
struct io io;
};
/** @see node_type::print */

View file

@ -40,6 +40,7 @@
#endif
#include <villas/config.h>
#include <villas/io.h>
#ifdef WITH_LIBNL_ROUTE_30
#include <villas/kernel/if.h>
@ -86,6 +87,7 @@ struct socket {
union sockaddr_union remote; /**< Remote address of the socket */
struct format_type *format;
struct io io;
/* Multicast options */
struct multicast {

View file

@ -38,6 +38,7 @@
#include <villas/pool.h>
#include <villas/queue_signalled.h>
#include <villas/common.h>
#include <villas/io.h>
#include <villas/config.h>
#define DEFAULT_WEBSOCKET_QUEUELEN (DEFAULT_QUEUELEN * 64)
@ -72,7 +73,7 @@ struct websocket_connection {
struct lws *wsi;
struct node *node;
struct format_type *format; /**< The IO format used for this connection. */
struct io io;
struct queue queue; /**< For samples which are sent to the WebSocket */
struct websocket_destination *destination;

View file

@ -33,6 +33,7 @@
#include <jansson.h>
#include <villas/list.h>
#include <villas/io.h>
#if ZMQ_BUILD_DRAFT_API && (ZMQ_VERSION_MAJOR > 4 || (ZMQ_VERSION_MAJOR == 4 && ZMQ_VERSION_MINOR >= 2))
#define ZMQ_BUILD_DISH 1
@ -49,6 +50,7 @@ struct zeromq {
char *filter;
struct format_type *format;
struct io io;
struct {
int enabled;

View file

@ -26,30 +26,6 @@
#include <villas/plugin.h>
#include <villas/format_type.h>
int format_type_sscan(struct format_type *fmt, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int flags)
{
flags |= fmt->flags;
return fmt->sscan ? fmt->sscan(buf, len, rbytes, smps, cnt, flags) : -1;
}
int format_type_sprint(struct format_type *fmt, char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt, int flags)
{
flags |= fmt->flags;
return fmt->sprint ? fmt->sprint(buf, len, wbytes, smps, cnt, flags) : -1;
}
int format_type_fscan(struct format_type *fmt, FILE *f, struct sample *smps[], unsigned cnt, int flags)
{
return fmt->sprint ? fmt->fscan(f, smps, cnt, flags) : -1;
}
int format_type_fprint(struct format_type *fmt, FILE *f, struct sample *smps[], unsigned cnt, int flags)
{
return fmt->fprint ? fmt->fprint(f, smps, cnt, flags) : -1;
}
struct format_type * format_type_lookup(const char *name)
{
struct plugin *p;

View file

@ -51,7 +51,7 @@ static int print_start(struct hook *h)
struct print *p = (struct print *) h->_vd;
int ret;
ret = io_init(&p->io, p->format, SAMPLE_HAS_ALL);
ret = io_init(&p->io, p->format, h->node, SAMPLE_HAS_ALL);
if (ret)
return ret;
@ -71,10 +71,6 @@ static int print_stop(struct hook *h)
if (ret)
return ret;
ret = io_destroy(&p->io);
if (ret)
return ret;
return 0;
}
@ -110,6 +106,18 @@ static int print_process(struct hook *h, struct sample *smps[], unsigned *cnt)
return 0;
}
static int print_destroy(struct hook *h)
{
int ret;
struct print *p = (struct print *) h->_vd;
ret = io_destroy(&p->io);
if (ret)
return ret;
return 0;
}
static struct plugin p = {
.name = "print",
.description = "Print the message to stdout",
@ -119,6 +127,7 @@ static struct plugin p = {
.priority = 99,
.init = print_init,
.parse = print_parse,
.destroy= print_destroy,
.start = print_start,
.stop = print_stop,
.process= print_process,

View file

@ -40,7 +40,7 @@ static int io_print_lines(struct io *io, struct sample *smps[], unsigned cnt)
for (i = 0; i < cnt; i++) {
size_t wbytes;
ret = format_type_sprint(io->_vt, io->output.buffer, io->output.buflen, &wbytes, &smps[i], 1, io->flags);
ret = io_sprint(io, io->output.buffer, io->output.buflen, &wbytes, &smps[i], 1);
if (ret < 0)
return ret;
@ -71,7 +71,7 @@ skip: bytes = getdelim(&io->input.buffer, &io->input.buflen, io->delimiter, f);
if (ptr[0] == '\0' || ptr[0] == '#')
goto skip;
ret = format_type_sscan(io->_vt, io->input.buffer, bytes, &rbytes, &smps[i], 1, io->flags);
ret = io_sscan(io, io->input.buffer, bytes, &rbytes, &smps[i], 1);
if (ret < 0)
return ret;
}
@ -79,7 +79,7 @@ skip: bytes = getdelim(&io->input.buffer, &io->input.buflen, io->delimiter, f);
return i;
}
int io_init(struct io *io, struct format_type *fmt, int flags)
int io_init(struct io *io, struct format_type *fmt, struct node *n, int flags)
{
io->_vt = fmt;
io->_vd = alloc(fmt->size);
@ -93,6 +93,14 @@ int io_init(struct io *io, struct format_type *fmt, int flags)
io->input.buffer = alloc(io->input.buflen);
io->output.buffer = alloc(io->output.buflen);
if (n) {
io->input.node = n;
io->output.node = n;
io->input.signals = &n->signals;
io->output.signals = &n->signals;
}
return io->_vt->init ? io->_vt->init(io) : 0;
}
@ -372,25 +380,20 @@ int io_print(struct io *io, struct sample *smps[], unsigned cnt)
assert(io->state == STATE_OPENED);
if (io->_vt->print)
ret = io->_vt->print(io, smps, cnt);
else if (io->flags & IO_NEWLINES)
if (io->flags & IO_NEWLINES)
ret = io_print_lines(io, smps, cnt);
else {
else if (io->_vt->print)
ret = io->_vt->print(io, smps, cnt);
else if (io->_vt->sprint) {
FILE *f = io_stream_output(io);
size_t wbytes;
if (io->_vt->fprint)
ret = format_type_fprint(io->_vt, f, smps, cnt, io->flags);
else if (io->_vt->sprint) {
size_t wbytes;
ret = io_sprint(io, io->output.buffer, io->output.buflen, &wbytes, smps, cnt);
ret = format_type_sprint(io->_vt, io->output.buffer, io->output.buflen, &wbytes, smps, cnt, io->flags);
fwrite(io->output.buffer, wbytes, 1, f);
}
else
ret = -1;
fwrite(io->output.buffer, wbytes, 1, f);
}
else
ret = -1;
if (io->flags & IO_FLUSH)
io_flush(io);
@ -404,25 +407,20 @@ int io_scan(struct io *io, struct sample *smps[], unsigned cnt)
assert(io->state == STATE_OPENED);
if (io->_vt->scan)
ret = io->_vt->scan(io, smps, cnt);
else if (io->flags & IO_NEWLINES)
if (io->flags & IO_NEWLINES)
ret = io_scan_lines(io, smps, cnt);
else {
else if (io->_vt->scan)
ret = io->_vt->scan(io, smps, cnt);
else if (io->_vt->sscan) {
FILE *f = io_stream_input(io);
size_t bytes, rbytes;
if (io->_vt->fscan)
ret = format_type_fscan(io->_vt, f, smps, cnt, io->flags);
else if (io->_vt->sscan) {
size_t bytes, rbytes;
bytes = fread(io->input.buffer, 1, io->input.buflen, f);
bytes = fread(io->input.buffer, 1, io->input.buflen, f);
ret = format_type_sscan(io->_vt, io->input.buffer, bytes, &rbytes, smps, cnt, io->flags);
}
else
ret = -1;
ret = io_sscan(io, io->input.buffer, bytes, &rbytes, smps, cnt);
}
else
ret = -1;
return ret;
}
@ -444,3 +442,17 @@ FILE * io_stream_input(struct io *io) {
? io->input.stream.adv->file
: io->input.stream.std;
}
int io_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt)
{
struct format_type *fmt = io->_vt;
return fmt->sscan ? fmt->sscan(buf, len, rbytes, smps, cnt, io->flags) : -1;
}
int io_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt)
{
struct format_type *fmt = io->_vt;
return fmt->sprint ? fmt->sprint(buf, len, wbytes, smps, cnt, io->flags) : -1;
}

View file

@ -219,12 +219,17 @@ char * amqp_print(struct node *n)
int amqp_start(struct node *n)
{
int ret;
struct amqp *a = n->_vd;
amqp_bytes_t queue;
amqp_rpc_reply_t rep;
amqp_queue_declare_ok_t *r;
ret = io_init(&a->io, a->format, n, SAMPLE_HAS_ALL);
if (ret)
return ret;
/* Connect producer */
a->producer = amqp_connect(&a->connection_info, &a->ssl_info);
if (!a->producer)
@ -295,7 +300,7 @@ int amqp_read(struct node *n, struct sample *smps[], unsigned cnt)
if (rep.reply_type != AMQP_RESPONSE_NORMAL)
return -1;
ret = format_type_sscan(a->format, env.message.body.bytes, env.message.body.len, NULL, smps, cnt, 0);
ret = io_sscan(&a->io, env.message.body.bytes, env.message.body.len, NULL, smps, cnt);
amqp_destroy_envelope(&env);
@ -309,7 +314,7 @@ int amqp_write(struct node *n, struct sample *smps[], unsigned cnt)
char data[1500];
size_t wbytes;
ret = format_type_sprint(a->format, data, sizeof(data), &wbytes, smps, cnt, SAMPLE_HAS_ALL);
ret = io_sprint(&a->io, data, sizeof(data), &wbytes, smps, cnt);
if (ret <= 0)
return -1;
@ -341,8 +346,13 @@ int amqp_fd(struct node *n)
int amqp_destroy(struct node *n)
{
int ret;
struct amqp *a = n->_vd;
ret = io_destroy(&a->io);
if (ret)
return ret;
if (a->uri)
free(a->uri);

View file

@ -209,7 +209,7 @@ int file_start(struct node *n)
if (f->flush)
flags |= IO_FLUSH;
ret = io_init(&f->io, f->format, flags);
ret = io_init(&f->io, f->format, n, flags);
if (ret)
return ret;

View file

@ -183,7 +183,7 @@ static void mqtt_message_cb(struct mosquitto *mosq, void *userdata, const struct
return;
}
ret = format_type_sscan(m->format, msg->payload, msg->payloadlen, NULL, &smp, 1, 0);
ret = io_sscan(&m->io, msg->payload, msg->payloadlen, NULL, &smp, 1);
if (ret != 1)
return;
@ -315,6 +315,10 @@ int mqtt_destroy(struct node *n)
mosquitto_destroy(m->client);
ret = io_destroy(&m->io);
if (ret)
return ret;
ret = pool_destroy(&m->pool);
if (ret)
return ret;
@ -374,6 +378,10 @@ int mqtt_start(struct node *n)
mosquitto_message_callback_set(m->client, mqtt_message_cb);
mosquitto_subscribe_callback_set(m->client, mqtt_subscribe_cb);
ret = io_init(&m->io, m->format, n, SAMPLE_HAS_ALL);
if (ret)
return ret;
ret = pool_init(&m->pool, 1024, SAMPLE_LEN(n->samplelen), &memtype_hugepage);
if (ret)
return ret;
@ -490,8 +498,8 @@ int mqtt_write(struct node *n, struct sample *smps[], unsigned cnt)
char data[1500];
ret = format_type_sprint(m->format, data, sizeof(data), &wbytes, smps, cnt, SAMPLE_HAS_ALL);
if (ret <= 0)
ret = io_sprint(&m->io, data, sizeof(data), &wbytes, smps, cnt);
if (ret < 0)
return -1;
ret = mosquitto_publish(m->client, NULL /* mid */, m->publish, wbytes, data, m->qos, m->retain);

View file

@ -151,6 +151,10 @@ int nanomsg_start(struct node *n)
int ret;
struct nanomsg *m = (struct nanomsg *) n->_vd;
ret = io_init(&m->io, m->format, n, SAMPLE_HAS_ALL);
if (ret)
return ret;
ret = m->subscriber.socket = nn_socket(AF_SP, NN_SUB);
if (ret < 0)
return ret;
@ -205,6 +209,18 @@ int nanomsg_stop(struct node *n)
return 0;
}
int nanomsg_destroy(struct node *n)
{
int ret;
struct nanomsg *m = (struct nanomsg *) n->_vd;
ret = io_destroy(&m->io);
if (ret)
return ret;
return 0;
}
int nanomsg_deinit()
{
nn_term();
@ -223,7 +239,7 @@ int nanomsg_read(struct node *n, struct sample *smps[], unsigned cnt)
if (bytes < 0)
return -1;
return format_type_sscan(m->format, data, bytes, NULL, smps, cnt, 0);
return io_sscan(&m->io, data, bytes, NULL, smps, cnt);
}
int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt)
@ -235,7 +251,7 @@ int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt)
char data[NANOMSG_MAX_PACKET_LEN];
ret = format_type_sprint(m->format, data, sizeof(data), &wbytes, smps, cnt, SAMPLE_HAS_ALL);
ret = io_sprint(&m->io, data, sizeof(data), &wbytes, smps, cnt);
if (ret <= 0)
return -1;
@ -273,6 +289,7 @@ static struct plugin p = {
.print = nanomsg_print,
.start = nanomsg_start,
.stop = nanomsg_stop,
.destroy = nanomsg_destroy,
.deinit = nanomsg_deinit,
.read = nanomsg_read,
.write = nanomsg_write,

View file

@ -167,6 +167,7 @@ int socket_start(struct node *n)
struct socket *s = (struct socket *) n->_vd;
int ret;
// TODO: Move to socket_check() ?
/* Some checks on the addresses */
if (s->layer != SOCKET_LAYER_UNIX) {
if (s->local.sa.sa_family != s->remote.sa.sa_family)
@ -196,6 +197,11 @@ int socket_start(struct node *n)
error("Multicast group address of node %s must be within 224.0.0.0/4", node_name(n));
}
/* Initialize IO */
ret = io_init(&s->io, s->format, n, SAMPLE_HAS_ALL);
if (ret)
return ret;
/* Create socket */
switch (s->layer) {
case SOCKET_LAYER_UDP: s->sd = socket(s->local.sa.sa_family, SOCK_DGRAM, IPPROTO_UDP); break;
@ -314,9 +320,14 @@ int socket_stop(struct node *n)
int socket_destroy(struct node *n)
{
#ifdef WITH_NETEM
int ret;
struct socket *s = (struct socket *) n->_vd;
ret = io_destroy(&s->io);
if (ret)
return ret;
#ifdef WITH_NETEM
rtnl_qdisc_put(s->tc_qdisc);
rtnl_cls_put(s->tc_classifier);
#endif /* WITH_NETEM */
@ -379,7 +390,7 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt)
goto out;
}
ret = format_type_sscan(s->format, ptr, bytes, &rbytes, smps, cnt, 0);
ret = io_sscan(&s->io, ptr, bytes, &rbytes, smps, cnt);
if (ret < 0 || bytes != rbytes)
warn("Received invalid packet from node: %s ret=%d, bytes=%zu, rbytes=%zu", node_name(n), ret, bytes, rbytes);
@ -404,7 +415,7 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt)
if (!buf)
return -1;
retry: ret = format_type_sprint(s->format, buf, buflen, &wbytes, smps, cnt, SAMPLE_HAS_ALL);
retry: ret = io_sprint(&s->io, buf, buflen, &wbytes, smps, cnt);
if (ret < 0)
goto out;

View file

@ -253,7 +253,7 @@ int test_rtt_start(struct node *n)
return ret;
}
ret = io_init(&t->io, t->format, SAMPLE_HAS_ALL & ~SAMPLE_HAS_VALUES);
ret = io_init(&t->io, t->format, n, SAMPLE_HAS_ALL & ~SAMPLE_HAS_VALUES);
if (ret)
return ret;

View file

@ -161,12 +161,16 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
return -1;
}
c->format = format_type_lookup(format);
if (!c->format) {
struct format_type *fmt = format_type_lookup(format);
if (fmt) {
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Invalid format");
return -1;
}
ret = io_init(&c->io, fmt, c->node, SAMPLE_HAS_ALL);
if (ret)
return -1;
buffer_init(&c->buffers.recv, 1 << 12);
buffer_init(&c->buffers.send, 1 << 12);
@ -210,6 +214,10 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
if (ret)
return ret;
ret = io_destroy(&c->io);
if (ret)
return ret;
buffer_destroy(&c->buffers.recv);
buffer_destroy(&c->buffers.send);
@ -233,9 +241,9 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
pulled = queue_pull_many(&c->queue, (void **) smps, cnt);
if (pulled > 0) {
format_type_sprint(c->format, c->buffers.send.buf + LWS_PRE, c->buffers.send.size - LWS_PRE, &wbytes, smps, pulled, SAMPLE_HAS_ALL);
io_sprint(&c->io, c->buffers.send.buf + LWS_PRE, c->buffers.send.size - LWS_PRE, &wbytes, smps, pulled);
ret = lws_write(wsi, (unsigned char *) c->buffers.send.buf + LWS_PRE, wbytes, c->format->flags & format_type_BINARY ? LWS_WRITE_BINARY : LWS_WRITE_TEXT);
ret = lws_write(wsi, (unsigned char *) c->buffers.send.buf + LWS_PRE, wbytes, c->io.flags & FORMAT_TYPE_BINARY ? LWS_WRITE_BINARY : LWS_WRITE_TEXT);
sample_put_many(smps, pulled);
@ -274,7 +282,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
if (avail < cnt)
warn("Pool underrun for connection: %s", websocket_connection_name(c));
recvd = format_type_sscan(c->format, c->buffers.recv.buf, c->buffers.recv.len, NULL, smps, avail, 0);
recvd = io_sscan(&c->io, c->buffers.recv.buf, c->buffers.recv.len, NULL, smps, avail);
if (recvd < 0) {
warn("Failed to parse sample data received on connection: %s", websocket_connection_name(c));
break;
@ -367,7 +375,15 @@ int websocket_start(struct node *n)
c->destination = d;
c->_name = NULL;
c->format = format_type_lookup("villas-web"); /** @todo We could parse the format from the URI */
struct format_type *fmt;
fmt = format_type_lookup("villas-web"); /** @todo We could parse the format from the URI */
if (!fmt)
return -1;
ret = io_init(&c->io, fmt, n, SAMPLE_HAS_ALL);
if (ret)
return ret;
d->info.context = web->context;
d->info.vhost = web->vhost;

View file

@ -249,6 +249,10 @@ int zeromq_start(struct node *n)
int ret;
struct zeromq *z = (struct zeromq *) n->_vd;
ret = io_init(&z->io, z->format, n, SAMPLE_HAS_ALL);
if (ret)
return ret;
switch (z->pattern) {
#ifdef ZMQ_BUILD_DISH
case ZEROMQ_PATTERN_RADIODISH:
@ -403,6 +407,18 @@ int zeromq_stop(struct node *n)
return zmq_close(z->publisher.socket);
}
int zeromq_destroy(struct node *n)
{
int ret;
struct zeromq *z = (struct zeromq *) n->_vd;
ret = io_destroy(&z->io);
if (ret)
return ret;
return 0;
}
int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt)
{
int recv, ret;
@ -430,7 +446,7 @@ int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt)
if (ret < 0)
return ret;
recv = format_type_sscan(z->format, zmq_msg_data(&m), zmq_msg_size(&m), NULL, smps, cnt, 0);
recv = io_sscan(&z->io, zmq_msg_data(&m), zmq_msg_size(&m), NULL, smps, cnt);
ret = zmq_msg_close(&m);
if (ret)
@ -449,7 +465,7 @@ int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt)
char data[4096];
ret = format_type_sprint(z->format, data, sizeof(data), &wbytes, smps, cnt, SAMPLE_HAS_ALL);
ret = io_sprint(&z->io, data, sizeof(data), &wbytes, smps, cnt);
if (ret <= 0)
return -1;
@ -516,6 +532,7 @@ static struct plugin p = {
.print = zeromq_print,
.start = zeromq_start,
.stop = zeromq_stop,
.destroy = zeromq_destroy,
.init = zeromq_init,
.deinit = zeromq_deinit,
.read = zeromq_read,

View file

@ -106,7 +106,7 @@ check: if (optarg == endptr)
if (!p)
error("Invalid format: %s", dirs[i].name);
ret = io_init(dirs[i].io, &p->io, SAMPLE_HAS_ALL);
ret = io_init(dirs[i].io, &p->io, NULL, SAMPLE_HAS_ALL);
if (ret)
error("Failed to initialize IO: %s", dirs[i].name);
@ -131,6 +131,10 @@ check: if (optarg == endptr)
ret = io_close(dirs[i].io);
if (ret)
error("Failed to close IO");
ret = io_destroy(dirs[i].io);
if (ret)
error("Failed to destroy IO");
}
return 0;

View file

@ -63,6 +63,10 @@ static void quit(int signal, siginfo_t *sinfo, void *ctx)
if (ret)
error("Failed to destroy hook");
ret = io_destroy(&io);
if (ret)
error("Failed to destroy IO");
sample_free_many(smps, cnt);
ret = pool_destroy(&q);
@ -181,7 +185,7 @@ check: if (optarg == endptr)
if (!p)
error("Unknown IO format '%s'", format);
ret = io_init(&io, &p->io, SAMPLE_HAS_ALL);
ret = io_init(&io, &p->io, NULL, SAMPLE_HAS_ALL);
if (ret)
error("Failed to initialize IO");

View file

@ -57,6 +57,8 @@ struct node *node;
static void quit(int signal, siginfo_t *sinfo, void *ctx)
{
int ret;
if (signal == SIGALRM)
info("Reached timeout. Terminating...");
@ -70,15 +72,29 @@ static void quit(int signal, siginfo_t *sinfo, void *ctx)
pthread_join(sendd.thread, NULL);
}
super_node_stop(&sn);
ret = super_node_stop(&sn);
if (ret)
error("Failed to stop super node");
if (recvv.enabled)
pool_destroy(&recvv.pool);
if (recvv.enabled) {
ret = pool_destroy(&recvv.pool);
if (ret)
error("Failed to destroy pool");
}
if (sendd.enabled)
pool_destroy(&sendd.pool);
if (sendd.enabled) {
ret = pool_destroy(&sendd.pool);
if (ret)
error("Failed to destroy pool");
}
super_node_destroy(&sn);
ret = super_node_destroy(&sn);
if (ret)
error("Failed to destroy super node");
ret = io_destroy(&io);
if (ret)
error("Failed to destroy IO");
info(CLR_GRN("Goodbye!"));
exit(EXIT_SUCCESS);
@ -309,7 +325,7 @@ check: if (optarg == endptr)
if (!p)
error("Invalid format: %s", format);
ret = io_init(&io, &p->io, SAMPLE_HAS_ALL);
ret = io_init(&io, &p->io, NULL, SAMPLE_HAS_ALL);
if (ret)
error("Failed to initialize IO");

View file

@ -87,11 +87,11 @@ static void quit(int signal, siginfo_t *sinfo, void *ctx)
ret = io_close(&io);
if (ret)
error("Failed to close output");
error("Failed to close IO");
ret = io_destroy(&io);
if (ret)
error("Failed to destroy output");
error("Failed to destroy IO");
ret = pool_destroy(&q);
if (ret)
@ -136,7 +136,7 @@ int main(int argc, char *argv[])
if (!p)
error("Invalid output format '%s'", format);
ret = io_init(&io, &p->io, IO_FLUSH | (SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET));
ret = io_init(&io, &p->io, NULL, IO_FLUSH | (SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET));
if (ret)
error("Failed to initialize output");

View file

@ -135,7 +135,7 @@ check: if (optarg == endptr)
if (!s[i].fmt)
error("Invalid IO format: %s", s[i].format);
ret = io_init(&s[i].io, s[i].fmt, 0);
ret = io_init(&s[i].io, s[i].fmt, NULL, 0);
if (ret)
error("Failed to initialize IO");
@ -192,12 +192,20 @@ retry: eofs = 0;
}
out: for (int i = 0; i < n; i++) {
io_close(&s[i].io);
io_destroy(&s[i].io);
ret = io_close(&s[i].io);
if (ret)
error("Failed to close IO");
ret = io_destroy(&s[i].io);
if (ret)
error("Failed to destroy IO");
sample_put(s[i].sample);
}
pool_destroy(&pool);
ret = pool_destroy(&pool);
if (ret)
error("Failed to destroy pool");
return ret;
}