diff --git a/include/villas/format_type.h b/include/villas/format_type.h index 0bb773a5e..af3282bf6 100644 --- a/include/villas/format_type.h +++ b/include/villas/format_type.h @@ -30,7 +30,7 @@ struct sample; struct io; enum format_type_flags { - format_type_BINARY = (1 << 8) + FORMAT_TYPE_BINARY = (1 << 8) }; struct format_type { @@ -95,12 +95,6 @@ struct format_type { /** @see format_type_sprint */ int (*sprint)(char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt, int flags); - /** @see format_type_fscan */ - int (*fscan)(FILE *f, struct sample *smps[], unsigned cnt, int flags); - - /** @see format_type_fprint */ - int (*fprint)(FILE *f, struct sample *smps[], unsigned cnt, int flags); - /** @} */ size_t size; /**< Number of bytes to allocate for io::_vd */ @@ -108,43 +102,3 @@ struct format_type { }; struct format_type * format_type_lookup(const char *name); - -/** Parse samples from the buffer \p buf with a length of \p len bytes. - * - * @param buf[in] The buffer of data which should be parsed / de-serialized. - * @param len[in] The length of the buffer \p buf. - * @param rbytes[out] The number of bytes which have been read from \p buf. - * @param smps[out] The array of pointers to samples. - * @param cnt[in] The number of pointers in the array \p smps. - * - * @retval >=0 The number of samples which have been parsed from \p buf and written into \p smps. - * @retval <0 Something went wrong. - */ -int format_type_sscan(struct format_type *fmt, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int flags); - -/** Print \p cnt samples from \p smps into buffer \p buf of length \p len. - * - * @param buf[out] The buffer which should be filled with serialized data. - * @param len[in] The length of the buffer \p buf. - * @param rbytes[out] The number of bytes which have been written to \p buf. Ignored if NULL. - * @param smps[in] The array of pointers to samples. - * @param cnt[in] The number of pointers in the array \p smps. - * - * @retval >=0 The number of samples from \p smps which have been written into \p buf. - * @retval <0 Something went wrong. - */ -int format_type_sprint(struct format_type *fmt, char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt, int flags); - -/** Parse up to \p cnt samples from stream \p f into array \p smps. - * - * @retval >=0 The number of samples which have been parsed from \p f and written into \p smps. - * @retval <0 Something went wrong. - */ -int format_type_fscan(struct format_type *fmt, FILE *f, struct sample *smps[], unsigned cnt, int flags); - -/** Print \p cnt samples from \p smps to stream \p f. - * - * @retval >=0 The number of samples from \p smps which have been written to \p f. - * @retval <0 Something went wrong. - */ -int format_type_fprint(struct format_type *fmt, FILE *f, struct sample *smps[], unsigned cnt, int flags); diff --git a/include/villas/io.h b/include/villas/io.h index 7405eec91..da8fcac3b 100644 --- a/include/villas/io.h +++ b/include/villas/io.h @@ -25,6 +25,7 @@ #include "advio.h" #include "common.h" +#include "node.h" /* Forward declarations */ struct sample; @@ -68,7 +69,7 @@ struct io { struct format_type *_vt; }; -int io_init(struct io *io, struct format_type *fmt, int flags); +int io_init(struct io *io, struct format_type *fmt, struct node *n, int flags); int io_destroy(struct io *io); @@ -106,3 +107,29 @@ int io_stream_flush(struct io *io); FILE * io_stream_input(struct io *io); FILE * io_stream_output(struct io *io); + +/** Parse samples from the buffer \p buf with a length of \p len bytes. + * + * @param buf[in] The buffer of data which should be parsed / de-serialized. + * @param len[in] The length of the buffer \p buf. + * @param rbytes[out] The number of bytes which have been read from \p buf. + * @param smps[out] The array of pointers to samples. + * @param cnt[in] The number of pointers in the array \p smps. + * + * @retval >=0 The number of samples which have been parsed from \p buf and written into \p smps. + * @retval <0 Something went wrong. + */ +int io_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt); + +/** Print \p cnt samples from \p smps into buffer \p buf of length \p len. + * + * @param buf[out] The buffer which should be filled with serialized data. + * @param len[in] The length of the buffer \p buf. + * @param rbytes[out] The number of bytes which have been written to \p buf. Ignored if NULL. + * @param smps[in] The array of pointers to samples. + * @param cnt[in] The number of pointers in the array \p smps. + * + * @retval >=0 The number of samples from \p smps which have been written into \p buf. + * @retval <0 Something went wrong. + */ +int io_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt); diff --git a/include/villas/nodes/amqp.h b/include/villas/nodes/amqp.h index 720dfbc7d..9eb98abe2 100644 --- a/include/villas/nodes/amqp.h +++ b/include/villas/nodes/amqp.h @@ -35,6 +35,7 @@ #include #include +#include /* Forward declarations */ struct format_type; @@ -61,6 +62,7 @@ struct amqp { amqp_connection_state_t consumer; struct format_type *format; + struct io io; }; /** @see node_type::print */ diff --git a/include/villas/nodes/mqtt.h b/include/villas/nodes/mqtt.h index 088e1f54a..aadad8c29 100644 --- a/include/villas/nodes/mqtt.h +++ b/include/villas/nodes/mqtt.h @@ -33,6 +33,7 @@ #include #include +#include #include /* Forward declarations */ @@ -64,6 +65,7 @@ struct mqtt { } ssl; struct format_type *format; + struct io io; }; /** @see node_type::reverse */ diff --git a/include/villas/nodes/nanomsg.h b/include/villas/nodes/nanomsg.h index e5f52f018..9b38160dc 100644 --- a/include/villas/nodes/nanomsg.h +++ b/include/villas/nodes/nanomsg.h @@ -33,6 +33,7 @@ #include #include +#include /** The maximum length of a packet which contains stuct msg. */ #define NANOMSG_MAX_PACKET_LEN 1500 @@ -52,6 +53,7 @@ struct nanomsg { } subscriber; struct format_type *format; + struct io io; }; /** @see node_type::print */ diff --git a/include/villas/nodes/socket.h b/include/villas/nodes/socket.h index b77a0500f..cee715db7 100644 --- a/include/villas/nodes/socket.h +++ b/include/villas/nodes/socket.h @@ -40,6 +40,7 @@ #endif #include +#include #ifdef WITH_LIBNL_ROUTE_30 #include @@ -86,6 +87,7 @@ struct socket { union sockaddr_union remote; /**< Remote address of the socket */ struct format_type *format; + struct io io; /* Multicast options */ struct multicast { diff --git a/include/villas/nodes/websocket.h b/include/villas/nodes/websocket.h index 1684c695c..f4461ddab 100644 --- a/include/villas/nodes/websocket.h +++ b/include/villas/nodes/websocket.h @@ -38,6 +38,7 @@ #include #include #include +#include #include #define DEFAULT_WEBSOCKET_QUEUELEN (DEFAULT_QUEUELEN * 64) @@ -72,7 +73,7 @@ struct websocket_connection { struct lws *wsi; struct node *node; - struct format_type *format; /**< The IO format used for this connection. */ + struct io io; struct queue queue; /**< For samples which are sent to the WebSocket */ struct websocket_destination *destination; diff --git a/include/villas/nodes/zeromq.h b/include/villas/nodes/zeromq.h index 042cf5e14..afdf4a9b7 100644 --- a/include/villas/nodes/zeromq.h +++ b/include/villas/nodes/zeromq.h @@ -33,6 +33,7 @@ #include #include +#include #if ZMQ_BUILD_DRAFT_API && (ZMQ_VERSION_MAJOR > 4 || (ZMQ_VERSION_MAJOR == 4 && ZMQ_VERSION_MINOR >= 2)) #define ZMQ_BUILD_DISH 1 @@ -49,6 +50,7 @@ struct zeromq { char *filter; struct format_type *format; + struct io io; struct { int enabled; diff --git a/lib/format_type.c b/lib/format_type.c index bb01ae836..28e2bf603 100644 --- a/lib/format_type.c +++ b/lib/format_type.c @@ -26,30 +26,6 @@ #include #include -int format_type_sscan(struct format_type *fmt, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int flags) -{ - flags |= fmt->flags; - - return fmt->sscan ? fmt->sscan(buf, len, rbytes, smps, cnt, flags) : -1; -} - -int format_type_sprint(struct format_type *fmt, char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt, int flags) -{ - flags |= fmt->flags; - - return fmt->sprint ? fmt->sprint(buf, len, wbytes, smps, cnt, flags) : -1; -} - -int format_type_fscan(struct format_type *fmt, FILE *f, struct sample *smps[], unsigned cnt, int flags) -{ - return fmt->sprint ? fmt->fscan(f, smps, cnt, flags) : -1; -} - -int format_type_fprint(struct format_type *fmt, FILE *f, struct sample *smps[], unsigned cnt, int flags) -{ - return fmt->fprint ? fmt->fprint(f, smps, cnt, flags) : -1; -} - struct format_type * format_type_lookup(const char *name) { struct plugin *p; diff --git a/lib/hooks/print.c b/lib/hooks/print.c index a4a736215..f436402d6 100644 --- a/lib/hooks/print.c +++ b/lib/hooks/print.c @@ -51,7 +51,7 @@ static int print_start(struct hook *h) struct print *p = (struct print *) h->_vd; int ret; - ret = io_init(&p->io, p->format, SAMPLE_HAS_ALL); + ret = io_init(&p->io, p->format, h->node, SAMPLE_HAS_ALL); if (ret) return ret; @@ -71,10 +71,6 @@ static int print_stop(struct hook *h) if (ret) return ret; - ret = io_destroy(&p->io); - if (ret) - return ret; - return 0; } @@ -110,6 +106,18 @@ static int print_process(struct hook *h, struct sample *smps[], unsigned *cnt) return 0; } +static int print_destroy(struct hook *h) +{ + int ret; + struct print *p = (struct print *) h->_vd; + + ret = io_destroy(&p->io); + if (ret) + return ret; + + return 0; +} + static struct plugin p = { .name = "print", .description = "Print the message to stdout", @@ -119,6 +127,7 @@ static struct plugin p = { .priority = 99, .init = print_init, .parse = print_parse, + .destroy= print_destroy, .start = print_start, .stop = print_stop, .process= print_process, diff --git a/lib/io.c b/lib/io.c index 68bea7477..8d80c1351 100644 --- a/lib/io.c +++ b/lib/io.c @@ -40,7 +40,7 @@ static int io_print_lines(struct io *io, struct sample *smps[], unsigned cnt) for (i = 0; i < cnt; i++) { size_t wbytes; - ret = format_type_sprint(io->_vt, io->output.buffer, io->output.buflen, &wbytes, &smps[i], 1, io->flags); + ret = io_sprint(io, io->output.buffer, io->output.buflen, &wbytes, &smps[i], 1); if (ret < 0) return ret; @@ -71,7 +71,7 @@ skip: bytes = getdelim(&io->input.buffer, &io->input.buflen, io->delimiter, f); if (ptr[0] == '\0' || ptr[0] == '#') goto skip; - ret = format_type_sscan(io->_vt, io->input.buffer, bytes, &rbytes, &smps[i], 1, io->flags); + ret = io_sscan(io, io->input.buffer, bytes, &rbytes, &smps[i], 1); if (ret < 0) return ret; } @@ -79,7 +79,7 @@ skip: bytes = getdelim(&io->input.buffer, &io->input.buflen, io->delimiter, f); return i; } -int io_init(struct io *io, struct format_type *fmt, int flags) +int io_init(struct io *io, struct format_type *fmt, struct node *n, int flags) { io->_vt = fmt; io->_vd = alloc(fmt->size); @@ -93,6 +93,14 @@ int io_init(struct io *io, struct format_type *fmt, int flags) io->input.buffer = alloc(io->input.buflen); io->output.buffer = alloc(io->output.buflen); + if (n) { + io->input.node = n; + io->output.node = n; + + io->input.signals = &n->signals; + io->output.signals = &n->signals; + } + return io->_vt->init ? io->_vt->init(io) : 0; } @@ -372,25 +380,20 @@ int io_print(struct io *io, struct sample *smps[], unsigned cnt) assert(io->state == STATE_OPENED); - if (io->_vt->print) - ret = io->_vt->print(io, smps, cnt); - else if (io->flags & IO_NEWLINES) + if (io->flags & IO_NEWLINES) ret = io_print_lines(io, smps, cnt); - else { + else if (io->_vt->print) + ret = io->_vt->print(io, smps, cnt); + else if (io->_vt->sprint) { FILE *f = io_stream_output(io); + size_t wbytes; - if (io->_vt->fprint) - ret = format_type_fprint(io->_vt, f, smps, cnt, io->flags); - else if (io->_vt->sprint) { - size_t wbytes; + ret = io_sprint(io, io->output.buffer, io->output.buflen, &wbytes, smps, cnt); - ret = format_type_sprint(io->_vt, io->output.buffer, io->output.buflen, &wbytes, smps, cnt, io->flags); - - fwrite(io->output.buffer, wbytes, 1, f); - } - else - ret = -1; + fwrite(io->output.buffer, wbytes, 1, f); } + else + ret = -1; if (io->flags & IO_FLUSH) io_flush(io); @@ -404,25 +407,20 @@ int io_scan(struct io *io, struct sample *smps[], unsigned cnt) assert(io->state == STATE_OPENED); - if (io->_vt->scan) - ret = io->_vt->scan(io, smps, cnt); - else if (io->flags & IO_NEWLINES) + if (io->flags & IO_NEWLINES) ret = io_scan_lines(io, smps, cnt); - else { + else if (io->_vt->scan) + ret = io->_vt->scan(io, smps, cnt); + else if (io->_vt->sscan) { FILE *f = io_stream_input(io); + size_t bytes, rbytes; - if (io->_vt->fscan) - ret = format_type_fscan(io->_vt, f, smps, cnt, io->flags); - else if (io->_vt->sscan) { - size_t bytes, rbytes; + bytes = fread(io->input.buffer, 1, io->input.buflen, f); - bytes = fread(io->input.buffer, 1, io->input.buflen, f); - - ret = format_type_sscan(io->_vt, io->input.buffer, bytes, &rbytes, smps, cnt, io->flags); - } - else - ret = -1; + ret = io_sscan(io, io->input.buffer, bytes, &rbytes, smps, cnt); } + else + ret = -1; return ret; } @@ -444,3 +442,17 @@ FILE * io_stream_input(struct io *io) { ? io->input.stream.adv->file : io->input.stream.std; } + +int io_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt) +{ + struct format_type *fmt = io->_vt; + + return fmt->sscan ? fmt->sscan(buf, len, rbytes, smps, cnt, io->flags) : -1; +} + +int io_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt) +{ + struct format_type *fmt = io->_vt; + + return fmt->sprint ? fmt->sprint(buf, len, wbytes, smps, cnt, io->flags) : -1; +} diff --git a/lib/nodes/amqp.c b/lib/nodes/amqp.c index f6ae610e2..21da90a4d 100644 --- a/lib/nodes/amqp.c +++ b/lib/nodes/amqp.c @@ -219,12 +219,17 @@ char * amqp_print(struct node *n) int amqp_start(struct node *n) { + int ret; struct amqp *a = n->_vd; amqp_bytes_t queue; amqp_rpc_reply_t rep; amqp_queue_declare_ok_t *r; + ret = io_init(&a->io, a->format, n, SAMPLE_HAS_ALL); + if (ret) + return ret; + /* Connect producer */ a->producer = amqp_connect(&a->connection_info, &a->ssl_info); if (!a->producer) @@ -295,7 +300,7 @@ int amqp_read(struct node *n, struct sample *smps[], unsigned cnt) if (rep.reply_type != AMQP_RESPONSE_NORMAL) return -1; - ret = format_type_sscan(a->format, env.message.body.bytes, env.message.body.len, NULL, smps, cnt, 0); + ret = io_sscan(&a->io, env.message.body.bytes, env.message.body.len, NULL, smps, cnt); amqp_destroy_envelope(&env); @@ -309,7 +314,7 @@ int amqp_write(struct node *n, struct sample *smps[], unsigned cnt) char data[1500]; size_t wbytes; - ret = format_type_sprint(a->format, data, sizeof(data), &wbytes, smps, cnt, SAMPLE_HAS_ALL); + ret = io_sprint(&a->io, data, sizeof(data), &wbytes, smps, cnt); if (ret <= 0) return -1; @@ -341,8 +346,13 @@ int amqp_fd(struct node *n) int amqp_destroy(struct node *n) { + int ret; struct amqp *a = n->_vd; + ret = io_destroy(&a->io); + if (ret) + return ret; + if (a->uri) free(a->uri); diff --git a/lib/nodes/file.c b/lib/nodes/file.c index 3e0088286..cc642caf4 100644 --- a/lib/nodes/file.c +++ b/lib/nodes/file.c @@ -209,7 +209,7 @@ int file_start(struct node *n) if (f->flush) flags |= IO_FLUSH; - ret = io_init(&f->io, f->format, flags); + ret = io_init(&f->io, f->format, n, flags); if (ret) return ret; diff --git a/lib/nodes/mqtt.c b/lib/nodes/mqtt.c index c4f10c465..fa580ff99 100644 --- a/lib/nodes/mqtt.c +++ b/lib/nodes/mqtt.c @@ -183,7 +183,7 @@ static void mqtt_message_cb(struct mosquitto *mosq, void *userdata, const struct return; } - ret = format_type_sscan(m->format, msg->payload, msg->payloadlen, NULL, &smp, 1, 0); + ret = io_sscan(&m->io, msg->payload, msg->payloadlen, NULL, &smp, 1); if (ret != 1) return; @@ -315,6 +315,10 @@ int mqtt_destroy(struct node *n) mosquitto_destroy(m->client); + ret = io_destroy(&m->io); + if (ret) + return ret; + ret = pool_destroy(&m->pool); if (ret) return ret; @@ -374,6 +378,10 @@ int mqtt_start(struct node *n) mosquitto_message_callback_set(m->client, mqtt_message_cb); mosquitto_subscribe_callback_set(m->client, mqtt_subscribe_cb); + ret = io_init(&m->io, m->format, n, SAMPLE_HAS_ALL); + if (ret) + return ret; + ret = pool_init(&m->pool, 1024, SAMPLE_LEN(n->samplelen), &memtype_hugepage); if (ret) return ret; @@ -490,8 +498,8 @@ int mqtt_write(struct node *n, struct sample *smps[], unsigned cnt) char data[1500]; - ret = format_type_sprint(m->format, data, sizeof(data), &wbytes, smps, cnt, SAMPLE_HAS_ALL); - if (ret <= 0) + ret = io_sprint(&m->io, data, sizeof(data), &wbytes, smps, cnt); + if (ret < 0) return -1; ret = mosquitto_publish(m->client, NULL /* mid */, m->publish, wbytes, data, m->qos, m->retain); diff --git a/lib/nodes/nanomsg.c b/lib/nodes/nanomsg.c index 2e3bf8603..432734803 100644 --- a/lib/nodes/nanomsg.c +++ b/lib/nodes/nanomsg.c @@ -151,6 +151,10 @@ int nanomsg_start(struct node *n) int ret; struct nanomsg *m = (struct nanomsg *) n->_vd; + ret = io_init(&m->io, m->format, n, SAMPLE_HAS_ALL); + if (ret) + return ret; + ret = m->subscriber.socket = nn_socket(AF_SP, NN_SUB); if (ret < 0) return ret; @@ -205,6 +209,18 @@ int nanomsg_stop(struct node *n) return 0; } +int nanomsg_destroy(struct node *n) +{ + int ret; + struct nanomsg *m = (struct nanomsg *) n->_vd; + + ret = io_destroy(&m->io); + if (ret) + return ret; + + return 0; +} + int nanomsg_deinit() { nn_term(); @@ -223,7 +239,7 @@ int nanomsg_read(struct node *n, struct sample *smps[], unsigned cnt) if (bytes < 0) return -1; - return format_type_sscan(m->format, data, bytes, NULL, smps, cnt, 0); + return io_sscan(&m->io, data, bytes, NULL, smps, cnt); } int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt) @@ -235,7 +251,7 @@ int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt) char data[NANOMSG_MAX_PACKET_LEN]; - ret = format_type_sprint(m->format, data, sizeof(data), &wbytes, smps, cnt, SAMPLE_HAS_ALL); + ret = io_sprint(&m->io, data, sizeof(data), &wbytes, smps, cnt); if (ret <= 0) return -1; @@ -273,6 +289,7 @@ static struct plugin p = { .print = nanomsg_print, .start = nanomsg_start, .stop = nanomsg_stop, + .destroy = nanomsg_destroy, .deinit = nanomsg_deinit, .read = nanomsg_read, .write = nanomsg_write, diff --git a/lib/nodes/socket.c b/lib/nodes/socket.c index d5b66a2fc..6ac97ba05 100644 --- a/lib/nodes/socket.c +++ b/lib/nodes/socket.c @@ -167,6 +167,7 @@ int socket_start(struct node *n) struct socket *s = (struct socket *) n->_vd; int ret; + // TODO: Move to socket_check() ? /* Some checks on the addresses */ if (s->layer != SOCKET_LAYER_UNIX) { if (s->local.sa.sa_family != s->remote.sa.sa_family) @@ -196,6 +197,11 @@ int socket_start(struct node *n) error("Multicast group address of node %s must be within 224.0.0.0/4", node_name(n)); } + /* Initialize IO */ + ret = io_init(&s->io, s->format, n, SAMPLE_HAS_ALL); + if (ret) + return ret; + /* Create socket */ switch (s->layer) { case SOCKET_LAYER_UDP: s->sd = socket(s->local.sa.sa_family, SOCK_DGRAM, IPPROTO_UDP); break; @@ -314,9 +320,14 @@ int socket_stop(struct node *n) int socket_destroy(struct node *n) { -#ifdef WITH_NETEM + int ret; struct socket *s = (struct socket *) n->_vd; + ret = io_destroy(&s->io); + if (ret) + return ret; + +#ifdef WITH_NETEM rtnl_qdisc_put(s->tc_qdisc); rtnl_cls_put(s->tc_classifier); #endif /* WITH_NETEM */ @@ -379,7 +390,7 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt) goto out; } - ret = format_type_sscan(s->format, ptr, bytes, &rbytes, smps, cnt, 0); + ret = io_sscan(&s->io, ptr, bytes, &rbytes, smps, cnt); if (ret < 0 || bytes != rbytes) warn("Received invalid packet from node: %s ret=%d, bytes=%zu, rbytes=%zu", node_name(n), ret, bytes, rbytes); @@ -404,7 +415,7 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt) if (!buf) return -1; -retry: ret = format_type_sprint(s->format, buf, buflen, &wbytes, smps, cnt, SAMPLE_HAS_ALL); +retry: ret = io_sprint(&s->io, buf, buflen, &wbytes, smps, cnt); if (ret < 0) goto out; diff --git a/lib/nodes/test_rtt.c b/lib/nodes/test_rtt.c index 060923325..6ccbae612 100644 --- a/lib/nodes/test_rtt.c +++ b/lib/nodes/test_rtt.c @@ -253,7 +253,7 @@ int test_rtt_start(struct node *n) return ret; } - ret = io_init(&t->io, t->format, SAMPLE_HAS_ALL & ~SAMPLE_HAS_VALUES); + ret = io_init(&t->io, t->format, n, SAMPLE_HAS_ALL & ~SAMPLE_HAS_VALUES); if (ret) return ret; diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c index 604ef8fca..0c598daea 100644 --- a/lib/nodes/websocket.c +++ b/lib/nodes/websocket.c @@ -161,12 +161,16 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi return -1; } - c->format = format_type_lookup(format); - if (!c->format) { + struct format_type *fmt = format_type_lookup(format); + if (fmt) { websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Invalid format"); return -1; } + ret = io_init(&c->io, fmt, c->node, SAMPLE_HAS_ALL); + if (ret) + return -1; + buffer_init(&c->buffers.recv, 1 << 12); buffer_init(&c->buffers.send, 1 << 12); @@ -210,6 +214,10 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi if (ret) return ret; + ret = io_destroy(&c->io); + if (ret) + return ret; + buffer_destroy(&c->buffers.recv); buffer_destroy(&c->buffers.send); @@ -233,9 +241,9 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi pulled = queue_pull_many(&c->queue, (void **) smps, cnt); if (pulled > 0) { - format_type_sprint(c->format, c->buffers.send.buf + LWS_PRE, c->buffers.send.size - LWS_PRE, &wbytes, smps, pulled, SAMPLE_HAS_ALL); + io_sprint(&c->io, c->buffers.send.buf + LWS_PRE, c->buffers.send.size - LWS_PRE, &wbytes, smps, pulled); - ret = lws_write(wsi, (unsigned char *) c->buffers.send.buf + LWS_PRE, wbytes, c->format->flags & format_type_BINARY ? LWS_WRITE_BINARY : LWS_WRITE_TEXT); + ret = lws_write(wsi, (unsigned char *) c->buffers.send.buf + LWS_PRE, wbytes, c->io.flags & FORMAT_TYPE_BINARY ? LWS_WRITE_BINARY : LWS_WRITE_TEXT); sample_put_many(smps, pulled); @@ -274,7 +282,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi if (avail < cnt) warn("Pool underrun for connection: %s", websocket_connection_name(c)); - recvd = format_type_sscan(c->format, c->buffers.recv.buf, c->buffers.recv.len, NULL, smps, avail, 0); + recvd = io_sscan(&c->io, c->buffers.recv.buf, c->buffers.recv.len, NULL, smps, avail); if (recvd < 0) { warn("Failed to parse sample data received on connection: %s", websocket_connection_name(c)); break; @@ -367,7 +375,15 @@ int websocket_start(struct node *n) c->destination = d; c->_name = NULL; - c->format = format_type_lookup("villas-web"); /** @todo We could parse the format from the URI */ + struct format_type *fmt; + + fmt = format_type_lookup("villas-web"); /** @todo We could parse the format from the URI */ + if (!fmt) + return -1; + + ret = io_init(&c->io, fmt, n, SAMPLE_HAS_ALL); + if (ret) + return ret; d->info.context = web->context; d->info.vhost = web->vhost; diff --git a/lib/nodes/zeromq.c b/lib/nodes/zeromq.c index 49c8738d6..930e45bf9 100644 --- a/lib/nodes/zeromq.c +++ b/lib/nodes/zeromq.c @@ -249,6 +249,10 @@ int zeromq_start(struct node *n) int ret; struct zeromq *z = (struct zeromq *) n->_vd; + ret = io_init(&z->io, z->format, n, SAMPLE_HAS_ALL); + if (ret) + return ret; + switch (z->pattern) { #ifdef ZMQ_BUILD_DISH case ZEROMQ_PATTERN_RADIODISH: @@ -403,6 +407,18 @@ int zeromq_stop(struct node *n) return zmq_close(z->publisher.socket); } +int zeromq_destroy(struct node *n) +{ + int ret; + struct zeromq *z = (struct zeromq *) n->_vd; + + ret = io_destroy(&z->io); + if (ret) + return ret; + + return 0; +} + int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt) { int recv, ret; @@ -430,7 +446,7 @@ int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt) if (ret < 0) return ret; - recv = format_type_sscan(z->format, zmq_msg_data(&m), zmq_msg_size(&m), NULL, smps, cnt, 0); + recv = io_sscan(&z->io, zmq_msg_data(&m), zmq_msg_size(&m), NULL, smps, cnt); ret = zmq_msg_close(&m); if (ret) @@ -449,7 +465,7 @@ int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt) char data[4096]; - ret = format_type_sprint(z->format, data, sizeof(data), &wbytes, smps, cnt, SAMPLE_HAS_ALL); + ret = io_sprint(&z->io, data, sizeof(data), &wbytes, smps, cnt); if (ret <= 0) return -1; @@ -516,6 +532,7 @@ static struct plugin p = { .print = zeromq_print, .start = zeromq_start, .stop = zeromq_stop, + .destroy = zeromq_destroy, .init = zeromq_init, .deinit = zeromq_deinit, .read = zeromq_read, diff --git a/src/convert.c b/src/convert.c index 0cd214a27..287c466d8 100644 --- a/src/convert.c +++ b/src/convert.c @@ -106,7 +106,7 @@ check: if (optarg == endptr) if (!p) error("Invalid format: %s", dirs[i].name); - ret = io_init(dirs[i].io, &p->io, SAMPLE_HAS_ALL); + ret = io_init(dirs[i].io, &p->io, NULL, SAMPLE_HAS_ALL); if (ret) error("Failed to initialize IO: %s", dirs[i].name); @@ -131,6 +131,10 @@ check: if (optarg == endptr) ret = io_close(dirs[i].io); if (ret) error("Failed to close IO"); + + ret = io_destroy(dirs[i].io); + if (ret) + error("Failed to destroy IO"); } return 0; diff --git a/src/hook.c b/src/hook.c index 55eac6993..098de89e0 100644 --- a/src/hook.c +++ b/src/hook.c @@ -63,6 +63,10 @@ static void quit(int signal, siginfo_t *sinfo, void *ctx) if (ret) error("Failed to destroy hook"); + ret = io_destroy(&io); + if (ret) + error("Failed to destroy IO"); + sample_free_many(smps, cnt); ret = pool_destroy(&q); @@ -181,7 +185,7 @@ check: if (optarg == endptr) if (!p) error("Unknown IO format '%s'", format); - ret = io_init(&io, &p->io, SAMPLE_HAS_ALL); + ret = io_init(&io, &p->io, NULL, SAMPLE_HAS_ALL); if (ret) error("Failed to initialize IO"); diff --git a/src/pipe.c b/src/pipe.c index 95346878e..e063adf85 100644 --- a/src/pipe.c +++ b/src/pipe.c @@ -57,6 +57,8 @@ struct node *node; static void quit(int signal, siginfo_t *sinfo, void *ctx) { + int ret; + if (signal == SIGALRM) info("Reached timeout. Terminating..."); @@ -70,15 +72,29 @@ static void quit(int signal, siginfo_t *sinfo, void *ctx) pthread_join(sendd.thread, NULL); } - super_node_stop(&sn); + ret = super_node_stop(&sn); + if (ret) + error("Failed to stop super node"); - if (recvv.enabled) - pool_destroy(&recvv.pool); + if (recvv.enabled) { + ret = pool_destroy(&recvv.pool); + if (ret) + error("Failed to destroy pool"); + } - if (sendd.enabled) - pool_destroy(&sendd.pool); + if (sendd.enabled) { + ret = pool_destroy(&sendd.pool); + if (ret) + error("Failed to destroy pool"); + } - super_node_destroy(&sn); + ret = super_node_destroy(&sn); + if (ret) + error("Failed to destroy super node"); + + ret = io_destroy(&io); + if (ret) + error("Failed to destroy IO"); info(CLR_GRN("Goodbye!")); exit(EXIT_SUCCESS); @@ -309,7 +325,7 @@ check: if (optarg == endptr) if (!p) error("Invalid format: %s", format); - ret = io_init(&io, &p->io, SAMPLE_HAS_ALL); + ret = io_init(&io, &p->io, NULL, SAMPLE_HAS_ALL); if (ret) error("Failed to initialize IO"); diff --git a/src/signal.c b/src/signal.c index 588bba5c6..f608303c8 100644 --- a/src/signal.c +++ b/src/signal.c @@ -87,11 +87,11 @@ static void quit(int signal, siginfo_t *sinfo, void *ctx) ret = io_close(&io); if (ret) - error("Failed to close output"); + error("Failed to close IO"); ret = io_destroy(&io); if (ret) - error("Failed to destroy output"); + error("Failed to destroy IO"); ret = pool_destroy(&q); if (ret) @@ -136,7 +136,7 @@ int main(int argc, char *argv[]) if (!p) error("Invalid output format '%s'", format); - ret = io_init(&io, &p->io, IO_FLUSH | (SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET)); + ret = io_init(&io, &p->io, NULL, IO_FLUSH | (SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET)); if (ret) error("Failed to initialize output"); diff --git a/src/test-cmp.c b/src/test-cmp.c index 6bc07d896..bd1968e7e 100644 --- a/src/test-cmp.c +++ b/src/test-cmp.c @@ -135,7 +135,7 @@ check: if (optarg == endptr) if (!s[i].fmt) error("Invalid IO format: %s", s[i].format); - ret = io_init(&s[i].io, s[i].fmt, 0); + ret = io_init(&s[i].io, s[i].fmt, NULL, 0); if (ret) error("Failed to initialize IO"); @@ -192,12 +192,20 @@ retry: eofs = 0; } out: for (int i = 0; i < n; i++) { - io_close(&s[i].io); - io_destroy(&s[i].io); + ret = io_close(&s[i].io); + if (ret) + error("Failed to close IO"); + + ret = io_destroy(&s[i].io); + if (ret) + error("Failed to destroy IO"); + sample_put(s[i].sample); } - pool_destroy(&pool); + ret = pool_destroy(&pool); + if (ret) + error("Failed to destroy pool"); return ret; }