diff --git a/include/villas/io/csv.h b/include/villas/io/csv.h index 993305664..c7f2a1027 100644 --- a/include/villas/io/csv.h +++ b/include/villas/io/csv.h @@ -32,4 +32,4 @@ struct sample; int csv_fprint(FILE *f, struct sample *smps[], unsigned cnt, int flags); -int csv_fscan(FILE *f, struct sample *smps[], unsigned cnt, int *flags); +int csv_fscan(FILE *f, struct sample *smps[], unsigned cnt, int flags); diff --git a/include/villas/io/json.h b/include/villas/io/json.h index 045a1201e..8274732bc 100644 --- a/include/villas/io/json.h +++ b/include/villas/io/json.h @@ -28,8 +28,8 @@ int json_pack_sample(json_t **j, struct sample *s, int flags); -int json_unpack_sample(json_t *j, struct sample *s, int *flags); +int json_unpack_sample(json_t *j, struct sample *s, int flags); int json_fprint(FILE *f, struct sample *smps[], unsigned cnt, int flags); -int json_fscan(FILE *f, struct sample *smps[], unsigned cnt, int *flags); +int json_fscan(FILE *f, struct sample *smps[], unsigned cnt, int flags); diff --git a/include/villas/io/msg.h b/include/villas/io/msg.h index 9c520d06d..508b5be68 100644 --- a/include/villas/io/msg.h +++ b/include/villas/io/msg.h @@ -68,4 +68,4 @@ int msg_from_sample(struct msg *msg, struct sample *smp); int msg_sprint(char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt, int flags); /** Read struct sample's from buffer \p buf into samples \p smps. */ -int msg_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags); +int msg_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int flags); diff --git a/include/villas/io/raw.h b/include/villas/io/raw.h index 797503fed..9fa5d04cb 100644 --- a/include/villas/io/raw.h +++ b/include/villas/io/raw.h @@ -34,7 +34,7 @@ enum raw_flags { RAW_BE_INT = (1 << 17), /**< Byte-order for integer data: big-endian if set. */ RAW_BE_FLT = (1 << 18), /**< Byte-order for floating point data: big-endian if set. */ RAW_BE_HDR = (1 << 19), /**< Byte-order for fake header fields: big-endian if set. */ - + /** Byte-order for all fields: big-endian if set. */ RAW_BE = RAW_BE_INT | RAW_BE_FLT | RAW_BE_HDR, @@ -57,4 +57,4 @@ enum raw_flags { int raw_sprint(char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt, int flags); /** Read struct sample's from buffer \p buf into samples \p smps. */ -int raw_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags); +int raw_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int flags); diff --git a/include/villas/io/villas.h b/include/villas/io/villas.h index 94aadc40d..34d248013 100644 --- a/include/villas/io/villas.h +++ b/include/villas/io/villas.h @@ -33,4 +33,4 @@ int villas_scan(struct io *io, struct sample *smps[], unsigned cnt); int villas_fprint(FILE *f, struct sample *smps[], unsigned cnt, int flags); -int villas_fscan(FILE *f, struct sample *smps[], unsigned cnt, int *flags); +int villas_fscan(FILE *f, struct sample *smps[], unsigned cnt, int flags); diff --git a/include/villas/io_format.h b/include/villas/io_format.h index 7bca9954f..599acd34c 100644 --- a/include/villas/io_format.h +++ b/include/villas/io_format.h @@ -30,12 +30,6 @@ struct sample; struct io; enum io_format_flags { - IO_FORMAT_NANOSECONDS = (1 << 0), /**< Include nanoseconds in output. */ - IO_FORMAT_OFFSET = (1 << 1), /**< Include offset / delta between received and send timestamps. */ - IO_FORMAT_SEQUENCE = (1 << 2), /**< Include sequence number in output. */ - IO_FORMAT_VALUES = (1 << 3), /**< Include values in output. */ - IO_FORMAT_ALL = 15, /**< Enable all output options. */ - IO_FORMAT_BINARY = (1 << 8) }; @@ -70,7 +64,7 @@ struct io_format { * @see rewind() */ void (*rewind)(struct io *io); - + /** Get a file descriptor which can be used with select / poll */ int (*fd)(struct io *io); @@ -89,13 +83,13 @@ struct io_format { */ /** @see io_format_sscan */ - int (*sscan)(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags); + int (*sscan)(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int flags); /** @see io_format_sprint */ int (*sprint)(char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt, int flags); /** @see io_format_fscan */ - int (*fscan)(FILE *f, struct sample *smps[], unsigned cnt, int *flags); + int (*fscan)(FILE *f, struct sample *smps[], unsigned cnt, int flags); /** @see io_format_fprint */ int (*fprint)(FILE *f, struct sample *smps[], unsigned cnt, int flags); @@ -119,7 +113,7 @@ struct io_format * io_format_lookup(const char *name); * @retval >=0 The number of samples which have been parsed from \p buf and written into \p smps. * @retval <0 Something went wrong. */ -int io_format_sscan(struct io_format *fmt, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags); +int io_format_sscan(struct io_format *fmt, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int flags); /** Print \p cnt samples from \p smps into buffer \p buf of length \p len. * @@ -139,7 +133,7 @@ int io_format_sprint(struct io_format *fmt, char *buf, size_t len, size_t *wbyte * @retval >=0 The number of samples which have been parsed from \p f and written into \p smps. * @retval <0 Something went wrong. */ -int io_format_fscan(struct io_format *fmt, FILE *f, struct sample *smps[], unsigned cnt, int *flags); +int io_format_fscan(struct io_format *fmt, FILE *f, struct sample *smps[], unsigned cnt, int flags); /** Print \p cnt samples from \p smps to stream \p f. * diff --git a/include/villas/sample.h b/include/villas/sample.h index 7c6d88925..662d07704 100644 --- a/include/villas/sample.h +++ b/include/villas/sample.h @@ -52,11 +52,24 @@ enum sample_data_format { SAMPLE_DATA_FORMAT_INT = 1 }; +/** Parts of a sample that can be serialized / de-serialized by the IO formats */ +enum sample_has { + SAMPLE_ORIGIN = (1 << 0), /**< Include nanoseconds in output. */ + SAMPLE_RECEIVED = (1 << 1), /**< Include nanoseconds in output. */ + SAMPLE_OFFSET = (1 << 2), + SAMPLE_SOURCE = (1 << 3), + SAMPLE_ID = (1 << 4), + SAMPLE_SEQUENCE = (1 << 5), /**< Include sequence number in output. */ + SAMPLE_VALUES = (1 << 6), /**< Include values in output. */ + SAMPLE_FORMAT = (1 << 7), + SAMPLE_ALL = (1 << 7) - 1, /**< Enable all output options. */ +}; + struct sample { int sequence; /**< The sequence number of this sample. */ int length; /**< The number of values in sample::values which are valid. */ int capacity; /**< The number of values in sample::values for which memory is reserved. */ - + int id; atomic_int refcnt; /**< Reference counter. */ @@ -70,6 +83,8 @@ struct sample { struct timespec sent; /**< The point in time when this data was send for the last time. */ } ts; + int has; + /** A long bitfield indicating the number representation of the first 64 values in sample::data[]. * * @see sample_data_format diff --git a/lib/hooks/fix_ts.c b/lib/hooks/fix_ts.c deleted file mode 100644 index 4e6d8f6b1..000000000 --- a/lib/hooks/fix_ts.c +++ /dev/null @@ -1,70 +0,0 @@ -/** Fix timestamp hook. - * - * @author Steffen Vogel - * @copyright 2017, Institute for Automation of Complex Power Systems, EONERC - * @license GNU General Public License (version 3) - * - * VILLASnode - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - *********************************************************************************/ - -/** @addtogroup hooks Hook functions - * @{ - */ - -#include "hook.h" -#include "plugin.h" -#include "timing.h" -#include "sample.h" - -int fix_ts_read(struct hook *h, struct sample *smps[], unsigned *cnt) -{ - struct timespec now; - - now = time_now(); - - for (int i = 0; i < *cnt; i++) { - /* Check for missing receive timestamp - * Usually node_type::read() should update the receive timestamp. - * An example would be to use hardware timestamp capabilities of - * modern NICs. - */ - if ((smps[i]->ts.received.tv_sec == 0 && smps[i]->ts.received.tv_nsec == 0) || - (smps[i]->ts.received.tv_sec == -1 && smps[i]->ts.received.tv_nsec == -1)) - smps[i]->ts.received = now; - - /* Check for missing origin timestamp */ - if ((smps[i]->ts.origin.tv_sec == 0 && smps[i]->ts.origin.tv_nsec == 0) || - (smps[i]->ts.origin.tv_sec == -1 && smps[i]->ts.origin.tv_nsec == -1)) - smps[i]->ts.origin = now; - } - - return 0; -} - -static struct plugin p = { - .name = "fix_ts", - .description = "Update timestamps of sample if not set", - .type = PLUGIN_TYPE_HOOK, - .hook = { - .flags = HOOK_NODE | HOOK_BUILTIN, - .priority = 0, - .read = fix_ts_read, - } -}; - -REGISTER_PLUGIN(&p) - -/** @} */ diff --git a/lib/hooks/print.c b/lib/hooks/print.c index 70645a4d3..1107d6358 100644 --- a/lib/hooks/print.c +++ b/lib/hooks/print.c @@ -51,7 +51,7 @@ static int print_start(struct hook *h) struct print *p = h->_vd; int ret; - ret = io_init(&p->io, p->format, IO_FORMAT_ALL); + ret = io_init(&p->io, p->format, SAMPLE_ALL); if (ret) return ret; diff --git a/lib/io.c b/lib/io.c index 261edd48c..f53d1b0ec 100644 --- a/lib/io.c +++ b/lib/io.c @@ -59,7 +59,7 @@ int io_stream_open(struct io *io, const char *uri) if (uri) { if (aislocal(uri)) { io->mode = IO_MODE_STDIO; - + io->stdio.output = fopen(uri, "a+"); if (io->stdio.output == NULL) return -1; @@ -87,7 +87,7 @@ int io_stream_open(struct io *io, const char *uri) io->stdio.input = stdin; io->stdio.output = stdout; } - + if (io->mode == IO_MODE_STDIO) { ret = setvbuf(io->stdio.input, NULL, _IOLBF, BUFSIZ); if (ret) @@ -187,7 +187,7 @@ int io_stream_fd(struct io *io) case IO_MODE_CUSTOM: return -1; } - + return -1; } @@ -244,7 +244,7 @@ int io_print(struct io *io, struct sample *smps[], unsigned cnt) FILE *f = io->mode == IO_MODE_ADVIO ? io->advio.output->file : io->stdio.output; - + //flockfile(f); if (io->_vt->fprint) @@ -259,7 +259,7 @@ int io_print(struct io *io, struct sample *smps[], unsigned cnt) } else ret = -1; - + //funlockfile(f); } @@ -279,37 +279,35 @@ int io_scan(struct io *io, struct sample *smps[], unsigned cnt) FILE *f = io->mode == IO_MODE_ADVIO ? io->advio.input->file : io->stdio.input; - + //flockfile(f); - - int flags = io->flags; if (io->_vt->fscan) - return io->_vt->fscan(f, smps, cnt, &flags); + return io->_vt->fscan(f, smps, cnt, io->flags); else if (io->_vt->sscan) { size_t bytes, rbytes; char buf[4096]; bytes = fread(buf, 1, sizeof(buf), f); - ret = io->_vt->sscan(buf, bytes, &rbytes, smps, cnt, &flags); + ret = io->_vt->sscan(buf, bytes, &rbytes, smps, cnt, io->flags); } else ret = -1; - + //funlockfile(f); } - + return ret; } struct io_format * io_format_lookup(const char *name) { struct plugin *p; - + p = plugin_lookup(PLUGIN_TYPE_IO, name); if (!p) return NULL; - + return &p->io; } diff --git a/lib/io/csv.c b/lib/io/csv.c index aae43a75b..a55332b3c 100644 --- a/lib/io/csv.c +++ b/lib/io/csv.c @@ -30,12 +30,12 @@ size_t csv_sprint_single(char *buf, size_t len, struct sample *s, int flags) { - size_t off = snprintf(buf, len, "%ld", s->ts.origin.tv_sec); - - if (flags & IO_FORMAT_NANOSECONDS) - off += snprintf(buf + off, len - off, "%c%09llu", CSV_SEPARATOR, (unsigned long long) s->ts.origin.tv_nsec); + size_t off = 0; - if (flags & IO_FORMAT_SEQUENCE) + if (flags & SAMPLE_ORIGIN) + off += snprintf(buf + off, len - off, "%ld%c%09ld", s->ts.origin.tv_sec, CSV_SEPARATOR, s->ts.origin.tv_nsec); + + if (flags & SAMPLE_SEQUENCE) off += snprintf(buf + off, len - off, "%c%u", CSV_SEPARATOR, s->sequence); for (int i = 0; i < s->length; i++) { @@ -50,31 +50,37 @@ size_t csv_sprint_single(char *buf, size_t len, struct sample *s, int flags) } off += snprintf(buf + off, len - off, "\n"); - + return off; } -size_t csv_sscan_single(const char *buf, size_t len, struct sample *s, int *flags) +size_t csv_sscan_single(const char *buf, size_t len, struct sample *s, int flags) { const char *ptr = buf; char *end; + s->has = 0; + s->ts.origin.tv_sec = strtoul(ptr, &end, 10); if (end == ptr || *end == '\n') goto out; - + ptr = end; - + s->ts.origin.tv_nsec = strtoul(ptr, &end, 10); if (end == ptr || *end == '\n') goto out; - + ptr = end; - + + s->has |= SAMPLE_ORIGIN; + s->sequence = strtoul(ptr, &end, 10); if (end == ptr || *end == '\n') goto out; + s->has |= SAMPLE_SEQUENCE; + for (ptr = end, s->length = 0; s->length < s->capacity; ptr = end, s->length++) { @@ -94,11 +100,12 @@ size_t csv_sscan_single(const char *buf, size_t len, struct sample *s, int *flag if (end == ptr) goto out; } - + out: if (*end == '\n') end++; - s->ts.received = time_now(); + if (s->length > 0) + s->has |= SAMPLE_VALUES; return end - buf; } @@ -110,21 +117,21 @@ int csv_sprint(char *buf, size_t len, size_t *wbytes, struct sample *smps[], uns for (i = 0; i < cnt && off < len; i++) off += csv_sprint_single(buf + off, len - off, smps[i], flags); - + if (wbytes) *wbytes = off; return i; } -int csv_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags) +int csv_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int flags) { int i; size_t off = 0; for (i = 0; i < cnt && off < len; i++) off += csv_sscan_single(buf + off, len - off, smps[i], flags); - + if (rbytes) *rbytes = off; @@ -139,13 +146,13 @@ int csv_fprint_single(FILE *f, struct sample *s, int flags) ret = csv_sprint_single(line, sizeof(line), s, flags); if (ret < 0) return ret; - + fputs(line, f); return 0; } -int csv_fscan_single(FILE *f, struct sample *s, int *flags) +int csv_fscan_single(FILE *f, struct sample *s, int flags) { char *ptr, line[4096]; @@ -172,7 +179,7 @@ int csv_fprint(FILE *f, struct sample *smps[], unsigned cnt, int flags) return i; } -int csv_fscan(FILE *f, struct sample *smps[], unsigned cnt, int *flags) +int csv_fscan(FILE *f, struct sample *smps[], unsigned cnt, int flags) { int ret, i; for (i = 0; i < cnt; i++) { diff --git a/lib/io/json.c b/lib/io/json.c index f9b9374c7..bf21abebb 100644 --- a/lib/io/json.c +++ b/lib/io/json.c @@ -33,14 +33,14 @@ int json_pack_sample(json_t **j, struct sample *smp, int flags) "origin", smp->ts.origin.tv_sec, smp->ts.origin.tv_nsec, "received", smp->ts.received.tv_sec, smp->ts.received.tv_nsec, "sent", smp->ts.sent.tv_sec, smp->ts.sent.tv_nsec); - - if (flags & IO_FORMAT_SEQUENCE) { + + if (flags & SAMPLE_SEQUENCE) { json_t *json_sequence = json_integer(smp->sequence); - + json_object_set(json_smp, "sequence", json_sequence); } - - if (flags & IO_FORMAT_VALUES) { + + if (flags & SAMPLE_VALUES) { json_t *json_data = json_array(); for (int i = 0; i < smp->length; i++) { @@ -50,10 +50,10 @@ int json_pack_sample(json_t **j, struct sample *smp, int flags) json_array_append(json_data, json_value); } - + json_object_set(json_smp, "data", json_data); } - + *j = json_smp; return 0; @@ -63,23 +63,23 @@ int json_pack_samples(json_t **j, struct sample *smps[], unsigned cnt, int flags { int ret; json_t *json_smps = json_array(); - + for (int i = 0; i < cnt; i++) { json_t *json_smp; - + ret = json_pack_sample(&json_smp, smps[i], flags); if (ret) break; - + json_array_append(json_smps, json_smp); } - + *j = json_smps; - + return cnt; } -int json_unpack_sample(json_t *json_smp, struct sample *smp, int *flags) +int json_unpack_sample(json_t *json_smp, struct sample *smp, int flags) { int ret; json_t *json_data, *json_value; @@ -95,16 +95,17 @@ int json_unpack_sample(json_t *json_smp, struct sample *smp, int *flags) if (ret) return ret; - + if (!json_is_array(json_data)) return -1; + smp->has = SAMPLE_ORIGIN | SAMPLE_RECEIVED | SAMPLE_SEQUENCE; smp->length = 0; json_array_foreach(json_data, i, json_value) { if (i >= smp->capacity) break; - + switch (json_typeof(json_value)) { case JSON_REAL: smp->data[i].f = json_real_value(json_value); @@ -122,19 +123,22 @@ int json_unpack_sample(json_t *json_smp, struct sample *smp, int *flags) smp->length++; } - + + if (smp->length > 0) + smp->has |= SAMPLE_VALUES; + return 0; } -int json_unpack_samples(json_t *json_smps, struct sample *smps[], unsigned cnt, int *flags) +int json_unpack_samples(json_t *json_smps, struct sample *smps[], unsigned cnt, int flags) { int ret; json_t *json_smp; size_t i; - + if (!json_is_array(json_smps)) return -1; - + json_array_foreach(json_smps, i, json_smp) { if (i >= cnt) break; @@ -167,7 +171,7 @@ int json_sprint(char *buf, size_t len, size_t *wbytes, struct sample *smps[], un return ret; } -int json_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags) +int json_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int flags) { int ret; json_t *json; @@ -182,7 +186,7 @@ int json_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], uns return ret; json_decref(json); - + if (rbytes) *rbytes = err.position; @@ -208,7 +212,7 @@ int json_fprint(FILE *f, struct sample *smps[], unsigned cnt, int flags) return i; } -int json_fscan(FILE *f, struct sample *smps[], unsigned cnt, int *flags) +int json_fscan(FILE *f, struct sample *smps[], unsigned cnt, int flags) { int i, ret; json_t *json; diff --git a/lib/io/msg.c b/lib/io/msg.c index a213d37eb..cfc2028f8 100644 --- a/lib/io/msg.c +++ b/lib/io/msg.c @@ -80,12 +80,11 @@ int msg_to_sample(struct msg *msg, struct sample *smp) if (ret) return -1; + smp->has = SAMPLE_ORIGIN | SAMPLE_SEQUENCE | SAMPLE_VALUES | SAMPLE_ID; smp->length = MIN(msg->length, smp->capacity); smp->sequence = msg->sequence; smp->id = msg->id; smp->ts.origin = MSG_TS(msg); - smp->ts.received.tv_sec = -1; - smp->ts.received.tv_nsec = -1; smp->format = 0; for (int i = 0; i < smp->length; i++) { @@ -124,7 +123,7 @@ int msg_sprint(char *buf, size_t len, size_t *wbytes, struct sample *smps[], uns for (i = 0; i < cnt; i++) { struct msg *msg = (struct msg *) ptr; struct sample *smp = smps[i]; - + if (ptr + MSG_LEN(smp->length) > buf + len) break; @@ -140,18 +139,18 @@ int msg_sprint(char *buf, size_t len, size_t *wbytes, struct sample *smps[], uns ptr += MSG_LEN(smp->length); } - + if (wbytes) *wbytes = ptr - buf; return i; } -int msg_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags) +int msg_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int flags) { int ret, i = 0, values; char *ptr = buf; - + if (len % 4 != 0) { warn("Packet size is invalid: %zd Must be multiple of 4 bytes.", len); return -1; @@ -170,16 +169,16 @@ int msg_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsi warn("Invalid msg received: reason=1"); break; } - - values = (*flags & MSG_WEB) ? msg->length : ntohs(msg->length); - + + values = (flags & MSG_WEB) ? msg->length : ntohs(msg->length); + /* Check if remainder of message is in buffer boundaries */ if (ptr + MSG_LEN(values) > buf + len) { warn("Invalid msg received: reason=2, msglen=%zu, len=%zu, ptr=%p, buf=%p, i=%u", MSG_LEN(values), len, ptr, buf, i); break; } - if (*flags & MSG_WEB) + if (flags & MSG_WEB) ; else msg_ntoh(msg); @@ -192,7 +191,7 @@ int msg_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsi ptr += MSG_LEN(smp->length); } - + if (rbytes) *rbytes = ptr - buf; diff --git a/lib/io/raw.c b/lib/io/raw.c index 43144b2fc..c164d8f61 100644 --- a/lib/io/raw.c +++ b/lib/io/raw.c @@ -23,7 +23,6 @@ #include "sample.h" #include "plugin.h" #include "utils.h" -#include "compat.h" #include "io/raw.h" /** Convert float to host byte order */ @@ -61,14 +60,14 @@ int raw_sprint(char *buf, size_t len, size_t *wbytes, struct sample *smps[], uns int i, o = 0; size_t nlen; - + int8_t *i8 = (void *) buf; int16_t *i16 = (void *) buf; int32_t *i32 = (void *) buf; int64_t *i64 = (void *) buf; float *f32 = (void *) buf; double *f64 = (void *) buf; - + int bits = 1 << (flags >> 24); for (i = 0; i < cnt; i++) { @@ -128,12 +127,12 @@ int raw_sprint(char *buf, size_t len, size_t *wbytes, struct sample *smps[], uns return i; } -int raw_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags) +int raw_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int flags) { /* The raw format can not encode multiple samples in one buffer * as there is no support for framing. */ struct sample *smp = smps[0]; - + int8_t *i8 = (void *) buf; int16_t *i16 = (void *) buf; int32_t *i32 = (void *) buf; @@ -141,15 +140,15 @@ int raw_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsi float *f32 = (void *) buf; double *f64 = (void *) buf; - int off, bits = 1 << (*flags >> 24); - + int off, bits = 1 << (flags >> 24); + smp->length = len / (bits / 8); - - if (*flags & RAW_FAKE) { + + if (flags & RAW_FAKE) { off = 3; - + if (smp->length < off) { -// warn("Node %s received a packet with no fake header. Skipping...", node_name(n)); + warn("Received a packet with no fake header. Skipping..."); return 0; } @@ -157,21 +156,24 @@ int raw_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsi switch (bits) { case 32: - smp->sequence = SWAP_INT_TOH(*flags & RAW_BE_HDR, 32, i32[0]); - smp->ts.origin.tv_sec = SWAP_INT_TOH(*flags & RAW_BE_HDR, 32, i32[1]); - smp->ts.origin.tv_nsec = SWAP_INT_TOH(*flags & RAW_BE_HDR, 32, i32[2]); + smp->sequence = SWAP_INT_TOH(flags & RAW_BE_HDR, 32, i32[0]); + smp->ts.origin.tv_sec = SWAP_INT_TOH(flags & RAW_BE_HDR, 32, i32[1]); + smp->ts.origin.tv_nsec = SWAP_INT_TOH(flags & RAW_BE_HDR, 32, i32[2]); break; case 64: - smp->sequence = SWAP_INT_TOH(*flags & RAW_BE_HDR, 64, i64[0]); - smp->ts.origin.tv_sec = SWAP_INT_TOH(*flags & RAW_BE_HDR, 64, i64[1]); - smp->ts.origin.tv_nsec = SWAP_INT_TOH(*flags & RAW_BE_HDR, 64, i64[2]); + smp->sequence = SWAP_INT_TOH(flags & RAW_BE_HDR, 64, i64[0]); + smp->ts.origin.tv_sec = SWAP_INT_TOH(flags & RAW_BE_HDR, 64, i64[1]); + smp->ts.origin.tv_nsec = SWAP_INT_TOH(flags & RAW_BE_HDR, 64, i64[2]); break; } + + smp->has = SAMPLE_SEQUENCE | SAMPLE_ORIGIN; } else { off = 0; + smp->has = 0; smp->sequence = 0; smp->ts.origin.tv_sec = 0; smp->ts.origin.tv_nsec = 0; @@ -181,35 +183,32 @@ int raw_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsi warn("Received more values than supported: length=%u, capacity=%u", smp->length, smp->capacity); smp->length = smp->capacity; } - + for (int i = 0; i < smp->length; i++) { - int fmt = *flags & RAW_FLT ? SAMPLE_DATA_FORMAT_FLOAT + int fmt = flags & RAW_FLT ? SAMPLE_DATA_FORMAT_FLOAT : SAMPLE_DATA_FORMAT_INT; - + sample_set_data_format(smp, i, fmt); - + switch (fmt) { case SAMPLE_DATA_FORMAT_FLOAT: switch (bits) { - case 32: smp->data[i].f = SWAP_FLT_TOH(*flags & RAW_BE_FLT, f32[i+off]); break; - case 64: smp->data[i].f = SWAP_DBL_TOH(*flags & RAW_BE_FLT, f64[i+off]); break; + case 32: smp->data[i].f = SWAP_FLT_TOH(flags & RAW_BE_FLT, f32[i+off]); break; + case 64: smp->data[i].f = SWAP_DBL_TOH(flags & RAW_BE_FLT, f64[i+off]); break; } break; - + case SAMPLE_DATA_FORMAT_INT: switch (bits) { case 8: smp->data[i].i = i8[i]; break; - case 16: smp->data[i].i = (int16_t) SWAP_INT_TOH(*flags & RAW_BE_INT, 16, i16[i+off]); break; - case 32: smp->data[i].i = (int32_t) SWAP_INT_TOH(*flags & RAW_BE_INT, 32, i32[i+off]); break; - case 64: smp->data[i].i = (int64_t) SWAP_INT_TOH(*flags & RAW_BE_INT, 64, i64[i+off]); break; + case 16: smp->data[i].i = (int16_t) SWAP_INT_TOH(flags & RAW_BE_INT, 16, i16[i+off]); break; + case 32: smp->data[i].i = (int32_t) SWAP_INT_TOH(flags & RAW_BE_INT, 32, i32[i+off]); break; + case 64: smp->data[i].i = (int64_t) SWAP_INT_TOH(flags & RAW_BE_INT, 64, i64[i+off]); break; } break; } } - smp->ts.received.tv_sec = 0; - smp->ts.received.tv_nsec = 0; - if (rbytes) *rbytes = len; diff --git a/lib/io/villas.c b/lib/io/villas.c index 568dd339d..b2c614eec 100644 --- a/lib/io/villas.c +++ b/lib/io/villas.c @@ -33,18 +33,20 @@ size_t villas_sprint_single(char *buf, size_t len, struct sample *s, int flags) { - size_t off = snprintf(buf, len, "%llu", (unsigned long long) s->ts.origin.tv_sec); + size_t off = 0; - if (flags & IO_FORMAT_NANOSECONDS) + if (flags & SAMPLE_ORIGIN) { + off += snprintf(buf + off, len - off, "%llu", (unsigned long long) s->ts.origin.tv_sec); off += snprintf(buf + off, len - off, ".%09llu", (unsigned long long) s->ts.origin.tv_nsec); + } - if (flags & IO_FORMAT_OFFSET) + if (flags & SAMPLE_OFFSET) off += snprintf(buf + off, len - off, "%+e", time_delta(&s->ts.origin, &s->ts.received)); - if (flags & IO_FORMAT_SEQUENCE) + if (flags & SAMPLE_SEQUENCE) off += snprintf(buf + off, len - off, "(%u)", s->sequence); - if (flags & IO_FORMAT_VALUES) { + if (flags & SAMPLE_VALUES) { for (int i = 0; i < s->length; i++) { switch ((s->format >> i) & 0x1) { case SAMPLE_DATA_FORMAT_FLOAT: @@ -62,14 +64,15 @@ size_t villas_sprint_single(char *buf, size_t len, struct sample *s, int flags) return off; } -size_t villas_sscan_single(const char *buf, size_t len, struct sample *s, int *flags) +size_t villas_sscan_single(const char *buf, size_t len, struct sample *s, int flags) { char *end; const char *ptr = buf; - int fl = 0; double offset = 0; + s->has = 0; + /* Format: Seconds.NanoSeconds+Offset(SequenceNumber) Value1 Value2 ... * RegEx: (\d+(?:\.\d+)?)([-+]\d+(?:\.\d+)?(?:e[+-]?\d+)?)?(?:\((\d+)\))? * @@ -81,14 +84,14 @@ size_t villas_sscan_single(const char *buf, size_t len, struct sample *s, int *f if (ptr == end || *end == '\n') return -1; + s->has |= SAMPLE_ORIGIN; + /* Optional: nano seconds */ if (*end == '.') { ptr = end + 1; s->ts.origin.tv_nsec = (uint32_t) strtoul(ptr, &end, 10); - if (ptr != end) - fl |= IO_FORMAT_NANOSECONDS; - else + if (ptr == end) return -3; } else @@ -100,7 +103,7 @@ size_t villas_sscan_single(const char *buf, size_t len, struct sample *s, int *f offset = strtof(ptr, &end); /* offset is ignored for now */ if (ptr != end) - fl |= IO_FORMAT_OFFSET; + s->has |= SAMPLE_OFFSET; else return -4; } @@ -111,7 +114,7 @@ size_t villas_sscan_single(const char *buf, size_t len, struct sample *s, int *f s->sequence = strtoul(ptr, &end, 10); if (ptr != end) - fl |= IO_FORMAT_SEQUENCE; + s->has |= SAMPLE_SEQUENCE; else return -5; @@ -124,7 +127,7 @@ size_t villas_sscan_single(const char *buf, size_t len, struct sample *s, int *f ptr = end, s->length++) { if (*end == '\n') break; - + switch (s->format & (1 << s->length)) { case SAMPLE_DATA_FORMAT_FLOAT: s->data[s->length].f = strtod(ptr, &end); @@ -138,22 +141,19 @@ size_t villas_sscan_single(const char *buf, size_t len, struct sample *s, int *f if (end == ptr) break; } - + if (*end == '\n') end++; if (s->length > 0) - fl |= IO_FORMAT_VALUES; + s->has |= SAMPLE_VALUES; - if (flags) - *flags = fl; - - if (fl & IO_FORMAT_OFFSET) { + if (s->has & SAMPLE_OFFSET) { struct timespec off = time_from_double(offset); s->ts.received = time_add(&s->ts.origin, &off); + + s->has |= SAMPLE_RECEIVED; } - else - s->ts.received = time_now(); return end - buf; } @@ -165,28 +165,28 @@ int villas_sprint(char *buf, size_t len, size_t *wbytes, struct sample *smps[], for (i = 0; i < cnt && off < len; i++) off += villas_sprint_single(buf + off, len - off, smps[i], flags); - + if (wbytes) *wbytes = off; return i; } -int villas_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags) +int villas_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int flags) { int i; size_t off = 0; for (i = 0; i < cnt && off < len; i++) off += villas_sscan_single(buf + off, len - off, smps[i], flags); - + if (rbytes) *rbytes = off; return i; } -int villas_fscan_single(FILE *f, struct sample *s, int *flags) +int villas_fscan_single(FILE *f, struct sample *s, int flags) { char *ptr, line[4096]; @@ -209,7 +209,7 @@ int villas_fprint_single(FILE *f, struct sample *s, int flags) ret = villas_sprint_single(line, sizeof(line), s, flags); if (ret < 0) return ret; - + fputs(line, f); return 0; @@ -228,7 +228,7 @@ int villas_fprint(FILE *f, struct sample *smps[], unsigned cnt, int flags) return i; } -int villas_fscan(FILE *f, struct sample *smps[], unsigned cnt, int *flags) +int villas_fscan(FILE *f, struct sample *smps[], unsigned cnt, int flags) { int ret, i; diff --git a/lib/io_format.c b/lib/io_format.c index bbf3d1bd8..e857cfb9f 100644 --- a/lib/io_format.c +++ b/lib/io_format.c @@ -25,11 +25,10 @@ #include "io_format.h" -int io_format_sscan(struct io_format *fmt, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags) +int io_format_sscan(struct io_format *fmt, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int flags) { - if (!flags) - flags = &fmt->flags; - + flags |= fmt->flags; + return fmt->sscan ? fmt->sscan(buf, len, rbytes, smps, cnt, flags) : -1; } @@ -40,7 +39,7 @@ int io_format_sprint(struct io_format *fmt, char *buf, size_t len, size_t *wbyte return fmt->sprint ? fmt->sprint(buf, len, wbytes, smps, cnt, flags) : -1; } -int io_format_fscan(struct io_format *fmt, FILE *f, struct sample *smps[], unsigned cnt, int *flags) +int io_format_fscan(struct io_format *fmt, FILE *f, struct sample *smps[], unsigned cnt, int flags) { return fmt->sprint ? fmt->fscan(f, smps, cnt, flags) : -1; } @@ -48,4 +47,4 @@ int io_format_fscan(struct io_format *fmt, FILE *f, struct sample *smps[], unsig int io_format_fprint(struct io_format *fmt, FILE *f, struct sample *smps[], unsigned cnt, int flags) { return fmt->fprint ? fmt->fprint(f, smps, cnt, flags) : -1; -} \ No newline at end of file +} diff --git a/lib/node.c b/lib/node.c index 2d6e92c7b..d5aa9945d 100644 --- a/lib/node.c +++ b/lib/node.c @@ -29,6 +29,7 @@ #include "plugin.h" #include "config_helper.h" #include "mapping.h" +#include "timing.h" int node_init(struct node *n, struct node_type *vt) { @@ -278,9 +279,27 @@ int node_read(struct node *n, struct sample *smps[], unsigned cnt) debug(LOG_NODES | 5, "Received %u samples from node %s", nread, node_name(n)); } - for (int i = 0; i < nread; i++) + /* Add missing fields */ + for (int i = 0; i < nread; i++) { smps[i]->source = n; + if (!(smps[i]->has & SAMPLE_SEQUENCE)) + smps[i]->sequence = n->sequence++; + + if (!(smps[i]->has & SAMPLE_ORIGIN) || + !(smps[i]->has & SAMPLE_RECEIVED)) { + + struct timespec now = time_now(); + + if (!(smps[i]->has & SAMPLE_RECEIVED)) + smps[i]->ts.received = now; + + if (!(smps[i]->has & SAMPLE_ORIGIN)) + smps[i]->ts.origin = now; + } + } + + /* Run read hooks */ rread = hook_read_list(&n->hooks, smps, nread); if (nread != rread) { int skipped = nread - rread; diff --git a/lib/nodes/file.c b/lib/nodes/file.c index 6ecef27f7..8fef1736e 100644 --- a/lib/nodes/file.c +++ b/lib/nodes/file.c @@ -104,7 +104,7 @@ int file_parse(struct node *n, json_t *cfg) f->epoch = time_from_double(epoch_flt); f->uri_tmpl = uri_tmpl ? strdup(uri_tmpl) : NULL; - + f->format = io_format_lookup(format); if (!f->format) error("Invalid format '%s' for node %s", format, node_name(n)); @@ -205,7 +205,7 @@ int file_start(struct node *n) f->uri = file_format_name(f->uri_tmpl, &now); /* Open file */ - flags = IO_FORMAT_ALL; + flags = SAMPLE_ALL; if (f->flush) flags |= IO_FLUSH; @@ -342,7 +342,7 @@ int file_write(struct node *n, struct sample *smps[], unsigned cnt) int file_fd(struct node *n) { struct file *f = n->_vd; - + if (f->rate) return task_fd(&f->task); else { diff --git a/lib/nodes/nanomsg.c b/lib/nodes/nanomsg.c index 3bf0e4b5c..af5372fb9 100644 --- a/lib/nodes/nanomsg.c +++ b/lib/nodes/nanomsg.c @@ -111,7 +111,7 @@ int nanomsg_parse(struct node *n, json_t *cfg) if (ret < 0) error("Invalid type for 'subscribe' setting of node %s", node_name(n)); } - + m->format = io_format_lookup(format); if (!m->format) error("Invalid format '%s' for node %s", format, node_name(n)); @@ -223,7 +223,7 @@ int nanomsg_read(struct node *n, struct sample *smps[], unsigned cnt) if (bytes < 0) return -1; - return io_format_sscan(m->format, data, bytes, NULL, smps, cnt, NULL); + return io_format_sscan(m->format, data, bytes, NULL, smps, cnt, 0); } int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt) @@ -235,7 +235,7 @@ int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt) char data[NANOMSG_MAX_PACKET_LEN]; - ret = io_format_sprint(m->format, data, sizeof(data), &wbytes, smps, cnt, IO_FORMAT_ALL); + ret = io_format_sprint(m->format, data, sizeof(data), &wbytes, smps, cnt, SAMPLE_ALL); if (ret <= 0) return -1; @@ -250,14 +250,14 @@ int nanomsg_fd(struct node *n) { int ret; struct nanomsg *m = n->_vd; - + int fd; size_t len = sizeof(fd); ret = nn_getsockopt(m->subscriber.socket, NN_SOL_SOCKET, NN_RCVFD, &fd, &len); if (ret) return ret; - + return fd; } diff --git a/lib/nodes/shmem.c b/lib/nodes/shmem.c index 85ed995e2..7e532bd55 100644 --- a/lib/nodes/shmem.c +++ b/lib/nodes/shmem.c @@ -128,11 +128,6 @@ int shmem_read(struct node *n, struct sample *smps[], unsigned cnt) sample_copy_many(smps, shared_smps, recv); sample_put_many(shared_smps, recv); - struct timespec ts_recv = time_now(); - - for (int i = 0; i < recv; i++) - smps[i]->ts.received = ts_recv; - return recv; } @@ -140,25 +135,18 @@ int shmem_write(struct node *n, struct sample *smps[], unsigned cnt) { struct shmem *shm = n->_vd; struct sample *shared_smps[cnt]; /* Samples need to be copied to the shared pool first */ - int avail, pushed, len; + int avail, pushed; avail = sample_alloc(&shm->intf.write.shared->pool, shared_smps, cnt); if (avail != cnt) warn("Pool underrun for shmem node %s", shm->out_name); for (int i = 0; i < avail; i++) { + sample_copy(shared_smps[i], smps[i]); + /* Since the node isn't in shared memory, the source can't be accessed */ shared_smps[i]->source = NULL; - shared_smps[i]->sequence = smps[i]->sequence; - shared_smps[i]->ts = smps[i]->ts; - - len = MIN(smps[i]->length, shared_smps[i]->capacity); - if (len != smps[i]->length) - warn("Losing data because of sample capacity mismatch in node %s", node_name(n)); - - memcpy(shared_smps[i]->data, smps[i]->data, SAMPLE_DATA_LEN(len)); - - shared_smps[i]->length = len; + shared_smps[i]->has &= ~SAMPLE_SOURCE; } pushed = shmem_int_write(&shm->intf, shared_smps, avail); diff --git a/lib/nodes/signal.c b/lib/nodes/signal.c index bae4cbe09..397dd0f33 100644 --- a/lib/nodes/signal.c +++ b/lib/nodes/signal.c @@ -210,7 +210,7 @@ int signal_read(struct node *n, struct sample *smps[], unsigned cnt) struct signal *s = n->_vd; struct sample *t = smps[0]; - struct timespec ts_recv; + struct timespec ts; int steps; assert(cnt == 1); @@ -222,20 +222,20 @@ int signal_read(struct node *n, struct sample *smps[], unsigned cnt) if (steps > 1) warn("Missed steps: %u", steps); - ts_recv = time_now(); + ts = time_now(); } else { struct timespec offset = time_from_double(s->counter * 1.0 / s->rate); - ts_recv = time_add(&s->started, &offset); + ts = time_add(&s->started, &offset); steps = 1; } - double running = time_delta(&s->started, &ts_recv); + double running = time_delta(&s->started, &ts); - t->ts.origin = - t->ts.received = ts_recv; + t->has = SAMPLE_ORIGIN | SAMPLE_VALUES | SAMPLE_SEQUENCE; + t->ts.origin = ts; t->sequence = s->counter; t->length = n->samplelen; diff --git a/lib/nodes/socket.c b/lib/nodes/socket.c index c5b2aa8cb..7415328ac 100644 --- a/lib/nodes/socket.c +++ b/lib/nodes/socket.c @@ -344,7 +344,7 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt) return 0; } - ret = io_format_sscan(s->format, bufptr, bytes, &rbytes, smps, cnt, NULL); + ret = io_format_sscan(s->format, bufptr, bytes, &rbytes, smps, cnt, 0); if (bytes != rbytes) warn("Received invalid packet from node: %s bytes=%zu, rbytes=%zu", node_name(n), bytes, rbytes); @@ -361,7 +361,7 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt) ssize_t bytes; size_t wbytes; - ret = io_format_sprint(s->format, data, sizeof(data), &wbytes, smps, cnt, IO_FORMAT_ALL); + ret = io_format_sprint(s->format, data, sizeof(data), &wbytes, smps, cnt, SAMPLE_ALL); if (ret < 0) return -1; diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c index 52ebb1e72..a89e7b8e0 100644 --- a/lib/nodes/websocket.c +++ b/lib/nodes/websocket.c @@ -47,9 +47,9 @@ static char * websocket_connection_name(struct websocket_connection *c) if (c->wsi) { char name[128]; char ip[128]; - + lws_get_peer_addresses(c->wsi, lws_get_socket_fd(c->wsi), name, sizeof(name), ip, sizeof(ip)); - + strcatf(&c->_name, "remote.ip=%s, remote.name=%s", ip, name); } else if (c->destination) @@ -75,15 +75,15 @@ static void websocket_destination_destroy(struct websocket_destination *d) static int websocket_connection_write(struct websocket_connection *c, struct sample *smps[], unsigned cnt) { int pushed; - + pushed = queue_push_many(&c->queue, (void **) smps, cnt); if (pushed < cnt) warn("Queue overrun in WebSocket connection: %s", websocket_connection_name(c)); - + sample_get_many(smps, cnt); - + debug(LOG_WEBSOCKET | 10, "Enqueued %u samples to %s", pushed, websocket_connection_name(c)); - + /* Client connections which are currently conecting don't have an associate c->wsi yet */ if (c->wsi) lws_callback_on_writable(c->wsi); @@ -102,17 +102,17 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi { int ret, recvd, pulled, cnt = 128; struct websocket_connection *c = user; - + switch (reason) { case LWS_CALLBACK_CLIENT_ESTABLISHED: c->wsi = wsi; c->state = STATE_ESTABLISHED; - + buffer_init(&c->buffers.recv, 1 << 12); buffer_init(&c->buffers.send, 1 << 12); debug(LOG_WEBSOCKET | 10, "Established WebSocket connection: %s", websocket_connection_name(c)); - + /* Schedule writable callback in case we have something to send */ if (queue_available(&c->queue) > 0) lws_callback_on_writable(wsi); @@ -146,7 +146,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Invalid URL"); return -1; } - + node = strtok(uri, "/."); if (strlen(node) == 0) c->node = NULL; @@ -160,7 +160,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi return -1; } } - + if (!format) format = "webmsg"; @@ -169,16 +169,16 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Invalid format"); return -1; } - + buffer_init(&c->buffers.recv, 1 << 12); buffer_init(&c->buffers.send, 1 << 12); - + ret = queue_init(&c->queue, DEFAULT_QUEUELEN, &memtype_hugepage); if (ret) return -1; list_push(&connections, c); - + debug(LOG_WEBSOCKET | 10, "Established WebSocket connection: %s", websocket_connection_name(c)); break; @@ -189,10 +189,10 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi warn("Failed to establish WebSocket connection: %s, reason=%s", websocket_connection_name(c), in ? (char *) in : "unkown"); return -1; - + case LWS_CALLBACK_CLOSED: debug(LOG_WEBSOCKET | 10, "Closed WebSocket connection: %s", websocket_connection_name(c)); - + if (c->state != STATE_SHUTDOWN) { /** @todo Attempt reconnect here */ } @@ -219,7 +219,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi case LWS_CALLBACK_CLIENT_WRITEABLE: case LWS_CALLBACK_SERVER_WRITEABLE: { size_t wbytes; - + if (c->state == STATE_SHUTDOWN) { websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_GOINGAWAY, "Node stopped"); return -1; @@ -229,51 +229,51 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi pulled = queue_pull_many(&c->queue, (void **) smps, cnt); if (pulled > 0) { - io_format_sprint(c->format, c->buffers.send.buf + LWS_PRE, c->buffers.send.size - LWS_PRE, &wbytes, smps, pulled, IO_FORMAT_ALL); - + io_format_sprint(c->format, c->buffers.send.buf + LWS_PRE, c->buffers.send.size - LWS_PRE, &wbytes, smps, pulled, SAMPLE_ALL); + ret = lws_write(wsi, (unsigned char *) c->buffers.send.buf + LWS_PRE, wbytes, c->format->flags & IO_FORMAT_BINARY ? LWS_WRITE_BINARY : LWS_WRITE_TEXT); sample_put_many(smps, pulled); debug(LOG_WEBSOCKET | 10, "Send %d samples to connection: %s, bytes=%d", pulled, websocket_connection_name(c), ret); } - + if (queue_available(&c->queue) > 0) lws_callback_on_writable(wsi); - + break; } case LWS_CALLBACK_CLIENT_RECEIVE: - case LWS_CALLBACK_RECEIVE: + case LWS_CALLBACK_RECEIVE: if (!c->node) { websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Catch-all connection can not receive."); return -1; } - + if (lws_is_first_fragment(wsi)) buffer_clear(&c->buffers.recv); - + ret = buffer_append(&c->buffers.recv, in, len); if (ret) { websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_UNACCEPTABLE_OPCODE, "Failed to process data"); return -1; } - + /* We dont try to parse the frame yet, as we have to wait for the remaining fragments */ if (lws_is_final_fragment(wsi)) { struct timespec ts_recv = time_now(); struct websocket *w = c->node->_vd; struct sample **smps = alloca(cnt * sizeof(struct sample *)); - + ret = sample_alloc(&w->pool, smps, cnt); if (ret != cnt) { warn("Pool underrun for connection: %s", websocket_connection_name(c)); break; } - recvd = io_format_sscan(c->format, c->buffers.recv.buf, c->buffers.recv.len, NULL, smps, cnt, NULL); + recvd = io_format_sscan(c->format, c->buffers.recv.buf, c->buffers.recv.len, NULL, smps, cnt, 0); if (recvd < 0) { warn("Failed to parse sample data received on connection: %s", websocket_connection_name(c)); break; @@ -282,13 +282,15 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi debug(LOG_WEBSOCKET | 10, "Received %d samples to connection: %s", recvd, websocket_connection_name(c)); /* Set receive timestamp */ - for (int i = 0; i < recvd; i++) + for (int i = 0; i < recvd; i++) { smps[i]->ts.received = ts_recv; + smps[i]->has |= SAMPLE_RECEIVED; + } ret = queue_signalled_push_many(&w->queue, (void **) smps, recvd); if (ret != recvd) warn("Queue overrun for connection: %s", websocket_connection_name(c)); - + if (c->state == STATE_SHUTDOWN) { websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_GOINGAWAY, "Node stopped"); return -1; @@ -300,7 +302,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi default: break; } - + return 0; } @@ -365,7 +367,7 @@ int websocket_start(struct node *n) d->info.context = web->context; d->info.vhost = web->vhost; d->info.userdata = c; - + ret = queue_init(&c->queue, DEFAULT_QUEUELEN, &memtype_hugepage); if (ret) return -1; @@ -382,29 +384,29 @@ int websocket_stop(struct node *n) { int ret; struct websocket *w = n->_vd; - + /* Wait for all connections to be closed */ for (;;) { int connecting = 0; - + for (int i = 0; i < list_length(&w->destinations); i++) { struct websocket_destination *d = list_at(&w->destinations, i); struct websocket_connection *c = d->info.userdata; - + if (c->state == STATE_CONNECTING) connecting++; } - + if (connecting == 0) break; - + debug(LOG_WEBSOCKET | 10, "Waiting for %d client connections to be established", connecting); sleep(1); } - + for (size_t i = 0; i < list_length(&connections); i++) { struct websocket_connection *c = list_at(&connections, i); - + if (c->node != n) continue; @@ -474,7 +476,7 @@ int websocket_write(struct node *n, struct sample *smps[], unsigned cnt) for (size_t i = 0; i < list_length(&connections); i++) { struct websocket_connection *c = list_at(&connections, i); - + if (c->node == n || c->node == NULL) websocket_connection_write(c, cpys, cnt); } @@ -561,7 +563,7 @@ char * websocket_print(struct node *n) int websocket_fd(struct node *n) { struct websocket *w = n->_vd; - + return queue_signalled_fd(&w->queue); } diff --git a/lib/nodes/zeromq.c b/lib/nodes/zeromq.c index 66819249e..720d8c3cd 100644 --- a/lib/nodes/zeromq.c +++ b/lib/nodes/zeromq.c @@ -125,7 +125,7 @@ int zeromq_parse(struct node *n, json_t *cfg) z->subscriber.endpoint = ep ? strdup(ep) : NULL; z->filter = filter ? strdup(filter) : NULL; - + z->format = io_format_lookup(format); if (!z->format) error("Invalid format '%s' for node %s", format, node_name(n)); @@ -429,7 +429,7 @@ int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt) if (ret < 0) return ret; - recv = io_format_sscan(z->format, zmq_msg_data(&m), zmq_msg_size(&m), NULL, smps, cnt, NULL); + recv = io_format_sscan(z->format, zmq_msg_data(&m), zmq_msg_size(&m), NULL, smps, cnt, 0); ret = zmq_msg_close(&m); if (ret) @@ -448,7 +448,7 @@ int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt) char data[4096]; - ret = io_format_sprint(z->format, data, sizeof(data), &wbytes, smps, cnt, IO_FORMAT_ALL); + ret = io_format_sprint(z->format, data, sizeof(data), &wbytes, smps, cnt, SAMPLE_ALL); if (ret <= 0) return -1; @@ -492,10 +492,10 @@ int zeromq_fd(struct node *n) { int ret; struct zeromq *z = n->_vd; - + int fd; size_t len = sizeof(fd); - + ret = zmq_getsockopt(z->subscriber.socket, ZMQ_FD, &fd, &len); if (ret) return ret; diff --git a/lib/sample.c b/lib/sample.c index ad5b86101..eda960c2b 100644 --- a/lib/sample.c +++ b/lib/sample.c @@ -93,10 +93,9 @@ int sample_copy(struct sample *dst, struct sample *src) dst->sequence = src->sequence; dst->format = src->format; dst->source = src->source; + dst->has = src->has; - dst->ts.origin = src->ts.origin; - dst->ts.received = src->ts.received; - dst->ts.sent = src->ts.sent; + dst->ts = src->ts; memcpy(&dst->data, &src->data, SAMPLE_DATA_LEN(dst->length)); diff --git a/src/hook.c b/src/hook.c index f82aa4439..a0dcd652e 100644 --- a/src/hook.c +++ b/src/hook.c @@ -180,7 +180,7 @@ check: if (optarg == endptr) if (!p) error("Unknown IO format '%s'", format); - ret = io_init(&io, &p->io, IO_FORMAT_ALL); + ret = io_init(&io, &p->io, SAMPLE_ALL); if (ret) error("Failed to initialize IO"); diff --git a/src/pipe.c b/src/pipe.c index 33bca57f5..2b0cb258b 100644 --- a/src/pipe.c +++ b/src/pipe.c @@ -292,7 +292,7 @@ check: if (optarg == endptr) if (!p) error("Invalid format: %s", format); - ret = io_init(&io, &p->io, IO_FORMAT_ALL); + ret = io_init(&io, &p->io, SAMPLE_ALL); if (ret) error("Failed to initialize IO"); diff --git a/src/signal.c b/src/signal.c index 77c6e9864..2543b8759 100644 --- a/src/signal.c +++ b/src/signal.c @@ -136,7 +136,7 @@ int main(int argc, char *argv[]) if (!p) error("Invalid output format '%s'", format); - ret = io_init(&io, &p->io, IO_FLUSH | (IO_FORMAT_ALL & ~IO_FORMAT_OFFSET)); + ret = io_init(&io, &p->io, IO_FLUSH | (SAMPLE_ALL & ~SAMPLE_OFFSET)); if (ret) error("Failed to initialize output"); diff --git a/src/test-cmp.c b/src/test-cmp.c index 7b21f3045..245fd9e32 100644 --- a/src/test-cmp.c +++ b/src/test-cmp.c @@ -65,7 +65,7 @@ void usage() int main(int argc, char *argv[]) { int ret; - + /* Default values */ double epsilon = 1e-9; int timestamp = 1; @@ -77,7 +77,6 @@ int main(int argc, char *argv[]) struct sample *samples[2]; struct { - int flags; char *path; FILE *handle; struct sample *sample; @@ -150,11 +149,11 @@ check: if (optarg == endptr) serror("Failed to open file: %s", f2.path); while (!feof(f1.handle) && !feof(f2.handle)) { - ret = villas_fscan(f1.handle, &f1.sample, 1, &f1.flags); + ret = villas_fscan(f1.handle, &f1.sample, 1, 0); if (ret < 0 && !feof(f1.handle)) goto out; - ret = villas_fscan(f2.handle, &f2.sample, 1, &f2.flags); + ret = villas_fscan(f2.handle, &f2.sample, 1, 0); if (ret < 0 && !feof(f2.handle)) goto out; @@ -168,7 +167,7 @@ check: if (optarg == endptr) } /* Compare sequence no */ - if (sequence && (f1.flags & IO_FORMAT_SEQUENCE) && (f2.flags & IO_FORMAT_SEQUENCE)) { + if (sequence && (f1.sample->has & SAMPLE_SEQUENCE) && (f2.sample->has & SAMPLE_SEQUENCE)) { if (f1.sample->sequence != f2.sample->sequence) { printf("sequence no: %d != %d\n", f1.sample->sequence, f2.sample->sequence); ret = 2;