1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

add new bitmask to samples to indicate which fields are valid

This commit is contained in:
Steffen Vogel 2017-09-04 14:28:55 +02:00
parent c1f83ffd20
commit 6d495c1a35
29 changed files with 260 additions and 308 deletions

View file

@ -32,4 +32,4 @@ struct sample;
int csv_fprint(FILE *f, struct sample *smps[], unsigned cnt, int flags);
int csv_fscan(FILE *f, struct sample *smps[], unsigned cnt, int *flags);
int csv_fscan(FILE *f, struct sample *smps[], unsigned cnt, int flags);

View file

@ -28,8 +28,8 @@
int json_pack_sample(json_t **j, struct sample *s, int flags);
int json_unpack_sample(json_t *j, struct sample *s, int *flags);
int json_unpack_sample(json_t *j, struct sample *s, int flags);
int json_fprint(FILE *f, struct sample *smps[], unsigned cnt, int flags);
int json_fscan(FILE *f, struct sample *smps[], unsigned cnt, int *flags);
int json_fscan(FILE *f, struct sample *smps[], unsigned cnt, int flags);

View file

@ -68,4 +68,4 @@ int msg_from_sample(struct msg *msg, struct sample *smp);
int msg_sprint(char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt, int flags);
/** Read struct sample's from buffer \p buf into samples \p smps. */
int msg_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags);
int msg_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int flags);

View file

@ -34,7 +34,7 @@ enum raw_flags {
RAW_BE_INT = (1 << 17), /**< Byte-order for integer data: big-endian if set. */
RAW_BE_FLT = (1 << 18), /**< Byte-order for floating point data: big-endian if set. */
RAW_BE_HDR = (1 << 19), /**< Byte-order for fake header fields: big-endian if set. */
/** Byte-order for all fields: big-endian if set. */
RAW_BE = RAW_BE_INT | RAW_BE_FLT | RAW_BE_HDR,
@ -57,4 +57,4 @@ enum raw_flags {
int raw_sprint(char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt, int flags);
/** Read struct sample's from buffer \p buf into samples \p smps. */
int raw_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags);
int raw_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int flags);

View file

@ -33,4 +33,4 @@ int villas_scan(struct io *io, struct sample *smps[], unsigned cnt);
int villas_fprint(FILE *f, struct sample *smps[], unsigned cnt, int flags);
int villas_fscan(FILE *f, struct sample *smps[], unsigned cnt, int *flags);
int villas_fscan(FILE *f, struct sample *smps[], unsigned cnt, int flags);

View file

@ -30,12 +30,6 @@ struct sample;
struct io;
enum io_format_flags {
IO_FORMAT_NANOSECONDS = (1 << 0), /**< Include nanoseconds in output. */
IO_FORMAT_OFFSET = (1 << 1), /**< Include offset / delta between received and send timestamps. */
IO_FORMAT_SEQUENCE = (1 << 2), /**< Include sequence number in output. */
IO_FORMAT_VALUES = (1 << 3), /**< Include values in output. */
IO_FORMAT_ALL = 15, /**< Enable all output options. */
IO_FORMAT_BINARY = (1 << 8)
};
@ -70,7 +64,7 @@ struct io_format {
* @see rewind()
*/
void (*rewind)(struct io *io);
/** Get a file descriptor which can be used with select / poll */
int (*fd)(struct io *io);
@ -89,13 +83,13 @@ struct io_format {
*/
/** @see io_format_sscan */
int (*sscan)(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags);
int (*sscan)(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int flags);
/** @see io_format_sprint */
int (*sprint)(char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt, int flags);
/** @see io_format_fscan */
int (*fscan)(FILE *f, struct sample *smps[], unsigned cnt, int *flags);
int (*fscan)(FILE *f, struct sample *smps[], unsigned cnt, int flags);
/** @see io_format_fprint */
int (*fprint)(FILE *f, struct sample *smps[], unsigned cnt, int flags);
@ -119,7 +113,7 @@ struct io_format * io_format_lookup(const char *name);
* @retval >=0 The number of samples which have been parsed from \p buf and written into \p smps.
* @retval <0 Something went wrong.
*/
int io_format_sscan(struct io_format *fmt, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags);
int io_format_sscan(struct io_format *fmt, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int flags);
/** Print \p cnt samples from \p smps into buffer \p buf of length \p len.
*
@ -139,7 +133,7 @@ int io_format_sprint(struct io_format *fmt, char *buf, size_t len, size_t *wbyte
* @retval >=0 The number of samples which have been parsed from \p f and written into \p smps.
* @retval <0 Something went wrong.
*/
int io_format_fscan(struct io_format *fmt, FILE *f, struct sample *smps[], unsigned cnt, int *flags);
int io_format_fscan(struct io_format *fmt, FILE *f, struct sample *smps[], unsigned cnt, int flags);
/** Print \p cnt samples from \p smps to stream \p f.
*

View file

@ -52,11 +52,24 @@ enum sample_data_format {
SAMPLE_DATA_FORMAT_INT = 1
};
/** Parts of a sample that can be serialized / de-serialized by the IO formats */
enum sample_has {
SAMPLE_ORIGIN = (1 << 0), /**< Include nanoseconds in output. */
SAMPLE_RECEIVED = (1 << 1), /**< Include nanoseconds in output. */
SAMPLE_OFFSET = (1 << 2),
SAMPLE_SOURCE = (1 << 3),
SAMPLE_ID = (1 << 4),
SAMPLE_SEQUENCE = (1 << 5), /**< Include sequence number in output. */
SAMPLE_VALUES = (1 << 6), /**< Include values in output. */
SAMPLE_FORMAT = (1 << 7),
SAMPLE_ALL = (1 << 7) - 1, /**< Enable all output options. */
};
struct sample {
int sequence; /**< The sequence number of this sample. */
int length; /**< The number of values in sample::values which are valid. */
int capacity; /**< The number of values in sample::values for which memory is reserved. */
int id;
atomic_int refcnt; /**< Reference counter. */
@ -70,6 +83,8 @@ struct sample {
struct timespec sent; /**< The point in time when this data was send for the last time. */
} ts;
int has;
/** A long bitfield indicating the number representation of the first 64 values in sample::data[].
*
* @see sample_data_format

View file

@ -1,70 +0,0 @@
/** Fix timestamp hook.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
/** @addtogroup hooks Hook functions
* @{
*/
#include "hook.h"
#include "plugin.h"
#include "timing.h"
#include "sample.h"
int fix_ts_read(struct hook *h, struct sample *smps[], unsigned *cnt)
{
struct timespec now;
now = time_now();
for (int i = 0; i < *cnt; i++) {
/* Check for missing receive timestamp
* Usually node_type::read() should update the receive timestamp.
* An example would be to use hardware timestamp capabilities of
* modern NICs.
*/
if ((smps[i]->ts.received.tv_sec == 0 && smps[i]->ts.received.tv_nsec == 0) ||
(smps[i]->ts.received.tv_sec == -1 && smps[i]->ts.received.tv_nsec == -1))
smps[i]->ts.received = now;
/* Check for missing origin timestamp */
if ((smps[i]->ts.origin.tv_sec == 0 && smps[i]->ts.origin.tv_nsec == 0) ||
(smps[i]->ts.origin.tv_sec == -1 && smps[i]->ts.origin.tv_nsec == -1))
smps[i]->ts.origin = now;
}
return 0;
}
static struct plugin p = {
.name = "fix_ts",
.description = "Update timestamps of sample if not set",
.type = PLUGIN_TYPE_HOOK,
.hook = {
.flags = HOOK_NODE | HOOK_BUILTIN,
.priority = 0,
.read = fix_ts_read,
}
};
REGISTER_PLUGIN(&p)
/** @} */

View file

@ -51,7 +51,7 @@ static int print_start(struct hook *h)
struct print *p = h->_vd;
int ret;
ret = io_init(&p->io, p->format, IO_FORMAT_ALL);
ret = io_init(&p->io, p->format, SAMPLE_ALL);
if (ret)
return ret;

View file

@ -59,7 +59,7 @@ int io_stream_open(struct io *io, const char *uri)
if (uri) {
if (aislocal(uri)) {
io->mode = IO_MODE_STDIO;
io->stdio.output = fopen(uri, "a+");
if (io->stdio.output == NULL)
return -1;
@ -87,7 +87,7 @@ int io_stream_open(struct io *io, const char *uri)
io->stdio.input = stdin;
io->stdio.output = stdout;
}
if (io->mode == IO_MODE_STDIO) {
ret = setvbuf(io->stdio.input, NULL, _IOLBF, BUFSIZ);
if (ret)
@ -187,7 +187,7 @@ int io_stream_fd(struct io *io)
case IO_MODE_CUSTOM:
return -1;
}
return -1;
}
@ -244,7 +244,7 @@ int io_print(struct io *io, struct sample *smps[], unsigned cnt)
FILE *f = io->mode == IO_MODE_ADVIO
? io->advio.output->file
: io->stdio.output;
//flockfile(f);
if (io->_vt->fprint)
@ -259,7 +259,7 @@ int io_print(struct io *io, struct sample *smps[], unsigned cnt)
}
else
ret = -1;
//funlockfile(f);
}
@ -279,37 +279,35 @@ int io_scan(struct io *io, struct sample *smps[], unsigned cnt)
FILE *f = io->mode == IO_MODE_ADVIO
? io->advio.input->file
: io->stdio.input;
//flockfile(f);
int flags = io->flags;
if (io->_vt->fscan)
return io->_vt->fscan(f, smps, cnt, &flags);
return io->_vt->fscan(f, smps, cnt, io->flags);
else if (io->_vt->sscan) {
size_t bytes, rbytes;
char buf[4096];
bytes = fread(buf, 1, sizeof(buf), f);
ret = io->_vt->sscan(buf, bytes, &rbytes, smps, cnt, &flags);
ret = io->_vt->sscan(buf, bytes, &rbytes, smps, cnt, io->flags);
}
else
ret = -1;
//funlockfile(f);
}
return ret;
}
struct io_format * io_format_lookup(const char *name)
{
struct plugin *p;
p = plugin_lookup(PLUGIN_TYPE_IO, name);
if (!p)
return NULL;
return &p->io;
}

View file

@ -30,12 +30,12 @@
size_t csv_sprint_single(char *buf, size_t len, struct sample *s, int flags)
{
size_t off = snprintf(buf, len, "%ld", s->ts.origin.tv_sec);
if (flags & IO_FORMAT_NANOSECONDS)
off += snprintf(buf + off, len - off, "%c%09llu", CSV_SEPARATOR, (unsigned long long) s->ts.origin.tv_nsec);
size_t off = 0;
if (flags & IO_FORMAT_SEQUENCE)
if (flags & SAMPLE_ORIGIN)
off += snprintf(buf + off, len - off, "%ld%c%09ld", s->ts.origin.tv_sec, CSV_SEPARATOR, s->ts.origin.tv_nsec);
if (flags & SAMPLE_SEQUENCE)
off += snprintf(buf + off, len - off, "%c%u", CSV_SEPARATOR, s->sequence);
for (int i = 0; i < s->length; i++) {
@ -50,31 +50,37 @@ size_t csv_sprint_single(char *buf, size_t len, struct sample *s, int flags)
}
off += snprintf(buf + off, len - off, "\n");
return off;
}
size_t csv_sscan_single(const char *buf, size_t len, struct sample *s, int *flags)
size_t csv_sscan_single(const char *buf, size_t len, struct sample *s, int flags)
{
const char *ptr = buf;
char *end;
s->has = 0;
s->ts.origin.tv_sec = strtoul(ptr, &end, 10);
if (end == ptr || *end == '\n')
goto out;
ptr = end;
s->ts.origin.tv_nsec = strtoul(ptr, &end, 10);
if (end == ptr || *end == '\n')
goto out;
ptr = end;
s->has |= SAMPLE_ORIGIN;
s->sequence = strtoul(ptr, &end, 10);
if (end == ptr || *end == '\n')
goto out;
s->has |= SAMPLE_SEQUENCE;
for (ptr = end, s->length = 0;
s->length < s->capacity;
ptr = end, s->length++) {
@ -94,11 +100,12 @@ size_t csv_sscan_single(const char *buf, size_t len, struct sample *s, int *flag
if (end == ptr)
goto out;
}
out: if (*end == '\n')
end++;
s->ts.received = time_now();
if (s->length > 0)
s->has |= SAMPLE_VALUES;
return end - buf;
}
@ -110,21 +117,21 @@ int csv_sprint(char *buf, size_t len, size_t *wbytes, struct sample *smps[], uns
for (i = 0; i < cnt && off < len; i++)
off += csv_sprint_single(buf + off, len - off, smps[i], flags);
if (wbytes)
*wbytes = off;
return i;
}
int csv_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags)
int csv_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int flags)
{
int i;
size_t off = 0;
for (i = 0; i < cnt && off < len; i++)
off += csv_sscan_single(buf + off, len - off, smps[i], flags);
if (rbytes)
*rbytes = off;
@ -139,13 +146,13 @@ int csv_fprint_single(FILE *f, struct sample *s, int flags)
ret = csv_sprint_single(line, sizeof(line), s, flags);
if (ret < 0)
return ret;
fputs(line, f);
return 0;
}
int csv_fscan_single(FILE *f, struct sample *s, int *flags)
int csv_fscan_single(FILE *f, struct sample *s, int flags)
{
char *ptr, line[4096];
@ -172,7 +179,7 @@ int csv_fprint(FILE *f, struct sample *smps[], unsigned cnt, int flags)
return i;
}
int csv_fscan(FILE *f, struct sample *smps[], unsigned cnt, int *flags)
int csv_fscan(FILE *f, struct sample *smps[], unsigned cnt, int flags)
{
int ret, i;
for (i = 0; i < cnt; i++) {

View file

@ -33,14 +33,14 @@ int json_pack_sample(json_t **j, struct sample *smp, int flags)
"origin", smp->ts.origin.tv_sec, smp->ts.origin.tv_nsec,
"received", smp->ts.received.tv_sec, smp->ts.received.tv_nsec,
"sent", smp->ts.sent.tv_sec, smp->ts.sent.tv_nsec);
if (flags & IO_FORMAT_SEQUENCE) {
if (flags & SAMPLE_SEQUENCE) {
json_t *json_sequence = json_integer(smp->sequence);
json_object_set(json_smp, "sequence", json_sequence);
}
if (flags & IO_FORMAT_VALUES) {
if (flags & SAMPLE_VALUES) {
json_t *json_data = json_array();
for (int i = 0; i < smp->length; i++) {
@ -50,10 +50,10 @@ int json_pack_sample(json_t **j, struct sample *smp, int flags)
json_array_append(json_data, json_value);
}
json_object_set(json_smp, "data", json_data);
}
*j = json_smp;
return 0;
@ -63,23 +63,23 @@ int json_pack_samples(json_t **j, struct sample *smps[], unsigned cnt, int flags
{
int ret;
json_t *json_smps = json_array();
for (int i = 0; i < cnt; i++) {
json_t *json_smp;
ret = json_pack_sample(&json_smp, smps[i], flags);
if (ret)
break;
json_array_append(json_smps, json_smp);
}
*j = json_smps;
return cnt;
}
int json_unpack_sample(json_t *json_smp, struct sample *smp, int *flags)
int json_unpack_sample(json_t *json_smp, struct sample *smp, int flags)
{
int ret;
json_t *json_data, *json_value;
@ -95,16 +95,17 @@ int json_unpack_sample(json_t *json_smp, struct sample *smp, int *flags)
if (ret)
return ret;
if (!json_is_array(json_data))
return -1;
smp->has = SAMPLE_ORIGIN | SAMPLE_RECEIVED | SAMPLE_SEQUENCE;
smp->length = 0;
json_array_foreach(json_data, i, json_value) {
if (i >= smp->capacity)
break;
switch (json_typeof(json_value)) {
case JSON_REAL:
smp->data[i].f = json_real_value(json_value);
@ -122,19 +123,22 @@ int json_unpack_sample(json_t *json_smp, struct sample *smp, int *flags)
smp->length++;
}
if (smp->length > 0)
smp->has |= SAMPLE_VALUES;
return 0;
}
int json_unpack_samples(json_t *json_smps, struct sample *smps[], unsigned cnt, int *flags)
int json_unpack_samples(json_t *json_smps, struct sample *smps[], unsigned cnt, int flags)
{
int ret;
json_t *json_smp;
size_t i;
if (!json_is_array(json_smps))
return -1;
json_array_foreach(json_smps, i, json_smp) {
if (i >= cnt)
break;
@ -167,7 +171,7 @@ int json_sprint(char *buf, size_t len, size_t *wbytes, struct sample *smps[], un
return ret;
}
int json_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags)
int json_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int flags)
{
int ret;
json_t *json;
@ -182,7 +186,7 @@ int json_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], uns
return ret;
json_decref(json);
if (rbytes)
*rbytes = err.position;
@ -208,7 +212,7 @@ int json_fprint(FILE *f, struct sample *smps[], unsigned cnt, int flags)
return i;
}
int json_fscan(FILE *f, struct sample *smps[], unsigned cnt, int *flags)
int json_fscan(FILE *f, struct sample *smps[], unsigned cnt, int flags)
{
int i, ret;
json_t *json;

View file

@ -80,12 +80,11 @@ int msg_to_sample(struct msg *msg, struct sample *smp)
if (ret)
return -1;
smp->has = SAMPLE_ORIGIN | SAMPLE_SEQUENCE | SAMPLE_VALUES | SAMPLE_ID;
smp->length = MIN(msg->length, smp->capacity);
smp->sequence = msg->sequence;
smp->id = msg->id;
smp->ts.origin = MSG_TS(msg);
smp->ts.received.tv_sec = -1;
smp->ts.received.tv_nsec = -1;
smp->format = 0;
for (int i = 0; i < smp->length; i++) {
@ -124,7 +123,7 @@ int msg_sprint(char *buf, size_t len, size_t *wbytes, struct sample *smps[], uns
for (i = 0; i < cnt; i++) {
struct msg *msg = (struct msg *) ptr;
struct sample *smp = smps[i];
if (ptr + MSG_LEN(smp->length) > buf + len)
break;
@ -140,18 +139,18 @@ int msg_sprint(char *buf, size_t len, size_t *wbytes, struct sample *smps[], uns
ptr += MSG_LEN(smp->length);
}
if (wbytes)
*wbytes = ptr - buf;
return i;
}
int msg_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags)
int msg_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int flags)
{
int ret, i = 0, values;
char *ptr = buf;
if (len % 4 != 0) {
warn("Packet size is invalid: %zd Must be multiple of 4 bytes.", len);
return -1;
@ -170,16 +169,16 @@ int msg_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsi
warn("Invalid msg received: reason=1");
break;
}
values = (*flags & MSG_WEB) ? msg->length : ntohs(msg->length);
values = (flags & MSG_WEB) ? msg->length : ntohs(msg->length);
/* Check if remainder of message is in buffer boundaries */
if (ptr + MSG_LEN(values) > buf + len) {
warn("Invalid msg received: reason=2, msglen=%zu, len=%zu, ptr=%p, buf=%p, i=%u", MSG_LEN(values), len, ptr, buf, i);
break;
}
if (*flags & MSG_WEB)
if (flags & MSG_WEB)
;
else
msg_ntoh(msg);
@ -192,7 +191,7 @@ int msg_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsi
ptr += MSG_LEN(smp->length);
}
if (rbytes)
*rbytes = ptr - buf;

View file

@ -23,7 +23,6 @@
#include "sample.h"
#include "plugin.h"
#include "utils.h"
#include "compat.h"
#include "io/raw.h"
/** Convert float to host byte order */
@ -61,14 +60,14 @@ int raw_sprint(char *buf, size_t len, size_t *wbytes, struct sample *smps[], uns
int i, o = 0;
size_t nlen;
int8_t *i8 = (void *) buf;
int16_t *i16 = (void *) buf;
int32_t *i32 = (void *) buf;
int64_t *i64 = (void *) buf;
float *f32 = (void *) buf;
double *f64 = (void *) buf;
int bits = 1 << (flags >> 24);
for (i = 0; i < cnt; i++) {
@ -128,12 +127,12 @@ int raw_sprint(char *buf, size_t len, size_t *wbytes, struct sample *smps[], uns
return i;
}
int raw_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags)
int raw_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int flags)
{
/* The raw format can not encode multiple samples in one buffer
* as there is no support for framing. */
struct sample *smp = smps[0];
int8_t *i8 = (void *) buf;
int16_t *i16 = (void *) buf;
int32_t *i32 = (void *) buf;
@ -141,15 +140,15 @@ int raw_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsi
float *f32 = (void *) buf;
double *f64 = (void *) buf;
int off, bits = 1 << (*flags >> 24);
int off, bits = 1 << (flags >> 24);
smp->length = len / (bits / 8);
if (*flags & RAW_FAKE) {
if (flags & RAW_FAKE) {
off = 3;
if (smp->length < off) {
// warn("Node %s received a packet with no fake header. Skipping...", node_name(n));
warn("Received a packet with no fake header. Skipping...");
return 0;
}
@ -157,21 +156,24 @@ int raw_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsi
switch (bits) {
case 32:
smp->sequence = SWAP_INT_TOH(*flags & RAW_BE_HDR, 32, i32[0]);
smp->ts.origin.tv_sec = SWAP_INT_TOH(*flags & RAW_BE_HDR, 32, i32[1]);
smp->ts.origin.tv_nsec = SWAP_INT_TOH(*flags & RAW_BE_HDR, 32, i32[2]);
smp->sequence = SWAP_INT_TOH(flags & RAW_BE_HDR, 32, i32[0]);
smp->ts.origin.tv_sec = SWAP_INT_TOH(flags & RAW_BE_HDR, 32, i32[1]);
smp->ts.origin.tv_nsec = SWAP_INT_TOH(flags & RAW_BE_HDR, 32, i32[2]);
break;
case 64:
smp->sequence = SWAP_INT_TOH(*flags & RAW_BE_HDR, 64, i64[0]);
smp->ts.origin.tv_sec = SWAP_INT_TOH(*flags & RAW_BE_HDR, 64, i64[1]);
smp->ts.origin.tv_nsec = SWAP_INT_TOH(*flags & RAW_BE_HDR, 64, i64[2]);
smp->sequence = SWAP_INT_TOH(flags & RAW_BE_HDR, 64, i64[0]);
smp->ts.origin.tv_sec = SWAP_INT_TOH(flags & RAW_BE_HDR, 64, i64[1]);
smp->ts.origin.tv_nsec = SWAP_INT_TOH(flags & RAW_BE_HDR, 64, i64[2]);
break;
}
smp->has = SAMPLE_SEQUENCE | SAMPLE_ORIGIN;
}
else {
off = 0;
smp->has = 0;
smp->sequence = 0;
smp->ts.origin.tv_sec = 0;
smp->ts.origin.tv_nsec = 0;
@ -181,35 +183,32 @@ int raw_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsi
warn("Received more values than supported: length=%u, capacity=%u", smp->length, smp->capacity);
smp->length = smp->capacity;
}
for (int i = 0; i < smp->length; i++) {
int fmt = *flags & RAW_FLT ? SAMPLE_DATA_FORMAT_FLOAT
int fmt = flags & RAW_FLT ? SAMPLE_DATA_FORMAT_FLOAT
: SAMPLE_DATA_FORMAT_INT;
sample_set_data_format(smp, i, fmt);
switch (fmt) {
case SAMPLE_DATA_FORMAT_FLOAT:
switch (bits) {
case 32: smp->data[i].f = SWAP_FLT_TOH(*flags & RAW_BE_FLT, f32[i+off]); break;
case 64: smp->data[i].f = SWAP_DBL_TOH(*flags & RAW_BE_FLT, f64[i+off]); break;
case 32: smp->data[i].f = SWAP_FLT_TOH(flags & RAW_BE_FLT, f32[i+off]); break;
case 64: smp->data[i].f = SWAP_DBL_TOH(flags & RAW_BE_FLT, f64[i+off]); break;
}
break;
case SAMPLE_DATA_FORMAT_INT:
switch (bits) {
case 8: smp->data[i].i = i8[i]; break;
case 16: smp->data[i].i = (int16_t) SWAP_INT_TOH(*flags & RAW_BE_INT, 16, i16[i+off]); break;
case 32: smp->data[i].i = (int32_t) SWAP_INT_TOH(*flags & RAW_BE_INT, 32, i32[i+off]); break;
case 64: smp->data[i].i = (int64_t) SWAP_INT_TOH(*flags & RAW_BE_INT, 64, i64[i+off]); break;
case 16: smp->data[i].i = (int16_t) SWAP_INT_TOH(flags & RAW_BE_INT, 16, i16[i+off]); break;
case 32: smp->data[i].i = (int32_t) SWAP_INT_TOH(flags & RAW_BE_INT, 32, i32[i+off]); break;
case 64: smp->data[i].i = (int64_t) SWAP_INT_TOH(flags & RAW_BE_INT, 64, i64[i+off]); break;
}
break;
}
}
smp->ts.received.tv_sec = 0;
smp->ts.received.tv_nsec = 0;
if (rbytes)
*rbytes = len;

View file

@ -33,18 +33,20 @@
size_t villas_sprint_single(char *buf, size_t len, struct sample *s, int flags)
{
size_t off = snprintf(buf, len, "%llu", (unsigned long long) s->ts.origin.tv_sec);
size_t off = 0;
if (flags & IO_FORMAT_NANOSECONDS)
if (flags & SAMPLE_ORIGIN) {
off += snprintf(buf + off, len - off, "%llu", (unsigned long long) s->ts.origin.tv_sec);
off += snprintf(buf + off, len - off, ".%09llu", (unsigned long long) s->ts.origin.tv_nsec);
}
if (flags & IO_FORMAT_OFFSET)
if (flags & SAMPLE_OFFSET)
off += snprintf(buf + off, len - off, "%+e", time_delta(&s->ts.origin, &s->ts.received));
if (flags & IO_FORMAT_SEQUENCE)
if (flags & SAMPLE_SEQUENCE)
off += snprintf(buf + off, len - off, "(%u)", s->sequence);
if (flags & IO_FORMAT_VALUES) {
if (flags & SAMPLE_VALUES) {
for (int i = 0; i < s->length; i++) {
switch ((s->format >> i) & 0x1) {
case SAMPLE_DATA_FORMAT_FLOAT:
@ -62,14 +64,15 @@ size_t villas_sprint_single(char *buf, size_t len, struct sample *s, int flags)
return off;
}
size_t villas_sscan_single(const char *buf, size_t len, struct sample *s, int *flags)
size_t villas_sscan_single(const char *buf, size_t len, struct sample *s, int flags)
{
char *end;
const char *ptr = buf;
int fl = 0;
double offset = 0;
s->has = 0;
/* Format: Seconds.NanoSeconds+Offset(SequenceNumber) Value1 Value2 ...
* RegEx: (\d+(?:\.\d+)?)([-+]\d+(?:\.\d+)?(?:e[+-]?\d+)?)?(?:\((\d+)\))?
*
@ -81,14 +84,14 @@ size_t villas_sscan_single(const char *buf, size_t len, struct sample *s, int *f
if (ptr == end || *end == '\n')
return -1;
s->has |= SAMPLE_ORIGIN;
/* Optional: nano seconds */
if (*end == '.') {
ptr = end + 1;
s->ts.origin.tv_nsec = (uint32_t) strtoul(ptr, &end, 10);
if (ptr != end)
fl |= IO_FORMAT_NANOSECONDS;
else
if (ptr == end)
return -3;
}
else
@ -100,7 +103,7 @@ size_t villas_sscan_single(const char *buf, size_t len, struct sample *s, int *f
offset = strtof(ptr, &end); /* offset is ignored for now */
if (ptr != end)
fl |= IO_FORMAT_OFFSET;
s->has |= SAMPLE_OFFSET;
else
return -4;
}
@ -111,7 +114,7 @@ size_t villas_sscan_single(const char *buf, size_t len, struct sample *s, int *f
s->sequence = strtoul(ptr, &end, 10);
if (ptr != end)
fl |= IO_FORMAT_SEQUENCE;
s->has |= SAMPLE_SEQUENCE;
else
return -5;
@ -124,7 +127,7 @@ size_t villas_sscan_single(const char *buf, size_t len, struct sample *s, int *f
ptr = end, s->length++) {
if (*end == '\n')
break;
switch (s->format & (1 << s->length)) {
case SAMPLE_DATA_FORMAT_FLOAT:
s->data[s->length].f = strtod(ptr, &end);
@ -138,22 +141,19 @@ size_t villas_sscan_single(const char *buf, size_t len, struct sample *s, int *f
if (end == ptr)
break;
}
if (*end == '\n')
end++;
if (s->length > 0)
fl |= IO_FORMAT_VALUES;
s->has |= SAMPLE_VALUES;
if (flags)
*flags = fl;
if (fl & IO_FORMAT_OFFSET) {
if (s->has & SAMPLE_OFFSET) {
struct timespec off = time_from_double(offset);
s->ts.received = time_add(&s->ts.origin, &off);
s->has |= SAMPLE_RECEIVED;
}
else
s->ts.received = time_now();
return end - buf;
}
@ -165,28 +165,28 @@ int villas_sprint(char *buf, size_t len, size_t *wbytes, struct sample *smps[],
for (i = 0; i < cnt && off < len; i++)
off += villas_sprint_single(buf + off, len - off, smps[i], flags);
if (wbytes)
*wbytes = off;
return i;
}
int villas_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags)
int villas_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int flags)
{
int i;
size_t off = 0;
for (i = 0; i < cnt && off < len; i++)
off += villas_sscan_single(buf + off, len - off, smps[i], flags);
if (rbytes)
*rbytes = off;
return i;
}
int villas_fscan_single(FILE *f, struct sample *s, int *flags)
int villas_fscan_single(FILE *f, struct sample *s, int flags)
{
char *ptr, line[4096];
@ -209,7 +209,7 @@ int villas_fprint_single(FILE *f, struct sample *s, int flags)
ret = villas_sprint_single(line, sizeof(line), s, flags);
if (ret < 0)
return ret;
fputs(line, f);
return 0;
@ -228,7 +228,7 @@ int villas_fprint(FILE *f, struct sample *smps[], unsigned cnt, int flags)
return i;
}
int villas_fscan(FILE *f, struct sample *smps[], unsigned cnt, int *flags)
int villas_fscan(FILE *f, struct sample *smps[], unsigned cnt, int flags)
{
int ret, i;

View file

@ -25,11 +25,10 @@
#include "io_format.h"
int io_format_sscan(struct io_format *fmt, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags)
int io_format_sscan(struct io_format *fmt, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int flags)
{
if (!flags)
flags = &fmt->flags;
flags |= fmt->flags;
return fmt->sscan ? fmt->sscan(buf, len, rbytes, smps, cnt, flags) : -1;
}
@ -40,7 +39,7 @@ int io_format_sprint(struct io_format *fmt, char *buf, size_t len, size_t *wbyte
return fmt->sprint ? fmt->sprint(buf, len, wbytes, smps, cnt, flags) : -1;
}
int io_format_fscan(struct io_format *fmt, FILE *f, struct sample *smps[], unsigned cnt, int *flags)
int io_format_fscan(struct io_format *fmt, FILE *f, struct sample *smps[], unsigned cnt, int flags)
{
return fmt->sprint ? fmt->fscan(f, smps, cnt, flags) : -1;
}
@ -48,4 +47,4 @@ int io_format_fscan(struct io_format *fmt, FILE *f, struct sample *smps[], unsig
int io_format_fprint(struct io_format *fmt, FILE *f, struct sample *smps[], unsigned cnt, int flags)
{
return fmt->fprint ? fmt->fprint(f, smps, cnt, flags) : -1;
}
}

View file

@ -29,6 +29,7 @@
#include "plugin.h"
#include "config_helper.h"
#include "mapping.h"
#include "timing.h"
int node_init(struct node *n, struct node_type *vt)
{
@ -278,9 +279,27 @@ int node_read(struct node *n, struct sample *smps[], unsigned cnt)
debug(LOG_NODES | 5, "Received %u samples from node %s", nread, node_name(n));
}
for (int i = 0; i < nread; i++)
/* Add missing fields */
for (int i = 0; i < nread; i++) {
smps[i]->source = n;
if (!(smps[i]->has & SAMPLE_SEQUENCE))
smps[i]->sequence = n->sequence++;
if (!(smps[i]->has & SAMPLE_ORIGIN) ||
!(smps[i]->has & SAMPLE_RECEIVED)) {
struct timespec now = time_now();
if (!(smps[i]->has & SAMPLE_RECEIVED))
smps[i]->ts.received = now;
if (!(smps[i]->has & SAMPLE_ORIGIN))
smps[i]->ts.origin = now;
}
}
/* Run read hooks */
rread = hook_read_list(&n->hooks, smps, nread);
if (nread != rread) {
int skipped = nread - rread;

View file

@ -104,7 +104,7 @@ int file_parse(struct node *n, json_t *cfg)
f->epoch = time_from_double(epoch_flt);
f->uri_tmpl = uri_tmpl ? strdup(uri_tmpl) : NULL;
f->format = io_format_lookup(format);
if (!f->format)
error("Invalid format '%s' for node %s", format, node_name(n));
@ -205,7 +205,7 @@ int file_start(struct node *n)
f->uri = file_format_name(f->uri_tmpl, &now);
/* Open file */
flags = IO_FORMAT_ALL;
flags = SAMPLE_ALL;
if (f->flush)
flags |= IO_FLUSH;
@ -342,7 +342,7 @@ int file_write(struct node *n, struct sample *smps[], unsigned cnt)
int file_fd(struct node *n)
{
struct file *f = n->_vd;
if (f->rate)
return task_fd(&f->task);
else {

View file

@ -111,7 +111,7 @@ int nanomsg_parse(struct node *n, json_t *cfg)
if (ret < 0)
error("Invalid type for 'subscribe' setting of node %s", node_name(n));
}
m->format = io_format_lookup(format);
if (!m->format)
error("Invalid format '%s' for node %s", format, node_name(n));
@ -223,7 +223,7 @@ int nanomsg_read(struct node *n, struct sample *smps[], unsigned cnt)
if (bytes < 0)
return -1;
return io_format_sscan(m->format, data, bytes, NULL, smps, cnt, NULL);
return io_format_sscan(m->format, data, bytes, NULL, smps, cnt, 0);
}
int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt)
@ -235,7 +235,7 @@ int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt)
char data[NANOMSG_MAX_PACKET_LEN];
ret = io_format_sprint(m->format, data, sizeof(data), &wbytes, smps, cnt, IO_FORMAT_ALL);
ret = io_format_sprint(m->format, data, sizeof(data), &wbytes, smps, cnt, SAMPLE_ALL);
if (ret <= 0)
return -1;
@ -250,14 +250,14 @@ int nanomsg_fd(struct node *n)
{
int ret;
struct nanomsg *m = n->_vd;
int fd;
size_t len = sizeof(fd);
ret = nn_getsockopt(m->subscriber.socket, NN_SOL_SOCKET, NN_RCVFD, &fd, &len);
if (ret)
return ret;
return fd;
}

View file

@ -128,11 +128,6 @@ int shmem_read(struct node *n, struct sample *smps[], unsigned cnt)
sample_copy_many(smps, shared_smps, recv);
sample_put_many(shared_smps, recv);
struct timespec ts_recv = time_now();
for (int i = 0; i < recv; i++)
smps[i]->ts.received = ts_recv;
return recv;
}
@ -140,25 +135,18 @@ int shmem_write(struct node *n, struct sample *smps[], unsigned cnt)
{
struct shmem *shm = n->_vd;
struct sample *shared_smps[cnt]; /* Samples need to be copied to the shared pool first */
int avail, pushed, len;
int avail, pushed;
avail = sample_alloc(&shm->intf.write.shared->pool, shared_smps, cnt);
if (avail != cnt)
warn("Pool underrun for shmem node %s", shm->out_name);
for (int i = 0; i < avail; i++) {
sample_copy(shared_smps[i], smps[i]);
/* Since the node isn't in shared memory, the source can't be accessed */
shared_smps[i]->source = NULL;
shared_smps[i]->sequence = smps[i]->sequence;
shared_smps[i]->ts = smps[i]->ts;
len = MIN(smps[i]->length, shared_smps[i]->capacity);
if (len != smps[i]->length)
warn("Losing data because of sample capacity mismatch in node %s", node_name(n));
memcpy(shared_smps[i]->data, smps[i]->data, SAMPLE_DATA_LEN(len));
shared_smps[i]->length = len;
shared_smps[i]->has &= ~SAMPLE_SOURCE;
}
pushed = shmem_int_write(&shm->intf, shared_smps, avail);

View file

@ -210,7 +210,7 @@ int signal_read(struct node *n, struct sample *smps[], unsigned cnt)
struct signal *s = n->_vd;
struct sample *t = smps[0];
struct timespec ts_recv;
struct timespec ts;
int steps;
assert(cnt == 1);
@ -222,20 +222,20 @@ int signal_read(struct node *n, struct sample *smps[], unsigned cnt)
if (steps > 1)
warn("Missed steps: %u", steps);
ts_recv = time_now();
ts = time_now();
}
else {
struct timespec offset = time_from_double(s->counter * 1.0 / s->rate);
ts_recv = time_add(&s->started, &offset);
ts = time_add(&s->started, &offset);
steps = 1;
}
double running = time_delta(&s->started, &ts_recv);
double running = time_delta(&s->started, &ts);
t->ts.origin =
t->ts.received = ts_recv;
t->has = SAMPLE_ORIGIN | SAMPLE_VALUES | SAMPLE_SEQUENCE;
t->ts.origin = ts;
t->sequence = s->counter;
t->length = n->samplelen;

View file

@ -344,7 +344,7 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt)
return 0;
}
ret = io_format_sscan(s->format, bufptr, bytes, &rbytes, smps, cnt, NULL);
ret = io_format_sscan(s->format, bufptr, bytes, &rbytes, smps, cnt, 0);
if (bytes != rbytes)
warn("Received invalid packet from node: %s bytes=%zu, rbytes=%zu", node_name(n), bytes, rbytes);
@ -361,7 +361,7 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt)
ssize_t bytes;
size_t wbytes;
ret = io_format_sprint(s->format, data, sizeof(data), &wbytes, smps, cnt, IO_FORMAT_ALL);
ret = io_format_sprint(s->format, data, sizeof(data), &wbytes, smps, cnt, SAMPLE_ALL);
if (ret < 0)
return -1;

View file

@ -47,9 +47,9 @@ static char * websocket_connection_name(struct websocket_connection *c)
if (c->wsi) {
char name[128];
char ip[128];
lws_get_peer_addresses(c->wsi, lws_get_socket_fd(c->wsi), name, sizeof(name), ip, sizeof(ip));
strcatf(&c->_name, "remote.ip=%s, remote.name=%s", ip, name);
}
else if (c->destination)
@ -75,15 +75,15 @@ static void websocket_destination_destroy(struct websocket_destination *d)
static int websocket_connection_write(struct websocket_connection *c, struct sample *smps[], unsigned cnt)
{
int pushed;
pushed = queue_push_many(&c->queue, (void **) smps, cnt);
if (pushed < cnt)
warn("Queue overrun in WebSocket connection: %s", websocket_connection_name(c));
sample_get_many(smps, cnt);
debug(LOG_WEBSOCKET | 10, "Enqueued %u samples to %s", pushed, websocket_connection_name(c));
/* Client connections which are currently conecting don't have an associate c->wsi yet */
if (c->wsi)
lws_callback_on_writable(c->wsi);
@ -102,17 +102,17 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
{
int ret, recvd, pulled, cnt = 128;
struct websocket_connection *c = user;
switch (reason) {
case LWS_CALLBACK_CLIENT_ESTABLISHED:
c->wsi = wsi;
c->state = STATE_ESTABLISHED;
buffer_init(&c->buffers.recv, 1 << 12);
buffer_init(&c->buffers.send, 1 << 12);
debug(LOG_WEBSOCKET | 10, "Established WebSocket connection: %s", websocket_connection_name(c));
/* Schedule writable callback in case we have something to send */
if (queue_available(&c->queue) > 0)
lws_callback_on_writable(wsi);
@ -146,7 +146,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Invalid URL");
return -1;
}
node = strtok(uri, "/.");
if (strlen(node) == 0)
c->node = NULL;
@ -160,7 +160,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
return -1;
}
}
if (!format)
format = "webmsg";
@ -169,16 +169,16 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Invalid format");
return -1;
}
buffer_init(&c->buffers.recv, 1 << 12);
buffer_init(&c->buffers.send, 1 << 12);
ret = queue_init(&c->queue, DEFAULT_QUEUELEN, &memtype_hugepage);
if (ret)
return -1;
list_push(&connections, c);
debug(LOG_WEBSOCKET | 10, "Established WebSocket connection: %s", websocket_connection_name(c));
break;
@ -189,10 +189,10 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
warn("Failed to establish WebSocket connection: %s, reason=%s", websocket_connection_name(c), in ? (char *) in : "unkown");
return -1;
case LWS_CALLBACK_CLOSED:
debug(LOG_WEBSOCKET | 10, "Closed WebSocket connection: %s", websocket_connection_name(c));
if (c->state != STATE_SHUTDOWN) {
/** @todo Attempt reconnect here */
}
@ -219,7 +219,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
case LWS_CALLBACK_CLIENT_WRITEABLE:
case LWS_CALLBACK_SERVER_WRITEABLE: {
size_t wbytes;
if (c->state == STATE_SHUTDOWN) {
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_GOINGAWAY, "Node stopped");
return -1;
@ -229,51 +229,51 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
pulled = queue_pull_many(&c->queue, (void **) smps, cnt);
if (pulled > 0) {
io_format_sprint(c->format, c->buffers.send.buf + LWS_PRE, c->buffers.send.size - LWS_PRE, &wbytes, smps, pulled, IO_FORMAT_ALL);
io_format_sprint(c->format, c->buffers.send.buf + LWS_PRE, c->buffers.send.size - LWS_PRE, &wbytes, smps, pulled, SAMPLE_ALL);
ret = lws_write(wsi, (unsigned char *) c->buffers.send.buf + LWS_PRE, wbytes, c->format->flags & IO_FORMAT_BINARY ? LWS_WRITE_BINARY : LWS_WRITE_TEXT);
sample_put_many(smps, pulled);
debug(LOG_WEBSOCKET | 10, "Send %d samples to connection: %s, bytes=%d", pulled, websocket_connection_name(c), ret);
}
if (queue_available(&c->queue) > 0)
lws_callback_on_writable(wsi);
break;
}
case LWS_CALLBACK_CLIENT_RECEIVE:
case LWS_CALLBACK_RECEIVE:
case LWS_CALLBACK_RECEIVE:
if (!c->node) {
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Catch-all connection can not receive.");
return -1;
}
if (lws_is_first_fragment(wsi))
buffer_clear(&c->buffers.recv);
ret = buffer_append(&c->buffers.recv, in, len);
if (ret) {
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_UNACCEPTABLE_OPCODE, "Failed to process data");
return -1;
}
/* We dont try to parse the frame yet, as we have to wait for the remaining fragments */
if (lws_is_final_fragment(wsi)) {
struct timespec ts_recv = time_now();
struct websocket *w = c->node->_vd;
struct sample **smps = alloca(cnt * sizeof(struct sample *));
ret = sample_alloc(&w->pool, smps, cnt);
if (ret != cnt) {
warn("Pool underrun for connection: %s", websocket_connection_name(c));
break;
}
recvd = io_format_sscan(c->format, c->buffers.recv.buf, c->buffers.recv.len, NULL, smps, cnt, NULL);
recvd = io_format_sscan(c->format, c->buffers.recv.buf, c->buffers.recv.len, NULL, smps, cnt, 0);
if (recvd < 0) {
warn("Failed to parse sample data received on connection: %s", websocket_connection_name(c));
break;
@ -282,13 +282,15 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
debug(LOG_WEBSOCKET | 10, "Received %d samples to connection: %s", recvd, websocket_connection_name(c));
/* Set receive timestamp */
for (int i = 0; i < recvd; i++)
for (int i = 0; i < recvd; i++) {
smps[i]->ts.received = ts_recv;
smps[i]->has |= SAMPLE_RECEIVED;
}
ret = queue_signalled_push_many(&w->queue, (void **) smps, recvd);
if (ret != recvd)
warn("Queue overrun for connection: %s", websocket_connection_name(c));
if (c->state == STATE_SHUTDOWN) {
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_GOINGAWAY, "Node stopped");
return -1;
@ -300,7 +302,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
default:
break;
}
return 0;
}
@ -365,7 +367,7 @@ int websocket_start(struct node *n)
d->info.context = web->context;
d->info.vhost = web->vhost;
d->info.userdata = c;
ret = queue_init(&c->queue, DEFAULT_QUEUELEN, &memtype_hugepage);
if (ret)
return -1;
@ -382,29 +384,29 @@ int websocket_stop(struct node *n)
{
int ret;
struct websocket *w = n->_vd;
/* Wait for all connections to be closed */
for (;;) {
int connecting = 0;
for (int i = 0; i < list_length(&w->destinations); i++) {
struct websocket_destination *d = list_at(&w->destinations, i);
struct websocket_connection *c = d->info.userdata;
if (c->state == STATE_CONNECTING)
connecting++;
}
if (connecting == 0)
break;
debug(LOG_WEBSOCKET | 10, "Waiting for %d client connections to be established", connecting);
sleep(1);
}
for (size_t i = 0; i < list_length(&connections); i++) {
struct websocket_connection *c = list_at(&connections, i);
if (c->node != n)
continue;
@ -474,7 +476,7 @@ int websocket_write(struct node *n, struct sample *smps[], unsigned cnt)
for (size_t i = 0; i < list_length(&connections); i++) {
struct websocket_connection *c = list_at(&connections, i);
if (c->node == n || c->node == NULL)
websocket_connection_write(c, cpys, cnt);
}
@ -561,7 +563,7 @@ char * websocket_print(struct node *n)
int websocket_fd(struct node *n)
{
struct websocket *w = n->_vd;
return queue_signalled_fd(&w->queue);
}

View file

@ -125,7 +125,7 @@ int zeromq_parse(struct node *n, json_t *cfg)
z->subscriber.endpoint = ep ? strdup(ep) : NULL;
z->filter = filter ? strdup(filter) : NULL;
z->format = io_format_lookup(format);
if (!z->format)
error("Invalid format '%s' for node %s", format, node_name(n));
@ -429,7 +429,7 @@ int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt)
if (ret < 0)
return ret;
recv = io_format_sscan(z->format, zmq_msg_data(&m), zmq_msg_size(&m), NULL, smps, cnt, NULL);
recv = io_format_sscan(z->format, zmq_msg_data(&m), zmq_msg_size(&m), NULL, smps, cnt, 0);
ret = zmq_msg_close(&m);
if (ret)
@ -448,7 +448,7 @@ int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt)
char data[4096];
ret = io_format_sprint(z->format, data, sizeof(data), &wbytes, smps, cnt, IO_FORMAT_ALL);
ret = io_format_sprint(z->format, data, sizeof(data), &wbytes, smps, cnt, SAMPLE_ALL);
if (ret <= 0)
return -1;
@ -492,10 +492,10 @@ int zeromq_fd(struct node *n)
{
int ret;
struct zeromq *z = n->_vd;
int fd;
size_t len = sizeof(fd);
ret = zmq_getsockopt(z->subscriber.socket, ZMQ_FD, &fd, &len);
if (ret)
return ret;

View file

@ -93,10 +93,9 @@ int sample_copy(struct sample *dst, struct sample *src)
dst->sequence = src->sequence;
dst->format = src->format;
dst->source = src->source;
dst->has = src->has;
dst->ts.origin = src->ts.origin;
dst->ts.received = src->ts.received;
dst->ts.sent = src->ts.sent;
dst->ts = src->ts;
memcpy(&dst->data, &src->data, SAMPLE_DATA_LEN(dst->length));

View file

@ -180,7 +180,7 @@ check: if (optarg == endptr)
if (!p)
error("Unknown IO format '%s'", format);
ret = io_init(&io, &p->io, IO_FORMAT_ALL);
ret = io_init(&io, &p->io, SAMPLE_ALL);
if (ret)
error("Failed to initialize IO");

View file

@ -292,7 +292,7 @@ check: if (optarg == endptr)
if (!p)
error("Invalid format: %s", format);
ret = io_init(&io, &p->io, IO_FORMAT_ALL);
ret = io_init(&io, &p->io, SAMPLE_ALL);
if (ret)
error("Failed to initialize IO");

View file

@ -136,7 +136,7 @@ int main(int argc, char *argv[])
if (!p)
error("Invalid output format '%s'", format);
ret = io_init(&io, &p->io, IO_FLUSH | (IO_FORMAT_ALL & ~IO_FORMAT_OFFSET));
ret = io_init(&io, &p->io, IO_FLUSH | (SAMPLE_ALL & ~SAMPLE_OFFSET));
if (ret)
error("Failed to initialize output");

View file

@ -65,7 +65,7 @@ void usage()
int main(int argc, char *argv[])
{
int ret;
/* Default values */
double epsilon = 1e-9;
int timestamp = 1;
@ -77,7 +77,6 @@ int main(int argc, char *argv[])
struct sample *samples[2];
struct {
int flags;
char *path;
FILE *handle;
struct sample *sample;
@ -150,11 +149,11 @@ check: if (optarg == endptr)
serror("Failed to open file: %s", f2.path);
while (!feof(f1.handle) && !feof(f2.handle)) {
ret = villas_fscan(f1.handle, &f1.sample, 1, &f1.flags);
ret = villas_fscan(f1.handle, &f1.sample, 1, 0);
if (ret < 0 && !feof(f1.handle))
goto out;
ret = villas_fscan(f2.handle, &f2.sample, 1, &f2.flags);
ret = villas_fscan(f2.handle, &f2.sample, 1, 0);
if (ret < 0 && !feof(f2.handle))
goto out;
@ -168,7 +167,7 @@ check: if (optarg == endptr)
}
/* Compare sequence no */
if (sequence && (f1.flags & IO_FORMAT_SEQUENCE) && (f2.flags & IO_FORMAT_SEQUENCE)) {
if (sequence && (f1.sample->has & SAMPLE_SEQUENCE) && (f2.sample->has & SAMPLE_SEQUENCE)) {
if (f1.sample->sequence != f2.sample->sequence) {
printf("sequence no: %d != %d\n", f1.sample->sequence, f2.sample->sequence);
ret = 2;