1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

adding new field "flags" to struct sample

This commit is contained in:
Steffen Vogel 2017-09-16 15:04:59 +02:00
parent 2c67e1a4f4
commit 5e620942ce
23 changed files with 82 additions and 79 deletions

View file

@ -69,6 +69,8 @@ struct node
int node_init(struct node *n, struct node_type *vt);
int node_init2(struct node *n);
/** Parse settings of a node.
*
* @param cfg A JSON object containing the configuration of the node.

View file

@ -146,7 +146,7 @@ struct node_type {
* @param n A pointer to the node object.
*/
int (*reverse)(struct node *n);
/** Return a file descriptor which can be used by poll / select to detect the availability of new data. */
int (*fd)(struct node *n);
};

View file

@ -53,28 +53,35 @@ enum sample_data_format {
};
/** Parts of a sample that can be serialized / de-serialized by the IO formats */
enum sample_has {
SAMPLE_ORIGIN = (1 << 0), /**< Include nanoseconds in output. */
SAMPLE_RECEIVED = (1 << 1), /**< Include nanoseconds in output. */
SAMPLE_OFFSET = (1 << 2),
SAMPLE_SOURCE = (1 << 3),
SAMPLE_ID = (1 << 4),
SAMPLE_SEQUENCE = (1 << 5), /**< Include sequence number in output. */
SAMPLE_VALUES = (1 << 6), /**< Include values in output. */
SAMPLE_FORMAT = (1 << 7),
SAMPLE_ALL = (1 << 7) - 1, /**< Enable all output options. */
enum sample_flags {
SAMPLE_HAS_ORIGIN = (1 << 0), /**< Include origin timestamp in output. */
SAMPLE_HAS_RECEIVED = (1 << 1), /**< Include receive timestamp in output. */
SAMPLE_HAS_OFFSET = (1 << 2), /**< Include offset (received - origin timestamp) in output. */
SAMPLE_HAS_SOURCE = (1 << 3), /**< This sample has a valid sample::source field. */
SAMPLE_HAS_ID = (1 << 4), /**< This sample has a valid sample::id field. */
SAMPLE_HAS_SEQUENCE = (1 << 5), /**< Include sequence number in output. */
SAMPLE_HAS_VALUES = (1 << 6), /**< Include values in output. */
SAMPLE_HAS_FORMAT = (1 << 7), /**< This sample has a valid sample::format field. */
SAMPLE_HAS_ALL = (1 << 7) - 1, /**< Enable all output options. */
};
struct sample {
int sequence; /**< The sequence number of this sample. */
int length; /**< The number of values in sample::values which are valid. */
int capacity; /**< The number of values in sample::values for which memory is reserved. */
int sequence; /**< The sequence number of this sample. */
int length; /**< The number of values in sample::values which are valid. */
int capacity; /**< The number of values in sample::values for which memory is reserved. */
int flags; /**< Flags are used to store binary properties of a sample. */
int id;
int id; /**< The id field is usually the same as sample::source::id */
struct node *source; /**< The node from which this sample originates. */
atomic_int refcnt; /**< Reference counter. */
off_t pool_off; /**< This sample belongs to this memory pool (relative pointer). */
struct node *source; /**< The node from which this sample originates. */
atomic_int refcnt; /**< Reference counter. */
off_t pool_off; /**< This sample belongs to this memory pool (relative pointer). See sample_pool(). */
/** A long bitfield indicating the number representation of the first 64 values in sample::data[].
*
* @see sample_data_format
*/
uint64_t format;
/** All timestamps are seconds / nano seconds after 1.1.1970 UTC */
struct {
@ -83,14 +90,6 @@ struct sample {
struct timespec sent; /**< The point in time when this data was send for the last time. */
} ts;
int has;
/** A long bitfield indicating the number representation of the first 64 values in sample::data[].
*
* @see sample_data_format
*/
uint64_t format;
/** The values. */
union {
double f; /**< Floating point values. */
@ -98,6 +97,7 @@ struct sample {
} data[]; /**< Data is in host endianess! */
};
/** Get the address of the pool to which the sample belongs. */
#define sample_pool(s) ((struct pool *) ((char *) (s) + (s)->pool_off))
/** Request \p cnt samples from memory pool \p p and initialize them.

View file

@ -51,7 +51,7 @@ static int print_start(struct hook *h)
struct print *p = h->_vd;
int ret;
ret = io_init(&p->io, p->format, SAMPLE_ALL);
ret = io_init(&p->io, p->format, SAMPLE_HAS_ALL);
if (ret)
return ret;

View file

@ -32,10 +32,10 @@ size_t csv_sprint_single(char *buf, size_t len, struct sample *s, int flags)
{
size_t off = 0;
if (flags & SAMPLE_ORIGIN)
if (flags & SAMPLE_HAS_ORIGIN)
off += snprintf(buf + off, len - off, "%ld%c%09ld", s->ts.origin.tv_sec, CSV_SEPARATOR, s->ts.origin.tv_nsec);
if (flags & SAMPLE_SEQUENCE)
if (flags & SAMPLE_HAS_SEQUENCE)
off += snprintf(buf + off, len - off, "%c%u", CSV_SEPARATOR, s->sequence);
for (int i = 0; i < s->length; i++) {
@ -59,7 +59,7 @@ size_t csv_sscan_single(const char *buf, size_t len, struct sample *s, int flags
const char *ptr = buf;
char *end;
s->has = 0;
s->flags = 0;
s->ts.origin.tv_sec = strtoul(ptr, &end, 10);
if (end == ptr || *end == '\n')
@ -73,13 +73,13 @@ size_t csv_sscan_single(const char *buf, size_t len, struct sample *s, int flags
ptr = end;
s->has |= SAMPLE_ORIGIN;
s->flags |= SAMPLE_HAS_ORIGIN;
s->sequence = strtoul(ptr, &end, 10);
if (end == ptr || *end == '\n')
goto out;
s->has |= SAMPLE_SEQUENCE;
s->flags |= SAMPLE_HAS_SEQUENCE;
for (ptr = end, s->length = 0;
s->length < s->capacity;
@ -105,7 +105,7 @@ out: if (*end == '\n')
end++;
if (s->length > 0)
s->has |= SAMPLE_VALUES;
s->flags |= SAMPLE_HAS_VALUES;
return end - buf;
}

View file

@ -34,13 +34,13 @@ int json_pack_sample(json_t **j, struct sample *smp, int flags)
"received", smp->ts.received.tv_sec, smp->ts.received.tv_nsec,
"sent", smp->ts.sent.tv_sec, smp->ts.sent.tv_nsec);
if (flags & SAMPLE_SEQUENCE) {
if (flags & SAMPLE_HAS_SEQUENCE) {
json_t *json_sequence = json_integer(smp->sequence);
json_object_set(json_smp, "sequence", json_sequence);
}
if (flags & SAMPLE_VALUES) {
if (flags & SAMPLE_HAS_VALUES) {
json_t *json_data = json_array();
for (int i = 0; i < smp->length; i++) {
@ -99,7 +99,7 @@ int json_unpack_sample(json_t *json_smp, struct sample *smp, int flags)
if (!json_is_array(json_data))
return -1;
smp->has = SAMPLE_ORIGIN | SAMPLE_RECEIVED | SAMPLE_SEQUENCE;
smp->flags = SAMPLE_HAS_ORIGIN | SAMPLE_HAS_RECEIVED | SAMPLE_HAS_SEQUENCE;
smp->length = 0;
json_array_foreach(json_data, i, json_value) {
@ -125,7 +125,7 @@ int json_unpack_sample(json_t *json_smp, struct sample *smp, int flags)
}
if (smp->length > 0)
smp->has |= SAMPLE_VALUES;
smp->flags |= SAMPLE_HAS_VALUES;
return 0;
}

View file

@ -79,7 +79,7 @@ int msg_to_sample(struct msg *msg, struct sample *smp)
if (ret)
return -1;
smp->has = SAMPLE_ORIGIN | SAMPLE_SEQUENCE | SAMPLE_VALUES | SAMPLE_ID;
smp->flags = SAMPLE_HAS_ORIGIN | SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_VALUES | SAMPLE_HAS_ID;
smp->length = MIN(msg->length, smp->capacity);
smp->sequence = msg->sequence;
smp->id = msg->id;

View file

@ -169,12 +169,12 @@ int raw_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsi
break;
}
smp->has = SAMPLE_SEQUENCE | SAMPLE_ORIGIN;
smp->flags = SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_ORIGIN;
}
else {
off = 0;
smp->has = 0;
smp->flags = 0;
smp->sequence = 0;
smp->ts.origin.tv_sec = 0;
smp->ts.origin.tv_nsec = 0;

View file

@ -40,18 +40,18 @@ size_t villas_human_sprint_single(char *buf, size_t len, struct sample *s, int f
{
size_t off = 0;
if (flags & SAMPLE_ORIGIN) {
if (flags & SAMPLE_HAS_ORIGIN) {
off += snprintf(buf + off, len - off, "%llu", (unsigned long long) s->ts.origin.tv_sec);
off += snprintf(buf + off, len - off, ".%09llu", (unsigned long long) s->ts.origin.tv_nsec);
}
if (flags & SAMPLE_RECEIVED)
if (flags & SAMPLE_HAS_RECEIVED)
off += snprintf(buf + off, len - off, "%+e", time_delta(&s->ts.origin, &s->ts.received));
if (flags & SAMPLE_SEQUENCE)
if (flags & SAMPLE_HAS_SEQUENCE)
off += snprintf(buf + off, len - off, "(%u)", s->sequence);
if (flags & SAMPLE_VALUES) {
if (flags & SAMPLE_HAS_VALUES) {
for (int i = 0; i < s->length; i++) {
switch (sample_get_data_format(s, i)) {
case SAMPLE_DATA_FORMAT_FLOAT:
@ -76,7 +76,7 @@ size_t villas_human_sscan_single(const char *buf, size_t len, struct sample *s,
double offset = 0;
s->has = 0;
s->flags = 0;
/* Format: Seconds.NanoSeconds+Offset(SequenceNumber) Value1 Value2 ...
* RegEx: (\d+(?:\.\d+)?)([-+]\d+(?:\.\d+)?(?:e[+-]?\d+)?)?(?:\((\d+)\))?
@ -89,7 +89,7 @@ size_t villas_human_sscan_single(const char *buf, size_t len, struct sample *s,
if (ptr == end || *end == '\n')
return -1;
s->has |= SAMPLE_ORIGIN;
s->flags |= SAMPLE_HAS_ORIGIN;
/* Optional: nano seconds */
if (*end == '.') {
@ -108,7 +108,7 @@ size_t villas_human_sscan_single(const char *buf, size_t len, struct sample *s,
offset = strtof(ptr, &end); /* offset is ignored for now */
if (ptr != end)
s->has |= SAMPLE_OFFSET;
s->flags |= SAMPLE_HAS_OFFSET;
else
return -4;
}
@ -119,7 +119,7 @@ size_t villas_human_sscan_single(const char *buf, size_t len, struct sample *s,
s->sequence = strtoul(ptr, &end, 10);
if (ptr != end)
s->has |= SAMPLE_SEQUENCE;
s->flags |= SAMPLE_HAS_SEQUENCE;
else
return -5;
@ -151,13 +151,13 @@ size_t villas_human_sscan_single(const char *buf, size_t len, struct sample *s,
end++;
if (s->length > 0)
s->has |= SAMPLE_VALUES;
s->flags |= SAMPLE_HAS_VALUES;
if (s->has & SAMPLE_OFFSET) {
if (s->flags & SAMPLE_HAS_OFFSET) {
struct timespec off = time_from_double(offset);
s->ts.received = time_add(&s->ts.origin, &off);
s->has |= SAMPLE_RECEIVED;
s->flags |= SAMPLE_HAS_RECEIVED;
}
return end - buf;

View file

@ -283,18 +283,18 @@ int node_read(struct node *n, struct sample *smps[], unsigned cnt)
for (int i = 0; i < nread; i++) {
smps[i]->source = n;
if (!(smps[i]->has & SAMPLE_SEQUENCE))
if (!(smps[i]->flags & SAMPLE_HAS_SEQUENCE))
smps[i]->sequence = n->sequence++;
if (!(smps[i]->has & SAMPLE_ORIGIN) ||
!(smps[i]->has & SAMPLE_RECEIVED)) {
if (!(smps[i]->flags & SAMPLE_HAS_ORIGIN) ||
!(smps[i]->flags & SAMPLE_HAS_RECEIVED)) {
struct timespec now = time_now();
if (!(smps[i]->has & SAMPLE_RECEIVED))
if (!(smps[i]->flags & SAMPLE_HAS_RECEIVED))
smps[i]->ts.received = now;
if (!(smps[i]->has & SAMPLE_ORIGIN))
if (!(smps[i]->flags & SAMPLE_HAS_ORIGIN))
smps[i]->ts.origin = now;
}
}
@ -320,6 +320,7 @@ int node_write(struct node *n, struct sample *smps[], unsigned cnt)
if (!n->_vt->write)
return -1;
/* Run write hooks */
cnt = hook_write_list(&n->hooks, smps, cnt);
if (cnt <= 0)
return cnt;

View file

@ -205,7 +205,7 @@ int file_start(struct node *n)
f->uri = file_format_name(f->uri_tmpl, &now);
/* Open file */
flags = SAMPLE_ALL;
flags = SAMPLE_HAS_ALL;
if (f->flush)
flags |= IO_FLUSH;

View file

@ -235,7 +235,7 @@ int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt)
char data[NANOMSG_MAX_PACKET_LEN];
ret = io_format_sprint(m->format, data, sizeof(data), &wbytes, smps, cnt, SAMPLE_ALL);
ret = io_format_sprint(m->format, data, sizeof(data), &wbytes, smps, cnt, SAMPLE_HAS_ALL);
if (ret <= 0)
return -1;

View file

@ -146,7 +146,7 @@ int shmem_write(struct node *n, struct sample *smps[], unsigned cnt)
/* Since the node isn't in shared memory, the source can't be accessed */
shared_smps[i]->source = NULL;
shared_smps[i]->has &= ~SAMPLE_SOURCE;
shared_smps[i]->flags &= ~SAMPLE_HAS_SOURCE;
}
pushed = shmem_int_write(&shm->intf, shared_smps, avail);

View file

@ -236,7 +236,7 @@ int signal_read(struct node *n, struct sample *smps[], unsigned cnt)
double running = time_delta(&s->started, &ts);
t->has = SAMPLE_ORIGIN | SAMPLE_VALUES | SAMPLE_SEQUENCE;
t->flags = SAMPLE_HAS_ORIGIN | SAMPLE_HAS_VALUES | SAMPLE_HAS_SEQUENCE;
t->ts.origin = ts;
t->sequence = s->counter;
t->length = n->samplelen;

View file

@ -361,7 +361,7 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt)
ssize_t bytes;
size_t wbytes;
ret = io_format_sprint(s->format, data, sizeof(data), &wbytes, smps, cnt, SAMPLE_ALL);
ret = io_format_sprint(s->format, data, sizeof(data), &wbytes, smps, cnt, SAMPLE_HAS_ALL);
if (ret < 0)
return -1;

View file

@ -131,7 +131,7 @@ int stats_node_read(struct node *n, struct sample *smps[], unsigned cnt)
task_wait_until_next_period(&sn->task);
smps[0]->length = MIN(STATS_COUNT * 6, smps[0]->capacity);
smps[0]->has = SAMPLE_VALUES;
smps[0]->flags = SAMPLE_HAS_VALUES;
for (int i = 0; i < 6 && (i+1)*STATS_METRICS <= smps[0]->length; i++) {
int tot = hist_total(&s->histograms[i]);

View file

@ -230,7 +230,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
pulled = queue_pull_many(&c->queue, (void **) smps, cnt);
if (pulled > 0) {
io_format_sprint(c->format, c->buffers.send.buf + LWS_PRE, c->buffers.send.size - LWS_PRE, &wbytes, smps, pulled, SAMPLE_ALL);
io_format_sprint(c->format, c->buffers.send.buf + LWS_PRE, c->buffers.send.size - LWS_PRE, &wbytes, smps, pulled, SAMPLE_HAS_ALL);
ret = lws_write(wsi, (unsigned char *) c->buffers.send.buf + LWS_PRE, wbytes, c->format->flags & IO_FORMAT_BINARY ? LWS_WRITE_BINARY : LWS_WRITE_TEXT);
@ -285,7 +285,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
/* Set receive timestamp */
for (int i = 0; i < recvd; i++) {
smps[i]->ts.received = ts_recv;
smps[i]->has |= SAMPLE_RECEIVED;
smps[i]->flags |= SAMPLE_HAS_RECEIVED;
}
ret = queue_signalled_push_many(&w->queue, (void **) smps, recvd);

View file

@ -448,7 +448,7 @@ int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt)
char data[4096];
ret = io_format_sprint(z->format, data, sizeof(data), &wbytes, smps, cnt, SAMPLE_ALL);
ret = io_format_sprint(z->format, data, sizeof(data), &wbytes, smps, cnt, SAMPLE_HAS_ALL);
if (ret <= 0)
return -1;

View file

@ -96,7 +96,7 @@ int sample_copy(struct sample *dst, struct sample *src)
dst->sequence = src->sequence;
dst->format = src->format;
dst->source = src->source;
dst->has = src->has;
dst->flags = src->flags;
dst->ts = src->ts;
@ -115,13 +115,13 @@ int sample_copy_many(struct sample *dsts[], struct sample *srcs[], int cnt)
int sample_cmp(struct sample *a, struct sample *b, double epsilon, int flags)
{
if ((a->has & b->has & flags) != flags) {
printf("missing components: a=%#x, b=%#x, wanted=%#x\n", a->has, b->has, flags);
if ((a->flags & b->flags & flags) != flags) {
printf("flags: a=%#x, b=%#x, wanted=%#x\n", a->flags, b->flags, flags);
return -1;
}
/* Compare sequence no */
if (flags & SAMPLE_SEQUENCE) {
if (flags & SAMPLE_HAS_SEQUENCE) {
if (a->sequence != b->sequence) {
printf("sequence no: %d != %d\n", a->sequence, b->sequence);
return 2;
@ -129,7 +129,7 @@ int sample_cmp(struct sample *a, struct sample *b, double epsilon, int flags)
}
/* Compare timestamp */
if (flags & SAMPLE_ORIGIN) {
if (flags & SAMPLE_HAS_ORIGIN) {
if (time_delta(&a->ts.origin, &b->ts.origin) > epsilon) {
printf("ts.origin: %f != %f\n", time_to_double(&a->ts.origin), time_to_double(&b->ts.origin));
return 3;
@ -137,7 +137,7 @@ int sample_cmp(struct sample *a, struct sample *b, double epsilon, int flags)
}
/* Compare ID */
if (flags & SAMPLE_SOURCE) {
if (flags & SAMPLE_HAS_SOURCE) {
if (a->id != b->id) {
printf("id: %d != %d\n", a->id, b->id);
return 7;
@ -145,7 +145,7 @@ int sample_cmp(struct sample *a, struct sample *b, double epsilon, int flags)
}
/* Compare data */
if (flags & SAMPLE_VALUES) {
if (flags & SAMPLE_HAS_VALUES) {
if (a->length != b->length) {
printf("length: %d != %d\n", a->length, b->length);
return 4;

View file

@ -184,7 +184,7 @@ check: if (optarg == endptr)
if (!p)
error("Unknown IO format '%s'", format);
ret = io_init(&io, &p->io, SAMPLE_ALL);
ret = io_init(&io, &p->io, SAMPLE_HAS_ALL);
if (ret)
error("Failed to initialize IO");

View file

@ -296,7 +296,7 @@ check: if (optarg == endptr)
if (!p)
error("Invalid format: %s", format);
ret = io_init(&io, &p->io, SAMPLE_ALL);
ret = io_init(&io, &p->io, SAMPLE_HAS_ALL);
if (ret)
error("Failed to initialize IO");

View file

@ -136,7 +136,7 @@ int main(int argc, char *argv[])
if (!p)
error("Invalid output format '%s'", format);
ret = io_init(&io, &p->io, IO_FLUSH | (SAMPLE_ALL & ~SAMPLE_OFFSET));
ret = io_init(&io, &p->io, IO_FLUSH | (SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET));
if (ret)
error("Failed to initialize output");

View file

@ -76,7 +76,7 @@ int main(int argc, char *argv[])
/* Default values */
double epsilon = 1e-9;
char *format = "villas-human";
int flags = SAMPLE_SEQUENCE | SAMPLE_VALUES | SAMPLE_ORIGIN;
int flags = SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_VALUES | SAMPLE_HAS_ORIGIN;
struct pool pool = { .state = STATE_DESTROYED };
@ -88,13 +88,13 @@ int main(int argc, char *argv[])
epsilon = strtod(optarg, &endptr);
goto check;
case 'v':
flags &= ~SAMPLE_VALUES;
flags &= ~SAMPLE_HAS_VALUES;
break;
case 't':
flags &= ~SAMPLE_ORIGIN;
flags &= ~SAMPLE_HAS_ORIGIN;
break;
case 's':
flags &= ~SAMPLE_SEQUENCE;
flags &= ~SAMPLE_HAS_SEQUENCE;
break;
case 'f':
format = optarg;