1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

formats: adapt to new signal code and separate node-type configuration into in/out sections

This commit is contained in:
Steffen Vogel 2018-08-20 18:30:24 +02:00
parent 3144d465f1
commit 86003e0b78
9 changed files with 696 additions and 393 deletions

View file

@ -31,33 +31,34 @@
#include <villas/signal.h>
#include <villas/timing.h>
static size_t csv_sprint_single(struct io *io, char *buf, size_t len, struct sample *s)
static size_t csv_sprint_single(struct io *io, char *buf, size_t len, const struct sample *smp)
{
size_t off = 0;
struct signal *sig;
if (io->flags & SAMPLE_HAS_ORIGIN)
off += snprintf(buf + off, len - off, "%ld%c%09ld", s->ts.origin.tv_sec, io->separator, s->ts.origin.tv_nsec);
if (io->flags & SAMPLE_HAS_TS_ORIGIN)
off += snprintf(buf + off, len - off, "%ld%c%09ld", smp->ts.origin.tv_sec, io->separator, smp->ts.origin.tv_nsec);
else
off += snprintf(buf + off, len - off, "nan%cnan", io->separator);
if (io->flags & SAMPLE_HAS_RECEIVED)
off += snprintf(buf + off, len - off, "%c%.09f", io->separator, time_delta(&s->ts.origin, &s->ts.received));
if (io->flags & SAMPLE_HAS_TS_RECEIVED)
off += snprintf(buf + off, len - off, "%c%.09f", io->separator, time_delta(&smp->ts.origin, &smp->ts.received));
else
off += snprintf(buf + off, len - off, "%cnan", io->separator);
if (io->flags & SAMPLE_HAS_SEQUENCE)
off += snprintf(buf + off, len - off, "%c%" PRIu64, io->separator, s->sequence);
off += snprintf(buf + off, len - off, "%c%" PRIu64, io->separator, smp->sequence);
else
off += snprintf(buf + off, len - off, "%cnan", io->separator);
for (int i = 0; i < s->length; i++) {
switch ((s->format >> i) & 0x1) {
case SAMPLE_DATA_FORMAT_FLOAT:
off += snprintf(buf + off, len - off, "%c%.6f", io->separator, s->data[i].f);
break;
case SAMPLE_DATA_FORMAT_INT:
off += snprintf(buf + off, len - off, "%c%" PRId64, io->separator, s->data[i].i);
if (io->flags & SAMPLE_HAS_DATA) {
for (int i = 0; i < smp->length; i++) {
sig = list_at_safe(smp->signals, i);
if (!sig)
break;
off += snprintf(buf + off, len - off, "%c", io->separator);
off += signal_data_snprint(&smp->data[i], sig, buf + off, len - off);
}
}
@ -66,26 +67,28 @@ static size_t csv_sprint_single(struct io *io, char *buf, size_t len, struct sam
return off;
}
static size_t csv_sscan_single(struct io *io, const char *buf, size_t len, struct sample *s)
static size_t csv_sscan_single(struct io *io, const char *buf, size_t len, struct sample *smp)
{
int ret, i = 0;
const char *ptr = buf;
char *end;
char *end, *next;
s->flags = 0;
smp->flags = 0;
smp->signals = io->signals;
s->ts.origin.tv_sec = strtoul(ptr, &end, 10);
smp->ts.origin.tv_sec = strtoul(ptr, &end, 10);
if (end == ptr || *end == io->delimiter)
goto out;
ptr = end + 1;
s->ts.origin.tv_nsec = strtoul(ptr, &end, 10);
smp->ts.origin.tv_nsec = strtoul(ptr, &end, 10);
if (end == ptr || *end == io->delimiter)
goto out;
ptr = end + 1;
s->flags |= SAMPLE_HAS_ORIGIN;
smp->flags |= SAMPLE_HAS_TS_ORIGIN;
double offset __attribute__((unused)) = strtof(ptr, &end);
if (end == ptr || *end == io->delimiter)
@ -93,52 +96,51 @@ static size_t csv_sscan_single(struct io *io, const char *buf, size_t len, struc
ptr = end + 1;
s->sequence = strtoul(ptr, &end, 10);
smp->sequence = strtoul(ptr, &end, 10);
if (end == ptr || *end == io->delimiter)
goto out;
s->flags |= SAMPLE_HAS_SEQUENCE;
smp->flags |= SAMPLE_HAS_SEQUENCE;
for (ptr = end + 1, s->length = 0;
s->length < s->capacity;
ptr = end + 1, s->length++) {
for (ptr = end + 1, i = 0; i < smp->capacity; ptr = end + 1, i++) {
if (*end == io->delimiter)
goto out;
//determine format (int or double) of current number starting at ptr
char * next_seperator = strchr(ptr, io->separator);
if(next_seperator == NULL){
//the last element of a row
next_seperator = strchr(ptr, io->delimiter);
struct signal *sig = (struct signal *) list_at_safe(smp->signals, i);
if (!sig)
goto out;
/* Perform signal detection only once */
if (sig->type == SIGNAL_TYPE_AUTO) {
/* Find end of the current column */
next = strpbrk(ptr, (char[]) { io->separator, io->delimiter, 0 });
if (next == NULL)
goto out;
/* Copy value to temporary '\0' terminated buffer */
size_t len = next - ptr;
char val[len+1];
strncpy(val, ptr, len);
val[len] = '\0';
sig->type = signal_type_detect(val);
debug(LOG_IO | 5, "Learned data type for index %u: %s", i, signal_type_to_str(sig->type));
}
char number[100];
strncpy(number, ptr, next_seperator-ptr);
char * contains_dot = strstr(number, ".");
if(contains_dot == NULL){
//no dot in string number --> number is an integer
s->data[s->length].i = strtol(ptr, &end, 10);
sample_set_data_format(s, s->length, SAMPLE_DATA_FORMAT_INT);
}
else{
//dot in string number --> number is a floating point value
s->data[s->length].f = strtod(ptr, &end);
sample_set_data_format(s, s->length, SAMPLE_DATA_FORMAT_FLOAT);
}
/* There are no valid values anymore. */
if (end == ptr)
ret = signal_data_parse_str(&smp->data[i], sig, ptr, &end);
if (ret || end == ptr) /* There are no valid values anymore. */
goto out;
}
out: if (*end == io->delimiter)
end++;
if (s->length > 0)
s->flags |= SAMPLE_HAS_VALUES;
smp->length = i;
if (smp->length > 0)
smp->flags |= SAMPLE_HAS_DATA;
return end - buf;
}
@ -157,7 +159,7 @@ int csv_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, struct samp
return i;
}
int csv_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt)
int csv_sscan(struct io *io, const char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt)
{
int i;
size_t off = 0;
@ -171,24 +173,23 @@ int csv_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, struct sampl
return i;
}
void csv_header(struct io *io)
void csv_header(struct io *io, const struct sample *smp)
{
FILE *f = io_stream_output(io);
fprintf(f, "# secs%cnsecs%coffset%csequence", io->separator, io->separator, io->separator);
if (io->output.signals) {
for (int i = 0; i < list_length(io->output.signals); i++) {
struct signal *s = (struct signal *) list_at(io->output.signals, i);
for (int i = 0; i < smp->length; i++) {
struct signal *sig = (struct signal *) list_at(smp->signals, i);
fprintf(f, "%c%s", io->separator, s->name);
if (sig->name)
fprintf(f, "%c%s", io->separator, sig->name);
else
fprintf(f, "%csignal%d", io->separator, i);
if (s->unit)
fprintf(f, "[%s]", s->unit);
}
if (sig->unit)
fprintf(f, "[%s]", sig->unit);
}
else
fprintf(f, "%cdata[]", io->separator);
fprintf(f, "%c", io->delimiter);
}
@ -216,7 +217,8 @@ static struct plugin p2 = {
.sscan = csv_sscan,
.header = csv_header,
.size = 0,
.flags = IO_NEWLINES,
.flags = IO_NEWLINES | IO_AUTO_DETECT_FORMAT |
SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_DATA,
.separator = ','
}
};

View file

@ -23,17 +23,41 @@
#include <villas/plugin.h>
#include <villas/sample.h>
#include <villas/compat.h>
#include <villas/signal.h>
#include <villas/io.h>
#include <villas/formats/json.h>
static enum signal_type json_detect_format(json_t *val)
{
int type = json_typeof(val);
switch (type) {
case JSON_REAL:
return SIGNAL_TYPE_FLOAT;
case JSON_INTEGER:
return SIGNAL_TYPE_INTEGER;
case JSON_TRUE:
case JSON_FALSE:
return SIGNAL_TYPE_BOOLEAN;
case JSON_OBJECT:
return SIGNAL_TYPE_COMPLEX; /* must be a complex number */
default:
return SIGNAL_TYPE_AUTO;
}
}
static json_t * json_pack_timestamps(struct sample *smp)
{
json_t *json_ts = json_object();
if (smp->flags & SAMPLE_HAS_ORIGIN)
if (smp->flags & SAMPLE_HAS_TS_ORIGIN)
json_object_set(json_ts, "origin", json_pack("[ I, I ]", smp->ts.origin.tv_sec, smp->ts.origin.tv_nsec));
if (smp->flags & SAMPLE_HAS_RECEIVED)
if (smp->flags & SAMPLE_HAS_TS_RECEIVED)
json_object_set(json_ts, "received", json_pack("[ I, I ]", smp->ts.received.tv_sec, smp->ts.received.tv_nsec));
return json_ts;
@ -55,7 +79,7 @@ static int json_unpack_timestamps(json_t *json_ts, struct sample *smp)
if (ret)
return ret;
smp->flags |= SAMPLE_HAS_ORIGIN;
smp->flags |= SAMPLE_HAS_TS_ORIGIN;
}
if (json_ts_received) {
@ -63,7 +87,7 @@ static int json_unpack_timestamps(json_t *json_ts, struct sample *smp)
if (ret)
return ret;
smp->flags |= SAMPLE_HAS_RECEIVED;
smp->flags |= SAMPLE_HAS_TS_RECEIVED;
}
return 0;
@ -82,13 +106,38 @@ static int json_pack_sample(struct io *io, json_t **j, struct sample *smp)
json_object_set(json_smp, "sequence", json_sequence);
}
if (smp->flags & SAMPLE_HAS_VALUES) {
if (smp->flags & SAMPLE_HAS_DATA) {
json_t *json_data = json_array();
for (int i = 0; i < smp->length; i++) {
json_t *json_value = sample_get_data_format(smp, i)
? json_integer(smp->data[i].i)
: json_real(smp->data[i].f);
enum signal_type fmt = sample_format(smp, i);
json_t *json_value;
switch (fmt) {
case SIGNAL_TYPE_INTEGER:
json_value = json_integer(smp->data[i].i);
break;
case SIGNAL_TYPE_FLOAT:
json_value = json_real(smp->data[i].f);
break;
case SIGNAL_TYPE_BOOLEAN:
json_value = json_boolean(smp->data[i].b);
break;
case SIGNAL_TYPE_COMPLEX:
json_value = json_pack("{ s: f, s: f }",
"real", creal(smp->data[i].z),
"imag", cimag(smp->data[i].z)
);
break;
case SIGNAL_TYPE_INVALID:
case SIGNAL_TYPE_AUTO:
json_value = json_null(); /* Unknown type */
break;
}
json_array_append(json_data, json_value);
}
@ -129,6 +178,8 @@ static int json_unpack_sample(struct io *io, json_t *json_smp, struct sample *sm
size_t i;
int64_t sequence = -1;
smp->signals = io->signals;
ret = json_unpack_ex(json_smp, &err, 0, "{ s?: o, s?: I, s: o }",
"ts", &json_ts,
"sequence", &sequence,
@ -157,26 +208,30 @@ static int json_unpack_sample(struct io *io, json_t *json_smp, struct sample *sm
if (i >= smp->capacity)
break;
switch (json_typeof(json_value)) {
case JSON_REAL:
smp->data[i].f = json_real_value(json_value);
sample_set_data_format(smp, i, SAMPLE_DATA_FORMAT_FLOAT);
break;
struct signal *sig = list_at_safe(smp->signals, i);
if (!sig)
return -1;
case JSON_INTEGER:
smp->data[i].i = json_integer_value(json_value);
sample_set_data_format(smp, i, SAMPLE_DATA_FORMAT_INT);
break;
default:
return -2;
enum signal_type fmt = json_detect_format(json_value);
if (sig->type == SIGNAL_TYPE_AUTO) {
debug(LOG_IO | 5, "Learned data type for index %zu: %s", i, signal_type_to_str(fmt));
sig->type = fmt;
}
else if (sig->type != fmt) {
error("Received invalid data type in JSON payload: Received %s, expected %s for signal %s (index %zu).",
signal_type_to_str(fmt), signal_type_to_str(sig->type), sig->name, i);
return -2;
}
ret = signal_data_parse_json(&smp->data[i], sig, json_value);
if (ret)
return -3;
smp->length++;
}
if (smp->length > 0)
smp->flags |= SAMPLE_HAS_VALUES;
smp->flags |= SAMPLE_HAS_DATA;
return 0;
}
@ -222,7 +277,7 @@ int json_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, struct sam
return ret;
}
int json_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt)
int json_sscan(struct io *io, const char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt)
{
int ret;
json_t *json;
@ -302,7 +357,9 @@ static struct plugin p = {
.sscan = json_sscan,
.sprint = json_sprint,
.size = 0,
.delimiter = '\n'
.delimiter = '\n',
.flags = IO_AUTO_DETECT_FORMAT |
SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_DATA
},
};

View file

@ -38,9 +38,8 @@ static int json_reserve_pack_sample(struct io *io, json_t **j, struct sample *sm
json_error_t err;
json_t *json_data, *json_name, *json_unit, *json_value;
json_t *json_created = NULL, *json_sequence = NULL;
struct signal *sig;
if (smp->flags & SAMPLE_HAS_ORIGIN)
if (smp->flags & SAMPLE_HAS_TS_ORIGIN)
json_created = json_integer(time_to_double(&smp->ts.origin) * 1e3);
if (smp->flags & SAMPLE_HAS_SEQUENCE)
@ -49,26 +48,26 @@ static int json_reserve_pack_sample(struct io *io, json_t **j, struct sample *sm
json_data = json_array();
for (int i = 0; i < smp->length; i++) {
if (io->out.signals)
sig = (struct signal *) list_at_safe(io->out.signals, i);
else
sig = NULL;
struct signal *sig;
if (sig) {
if (!sig->enabled)
continue;
sig = list_at_safe(smp->signals, i);
if (!sig)
return -1;
if (sig->name)
json_name = json_string(sig->name);
json_unit = json_string(sig->unit);
}
else {
char name[32];
snprintf(name, 32, "signal_%d", i);
json_name = json_string(name);
json_unit = NULL;
}
if (sig->unit)
json_unit = json_string(sig->unit);
else
json_unit = NULL;
json_value = json_pack_ex(&err, 0, "{ s: o, s: f }",
"name", json_name,
"value", smp->data[i].f
@ -96,7 +95,7 @@ static int json_reserve_pack_sample(struct io *io, json_t **j, struct sample *sm
);
if (*j == NULL)
return -1;
#if 0
#ifdef JSON_RESERVE_INTEGER_TARGET
if (io->out.node) {
char *endptr;
@ -113,6 +112,7 @@ static int json_reserve_pack_sample(struct io *io, json_t **j, struct sample *sm
#else
if (io->out.node)
json_object_set_new(*j, "target", json_string(io->out.node->name));
#endif
#endif
return 0;
@ -136,6 +136,7 @@ static int json_reserve_unpack_sample(struct io *io, json_t *json_smp, struct sa
if (ret)
return -1;
#if 0
#ifdef JSON_RESERVE_INTEGER_TARGET
if (json_target && io->in.node) {
if (!json_is_integer(json_target))
@ -163,7 +164,7 @@ static int json_reserve_unpack_sample(struct io *io, json_t *json_smp, struct sa
return 0;
}
#endif
#endif
if (!json_data || !json_is_array(json_data))
return -1;
@ -185,12 +186,12 @@ static int json_reserve_unpack_sample(struct io *io, json_t *json_smp, struct sa
struct signal *sig;
sig = (struct signal *) list_lookup(io->in.signals, name);
sig = (struct signal *) list_lookup(io->signals, name);
if (sig) {
if (!sig->enabled)
continue;
idx = list_index(io->in.signals, sig);
idx = list_index(io->signals, sig);
}
else {
ret = sscanf(name, "signal_%d", &idx);
@ -207,11 +208,11 @@ static int json_reserve_unpack_sample(struct io *io, json_t *json_smp, struct sa
}
if (smp->length > 0)
smp->flags |= SAMPLE_HAS_VALUES;
smp->flags |= SAMPLE_HAS_DATA;
if (created > 0) {
smp->ts.origin = time_from_double(created * 1e-3);
smp->flags |= SAMPLE_HAS_ORIGIN;
smp->flags |= SAMPLE_HAS_TS_ORIGIN;
}
return smp->length > 0 ? 1 : 0;
@ -243,7 +244,7 @@ int json_reserve_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, st
return ret;
}
int json_reserve_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt)
int json_reserve_sscan(struct io *io, const char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt)
{
int ret;
json_t *json;

View file

@ -25,7 +25,9 @@
#include <villas/formats/msg.h>
#include <villas/formats/msg_format.h>
#include <villas/sample.h>
#include <villas/signal.h>
#include <villas/utils.h>
#include <villas/list.h>
void msg_ntoh(struct msg *m)
{
@ -63,7 +65,7 @@ int msg_verify(struct msg *m)
{
if (m->version != MSG_VERSION)
return -1;
else if (m->type != MSG_TYPE_DATA)
else if (m->type != MSG_TYPE_DATA)
return -2;
else if (m->rsvd1 != 0)
return -3;
@ -71,7 +73,7 @@ int msg_verify(struct msg *m)
return 0;
}
int msg_to_sample(struct msg *msg, struct sample *smp)
int msg_to_sample(struct msg *msg, struct sample *smp, struct list *signals)
{
int ret;
@ -79,23 +81,32 @@ int msg_to_sample(struct msg *msg, struct sample *smp)
if (ret)
return -1;
smp->flags = SAMPLE_HAS_ORIGIN | SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_VALUES | SAMPLE_HAS_ID;
smp->flags = SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_DATA;
smp->length = MIN(msg->length, smp->capacity);
smp->sequence = msg->sequence;
smp->ts.origin = MSG_TS(msg);
smp->format = 0;
for (int i = 0; i < smp->length; i++) {
switch (sample_get_data_format(smp, i)) {
case SAMPLE_DATA_FORMAT_FLOAT: smp->data[i].f = msg->data[i].f; break;
case SAMPLE_DATA_FORMAT_INT: smp->data[i].i = msg->data[i].i; break;
struct signal *sig = list_at(signals, i);
switch (sig->type) {
case SIGNAL_TYPE_FLOAT:
smp->data[i].f = msg->data[i].f;
break;
case SIGNAL_TYPE_INTEGER:
smp->data[i].i = msg->data[i].i;
break;
default:
return -1;
}
}
return 0;
}
int msg_from_sample(struct msg *msg, struct sample *smp)
int msg_from_sample(struct msg *msg, struct sample *smp, struct list *signals)
{
*msg = MSG_INIT(smp->length, smp->sequence);
@ -103,9 +114,19 @@ int msg_from_sample(struct msg *msg, struct sample *smp)
msg->ts.nsec = smp->ts.origin.tv_nsec;
for (int i = 0; i < smp->length; i++) {
switch (sample_get_data_format(smp, i)) {
case SAMPLE_DATA_FORMAT_FLOAT: msg->data[i].f = smp->data[i].f; break;
case SAMPLE_DATA_FORMAT_INT: msg->data[i].i = smp->data[i].i; break;
struct signal *sig = list_at(signals, i);
switch (sig->type) {
case SIGNAL_TYPE_FLOAT:
msg->data[i].f = smp->data[i].f;
break;
case SIGNAL_TYPE_INTEGER:
msg->data[i].i = smp->data[i].i;
break;
default:
return -1;
}
}

View file

@ -24,9 +24,32 @@
#include <villas.pb-c.h>
#include <villas/sample.h>
#include <villas/signal.h>
#include <villas/io.h>
#include <villas/plugin.h>
#include <villas/formats/protobuf.h>
static enum signal_type protobuf_detect_format(Villas__Node__Value *val)
{
switch (val->value_case) {
case VILLAS__NODE__VALUE__VALUE_F:
return SIGNAL_TYPE_FLOAT;
case VILLAS__NODE__VALUE__VALUE_I:
return SIGNAL_TYPE_INTEGER;
case VILLAS__NODE__VALUE__VALUE_B:
return SIGNAL_TYPE_BOOLEAN;
case VILLAS__NODE__VALUE__VALUE_Z:
return SIGNAL_TYPE_COMPLEX;
case VILLAS__NODE__VALUE__VALUE__NOT_SET:
default:
return SIGNAL_TYPE_INVALID;
}
}
int protobuf_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt)
{
unsigned psz;
@ -50,7 +73,7 @@ int protobuf_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, struct
pb_smp->sequence = smp->sequence;
}
if (smp->flags & SAMPLE_HAS_ORIGIN) {
if (smp->flags & SAMPLE_HAS_TS_ORIGIN) {
pb_smp->timestamp = alloc(sizeof(Villas__Node__Timestamp));
villas__node__timestamp__init(pb_smp->timestamp);
@ -65,12 +88,37 @@ int protobuf_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, struct
Villas__Node__Value *pb_val = pb_smp->values[j] = alloc(sizeof(Villas__Node__Value));
villas__node__value__init(pb_val);
enum sample_data_format fmt = sample_get_data_format(smp, j);
enum signal_type fmt = sample_format(smp, j);
switch (fmt) {
case SAMPLE_DATA_FORMAT_FLOAT: pb_val->value_case = VILLAS__NODE__VALUE__VALUE_F; pb_val->f = smp->data[j].f; break;
case SAMPLE_DATA_FORMAT_INT: pb_val->value_case = VILLAS__NODE__VALUE__VALUE_I; pb_val->i = smp->data[j].i; break;
default: pb_val->value_case = VILLAS__NODE__VALUE__VALUE__NOT_SET; break;
case SIGNAL_TYPE_FLOAT:
pb_val->value_case = VILLAS__NODE__VALUE__VALUE_F;
pb_val->f = smp->data[j].f;
break;
case SIGNAL_TYPE_INTEGER:
pb_val->value_case = VILLAS__NODE__VALUE__VALUE_I;
pb_val->i = smp->data[j].i;
break;
case SIGNAL_TYPE_BOOLEAN:
pb_val->value_case = VILLAS__NODE__VALUE__VALUE_B;
pb_val->b = smp->data[j].b;
break;
case SIGNAL_TYPE_COMPLEX:
pb_val->value_case = VILLAS__NODE__VALUE__VALUE_Z;
pb_val->z = alloc(sizeof(Villas__Node__Complex));
villas__node__complex__init(pb_val->z);
pb_val->z->real = creal(smp->data[j].z);
pb_val->z->imag = cimag(smp->data[j].z);
break;
case SIGNAL_TYPE_AUTO:
case SIGNAL_TYPE_INVALID:
pb_val->value_case = VILLAS__NODE__VALUE__VALUE__NOT_SET;
break;
}
}
}
@ -93,22 +141,24 @@ out:
return -1;
}
int protobuf_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt)
int protobuf_sscan(struct io *io, const char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt)
{
unsigned i, j;
Villas__Node__Message *pb_msg;
pb_msg = villas__node__message__unpack(NULL, len, (uint8_t *) buf);
if (!pb_msg)
return -1;
for (i = 0; i < MIN(pb_msg->n_samples, cnt); i++) {
struct sample *smp = smps[i];
Villas__Node__Sample *pb_smp = pb_msg->samples[i];
smp->flags = SAMPLE_HAS_FORMAT;
smp->signals = io->signals;
if (pb_smp->type != VILLAS__NODE__SAMPLE__TYPE__DATA) {
warn("Parsed non supported message type");
break;
warn("Parsed non supported message type. Skipping");
continue;
}
if (pb_smp->has_sequence) {
@ -117,7 +167,7 @@ int protobuf_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, struct
}
if (pb_smp->timestamp) {
smp->flags |= SAMPLE_HAS_ORIGIN;
smp->flags |= SAMPLE_HAS_TS_ORIGIN;
smp->ts.origin.tv_sec = pb_smp->timestamp->sec;
smp->ts.origin.tv_nsec = pb_smp->timestamp->nsec;
}
@ -125,21 +175,45 @@ int protobuf_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, struct
for (j = 0; j < MIN(pb_smp->n_values, smp->capacity); j++) {
Villas__Node__Value *pb_val = pb_smp->values[j];
enum sample_data_format fmt = pb_val->value_case == VILLAS__NODE__VALUE__VALUE_F
? SAMPLE_DATA_FORMAT_FLOAT
: SAMPLE_DATA_FORMAT_INT;
enum signal_type fmt = protobuf_detect_format(pb_val);
switch (fmt) {
case SAMPLE_DATA_FORMAT_FLOAT: smp->data[j].f = pb_val->f; break;
case SAMPLE_DATA_FORMAT_INT: smp->data[j].i = pb_val->i; break;
default: { }
struct signal *sig = (struct signal *) list_at_safe(smp->signals, j);
if (!sig)
return -1;
if (sig->type == SIGNAL_TYPE_AUTO) {
debug(LOG_IO | 5, "Learned data type for index %u: %s", j, signal_type_to_str(fmt));
sig->type = fmt;
}
else if (sig->type != fmt) {
error("Received invalid data type in Protobuf payload: Received %s, expected %s for signal %s (index %u).",
signal_type_to_str(fmt), signal_type_to_str(sig->type), sig->name, i);
return -2;
}
sample_set_data_format(smp, j, fmt);
switch (sig->type) {
case SIGNAL_TYPE_FLOAT:
smp->data[j].f = pb_val->f;
break;
case SIGNAL_TYPE_INTEGER:
smp->data[j].i = pb_val->i;
break;
case SIGNAL_TYPE_BOOLEAN:
smp->data[j].b = pb_val->b;
break;
case SIGNAL_TYPE_COMPLEX:
smp->data[j].z = CMPLXF(pb_val->z->real, pb_val->z->imag);
break;
default: { }
}
}
if (pb_smp->n_values > 0)
smp->flags |= SAMPLE_HAS_VALUES;
smp->flags |= SAMPLE_HAS_DATA;
smp->length = j;
}
@ -158,7 +232,9 @@ static struct plugin p = {
.type = PLUGIN_TYPE_FORMAT,
.format = {
.sprint = protobuf_sprint,
.sscan = protobuf_sscan
.sscan = protobuf_sscan,
.flags = IO_AUTO_DETECT_FORMAT | IO_HAS_BINARY_PAYLOAD |
SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_DATA
}
};
REGISTER_PLUGIN(&p);

View file

@ -27,214 +27,342 @@
#include <villas/formats/raw.h>
#include <villas/compat.h>
/** Convert float to host byte order */
#define SWAP_FLT_TOH(o, n) ({ \
union { float f; uint32_t i; } x = { .f = n }; \
x.i = (o) ? be32toh(x.i) : le32toh(x.i); x.f; \
})
typedef float flt32_t;
typedef double flt64_t;
typedef long double flt128_t; /** @todo: check */
/** Convert double to host byte order */
#define SWAP_DBL_TOH(o, n) ({ \
union { float f; uint64_t i; } x = { .f = n }; \
x.i = (o) ? be64toh(x.i) : le64toh(x.i); x.f; \
})
/** Convert float to big/little endian byte order */
#define SWAP_FLT_TOE(o, n) ({ \
union { float f; uint32_t i; } x = { .f = n }; \
x.i = (o) ? htobe32(x.i) : htole32(x.i); x.f; \
#define SWAP_FLOAT_XTOH(o, b, n) ({ \
union { flt ## b ## _t f; uint ## b ## _t i; } x = { .f = n }; \
x.i = (o) ? be ## b ## toh(x.i) : le ## b ## toh(x.i); \
x.f; \
})
/** Convert double to big/little endian byte order */
#define SWAP_DBL_TOE(o, n) ({ \
union { double f; uint64_t i; } x = { .f = n }; \
x.i = (o) ? htobe64(x.i) : htole64(x.i); x.f; \
#define SWAP_FLOAT_HTOX(o, b, n) ({ \
union { flt ## b ## _t f; uint ## b ## _t i; } x = { .f = n }; \
x.i = (o) ? htobe ## b (x.i) : htole ## b (x.i); \
x.f; \
})
/** Convert integer of varying width to host byte order */
#define SWAP_INT_TOH(o, b, n) (o ? be ## b ## toh(n) : le ## b ## toh(n))
#define SWAP_INT_XTOH(o, b, n) (o ? be ## b ## toh(n) : le ## b ## toh(n))
/** Convert integer of varying width to big/little endian byte order */
#define SWAP_INT_TOE(o, b, n) (o ? htobe ## b (n) : htole ## b (n))
#define SWAP_INT_HTOX(o, b, n) (o ? htobe ## b (n) : htole ## b (n))
int raw_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt)
{
int i, o = 0;
int o = 0;
size_t nlen;
int8_t *i8 = (void *) buf;
int16_t *i16 = (void *) buf;
int32_t *i32 = (void *) buf;
int64_t *i64 = (void *) buf;
float *f32 = (void *) buf;
double *f64 = (void *) buf;
int8_t *i8 = (void *) buf;
int16_t *i16 = (void *) buf;
int32_t *i32 = (void *) buf;
int64_t *i64 = (void *) buf;
float *f32 = (void *) buf;
double *f64 = (void *) buf;
#ifdef HAS_128BIT
__int128 *i128 = (void *) buf;
__float128 *f128 = (void *) buf;
#endif
int bits = 1 << (io->flags >> 24);
for (i = 0; i < cnt; i++) {
nlen = (smps[i]->length + o + (io->flags & RAW_FAKE) ? 3 : 0) * (bits / 8);
if (nlen >= len)
break;
/* First three values are sequence, seconds and nano-seconds timestamps */
if (io->flags & RAW_FAKE) {
switch (bits) {
case 32:
i32[o++] = SWAP_INT_TOE(io->flags & RAW_BE_HDR, 32, smps[i]->sequence);
i32[o++] = SWAP_INT_TOE(io->flags & RAW_BE_HDR, 32, smps[i]->ts.origin.tv_sec);
i32[o++] = SWAP_INT_TOE(io->flags & RAW_BE_HDR, 32, smps[i]->ts.origin.tv_nsec);
break;
case 64:
i64[o++] = SWAP_INT_TOE(io->flags & RAW_BE_HDR, 64, smps[i]->sequence);
i64[o++] = SWAP_INT_TOE(io->flags & RAW_BE_HDR, 64, smps[i]->ts.origin.tv_sec);
i64[o++] = SWAP_INT_TOE(io->flags & RAW_BE_HDR, 64, smps[i]->ts.origin.tv_nsec);
break;
}
}
enum raw_format {
RAW_FORMAT_INT,
RAW_FORMAT_FLT
};
for (int j = 0; j < smps[i]->length; j++) {
enum sample_data_format smp_fmt;
enum raw_format raw_fmt;
union { double f; uint64_t i; } val;
if (io->flags & RAW_AUTO)
raw_fmt = smps[i]->format & (1 << i) ? RAW_FORMAT_INT : RAW_FORMAT_FLT;
else if (io->flags & RAW_FLT)
raw_fmt = RAW_FORMAT_FLT;
else
raw_fmt = RAW_FORMAT_INT;
smp_fmt = sample_get_data_format(smps[i], j);
switch (raw_fmt) {
case RAW_FORMAT_FLT:
switch (smp_fmt) {
case SAMPLE_DATA_FORMAT_INT: val.f = smps[i]->data[j].i; break;
case SAMPLE_DATA_FORMAT_FLOAT: val.f = smps[i]->data[j].f; break;
default: val.f = -1; break;
}
switch (bits) {
case 32: f32[o++] = SWAP_FLT_TOE(io->flags & RAW_BE_FLT, val.f); break;
case 64: f64[o++] = SWAP_DBL_TOE(io->flags & RAW_BE_FLT, val.f); break;
}
break;
case RAW_FORMAT_INT:
switch (smp_fmt) {
case SAMPLE_DATA_FORMAT_INT: val.i = smps[i]->data[j].i; break;
case SAMPLE_DATA_FORMAT_FLOAT: val.i = smps[i]->data[j].f; break;
default: val.i = -1; break;
}
switch (bits) {
case 8: i8 [o++] = val.i; break;
case 16: i16[o++] = SWAP_INT_TOE(io->flags & RAW_BE_INT, 16, val.i); break;
case 32: i32[o++] = SWAP_INT_TOE(io->flags & RAW_BE_INT, 32, val.i); break;
case 64: i64[o++] = SWAP_INT_TOE(io->flags & RAW_BE_INT, 64, val.i); break;
}
break;
}
}
}
if (wbytes)
*wbytes = o * (bits / 8);
return i;
}
int raw_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt)
{
/* The raw format can not encode multiple samples in one buffer
* as there is no support for framing. */
struct sample *smp = smps[0];
int8_t *i8 = (void *) buf;
int16_t *i16 = (void *) buf;
int32_t *i32 = (void *) buf;
int64_t *i64 = (void *) buf;
float *f32 = (void *) buf;
double *f64 = (void *) buf;
if (cnt > 1)
return -1;
int off, bits = 1 << (io->flags >> 24);
smp->length = len / (bits / 8);
if (io->flags & RAW_FAKE) {
off = 3;
if (smp->length < off) {
warn("Received a packet with no fake header. Skipping...");
return 0;
}
smp->length -= off;
/* First three values are sequence, seconds and nano-seconds timestamps
*
* These fields are always encoded as integers!
*/
if (io->flags & RAW_FAKE_HEADER) {
/* Check length */
nlen = (o + 3) * (bits / 8);
if (nlen >= len)
goto out;
switch (bits) {
case 8:
i8[o++] = smp->sequence;
i8[o++] = smp->ts.origin.tv_sec;
i8[o++] = smp->ts.origin.tv_nsec;
break;
case 16:
i16[o++] = SWAP_INT_HTOX(io->flags & RAW_BIG_ENDIAN, 16, smp->sequence);
i16[o++] = SWAP_INT_HTOX(io->flags & RAW_BIG_ENDIAN, 16, smp->ts.origin.tv_sec);
i16[o++] = SWAP_INT_HTOX(io->flags & RAW_BIG_ENDIAN, 16, smp->ts.origin.tv_nsec);
break;
case 32:
smp->sequence = SWAP_INT_TOH(io->flags & RAW_BE_HDR, 32, i32[0]);
smp->ts.origin.tv_sec = SWAP_INT_TOH(io->flags & RAW_BE_HDR, 32, i32[1]);
smp->ts.origin.tv_nsec = SWAP_INT_TOH(io->flags & RAW_BE_HDR, 32, i32[2]);
i32[o++] = SWAP_INT_HTOX(io->flags & RAW_BIG_ENDIAN, 32, smp->sequence);
i32[o++] = SWAP_INT_HTOX(io->flags & RAW_BIG_ENDIAN, 32, smp->ts.origin.tv_sec);
i32[o++] = SWAP_INT_HTOX(io->flags & RAW_BIG_ENDIAN, 32, smp->ts.origin.tv_nsec);
break;
case 64:
smp->sequence = SWAP_INT_TOH(io->flags & RAW_BE_HDR, 64, i64[0]);
smp->ts.origin.tv_sec = SWAP_INT_TOH(io->flags & RAW_BE_HDR, 64, i64[1]);
smp->ts.origin.tv_nsec = SWAP_INT_TOH(io->flags & RAW_BE_HDR, 64, i64[2]);
i64[o++] = SWAP_INT_HTOX(io->flags & RAW_BIG_ENDIAN, 64, smp->sequence);
i64[o++] = SWAP_INT_HTOX(io->flags & RAW_BIG_ENDIAN, 64, smp->ts.origin.tv_sec);
i64[o++] = SWAP_INT_HTOX(io->flags & RAW_BIG_ENDIAN, 64, smp->ts.origin.tv_nsec);
break;
#ifdef HAS_128BIT
case 128:
i128[o++] = SWAP_INT_TO_LE(io->flags & RAW_BIG_ENDIAN, 128, smp->sequence);
i128[o++] = SWAP_INT_TO_LE(io->flags & RAW_BIG_ENDIAN, 128, smp->ts.origin.tv_sec);
i128[o++] = SWAP_INT_TO_LE(io->flags & RAW_BIG_ENDIAN, 128, smp->ts.origin.tv_nsec);
break;
#endif
}
}
for (int j = 0; j < smp->length; j++) {
enum signal_type fmt = sample_format(smp, j);
union signal_data *data = &smp->data[j];
/* Check length */
nlen = (o + fmt == SIGNAL_TYPE_COMPLEX ? 2 : 1) * (bits / 8);
if (nlen >= len)
goto out;
switch (fmt) {
case SIGNAL_TYPE_FLOAT:
switch (bits) {
case 8: i8 [o++] = -1; break; /* Not supported */
case 16: i16[o++] = -1; break; /* Not supported */
case 32: f32[o++] = SWAP_FLOAT_HTOX(io->flags & RAW_BIG_ENDIAN, 32, data->f); break;
case 64: f64[o++] = SWAP_FLOAT_HTOX(io->flags & RAW_BIG_ENDIAN, 64, data->f); break;
#ifdef HAS_128BIT
case 128: f128[o++] = SWAP_FLOAT_HTOX(io->flags & RAW_BIG_ENDIAN, 128, data->f); break;
#endif
}
break;
case SIGNAL_TYPE_INTEGER:
switch (bits) {
case 8: i8 [o++] = data->i; break;
case 16: i16[o++] = SWAP_INT_HTOX(io->flags & RAW_BIG_ENDIAN, 16, data->i); break;
case 32: i32[o++] = SWAP_INT_HTOX(io->flags & RAW_BIG_ENDIAN, 32, data->i); break;
case 64: i64[o++] = SWAP_INT_HTOX(io->flags & RAW_BIG_ENDIAN, 64, data->i); break;
#ifdef HAS_128BIT
case 128: i128[o++] = SWAP_INT_HTOX(io->flags & RAW_BIG_ENDIAN, 128, data->i); break;
#endif
}
break;
case SIGNAL_TYPE_BOOLEAN:
switch (bits) {
case 8: i8 [o++] = data->b ? 1 : 0; break;
case 16: i16[o++] = SWAP_INT_HTOX(io->flags & RAW_BIG_ENDIAN, 16, data->b ? 1 : 0); break;
case 32: i32[o++] = SWAP_INT_HTOX(io->flags & RAW_BIG_ENDIAN, 32, data->b ? 1 : 0); break;
case 64: i64[o++] = SWAP_INT_HTOX(io->flags & RAW_BIG_ENDIAN, 64, data->b ? 1 : 0); break;
#ifdef HAS_128BIT
case 128: i128[o++] = SWAP_INT_HTOX(io->flags & RAW_BIG_ENDIAN, 128, data->b ? 1 : 0); break;
#endif
}
break;
case SIGNAL_TYPE_COMPLEX:
switch (bits) {
case 8: i8 [o++] = -1; /* Not supported */
i8 [o++] = -1; break;
case 16: i16[o++] = -1; /* Not supported */
i16[o++] = -1; break;
case 32: f32[o++] = SWAP_FLOAT_HTOX(io->flags & RAW_BIG_ENDIAN, 32, creal(data->z));
f32[o++] = SWAP_FLOAT_HTOX(io->flags & RAW_BIG_ENDIAN, 32, cimag(data->z)); break;
case 64: f64[o++] = SWAP_FLOAT_HTOX(io->flags & RAW_BIG_ENDIAN, 64, creal(data->z));
f64[o++] = SWAP_FLOAT_HTOX(io->flags & RAW_BIG_ENDIAN, 64, cimag(data->z)); break;
#ifdef HAS_128BIT
case 128: f128[o++] = SWAP_FLOAT_HTOX(io->flags & RAW_BIG_ENDIAN, 128, creal(data->z);
f128[o++] = SWAP_FLOAT_HTOX(io->flags & RAW_BIG_ENDIAN, 128, cimag(data->z); break;
#endif
}
break;
case SIGNAL_TYPE_AUTO:
case SIGNAL_TYPE_INVALID:
return -1;
}
}
out: if (wbytes)
*wbytes = o * (bits / 8);
return 1;
}
int raw_sscan(struct io *io, const char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt)
{
int8_t *i8 = (void *) buf;
int16_t *i16 = (void *) buf;
int32_t *i32 = (void *) buf;
int64_t *i64 = (void *) buf;
float *f32 = (void *) buf;
double *f64 = (void *) buf;
#ifdef HAS_128BIT
__int128 *i128 = (void *) buf;
__float128 *f128 = (void *) buf;
#endif
/* The raw format can not encode multiple samples in one buffer
* as there is no support for framing. */
struct sample *smp = smps[0];
int o = 0, bits = 1 << (io->flags >> 24);
int nlen = len / (bits / 8);
if (cnt > 1)
return -1;
if (len % (bits / 8)) {
warn("Invalid RAW Payload length: %#zx", len);
return -1;
}
if (io->flags & RAW_FAKE_HEADER) {
if (nlen < o + 3) {
warn("Received a packet with no fake header. Skipping...");
return -1;
}
smp->flags = SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_ORIGIN;
switch (bits) {
case 8:
smp->sequence = i8[o++];
smp->ts.origin.tv_sec = i8[o++];
smp->ts.origin.tv_nsec = i8[o++];
break;
case 16:
smp->sequence = SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 16, i16[o++]);
smp->ts.origin.tv_sec = SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 16, i16[o++]);
smp->ts.origin.tv_nsec = SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 16, i16[o++]);
break;
case 32:
smp->sequence = SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 32, i32[o++]);
smp->ts.origin.tv_sec = SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 32, i32[o++]);
smp->ts.origin.tv_nsec = SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 32, i32[o++]);
break;
case 64:
smp->sequence = SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 64, i64[o++]);
smp->ts.origin.tv_sec = SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 64, i64[o++]);
smp->ts.origin.tv_nsec = SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 64, i64[o++]);
break;
#ifdef HAS_128BIT
case 128:
smp->sequence = SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 128, i128[o++]);
smp->ts.origin.tv_sec = SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 128, i128[o++]);
smp->ts.origin.tv_nsec = SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 128, i128[o++]);
break;
#endif
}
smp->flags = SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_TS_ORIGIN;
}
else {
off = 0;
smp->flags = 0;
smp->sequence = 0;
smp->ts.origin.tv_sec = 0;
smp->ts.origin.tv_nsec = 0;
}
smp->signals = io->signals;
int i;
for (i = 0; i < smp->capacity && o < nlen; i++) {
enum signal_type fmt = sample_format(smp, i);
union signal_data *data = &smp->data[i];
switch (fmt) {
case SIGNAL_TYPE_FLOAT:
switch (bits) {
case 8: data->f = -1; o++; break; /* Not supported */
case 16: data->f = -1; o++; break; /* Not supported */
case 32: data->f = SWAP_FLOAT_XTOH(io->flags & RAW_BIG_ENDIAN, 32, f32[o++]); break;
case 64: data->f = SWAP_FLOAT_XTOH(io->flags & RAW_BIG_ENDIAN, 64, f64[o++]); break;
#ifdef HAS_128BIT
case 128: data->f = SWAP_FLOAT_XTOH(io->flags & RAW_BIG_ENDIAN, 128, f128[o++]); break;
#endif
}
break;
case SIGNAL_TYPE_INTEGER:
switch (bits) {
case 8: data->i = (int8_t) i8[o++]; break;
case 16: data->i = (int16_t) SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 16, i16[o++]); break;
case 32: data->i = (int32_t) SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 32, i32[o++]); break;
case 64: data->i = (int64_t) SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 64, i64[o++]); break;
#ifdef HAS_128BIT
case 128: data->i = (__int128) SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 128, i128[o++]); break;
#endif
}
break;
case SIGNAL_TYPE_BOOLEAN:
switch (bits) {
case 8: data->b = (bool) i8[o++]; break;
case 16: data->b = (bool) SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 16, i16[o++]); break;
case 32: data->b = (bool) SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 32, i32[o++]); break;
case 64: data->b = (bool) SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 64, i64[o++]); break;
#ifdef HAS_128BIT
case 128: data->b = (bool) SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 128, i128[o++]); break;
#endif
}
break;
case SIGNAL_TYPE_COMPLEX:
switch (bits) {
case 8: data->z = CMPLXF(-1, -1); o += 2; break; /* Not supported */
case 16: data->z = CMPLXF(-1, -1); o += 2; break; /* Not supported */
case 32: data->z = CMPLXF(
SWAP_FLOAT_XTOH(io->flags & RAW_BIG_ENDIAN, 32, f32[o++]), /* real */
SWAP_FLOAT_XTOH(io->flags & RAW_BIG_ENDIAN, 32, f32[o++]) /* imag */
);
break;
case 64: data->z = CMPLXF(
SWAP_FLOAT_XTOH(io->flags & RAW_BIG_ENDIAN, 64, f64[o++]), /* real */
SWAP_FLOAT_XTOH(io->flags & RAW_BIG_ENDIAN, 64, f64[o++]) /* imag */
);
break;
#if HAS_128BIT
case 128: data->z = CMPLXF(
SWAP_FLOAT_XTOH(io->flags & RAW_BIG_ENDIAN, 128, f128[o++]), /* real */
SWAP_FLOAT_XTOH(io->flags & RAW_BIG_ENDIAN, 128, f128[o++]) /* imag */
);
break;
#endif
}
break;
case SIGNAL_TYPE_AUTO:
case SIGNAL_TYPE_INVALID:
warn("Unsupported format in RAW payload");
return -1;
}
}
smp->length = i;
if (smp->length > smp->capacity) {
warn("Received more values than supported: length=%u, capacity=%u", smp->length, smp->capacity);
smp->length = smp->capacity;
}
for (int i = 0; i < smp->length; i++) {
enum sample_data_format smp_fmt = io->flags & RAW_FLT ? SAMPLE_DATA_FORMAT_FLOAT
: SAMPLE_DATA_FORMAT_INT;
sample_set_data_format(smp, i, smp_fmt);
switch (smp_fmt) {
case SAMPLE_DATA_FORMAT_FLOAT:
switch (bits) {
case 32: smp->data[i].f = SWAP_FLT_TOH(io->flags & RAW_BE_FLT, f32[i+off]); break;
case 64: smp->data[i].f = SWAP_DBL_TOH(io->flags & RAW_BE_FLT, f64[i+off]); break;
}
break;
case SAMPLE_DATA_FORMAT_INT:
switch (bits) {
case 8: smp->data[i].i = i8[i]; break;
case 16: smp->data[i].i = (int16_t) SWAP_INT_TOH(io->flags & RAW_BE_INT, 16, i16[i+off]); break;
case 32: smp->data[i].i = (int32_t) SWAP_INT_TOH(io->flags & RAW_BE_INT, 32, i32[i+off]); break;
case 64: smp->data[i].i = (int64_t) SWAP_INT_TOH(io->flags & RAW_BE_INT, 64, i64[i+off]); break;
}
break;
}
}
if (rbytes)
*rbytes = len;
*rbytes = o * (bits / 8);
return 1;
}
@ -244,8 +372,9 @@ static struct plugin i = { \
.name = n, \
.description = d, \
.type = PLUGIN_TYPE_FORMAT, \
.format = { \
.flags = f | FORMAT_TYPE_BINARY,\
.format = { \
.flags = f | IO_HAS_BINARY_PAYLOAD |\
SAMPLE_HAS_DATA, \
.sprint = raw_sprint, \
.sscan = raw_sscan \
} \
@ -253,14 +382,19 @@ static struct plugin i = { \
REGISTER_PLUGIN(& i);
/* Feel free to add additional format identifiers here to suit your needs */
REGISTER_FORMAT_RAW(p_f32, "raw.flt32", "Raw single precision floating point", RAW_32 | RAW_FLT)
REGISTER_FORMAT_RAW(p_f64, "raw.flt64", "Raw double precision floating point", RAW_64 | RAW_FLT)
REGISTER_FORMAT_RAW(p_i8, "raw.int8", "Raw 8 bit, signed integer", RAW_8)
REGISTER_FORMAT_RAW(p_i16be, "raw.int16.be", "Raw 16 bit, signed integer, big endian byte-order", RAW_16 | RAW_BE)
REGISTER_FORMAT_RAW(p_i16le, "raw.int16.le", "Raw 16 bit, signed integer, little endian byte-order", RAW_16)
REGISTER_FORMAT_RAW(p_i32be, "raw.int32.be", "Raw 32 bit, signed integer, big endian byte-order", RAW_32 | RAW_BE)
REGISTER_FORMAT_RAW(p_i32le, "raw.int32.le", "Raw 32 bit, signed integer, little endian byte-order", RAW_32)
REGISTER_FORMAT_RAW(p_i64be, "raw.int64.be", "Raw 64 bit, signed integer, bit endian byte-order", RAW_64 | RAW_BE)
REGISTER_FORMAT_RAW(p_i64le, "raw.int64.le", "Raw 64 bit, signed integer, little endian byte-order", RAW_64)
REGISTER_FORMAT_RAW(p_gtnet, "gtnet", "RTDS GTNET", RAW_32 | RAW_FLT | RAW_BE)
REGISTER_FORMAT_RAW(p_gtnef, "gtnet.fake", "RTDS GTNET with fake header", RAW_32 | RAW_FLT | RAW_BE | RAW_FAKE)
REGISTER_FORMAT_RAW(p_8, "raw.8", "Raw 8 bit", RAW_BITS_8)
REGISTER_FORMAT_RAW(p_16be, "raw.16.be", "Raw 16 bit, big endian byte-order", RAW_BITS_16 | RAW_BIG_ENDIAN)
REGISTER_FORMAT_RAW(p_32be, "raw.32.be", "Raw 32 bit, big endian byte-order", RAW_BITS_32 | RAW_BIG_ENDIAN)
REGISTER_FORMAT_RAW(p_64be, "raw.64.be", "Raw 64 bit, big endian byte-order", RAW_BITS_64 | RAW_BIG_ENDIAN)
REGISTER_FORMAT_RAW(p_16le, "raw.16.le", "Raw 16 bit, little endian byte-order", RAW_BITS_16)
REGISTER_FORMAT_RAW(p_32le, "raw.32.le", "Raw 32 bit, little endian byte-order", RAW_BITS_32)
REGISTER_FORMAT_RAW(p_64le, "raw.64.le", "Raw 64 bit, little endian byte-order", RAW_BITS_64)
#ifdef HAS_128BIT
REGISTER_FORMAT_RAW(p_128le, "raw.128.be", "Raw 128 bit, big endian byte-order", RAW_BITS_128 | RAW_BIG_ENDIAN)
REGISTER_FORMAT_RAW(p_128le, "raw.128.le", "Raw 128 bit, little endian byte-order", RAW_BITS_128)
#endif
REGISTER_FORMAT_RAW(p_gtnet, "gtnet", "RTDS GTNET", RAW_BITS_32 | RAW_BIG_ENDIAN)
REGISTER_FORMAT_RAW(p_gtnef, "gtnet.fake", "RTDS GTNET with fake header", RAW_BITS_32 | RAW_BIG_ENDIAN | RAW_FAKE_HEADER)

View file

@ -37,7 +37,7 @@ message Sample {
};
required Type type = 1 [default = DATA];
optional uint32 sequence = 2; // The sequence number is incremented by one for consecutive messages.
optional uint64 sequence = 2; // The sequence number is incremented by one for consecutive messages.
optional Timestamp timestamp = 4;
repeated Value values = 5;
}
@ -49,7 +49,14 @@ message Timestamp {
message Value {
oneof value {
float f = 1; // Floating point values.
int32 i = 2; // Integer values.
double f = 1; // Floating point values.
int64 i = 2; // Integer values.
bool b = 3; // Boolean values.
Complex z = 4; // Complex values.
}
}
message Complex {
required float real = 1; // Real component
required float imag = 2; // Imaginary component
}

View file

@ -42,7 +42,7 @@ int villas_binary_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, s
if (ptr + MSG_LEN(smp->length) > buf + len)
break;
ret = msg_from_sample(msg, smp);
ret = msg_from_sample(msg, smp, smp->signals);
if (ret)
return ret;
@ -61,10 +61,10 @@ int villas_binary_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, s
return i;
}
int villas_binary_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt)
int villas_binary_sscan(struct io *io, const char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt)
{
int ret, i = 0, values;
char *ptr = buf;
const char *ptr = buf;
if (len % 4 != 0) {
warn("Packet size is invalid: %zd Must be multiple of 4 bytes.", len);
@ -75,6 +75,8 @@ int villas_binary_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, st
struct msg *msg = (struct msg *) ptr;
struct sample *smp = smps[i];
smp->signals = io->signals;
/* Complete buffer has been parsed */
if (ptr == buf + len)
break;
@ -99,7 +101,7 @@ int villas_binary_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, st
else
msg_ntoh(msg);
ret = msg_to_sample(msg, smp);
ret = msg_to_sample(msg, smp, io->signals);
if (ret) {
warn("Invalid msg received: reason=3, ret=%d", ret);
break;
@ -122,7 +124,8 @@ static struct plugin p1 = {
.sprint = villas_binary_sprint,
.sscan = villas_binary_sscan,
.size = 0,
.flags = FORMAT_TYPE_BINARY
.flags = IO_HAS_BINARY_PAYLOAD |
SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_DATA
},
};
@ -135,7 +138,8 @@ static struct plugin p2 = {
.sprint = villas_binary_sprint,
.sscan = villas_binary_sscan,
.size = 0,
.flags = FORMAT_TYPE_BINARY | VILLAS_BINARY_WEB
.flags = IO_HAS_BINARY_PAYLOAD | VILLAS_BINARY_WEB |
SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_DATA
},
};

View file

@ -32,31 +32,30 @@
#include <villas/signal.h>
#include <villas/formats/villas_human.h>
static size_t villas_human_sprint_single(struct io *io, char *buf, size_t len, struct sample *s)
static size_t villas_human_sprint_single(struct io *io, char *buf, size_t len, const struct sample *smp)
{
size_t off = 0;
struct signal *sig;
if (io->flags & SAMPLE_HAS_ORIGIN) {
off += snprintf(buf + off, len - off, "%llu", (unsigned long long) s->ts.origin.tv_sec);
off += snprintf(buf + off, len - off, ".%09llu", (unsigned long long) s->ts.origin.tv_nsec);
if (io->flags & SAMPLE_HAS_TS_ORIGIN) {
off += snprintf(buf + off, len - off, "%llu", (unsigned long long) smp->ts.origin.tv_sec);
off += snprintf(buf + off, len - off, ".%09llu", (unsigned long long) smp->ts.origin.tv_nsec);
}
if (io->flags & SAMPLE_HAS_RECEIVED)
off += snprintf(buf + off, len - off, "%+e", time_delta(&s->ts.origin, &s->ts.received));
if (io->flags & SAMPLE_HAS_TS_RECEIVED)
off += snprintf(buf + off, len - off, "%+e", time_delta(&smp->ts.origin, &smp->ts.received));
if (io->flags & SAMPLE_HAS_SEQUENCE)
off += snprintf(buf + off, len - off, "(%" PRIu64 ")", s->sequence);
off += snprintf(buf + off, len - off, "(%" PRIu64 ")", smp->sequence);
if (io->flags & SAMPLE_HAS_VALUES) {
for (int i = 0; i < s->length; i++) {
switch (sample_get_data_format(s, i)) {
case SAMPLE_DATA_FORMAT_FLOAT:
off += snprintf(buf + off, len - off, "%c%.6lf", io->separator, s->data[i].f);
break;
case SAMPLE_DATA_FORMAT_INT:
off += snprintf(buf + off, len - off, "%c%" PRIi64, io->separator, s->data[i].i);
break;
}
if (io->flags & SAMPLE_HAS_DATA) {
for (int i = 0; i < smp->length; i++) {
sig = list_at_safe(smp->signals, i);
if (!sig)
break;
off += snprintf(buf + off, len - off, "%c", io->separator);
off += signal_data_snprint(&smp->data[i], sig, buf + off, len - off);
}
}
@ -65,14 +64,16 @@ static size_t villas_human_sprint_single(struct io *io, char *buf, size_t len, s
return off;
}
static size_t villas_human_sscan_single(struct io *io, const char *buf, size_t len, struct sample *s)
static size_t villas_human_sscan_single(struct io *io, const char *buf, size_t len, struct sample *smp)
{
char *end;
int ret;
char *end, *next;
const char *ptr = buf;
double offset = 0;
s->flags = 0;
smp->flags = 0;
smp->signals = io->signals;
/* Format: Seconds.NanoSeconds+Offset(SequenceNumber) Value1 Value2 ...
* RegEx: (\d+(?:\.\d+)?)([-+]\d+(?:\.\d+)?(?:e[+-]?\d+)?)?(?:\((\d+)\))?
@ -81,22 +82,22 @@ static size_t villas_human_sscan_single(struct io *io, const char *buf, size_t l
*/
/* Mandatory: seconds */
s->ts.origin.tv_sec = (uint32_t) strtoul(ptr, &end, 10);
smp->ts.origin.tv_sec = (uint32_t) strtoul(ptr, &end, 10);
if (ptr == end || *end == io->delimiter)
return -1;
s->flags |= SAMPLE_HAS_ORIGIN;
smp->flags |= SAMPLE_HAS_TS_ORIGIN;
/* Optional: nano seconds */
if (*end == '.') {
ptr = end + 1;
s->ts.origin.tv_nsec = (uint32_t) strtoul(ptr, &end, 10);
smp->ts.origin.tv_nsec = (uint32_t) strtoul(ptr, &end, 10);
if (ptr == end)
return -3;
}
else
s->ts.origin.tv_nsec = 0;
smp->ts.origin.tv_nsec = 0;
/* Optional: offset / delay */
if (*end == '+' || *end == '-') {
@ -104,7 +105,7 @@ static size_t villas_human_sscan_single(struct io *io, const char *buf, size_t l
offset = strtof(ptr, &end); /* offset is ignored for now */
if (ptr != end)
s->flags |= SAMPLE_HAS_OFFSET;
smp->flags |= SAMPLE_HAS_OFFSET;
else
return -4;
}
@ -113,9 +114,9 @@ static size_t villas_human_sscan_single(struct io *io, const char *buf, size_t l
if (*end == '(') {
ptr = end + 1;
s->sequence = strtoul(ptr, &end, 10);
smp->sequence = strtoul(ptr, &end, 10);
if (ptr != end)
s->flags |= SAMPLE_HAS_SEQUENCE;
smp->flags |= SAMPLE_HAS_SEQUENCE;
else
return -5;
@ -123,52 +124,52 @@ static size_t villas_human_sscan_single(struct io *io, const char *buf, size_t l
end++;
}
for (ptr = end, s->length = 0;
s->length < s->capacity;
ptr = end, s->length++) {
int i;
for (ptr = end + 1, i = 0; i < smp->capacity; ptr = end + 1, i++) {
if (*end == io->delimiter)
break;
goto out;
//determine format (int or double) of current number starting at ptr
//sko: not sure why ptr+1 is required in the following line to make it work...
//sko: it seems there is an additional space after each separator before a new value
char * next_seperator = strchr(ptr+1, io->separator);
if(next_seperator == NULL){
//the last element of a row
next_seperator = strchr(ptr, io->delimiter);
struct signal *sig = (struct signal *) list_at_safe(io->signals, i);
if (!sig)
goto out;
/* Perform signal detection only once */
if (sig->type == SIGNAL_TYPE_AUTO) {
/* Find end of the current column */
next = strpbrk(ptr, (char[]) { io->separator, io->delimiter, 0 });
if (next == NULL)
goto out;
/* Copy value to temporary '\0' terminated buffer */
size_t len = next - ptr;
char val[len+1];
strncpy(val, ptr, len);
val[len] = '\0';
sig->type = signal_type_detect(val);
debug(LOG_IO | 5, "Learned data type for index %u: %s", i, signal_type_to_str(sig->type));
}
char number[100];
strncpy(number, ptr, next_seperator-ptr);
char * contains_dot = strstr(number, ".");
if(contains_dot == NULL){
//no dot in string number --> number is an integer
s->data[s->length].i = strtol(ptr, &end, 10);
sample_set_data_format(s, s->length, SAMPLE_DATA_FORMAT_INT);
}
else{
//dot in string number --> number is a floating point value
s->data[s->length].f = strtod(ptr, &end);
sample_set_data_format(s, s->length, SAMPLE_DATA_FORMAT_FLOAT);
}
/* There are no valid values anymore. */
if (end == ptr)
break;
ret = signal_data_parse_str(&smp->data[i], sig, ptr, &end);
if (ret || end == ptr) /* There are no valid values anymore. */
goto out;
}
if (*end == io->delimiter)
out: if (*end == io->delimiter)
end++;
if (s->length > 0)
s->flags |= SAMPLE_HAS_VALUES;
smp->length = i;
if (smp->length > 0)
smp->flags |= SAMPLE_HAS_DATA;
if (s->flags & SAMPLE_HAS_OFFSET) {
if (smp->flags & SAMPLE_HAS_OFFSET) {
struct timespec off = time_from_double(offset);
s->ts.received = time_add(&s->ts.origin, &off);
smp->ts.received = time_add(&smp->ts.origin, &off);
s->flags |= SAMPLE_HAS_RECEIVED;
smp->flags |= SAMPLE_HAS_TS_RECEIVED;
}
return end - buf;
@ -188,7 +189,7 @@ int villas_human_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, st
return i;
}
int villas_human_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt)
int villas_human_sscan(struct io *io, const char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt)
{
int i;
size_t off = 0;
@ -202,24 +203,23 @@ int villas_human_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, str
return i;
}
void villas_human_header(struct io *io)
void villas_human_header(struct io *io, const struct sample *smp)
{
FILE *f = io_stream_output(io);
fprintf(f, "# %-20s", "seconds.nanoseconds+offset(sequence)");
if (io->output.signals) {
for (int i = 0; i < list_length(io->output.signals); i++) {
struct signal *s = (struct signal *) list_at(io->output.signals, i);
for (int i = 0; i < smp->length; i++) {
struct signal *sig = (struct signal *) list_at(smp->signals, i);
fprintf(f, "%c%s", io->separator, s->name);
if (sig->name)
fprintf(f, "%c%s", io->separator, sig->name);
else
fprintf(f, "%csignal%d", io->separator, i);
if (s->unit)
fprintf(f, "[%s]", s->unit);
}
if (sig->unit)
fprintf(f, "[%s]", sig->unit);
}
else
fprintf(f, "%cdata[]", io->separator);
fprintf(f, "%c", io->delimiter);
}
@ -233,7 +233,8 @@ static struct plugin p = {
.sscan = villas_human_sscan,
.header = villas_human_header,
.size = 0,
.flags = IO_NEWLINES,
.flags = IO_NEWLINES | IO_AUTO_DETECT_FORMAT |
SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_DATA,
.separator = '\t',
.delimiter = '\n'
}