diff --git a/lib/formats/csv.c b/lib/formats/csv.c index e2230d271..aa14e12de 100644 --- a/lib/formats/csv.c +++ b/lib/formats/csv.c @@ -31,33 +31,34 @@ #include #include -static size_t csv_sprint_single(struct io *io, char *buf, size_t len, struct sample *s) +static size_t csv_sprint_single(struct io *io, char *buf, size_t len, const struct sample *smp) { size_t off = 0; + struct signal *sig; - if (io->flags & SAMPLE_HAS_ORIGIN) - off += snprintf(buf + off, len - off, "%ld%c%09ld", s->ts.origin.tv_sec, io->separator, s->ts.origin.tv_nsec); + if (io->flags & SAMPLE_HAS_TS_ORIGIN) + off += snprintf(buf + off, len - off, "%ld%c%09ld", smp->ts.origin.tv_sec, io->separator, smp->ts.origin.tv_nsec); else off += snprintf(buf + off, len - off, "nan%cnan", io->separator); - if (io->flags & SAMPLE_HAS_RECEIVED) - off += snprintf(buf + off, len - off, "%c%.09f", io->separator, time_delta(&s->ts.origin, &s->ts.received)); + if (io->flags & SAMPLE_HAS_TS_RECEIVED) + off += snprintf(buf + off, len - off, "%c%.09f", io->separator, time_delta(&smp->ts.origin, &smp->ts.received)); else off += snprintf(buf + off, len - off, "%cnan", io->separator); if (io->flags & SAMPLE_HAS_SEQUENCE) - off += snprintf(buf + off, len - off, "%c%" PRIu64, io->separator, s->sequence); + off += snprintf(buf + off, len - off, "%c%" PRIu64, io->separator, smp->sequence); else off += snprintf(buf + off, len - off, "%cnan", io->separator); - for (int i = 0; i < s->length; i++) { - switch ((s->format >> i) & 0x1) { - case SAMPLE_DATA_FORMAT_FLOAT: - off += snprintf(buf + off, len - off, "%c%.6f", io->separator, s->data[i].f); - break; - case SAMPLE_DATA_FORMAT_INT: - off += snprintf(buf + off, len - off, "%c%" PRId64, io->separator, s->data[i].i); + if (io->flags & SAMPLE_HAS_DATA) { + for (int i = 0; i < smp->length; i++) { + sig = list_at_safe(smp->signals, i); + if (!sig) break; + + off += snprintf(buf + off, len - off, "%c", io->separator); + off += signal_data_snprint(&smp->data[i], sig, buf + off, len - off); } } @@ -66,26 +67,28 @@ static size_t csv_sprint_single(struct io *io, char *buf, size_t len, struct sam return off; } -static size_t csv_sscan_single(struct io *io, const char *buf, size_t len, struct sample *s) +static size_t csv_sscan_single(struct io *io, const char *buf, size_t len, struct sample *smp) { + int ret, i = 0; const char *ptr = buf; - char *end; + char *end, *next; - s->flags = 0; + smp->flags = 0; + smp->signals = io->signals; - s->ts.origin.tv_sec = strtoul(ptr, &end, 10); + smp->ts.origin.tv_sec = strtoul(ptr, &end, 10); if (end == ptr || *end == io->delimiter) goto out; ptr = end + 1; - s->ts.origin.tv_nsec = strtoul(ptr, &end, 10); + smp->ts.origin.tv_nsec = strtoul(ptr, &end, 10); if (end == ptr || *end == io->delimiter) goto out; ptr = end + 1; - s->flags |= SAMPLE_HAS_ORIGIN; + smp->flags |= SAMPLE_HAS_TS_ORIGIN; double offset __attribute__((unused)) = strtof(ptr, &end); if (end == ptr || *end == io->delimiter) @@ -93,52 +96,51 @@ static size_t csv_sscan_single(struct io *io, const char *buf, size_t len, struc ptr = end + 1; - s->sequence = strtoul(ptr, &end, 10); + smp->sequence = strtoul(ptr, &end, 10); if (end == ptr || *end == io->delimiter) goto out; - s->flags |= SAMPLE_HAS_SEQUENCE; + smp->flags |= SAMPLE_HAS_SEQUENCE; - for (ptr = end + 1, s->length = 0; - s->length < s->capacity; - ptr = end + 1, s->length++) { + for (ptr = end + 1, i = 0; i < smp->capacity; ptr = end + 1, i++) { if (*end == io->delimiter) goto out; - //determine format (int or double) of current number starting at ptr - char * next_seperator = strchr(ptr, io->separator); - if(next_seperator == NULL){ - //the last element of a row - next_seperator = strchr(ptr, io->delimiter); + struct signal *sig = (struct signal *) list_at_safe(smp->signals, i); + if (!sig) + goto out; + + /* Perform signal detection only once */ + if (sig->type == SIGNAL_TYPE_AUTO) { + + /* Find end of the current column */ + next = strpbrk(ptr, (char[]) { io->separator, io->delimiter, 0 }); + if (next == NULL) + goto out; + + /* Copy value to temporary '\0' terminated buffer */ + size_t len = next - ptr; + char val[len+1]; + strncpy(val, ptr, len); + val[len] = '\0'; + + sig->type = signal_type_detect(val); + + debug(LOG_IO | 5, "Learned data type for index %u: %s", i, signal_type_to_str(sig->type)); } - char number[100]; - strncpy(number, ptr, next_seperator-ptr); - char * contains_dot = strstr(number, "."); - if(contains_dot == NULL){ - //no dot in string number --> number is an integer - s->data[s->length].i = strtol(ptr, &end, 10); - sample_set_data_format(s, s->length, SAMPLE_DATA_FORMAT_INT); - } - - else{ - //dot in string number --> number is a floating point value - s->data[s->length].f = strtod(ptr, &end); - sample_set_data_format(s, s->length, SAMPLE_DATA_FORMAT_FLOAT); - } - - - /* There are no valid values anymore. */ - if (end == ptr) + ret = signal_data_parse_str(&smp->data[i], sig, ptr, &end); + if (ret || end == ptr) /* There are no valid values anymore. */ goto out; } out: if (*end == io->delimiter) end++; - if (s->length > 0) - s->flags |= SAMPLE_HAS_VALUES; + smp->length = i; + if (smp->length > 0) + smp->flags |= SAMPLE_HAS_DATA; return end - buf; } @@ -157,7 +159,7 @@ int csv_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, struct samp return i; } -int csv_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt) +int csv_sscan(struct io *io, const char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt) { int i; size_t off = 0; @@ -171,24 +173,23 @@ int csv_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, struct sampl return i; } -void csv_header(struct io *io) +void csv_header(struct io *io, const struct sample *smp) { FILE *f = io_stream_output(io); fprintf(f, "# secs%cnsecs%coffset%csequence", io->separator, io->separator, io->separator); - if (io->output.signals) { - for (int i = 0; i < list_length(io->output.signals); i++) { - struct signal *s = (struct signal *) list_at(io->output.signals, i); + for (int i = 0; i < smp->length; i++) { + struct signal *sig = (struct signal *) list_at(smp->signals, i); - fprintf(f, "%c%s", io->separator, s->name); + if (sig->name) + fprintf(f, "%c%s", io->separator, sig->name); + else + fprintf(f, "%csignal%d", io->separator, i); - if (s->unit) - fprintf(f, "[%s]", s->unit); - } + if (sig->unit) + fprintf(f, "[%s]", sig->unit); } - else - fprintf(f, "%cdata[]", io->separator); fprintf(f, "%c", io->delimiter); } @@ -216,7 +217,8 @@ static struct plugin p2 = { .sscan = csv_sscan, .header = csv_header, .size = 0, - .flags = IO_NEWLINES, + .flags = IO_NEWLINES | IO_AUTO_DETECT_FORMAT | + SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_DATA, .separator = ',' } }; diff --git a/lib/formats/json.c b/lib/formats/json.c index 20f0202f1..1a9aba7e3 100644 --- a/lib/formats/json.c +++ b/lib/formats/json.c @@ -23,17 +23,41 @@ #include #include #include +#include #include #include +static enum signal_type json_detect_format(json_t *val) +{ + int type = json_typeof(val); + + switch (type) { + case JSON_REAL: + return SIGNAL_TYPE_FLOAT; + + case JSON_INTEGER: + return SIGNAL_TYPE_INTEGER; + + case JSON_TRUE: + case JSON_FALSE: + return SIGNAL_TYPE_BOOLEAN; + + case JSON_OBJECT: + return SIGNAL_TYPE_COMPLEX; /* must be a complex number */ + + default: + return SIGNAL_TYPE_AUTO; + } +} + static json_t * json_pack_timestamps(struct sample *smp) { json_t *json_ts = json_object(); - if (smp->flags & SAMPLE_HAS_ORIGIN) + if (smp->flags & SAMPLE_HAS_TS_ORIGIN) json_object_set(json_ts, "origin", json_pack("[ I, I ]", smp->ts.origin.tv_sec, smp->ts.origin.tv_nsec)); - if (smp->flags & SAMPLE_HAS_RECEIVED) + if (smp->flags & SAMPLE_HAS_TS_RECEIVED) json_object_set(json_ts, "received", json_pack("[ I, I ]", smp->ts.received.tv_sec, smp->ts.received.tv_nsec)); return json_ts; @@ -55,7 +79,7 @@ static int json_unpack_timestamps(json_t *json_ts, struct sample *smp) if (ret) return ret; - smp->flags |= SAMPLE_HAS_ORIGIN; + smp->flags |= SAMPLE_HAS_TS_ORIGIN; } if (json_ts_received) { @@ -63,7 +87,7 @@ static int json_unpack_timestamps(json_t *json_ts, struct sample *smp) if (ret) return ret; - smp->flags |= SAMPLE_HAS_RECEIVED; + smp->flags |= SAMPLE_HAS_TS_RECEIVED; } return 0; @@ -82,13 +106,38 @@ static int json_pack_sample(struct io *io, json_t **j, struct sample *smp) json_object_set(json_smp, "sequence", json_sequence); } - if (smp->flags & SAMPLE_HAS_VALUES) { + if (smp->flags & SAMPLE_HAS_DATA) { json_t *json_data = json_array(); for (int i = 0; i < smp->length; i++) { - json_t *json_value = sample_get_data_format(smp, i) - ? json_integer(smp->data[i].i) - : json_real(smp->data[i].f); + enum signal_type fmt = sample_format(smp, i); + + json_t *json_value; + switch (fmt) { + case SIGNAL_TYPE_INTEGER: + json_value = json_integer(smp->data[i].i); + break; + + case SIGNAL_TYPE_FLOAT: + json_value = json_real(smp->data[i].f); + break; + + case SIGNAL_TYPE_BOOLEAN: + json_value = json_boolean(smp->data[i].b); + break; + + case SIGNAL_TYPE_COMPLEX: + json_value = json_pack("{ s: f, s: f }", + "real", creal(smp->data[i].z), + "imag", cimag(smp->data[i].z) + ); + break; + + case SIGNAL_TYPE_INVALID: + case SIGNAL_TYPE_AUTO: + json_value = json_null(); /* Unknown type */ + break; + } json_array_append(json_data, json_value); } @@ -129,6 +178,8 @@ static int json_unpack_sample(struct io *io, json_t *json_smp, struct sample *sm size_t i; int64_t sequence = -1; + smp->signals = io->signals; + ret = json_unpack_ex(json_smp, &err, 0, "{ s?: o, s?: I, s: o }", "ts", &json_ts, "sequence", &sequence, @@ -157,26 +208,30 @@ static int json_unpack_sample(struct io *io, json_t *json_smp, struct sample *sm if (i >= smp->capacity) break; - switch (json_typeof(json_value)) { - case JSON_REAL: - smp->data[i].f = json_real_value(json_value); - sample_set_data_format(smp, i, SAMPLE_DATA_FORMAT_FLOAT); - break; + struct signal *sig = list_at_safe(smp->signals, i); + if (!sig) + return -1; - case JSON_INTEGER: - smp->data[i].i = json_integer_value(json_value); - sample_set_data_format(smp, i, SAMPLE_DATA_FORMAT_INT); - break; - - default: - return -2; + enum signal_type fmt = json_detect_format(json_value); + if (sig->type == SIGNAL_TYPE_AUTO) { + debug(LOG_IO | 5, "Learned data type for index %zu: %s", i, signal_type_to_str(fmt)); + sig->type = fmt; } + else if (sig->type != fmt) { + error("Received invalid data type in JSON payload: Received %s, expected %s for signal %s (index %zu).", + signal_type_to_str(fmt), signal_type_to_str(sig->type), sig->name, i); + return -2; + } + + ret = signal_data_parse_json(&smp->data[i], sig, json_value); + if (ret) + return -3; smp->length++; } if (smp->length > 0) - smp->flags |= SAMPLE_HAS_VALUES; + smp->flags |= SAMPLE_HAS_DATA; return 0; } @@ -222,7 +277,7 @@ int json_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, struct sam return ret; } -int json_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt) +int json_sscan(struct io *io, const char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt) { int ret; json_t *json; @@ -302,7 +357,9 @@ static struct plugin p = { .sscan = json_sscan, .sprint = json_sprint, .size = 0, - .delimiter = '\n' + .delimiter = '\n', + .flags = IO_AUTO_DETECT_FORMAT | + SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_DATA }, }; diff --git a/lib/formats/json_reserve.c b/lib/formats/json_reserve.c index 57dc9a31f..4e718407b 100644 --- a/lib/formats/json_reserve.c +++ b/lib/formats/json_reserve.c @@ -38,9 +38,8 @@ static int json_reserve_pack_sample(struct io *io, json_t **j, struct sample *sm json_error_t err; json_t *json_data, *json_name, *json_unit, *json_value; json_t *json_created = NULL, *json_sequence = NULL; - struct signal *sig; - if (smp->flags & SAMPLE_HAS_ORIGIN) + if (smp->flags & SAMPLE_HAS_TS_ORIGIN) json_created = json_integer(time_to_double(&smp->ts.origin) * 1e3); if (smp->flags & SAMPLE_HAS_SEQUENCE) @@ -49,26 +48,26 @@ static int json_reserve_pack_sample(struct io *io, json_t **j, struct sample *sm json_data = json_array(); for (int i = 0; i < smp->length; i++) { - if (io->out.signals) - sig = (struct signal *) list_at_safe(io->out.signals, i); - else - sig = NULL; + struct signal *sig; - if (sig) { - if (!sig->enabled) - continue; + sig = list_at_safe(smp->signals, i); + if (!sig) + return -1; + if (sig->name) json_name = json_string(sig->name); - json_unit = json_string(sig->unit); - } else { char name[32]; snprintf(name, 32, "signal_%d", i); json_name = json_string(name); - json_unit = NULL; } + if (sig->unit) + json_unit = json_string(sig->unit); + else + json_unit = NULL; + json_value = json_pack_ex(&err, 0, "{ s: o, s: f }", "name", json_name, "value", smp->data[i].f @@ -96,7 +95,7 @@ static int json_reserve_pack_sample(struct io *io, json_t **j, struct sample *sm ); if (*j == NULL) return -1; - +#if 0 #ifdef JSON_RESERVE_INTEGER_TARGET if (io->out.node) { char *endptr; @@ -113,6 +112,7 @@ static int json_reserve_pack_sample(struct io *io, json_t **j, struct sample *sm #else if (io->out.node) json_object_set_new(*j, "target", json_string(io->out.node->name)); +#endif #endif return 0; @@ -136,6 +136,7 @@ static int json_reserve_unpack_sample(struct io *io, json_t *json_smp, struct sa if (ret) return -1; +#if 0 #ifdef JSON_RESERVE_INTEGER_TARGET if (json_target && io->in.node) { if (!json_is_integer(json_target)) @@ -163,7 +164,7 @@ static int json_reserve_unpack_sample(struct io *io, json_t *json_smp, struct sa return 0; } #endif - +#endif if (!json_data || !json_is_array(json_data)) return -1; @@ -185,12 +186,12 @@ static int json_reserve_unpack_sample(struct io *io, json_t *json_smp, struct sa struct signal *sig; - sig = (struct signal *) list_lookup(io->in.signals, name); + sig = (struct signal *) list_lookup(io->signals, name); if (sig) { if (!sig->enabled) continue; - idx = list_index(io->in.signals, sig); + idx = list_index(io->signals, sig); } else { ret = sscanf(name, "signal_%d", &idx); @@ -207,11 +208,11 @@ static int json_reserve_unpack_sample(struct io *io, json_t *json_smp, struct sa } if (smp->length > 0) - smp->flags |= SAMPLE_HAS_VALUES; + smp->flags |= SAMPLE_HAS_DATA; if (created > 0) { smp->ts.origin = time_from_double(created * 1e-3); - smp->flags |= SAMPLE_HAS_ORIGIN; + smp->flags |= SAMPLE_HAS_TS_ORIGIN; } return smp->length > 0 ? 1 : 0; @@ -243,7 +244,7 @@ int json_reserve_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, st return ret; } -int json_reserve_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt) +int json_reserve_sscan(struct io *io, const char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt) { int ret; json_t *json; diff --git a/lib/formats/msg.c b/lib/formats/msg.c index ee70148bd..e3a5f5d69 100644 --- a/lib/formats/msg.c +++ b/lib/formats/msg.c @@ -25,7 +25,9 @@ #include #include #include +#include #include +#include void msg_ntoh(struct msg *m) { @@ -63,7 +65,7 @@ int msg_verify(struct msg *m) { if (m->version != MSG_VERSION) return -1; - else if (m->type != MSG_TYPE_DATA) + else if (m->type != MSG_TYPE_DATA) return -2; else if (m->rsvd1 != 0) return -3; @@ -71,7 +73,7 @@ int msg_verify(struct msg *m) return 0; } -int msg_to_sample(struct msg *msg, struct sample *smp) +int msg_to_sample(struct msg *msg, struct sample *smp, struct list *signals) { int ret; @@ -79,23 +81,32 @@ int msg_to_sample(struct msg *msg, struct sample *smp) if (ret) return -1; - smp->flags = SAMPLE_HAS_ORIGIN | SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_VALUES | SAMPLE_HAS_ID; + smp->flags = SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_DATA; smp->length = MIN(msg->length, smp->capacity); smp->sequence = msg->sequence; smp->ts.origin = MSG_TS(msg); - smp->format = 0; for (int i = 0; i < smp->length; i++) { - switch (sample_get_data_format(smp, i)) { - case SAMPLE_DATA_FORMAT_FLOAT: smp->data[i].f = msg->data[i].f; break; - case SAMPLE_DATA_FORMAT_INT: smp->data[i].i = msg->data[i].i; break; + struct signal *sig = list_at(signals, i); + + switch (sig->type) { + case SIGNAL_TYPE_FLOAT: + smp->data[i].f = msg->data[i].f; + break; + + case SIGNAL_TYPE_INTEGER: + smp->data[i].i = msg->data[i].i; + break; + + default: + return -1; } } return 0; } -int msg_from_sample(struct msg *msg, struct sample *smp) +int msg_from_sample(struct msg *msg, struct sample *smp, struct list *signals) { *msg = MSG_INIT(smp->length, smp->sequence); @@ -103,9 +114,19 @@ int msg_from_sample(struct msg *msg, struct sample *smp) msg->ts.nsec = smp->ts.origin.tv_nsec; for (int i = 0; i < smp->length; i++) { - switch (sample_get_data_format(smp, i)) { - case SAMPLE_DATA_FORMAT_FLOAT: msg->data[i].f = smp->data[i].f; break; - case SAMPLE_DATA_FORMAT_INT: msg->data[i].i = smp->data[i].i; break; + struct signal *sig = list_at(signals, i); + + switch (sig->type) { + case SIGNAL_TYPE_FLOAT: + msg->data[i].f = smp->data[i].f; + break; + + case SIGNAL_TYPE_INTEGER: + msg->data[i].i = smp->data[i].i; + break; + + default: + return -1; } } diff --git a/lib/formats/protobuf.c b/lib/formats/protobuf.c index f5bb2a1b0..051556f46 100644 --- a/lib/formats/protobuf.c +++ b/lib/formats/protobuf.c @@ -24,9 +24,32 @@ #include #include +#include +#include #include #include +static enum signal_type protobuf_detect_format(Villas__Node__Value *val) +{ + switch (val->value_case) { + case VILLAS__NODE__VALUE__VALUE_F: + return SIGNAL_TYPE_FLOAT; + + case VILLAS__NODE__VALUE__VALUE_I: + return SIGNAL_TYPE_INTEGER; + + case VILLAS__NODE__VALUE__VALUE_B: + return SIGNAL_TYPE_BOOLEAN; + + case VILLAS__NODE__VALUE__VALUE_Z: + return SIGNAL_TYPE_COMPLEX; + + case VILLAS__NODE__VALUE__VALUE__NOT_SET: + default: + return SIGNAL_TYPE_INVALID; + } +} + int protobuf_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt) { unsigned psz; @@ -50,7 +73,7 @@ int protobuf_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, struct pb_smp->sequence = smp->sequence; } - if (smp->flags & SAMPLE_HAS_ORIGIN) { + if (smp->flags & SAMPLE_HAS_TS_ORIGIN) { pb_smp->timestamp = alloc(sizeof(Villas__Node__Timestamp)); villas__node__timestamp__init(pb_smp->timestamp); @@ -65,12 +88,37 @@ int protobuf_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, struct Villas__Node__Value *pb_val = pb_smp->values[j] = alloc(sizeof(Villas__Node__Value)); villas__node__value__init(pb_val); - enum sample_data_format fmt = sample_get_data_format(smp, j); - + enum signal_type fmt = sample_format(smp, j); switch (fmt) { - case SAMPLE_DATA_FORMAT_FLOAT: pb_val->value_case = VILLAS__NODE__VALUE__VALUE_F; pb_val->f = smp->data[j].f; break; - case SAMPLE_DATA_FORMAT_INT: pb_val->value_case = VILLAS__NODE__VALUE__VALUE_I; pb_val->i = smp->data[j].i; break; - default: pb_val->value_case = VILLAS__NODE__VALUE__VALUE__NOT_SET; break; + case SIGNAL_TYPE_FLOAT: + pb_val->value_case = VILLAS__NODE__VALUE__VALUE_F; + pb_val->f = smp->data[j].f; + break; + + case SIGNAL_TYPE_INTEGER: + pb_val->value_case = VILLAS__NODE__VALUE__VALUE_I; + pb_val->i = smp->data[j].i; + break; + + case SIGNAL_TYPE_BOOLEAN: + pb_val->value_case = VILLAS__NODE__VALUE__VALUE_B; + pb_val->b = smp->data[j].b; + break; + + case SIGNAL_TYPE_COMPLEX: + pb_val->value_case = VILLAS__NODE__VALUE__VALUE_Z; + pb_val->z = alloc(sizeof(Villas__Node__Complex)); + + villas__node__complex__init(pb_val->z); + + pb_val->z->real = creal(smp->data[j].z); + pb_val->z->imag = cimag(smp->data[j].z); + break; + + case SIGNAL_TYPE_AUTO: + case SIGNAL_TYPE_INVALID: + pb_val->value_case = VILLAS__NODE__VALUE__VALUE__NOT_SET; + break; } } } @@ -93,22 +141,24 @@ out: return -1; } -int protobuf_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt) +int protobuf_sscan(struct io *io, const char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt) { unsigned i, j; Villas__Node__Message *pb_msg; pb_msg = villas__node__message__unpack(NULL, len, (uint8_t *) buf); + if (!pb_msg) + return -1; for (i = 0; i < MIN(pb_msg->n_samples, cnt); i++) { struct sample *smp = smps[i]; Villas__Node__Sample *pb_smp = pb_msg->samples[i]; - smp->flags = SAMPLE_HAS_FORMAT; + smp->signals = io->signals; if (pb_smp->type != VILLAS__NODE__SAMPLE__TYPE__DATA) { - warn("Parsed non supported message type"); - break; + warn("Parsed non supported message type. Skipping"); + continue; } if (pb_smp->has_sequence) { @@ -117,7 +167,7 @@ int protobuf_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, struct } if (pb_smp->timestamp) { - smp->flags |= SAMPLE_HAS_ORIGIN; + smp->flags |= SAMPLE_HAS_TS_ORIGIN; smp->ts.origin.tv_sec = pb_smp->timestamp->sec; smp->ts.origin.tv_nsec = pb_smp->timestamp->nsec; } @@ -125,21 +175,45 @@ int protobuf_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, struct for (j = 0; j < MIN(pb_smp->n_values, smp->capacity); j++) { Villas__Node__Value *pb_val = pb_smp->values[j]; - enum sample_data_format fmt = pb_val->value_case == VILLAS__NODE__VALUE__VALUE_F - ? SAMPLE_DATA_FORMAT_FLOAT - : SAMPLE_DATA_FORMAT_INT; + enum signal_type fmt = protobuf_detect_format(pb_val); - switch (fmt) { - case SAMPLE_DATA_FORMAT_FLOAT: smp->data[j].f = pb_val->f; break; - case SAMPLE_DATA_FORMAT_INT: smp->data[j].i = pb_val->i; break; - default: { } + struct signal *sig = (struct signal *) list_at_safe(smp->signals, j); + if (!sig) + return -1; + + if (sig->type == SIGNAL_TYPE_AUTO) { + debug(LOG_IO | 5, "Learned data type for index %u: %s", j, signal_type_to_str(fmt)); + sig->type = fmt; + } + else if (sig->type != fmt) { + error("Received invalid data type in Protobuf payload: Received %s, expected %s for signal %s (index %u).", + signal_type_to_str(fmt), signal_type_to_str(sig->type), sig->name, i); + return -2; } - sample_set_data_format(smp, j, fmt); + switch (sig->type) { + case SIGNAL_TYPE_FLOAT: + smp->data[j].f = pb_val->f; + break; + + case SIGNAL_TYPE_INTEGER: + smp->data[j].i = pb_val->i; + break; + + case SIGNAL_TYPE_BOOLEAN: + smp->data[j].b = pb_val->b; + break; + + case SIGNAL_TYPE_COMPLEX: + smp->data[j].z = CMPLXF(pb_val->z->real, pb_val->z->imag); + break; + + default: { } + } } if (pb_smp->n_values > 0) - smp->flags |= SAMPLE_HAS_VALUES; + smp->flags |= SAMPLE_HAS_DATA; smp->length = j; } @@ -158,7 +232,9 @@ static struct plugin p = { .type = PLUGIN_TYPE_FORMAT, .format = { .sprint = protobuf_sprint, - .sscan = protobuf_sscan + .sscan = protobuf_sscan, + .flags = IO_AUTO_DETECT_FORMAT | IO_HAS_BINARY_PAYLOAD | + SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_DATA } }; REGISTER_PLUGIN(&p); diff --git a/lib/formats/raw.c b/lib/formats/raw.c index 7e023692d..3e0429480 100644 --- a/lib/formats/raw.c +++ b/lib/formats/raw.c @@ -27,214 +27,342 @@ #include #include -/** Convert float to host byte order */ -#define SWAP_FLT_TOH(o, n) ({ \ - union { float f; uint32_t i; } x = { .f = n }; \ - x.i = (o) ? be32toh(x.i) : le32toh(x.i); x.f; \ -}) +typedef float flt32_t; +typedef double flt64_t; +typedef long double flt128_t; /** @todo: check */ /** Convert double to host byte order */ -#define SWAP_DBL_TOH(o, n) ({ \ - union { float f; uint64_t i; } x = { .f = n }; \ - x.i = (o) ? be64toh(x.i) : le64toh(x.i); x.f; \ -}) - -/** Convert float to big/little endian byte order */ -#define SWAP_FLT_TOE(o, n) ({ \ - union { float f; uint32_t i; } x = { .f = n }; \ - x.i = (o) ? htobe32(x.i) : htole32(x.i); x.f; \ +#define SWAP_FLOAT_XTOH(o, b, n) ({ \ + union { flt ## b ## _t f; uint ## b ## _t i; } x = { .f = n }; \ + x.i = (o) ? be ## b ## toh(x.i) : le ## b ## toh(x.i); \ + x.f; \ }) /** Convert double to big/little endian byte order */ -#define SWAP_DBL_TOE(o, n) ({ \ - union { double f; uint64_t i; } x = { .f = n }; \ - x.i = (o) ? htobe64(x.i) : htole64(x.i); x.f; \ +#define SWAP_FLOAT_HTOX(o, b, n) ({ \ + union { flt ## b ## _t f; uint ## b ## _t i; } x = { .f = n }; \ + x.i = (o) ? htobe ## b (x.i) : htole ## b (x.i); \ + x.f; \ }) /** Convert integer of varying width to host byte order */ -#define SWAP_INT_TOH(o, b, n) (o ? be ## b ## toh(n) : le ## b ## toh(n)) +#define SWAP_INT_XTOH(o, b, n) (o ? be ## b ## toh(n) : le ## b ## toh(n)) /** Convert integer of varying width to big/little endian byte order */ -#define SWAP_INT_TOE(o, b, n) (o ? htobe ## b (n) : htole ## b (n)) +#define SWAP_INT_HTOX(o, b, n) (o ? htobe ## b (n) : htole ## b (n)) int raw_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt) { - - int i, o = 0; + int o = 0; size_t nlen; - int8_t *i8 = (void *) buf; - int16_t *i16 = (void *) buf; - int32_t *i32 = (void *) buf; - int64_t *i64 = (void *) buf; - float *f32 = (void *) buf; - double *f64 = (void *) buf; + int8_t *i8 = (void *) buf; + int16_t *i16 = (void *) buf; + int32_t *i32 = (void *) buf; + int64_t *i64 = (void *) buf; + float *f32 = (void *) buf; + double *f64 = (void *) buf; +#ifdef HAS_128BIT + __int128 *i128 = (void *) buf; + __float128 *f128 = (void *) buf; +#endif int bits = 1 << (io->flags >> 24); - for (i = 0; i < cnt; i++) { - nlen = (smps[i]->length + o + (io->flags & RAW_FAKE) ? 3 : 0) * (bits / 8); - if (nlen >= len) - break; - - /* First three values are sequence, seconds and nano-seconds timestamps */ - if (io->flags & RAW_FAKE) { - switch (bits) { - case 32: - i32[o++] = SWAP_INT_TOE(io->flags & RAW_BE_HDR, 32, smps[i]->sequence); - i32[o++] = SWAP_INT_TOE(io->flags & RAW_BE_HDR, 32, smps[i]->ts.origin.tv_sec); - i32[o++] = SWAP_INT_TOE(io->flags & RAW_BE_HDR, 32, smps[i]->ts.origin.tv_nsec); - break; - case 64: - i64[o++] = SWAP_INT_TOE(io->flags & RAW_BE_HDR, 64, smps[i]->sequence); - i64[o++] = SWAP_INT_TOE(io->flags & RAW_BE_HDR, 64, smps[i]->ts.origin.tv_sec); - i64[o++] = SWAP_INT_TOE(io->flags & RAW_BE_HDR, 64, smps[i]->ts.origin.tv_nsec); - break; - } - } - - enum raw_format { - RAW_FORMAT_INT, - RAW_FORMAT_FLT - }; - - for (int j = 0; j < smps[i]->length; j++) { - enum sample_data_format smp_fmt; - enum raw_format raw_fmt; - - union { double f; uint64_t i; } val; - - if (io->flags & RAW_AUTO) - raw_fmt = smps[i]->format & (1 << i) ? RAW_FORMAT_INT : RAW_FORMAT_FLT; - else if (io->flags & RAW_FLT) - raw_fmt = RAW_FORMAT_FLT; - else - raw_fmt = RAW_FORMAT_INT; - - smp_fmt = sample_get_data_format(smps[i], j); - - switch (raw_fmt) { - case RAW_FORMAT_FLT: - switch (smp_fmt) { - case SAMPLE_DATA_FORMAT_INT: val.f = smps[i]->data[j].i; break; - case SAMPLE_DATA_FORMAT_FLOAT: val.f = smps[i]->data[j].f; break; - default: val.f = -1; break; - } - - switch (bits) { - case 32: f32[o++] = SWAP_FLT_TOE(io->flags & RAW_BE_FLT, val.f); break; - case 64: f64[o++] = SWAP_DBL_TOE(io->flags & RAW_BE_FLT, val.f); break; - } - break; - - case RAW_FORMAT_INT: - switch (smp_fmt) { - case SAMPLE_DATA_FORMAT_INT: val.i = smps[i]->data[j].i; break; - case SAMPLE_DATA_FORMAT_FLOAT: val.i = smps[i]->data[j].f; break; - default: val.i = -1; break; - } - - switch (bits) { - case 8: i8 [o++] = val.i; break; - case 16: i16[o++] = SWAP_INT_TOE(io->flags & RAW_BE_INT, 16, val.i); break; - case 32: i32[o++] = SWAP_INT_TOE(io->flags & RAW_BE_INT, 32, val.i); break; - case 64: i64[o++] = SWAP_INT_TOE(io->flags & RAW_BE_INT, 64, val.i); break; - } - break; - } - } - } - - if (wbytes) - *wbytes = o * (bits / 8); - - return i; -} - -int raw_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt) -{ /* The raw format can not encode multiple samples in one buffer * as there is no support for framing. */ struct sample *smp = smps[0]; - int8_t *i8 = (void *) buf; - int16_t *i16 = (void *) buf; - int32_t *i32 = (void *) buf; - int64_t *i64 = (void *) buf; - float *f32 = (void *) buf; - double *f64 = (void *) buf; + if (cnt > 1) + return -1; - int off, bits = 1 << (io->flags >> 24); - - smp->length = len / (bits / 8); - - if (io->flags & RAW_FAKE) { - off = 3; - - if (smp->length < off) { - warn("Received a packet with no fake header. Skipping..."); - return 0; - } - - smp->length -= off; + /* First three values are sequence, seconds and nano-seconds timestamps + * + * These fields are always encoded as integers! + */ + if (io->flags & RAW_FAKE_HEADER) { + /* Check length */ + nlen = (o + 3) * (bits / 8); + if (nlen >= len) + goto out; switch (bits) { + case 8: + i8[o++] = smp->sequence; + i8[o++] = smp->ts.origin.tv_sec; + i8[o++] = smp->ts.origin.tv_nsec; + break; + + case 16: + i16[o++] = SWAP_INT_HTOX(io->flags & RAW_BIG_ENDIAN, 16, smp->sequence); + i16[o++] = SWAP_INT_HTOX(io->flags & RAW_BIG_ENDIAN, 16, smp->ts.origin.tv_sec); + i16[o++] = SWAP_INT_HTOX(io->flags & RAW_BIG_ENDIAN, 16, smp->ts.origin.tv_nsec); + break; + case 32: - smp->sequence = SWAP_INT_TOH(io->flags & RAW_BE_HDR, 32, i32[0]); - smp->ts.origin.tv_sec = SWAP_INT_TOH(io->flags & RAW_BE_HDR, 32, i32[1]); - smp->ts.origin.tv_nsec = SWAP_INT_TOH(io->flags & RAW_BE_HDR, 32, i32[2]); + i32[o++] = SWAP_INT_HTOX(io->flags & RAW_BIG_ENDIAN, 32, smp->sequence); + i32[o++] = SWAP_INT_HTOX(io->flags & RAW_BIG_ENDIAN, 32, smp->ts.origin.tv_sec); + i32[o++] = SWAP_INT_HTOX(io->flags & RAW_BIG_ENDIAN, 32, smp->ts.origin.tv_nsec); break; case 64: - smp->sequence = SWAP_INT_TOH(io->flags & RAW_BE_HDR, 64, i64[0]); - smp->ts.origin.tv_sec = SWAP_INT_TOH(io->flags & RAW_BE_HDR, 64, i64[1]); - smp->ts.origin.tv_nsec = SWAP_INT_TOH(io->flags & RAW_BE_HDR, 64, i64[2]); + i64[o++] = SWAP_INT_HTOX(io->flags & RAW_BIG_ENDIAN, 64, smp->sequence); + i64[o++] = SWAP_INT_HTOX(io->flags & RAW_BIG_ENDIAN, 64, smp->ts.origin.tv_sec); + i64[o++] = SWAP_INT_HTOX(io->flags & RAW_BIG_ENDIAN, 64, smp->ts.origin.tv_nsec); break; + +#ifdef HAS_128BIT + case 128: + i128[o++] = SWAP_INT_TO_LE(io->flags & RAW_BIG_ENDIAN, 128, smp->sequence); + i128[o++] = SWAP_INT_TO_LE(io->flags & RAW_BIG_ENDIAN, 128, smp->ts.origin.tv_sec); + i128[o++] = SWAP_INT_TO_LE(io->flags & RAW_BIG_ENDIAN, 128, smp->ts.origin.tv_nsec); + break; +#endif + } + } + + for (int j = 0; j < smp->length; j++) { + enum signal_type fmt = sample_format(smp, j); + union signal_data *data = &smp->data[j]; + + /* Check length */ + nlen = (o + fmt == SIGNAL_TYPE_COMPLEX ? 2 : 1) * (bits / 8); + if (nlen >= len) + goto out; + + switch (fmt) { + case SIGNAL_TYPE_FLOAT: + switch (bits) { + case 8: i8 [o++] = -1; break; /* Not supported */ + case 16: i16[o++] = -1; break; /* Not supported */ + + case 32: f32[o++] = SWAP_FLOAT_HTOX(io->flags & RAW_BIG_ENDIAN, 32, data->f); break; + case 64: f64[o++] = SWAP_FLOAT_HTOX(io->flags & RAW_BIG_ENDIAN, 64, data->f); break; +#ifdef HAS_128BIT + case 128: f128[o++] = SWAP_FLOAT_HTOX(io->flags & RAW_BIG_ENDIAN, 128, data->f); break; +#endif + } + break; + + case SIGNAL_TYPE_INTEGER: + switch (bits) { + case 8: i8 [o++] = data->i; break; + case 16: i16[o++] = SWAP_INT_HTOX(io->flags & RAW_BIG_ENDIAN, 16, data->i); break; + case 32: i32[o++] = SWAP_INT_HTOX(io->flags & RAW_BIG_ENDIAN, 32, data->i); break; + case 64: i64[o++] = SWAP_INT_HTOX(io->flags & RAW_BIG_ENDIAN, 64, data->i); break; +#ifdef HAS_128BIT + case 128: i128[o++] = SWAP_INT_HTOX(io->flags & RAW_BIG_ENDIAN, 128, data->i); break; +#endif + } + break; + + case SIGNAL_TYPE_BOOLEAN: + switch (bits) { + case 8: i8 [o++] = data->b ? 1 : 0; break; + case 16: i16[o++] = SWAP_INT_HTOX(io->flags & RAW_BIG_ENDIAN, 16, data->b ? 1 : 0); break; + case 32: i32[o++] = SWAP_INT_HTOX(io->flags & RAW_BIG_ENDIAN, 32, data->b ? 1 : 0); break; + case 64: i64[o++] = SWAP_INT_HTOX(io->flags & RAW_BIG_ENDIAN, 64, data->b ? 1 : 0); break; +#ifdef HAS_128BIT + case 128: i128[o++] = SWAP_INT_HTOX(io->flags & RAW_BIG_ENDIAN, 128, data->b ? 1 : 0); break; +#endif + } + break; + + case SIGNAL_TYPE_COMPLEX: + switch (bits) { + case 8: i8 [o++] = -1; /* Not supported */ + i8 [o++] = -1; break; + case 16: i16[o++] = -1; /* Not supported */ + i16[o++] = -1; break; + + case 32: f32[o++] = SWAP_FLOAT_HTOX(io->flags & RAW_BIG_ENDIAN, 32, creal(data->z)); + f32[o++] = SWAP_FLOAT_HTOX(io->flags & RAW_BIG_ENDIAN, 32, cimag(data->z)); break; + case 64: f64[o++] = SWAP_FLOAT_HTOX(io->flags & RAW_BIG_ENDIAN, 64, creal(data->z)); + f64[o++] = SWAP_FLOAT_HTOX(io->flags & RAW_BIG_ENDIAN, 64, cimag(data->z)); break; +#ifdef HAS_128BIT + case 128: f128[o++] = SWAP_FLOAT_HTOX(io->flags & RAW_BIG_ENDIAN, 128, creal(data->z); + f128[o++] = SWAP_FLOAT_HTOX(io->flags & RAW_BIG_ENDIAN, 128, cimag(data->z); break; +#endif + } + break; + + case SIGNAL_TYPE_AUTO: + case SIGNAL_TYPE_INVALID: + return -1; + } + } + +out: if (wbytes) + *wbytes = o * (bits / 8); + + return 1; +} + +int raw_sscan(struct io *io, const char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt) +{ + int8_t *i8 = (void *) buf; + int16_t *i16 = (void *) buf; + int32_t *i32 = (void *) buf; + int64_t *i64 = (void *) buf; + float *f32 = (void *) buf; + double *f64 = (void *) buf; +#ifdef HAS_128BIT + __int128 *i128 = (void *) buf; + __float128 *f128 = (void *) buf; +#endif + + /* The raw format can not encode multiple samples in one buffer + * as there is no support for framing. */ + struct sample *smp = smps[0]; + + int o = 0, bits = 1 << (io->flags >> 24); + int nlen = len / (bits / 8); + + if (cnt > 1) + return -1; + + if (len % (bits / 8)) { + warn("Invalid RAW Payload length: %#zx", len); + return -1; + } + + if (io->flags & RAW_FAKE_HEADER) { + if (nlen < o + 3) { + warn("Received a packet with no fake header. Skipping..."); + return -1; } - smp->flags = SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_ORIGIN; + switch (bits) { + case 8: + smp->sequence = i8[o++]; + smp->ts.origin.tv_sec = i8[o++]; + smp->ts.origin.tv_nsec = i8[o++]; + break; + + case 16: + smp->sequence = SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 16, i16[o++]); + smp->ts.origin.tv_sec = SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 16, i16[o++]); + smp->ts.origin.tv_nsec = SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 16, i16[o++]); + break; + + case 32: + smp->sequence = SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 32, i32[o++]); + smp->ts.origin.tv_sec = SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 32, i32[o++]); + smp->ts.origin.tv_nsec = SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 32, i32[o++]); + break; + + case 64: + smp->sequence = SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 64, i64[o++]); + smp->ts.origin.tv_sec = SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 64, i64[o++]); + smp->ts.origin.tv_nsec = SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 64, i64[o++]); + break; + +#ifdef HAS_128BIT + case 128: + smp->sequence = SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 128, i128[o++]); + smp->ts.origin.tv_sec = SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 128, i128[o++]); + smp->ts.origin.tv_nsec = SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 128, i128[o++]); + break; +#endif + } + + smp->flags = SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_TS_ORIGIN; } else { - off = 0; - smp->flags = 0; smp->sequence = 0; smp->ts.origin.tv_sec = 0; smp->ts.origin.tv_nsec = 0; } + smp->signals = io->signals; + + int i; + for (i = 0; i < smp->capacity && o < nlen; i++) { + enum signal_type fmt = sample_format(smp, i); + union signal_data *data = &smp->data[i]; + + switch (fmt) { + case SIGNAL_TYPE_FLOAT: + switch (bits) { + case 8: data->f = -1; o++; break; /* Not supported */ + case 16: data->f = -1; o++; break; /* Not supported */ + + case 32: data->f = SWAP_FLOAT_XTOH(io->flags & RAW_BIG_ENDIAN, 32, f32[o++]); break; + case 64: data->f = SWAP_FLOAT_XTOH(io->flags & RAW_BIG_ENDIAN, 64, f64[o++]); break; +#ifdef HAS_128BIT + case 128: data->f = SWAP_FLOAT_XTOH(io->flags & RAW_BIG_ENDIAN, 128, f128[o++]); break; +#endif + } + break; + + case SIGNAL_TYPE_INTEGER: + switch (bits) { + case 8: data->i = (int8_t) i8[o++]; break; + case 16: data->i = (int16_t) SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 16, i16[o++]); break; + case 32: data->i = (int32_t) SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 32, i32[o++]); break; + case 64: data->i = (int64_t) SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 64, i64[o++]); break; +#ifdef HAS_128BIT + case 128: data->i = (__int128) SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 128, i128[o++]); break; +#endif + } + break; + + case SIGNAL_TYPE_BOOLEAN: + switch (bits) { + case 8: data->b = (bool) i8[o++]; break; + case 16: data->b = (bool) SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 16, i16[o++]); break; + case 32: data->b = (bool) SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 32, i32[o++]); break; + case 64: data->b = (bool) SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 64, i64[o++]); break; +#ifdef HAS_128BIT + case 128: data->b = (bool) SWAP_INT_XTOH(io->flags & RAW_BIG_ENDIAN, 128, i128[o++]); break; +#endif + } + break; + + case SIGNAL_TYPE_COMPLEX: + switch (bits) { + case 8: data->z = CMPLXF(-1, -1); o += 2; break; /* Not supported */ + case 16: data->z = CMPLXF(-1, -1); o += 2; break; /* Not supported */ + + case 32: data->z = CMPLXF( + SWAP_FLOAT_XTOH(io->flags & RAW_BIG_ENDIAN, 32, f32[o++]), /* real */ + SWAP_FLOAT_XTOH(io->flags & RAW_BIG_ENDIAN, 32, f32[o++]) /* imag */ + ); + break; + + case 64: data->z = CMPLXF( + SWAP_FLOAT_XTOH(io->flags & RAW_BIG_ENDIAN, 64, f64[o++]), /* real */ + SWAP_FLOAT_XTOH(io->flags & RAW_BIG_ENDIAN, 64, f64[o++]) /* imag */ + ); + break; + +#if HAS_128BIT + case 128: data->z = CMPLXF( + SWAP_FLOAT_XTOH(io->flags & RAW_BIG_ENDIAN, 128, f128[o++]), /* real */ + SWAP_FLOAT_XTOH(io->flags & RAW_BIG_ENDIAN, 128, f128[o++]) /* imag */ + ); + break; +#endif + } + break; + + case SIGNAL_TYPE_AUTO: + case SIGNAL_TYPE_INVALID: + warn("Unsupported format in RAW payload"); + return -1; + } + } + + smp->length = i; + if (smp->length > smp->capacity) { warn("Received more values than supported: length=%u, capacity=%u", smp->length, smp->capacity); smp->length = smp->capacity; } - for (int i = 0; i < smp->length; i++) { - enum sample_data_format smp_fmt = io->flags & RAW_FLT ? SAMPLE_DATA_FORMAT_FLOAT - : SAMPLE_DATA_FORMAT_INT; - - sample_set_data_format(smp, i, smp_fmt); - - switch (smp_fmt) { - case SAMPLE_DATA_FORMAT_FLOAT: - switch (bits) { - case 32: smp->data[i].f = SWAP_FLT_TOH(io->flags & RAW_BE_FLT, f32[i+off]); break; - case 64: smp->data[i].f = SWAP_DBL_TOH(io->flags & RAW_BE_FLT, f64[i+off]); break; - } - break; - - case SAMPLE_DATA_FORMAT_INT: - switch (bits) { - case 8: smp->data[i].i = i8[i]; break; - case 16: smp->data[i].i = (int16_t) SWAP_INT_TOH(io->flags & RAW_BE_INT, 16, i16[i+off]); break; - case 32: smp->data[i].i = (int32_t) SWAP_INT_TOH(io->flags & RAW_BE_INT, 32, i32[i+off]); break; - case 64: smp->data[i].i = (int64_t) SWAP_INT_TOH(io->flags & RAW_BE_INT, 64, i64[i+off]); break; - } - break; - } - } - if (rbytes) - *rbytes = len; + *rbytes = o * (bits / 8); return 1; } @@ -244,8 +372,9 @@ static struct plugin i = { \ .name = n, \ .description = d, \ .type = PLUGIN_TYPE_FORMAT, \ - .format = { \ - .flags = f | FORMAT_TYPE_BINARY,\ + .format = { \ + .flags = f | IO_HAS_BINARY_PAYLOAD |\ + SAMPLE_HAS_DATA, \ .sprint = raw_sprint, \ .sscan = raw_sscan \ } \ @@ -253,14 +382,19 @@ static struct plugin i = { \ REGISTER_PLUGIN(& i); /* Feel free to add additional format identifiers here to suit your needs */ -REGISTER_FORMAT_RAW(p_f32, "raw.flt32", "Raw single precision floating point", RAW_32 | RAW_FLT) -REGISTER_FORMAT_RAW(p_f64, "raw.flt64", "Raw double precision floating point", RAW_64 | RAW_FLT) -REGISTER_FORMAT_RAW(p_i8, "raw.int8", "Raw 8 bit, signed integer", RAW_8) -REGISTER_FORMAT_RAW(p_i16be, "raw.int16.be", "Raw 16 bit, signed integer, big endian byte-order", RAW_16 | RAW_BE) -REGISTER_FORMAT_RAW(p_i16le, "raw.int16.le", "Raw 16 bit, signed integer, little endian byte-order", RAW_16) -REGISTER_FORMAT_RAW(p_i32be, "raw.int32.be", "Raw 32 bit, signed integer, big endian byte-order", RAW_32 | RAW_BE) -REGISTER_FORMAT_RAW(p_i32le, "raw.int32.le", "Raw 32 bit, signed integer, little endian byte-order", RAW_32) -REGISTER_FORMAT_RAW(p_i64be, "raw.int64.be", "Raw 64 bit, signed integer, bit endian byte-order", RAW_64 | RAW_BE) -REGISTER_FORMAT_RAW(p_i64le, "raw.int64.le", "Raw 64 bit, signed integer, little endian byte-order", RAW_64) -REGISTER_FORMAT_RAW(p_gtnet, "gtnet", "RTDS GTNET", RAW_32 | RAW_FLT | RAW_BE) -REGISTER_FORMAT_RAW(p_gtnef, "gtnet.fake", "RTDS GTNET with fake header", RAW_32 | RAW_FLT | RAW_BE | RAW_FAKE) +REGISTER_FORMAT_RAW(p_8, "raw.8", "Raw 8 bit", RAW_BITS_8) +REGISTER_FORMAT_RAW(p_16be, "raw.16.be", "Raw 16 bit, big endian byte-order", RAW_BITS_16 | RAW_BIG_ENDIAN) +REGISTER_FORMAT_RAW(p_32be, "raw.32.be", "Raw 32 bit, big endian byte-order", RAW_BITS_32 | RAW_BIG_ENDIAN) +REGISTER_FORMAT_RAW(p_64be, "raw.64.be", "Raw 64 bit, big endian byte-order", RAW_BITS_64 | RAW_BIG_ENDIAN) + +REGISTER_FORMAT_RAW(p_16le, "raw.16.le", "Raw 16 bit, little endian byte-order", RAW_BITS_16) +REGISTER_FORMAT_RAW(p_32le, "raw.32.le", "Raw 32 bit, little endian byte-order", RAW_BITS_32) +REGISTER_FORMAT_RAW(p_64le, "raw.64.le", "Raw 64 bit, little endian byte-order", RAW_BITS_64) + +#ifdef HAS_128BIT +REGISTER_FORMAT_RAW(p_128le, "raw.128.be", "Raw 128 bit, big endian byte-order", RAW_BITS_128 | RAW_BIG_ENDIAN) +REGISTER_FORMAT_RAW(p_128le, "raw.128.le", "Raw 128 bit, little endian byte-order", RAW_BITS_128) +#endif + +REGISTER_FORMAT_RAW(p_gtnet, "gtnet", "RTDS GTNET", RAW_BITS_32 | RAW_BIG_ENDIAN) +REGISTER_FORMAT_RAW(p_gtnef, "gtnet.fake", "RTDS GTNET with fake header", RAW_BITS_32 | RAW_BIG_ENDIAN | RAW_FAKE_HEADER) diff --git a/lib/formats/villas.proto b/lib/formats/villas.proto index 92d6cc2e7..cdc46d8c9 100644 --- a/lib/formats/villas.proto +++ b/lib/formats/villas.proto @@ -37,7 +37,7 @@ message Sample { }; required Type type = 1 [default = DATA]; - optional uint32 sequence = 2; // The sequence number is incremented by one for consecutive messages. + optional uint64 sequence = 2; // The sequence number is incremented by one for consecutive messages. optional Timestamp timestamp = 4; repeated Value values = 5; } @@ -49,7 +49,14 @@ message Timestamp { message Value { oneof value { - float f = 1; // Floating point values. - int32 i = 2; // Integer values. + double f = 1; // Floating point values. + int64 i = 2; // Integer values. + bool b = 3; // Boolean values. + Complex z = 4; // Complex values. } } + +message Complex { + required float real = 1; // Real component + required float imag = 2; // Imaginary component +} diff --git a/lib/formats/villas_binary.c b/lib/formats/villas_binary.c index ec7dbe501..0e51a94c0 100644 --- a/lib/formats/villas_binary.c +++ b/lib/formats/villas_binary.c @@ -42,7 +42,7 @@ int villas_binary_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, s if (ptr + MSG_LEN(smp->length) > buf + len) break; - ret = msg_from_sample(msg, smp); + ret = msg_from_sample(msg, smp, smp->signals); if (ret) return ret; @@ -61,10 +61,10 @@ int villas_binary_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, s return i; } -int villas_binary_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt) +int villas_binary_sscan(struct io *io, const char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt) { int ret, i = 0, values; - char *ptr = buf; + const char *ptr = buf; if (len % 4 != 0) { warn("Packet size is invalid: %zd Must be multiple of 4 bytes.", len); @@ -75,6 +75,8 @@ int villas_binary_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, st struct msg *msg = (struct msg *) ptr; struct sample *smp = smps[i]; + smp->signals = io->signals; + /* Complete buffer has been parsed */ if (ptr == buf + len) break; @@ -99,7 +101,7 @@ int villas_binary_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, st else msg_ntoh(msg); - ret = msg_to_sample(msg, smp); + ret = msg_to_sample(msg, smp, io->signals); if (ret) { warn("Invalid msg received: reason=3, ret=%d", ret); break; @@ -122,7 +124,8 @@ static struct plugin p1 = { .sprint = villas_binary_sprint, .sscan = villas_binary_sscan, .size = 0, - .flags = FORMAT_TYPE_BINARY + .flags = IO_HAS_BINARY_PAYLOAD | + SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_DATA }, }; @@ -135,7 +138,8 @@ static struct plugin p2 = { .sprint = villas_binary_sprint, .sscan = villas_binary_sscan, .size = 0, - .flags = FORMAT_TYPE_BINARY | VILLAS_BINARY_WEB + .flags = IO_HAS_BINARY_PAYLOAD | VILLAS_BINARY_WEB | + SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_DATA }, }; diff --git a/lib/formats/villas_human.c b/lib/formats/villas_human.c index 53b583dc2..007e9be0b 100644 --- a/lib/formats/villas_human.c +++ b/lib/formats/villas_human.c @@ -32,31 +32,30 @@ #include #include -static size_t villas_human_sprint_single(struct io *io, char *buf, size_t len, struct sample *s) +static size_t villas_human_sprint_single(struct io *io, char *buf, size_t len, const struct sample *smp) { size_t off = 0; + struct signal *sig; - if (io->flags & SAMPLE_HAS_ORIGIN) { - off += snprintf(buf + off, len - off, "%llu", (unsigned long long) s->ts.origin.tv_sec); - off += snprintf(buf + off, len - off, ".%09llu", (unsigned long long) s->ts.origin.tv_nsec); + if (io->flags & SAMPLE_HAS_TS_ORIGIN) { + off += snprintf(buf + off, len - off, "%llu", (unsigned long long) smp->ts.origin.tv_sec); + off += snprintf(buf + off, len - off, ".%09llu", (unsigned long long) smp->ts.origin.tv_nsec); } - if (io->flags & SAMPLE_HAS_RECEIVED) - off += snprintf(buf + off, len - off, "%+e", time_delta(&s->ts.origin, &s->ts.received)); + if (io->flags & SAMPLE_HAS_TS_RECEIVED) + off += snprintf(buf + off, len - off, "%+e", time_delta(&smp->ts.origin, &smp->ts.received)); if (io->flags & SAMPLE_HAS_SEQUENCE) - off += snprintf(buf + off, len - off, "(%" PRIu64 ")", s->sequence); + off += snprintf(buf + off, len - off, "(%" PRIu64 ")", smp->sequence); - if (io->flags & SAMPLE_HAS_VALUES) { - for (int i = 0; i < s->length; i++) { - switch (sample_get_data_format(s, i)) { - case SAMPLE_DATA_FORMAT_FLOAT: - off += snprintf(buf + off, len - off, "%c%.6lf", io->separator, s->data[i].f); - break; - case SAMPLE_DATA_FORMAT_INT: - off += snprintf(buf + off, len - off, "%c%" PRIi64, io->separator, s->data[i].i); - break; - } + if (io->flags & SAMPLE_HAS_DATA) { + for (int i = 0; i < smp->length; i++) { + sig = list_at_safe(smp->signals, i); + if (!sig) + break; + + off += snprintf(buf + off, len - off, "%c", io->separator); + off += signal_data_snprint(&smp->data[i], sig, buf + off, len - off); } } @@ -65,14 +64,16 @@ static size_t villas_human_sprint_single(struct io *io, char *buf, size_t len, s return off; } -static size_t villas_human_sscan_single(struct io *io, const char *buf, size_t len, struct sample *s) +static size_t villas_human_sscan_single(struct io *io, const char *buf, size_t len, struct sample *smp) { - char *end; + int ret; + char *end, *next; const char *ptr = buf; double offset = 0; - s->flags = 0; + smp->flags = 0; + smp->signals = io->signals; /* Format: Seconds.NanoSeconds+Offset(SequenceNumber) Value1 Value2 ... * RegEx: (\d+(?:\.\d+)?)([-+]\d+(?:\.\d+)?(?:e[+-]?\d+)?)?(?:\((\d+)\))? @@ -81,22 +82,22 @@ static size_t villas_human_sscan_single(struct io *io, const char *buf, size_t l */ /* Mandatory: seconds */ - s->ts.origin.tv_sec = (uint32_t) strtoul(ptr, &end, 10); + smp->ts.origin.tv_sec = (uint32_t) strtoul(ptr, &end, 10); if (ptr == end || *end == io->delimiter) return -1; - s->flags |= SAMPLE_HAS_ORIGIN; + smp->flags |= SAMPLE_HAS_TS_ORIGIN; /* Optional: nano seconds */ if (*end == '.') { ptr = end + 1; - s->ts.origin.tv_nsec = (uint32_t) strtoul(ptr, &end, 10); + smp->ts.origin.tv_nsec = (uint32_t) strtoul(ptr, &end, 10); if (ptr == end) return -3; } else - s->ts.origin.tv_nsec = 0; + smp->ts.origin.tv_nsec = 0; /* Optional: offset / delay */ if (*end == '+' || *end == '-') { @@ -104,7 +105,7 @@ static size_t villas_human_sscan_single(struct io *io, const char *buf, size_t l offset = strtof(ptr, &end); /* offset is ignored for now */ if (ptr != end) - s->flags |= SAMPLE_HAS_OFFSET; + smp->flags |= SAMPLE_HAS_OFFSET; else return -4; } @@ -113,9 +114,9 @@ static size_t villas_human_sscan_single(struct io *io, const char *buf, size_t l if (*end == '(') { ptr = end + 1; - s->sequence = strtoul(ptr, &end, 10); + smp->sequence = strtoul(ptr, &end, 10); if (ptr != end) - s->flags |= SAMPLE_HAS_SEQUENCE; + smp->flags |= SAMPLE_HAS_SEQUENCE; else return -5; @@ -123,52 +124,52 @@ static size_t villas_human_sscan_single(struct io *io, const char *buf, size_t l end++; } - for (ptr = end, s->length = 0; - s->length < s->capacity; - ptr = end, s->length++) { + int i; + for (ptr = end + 1, i = 0; i < smp->capacity; ptr = end + 1, i++) { + if (*end == io->delimiter) - break; + goto out; - //determine format (int or double) of current number starting at ptr - //sko: not sure why ptr+1 is required in the following line to make it work... - //sko: it seems there is an additional space after each separator before a new value - char * next_seperator = strchr(ptr+1, io->separator); - if(next_seperator == NULL){ - //the last element of a row - next_seperator = strchr(ptr, io->delimiter); + struct signal *sig = (struct signal *) list_at_safe(io->signals, i); + if (!sig) + goto out; + + /* Perform signal detection only once */ + if (sig->type == SIGNAL_TYPE_AUTO) { + + /* Find end of the current column */ + next = strpbrk(ptr, (char[]) { io->separator, io->delimiter, 0 }); + if (next == NULL) + goto out; + + /* Copy value to temporary '\0' terminated buffer */ + size_t len = next - ptr; + char val[len+1]; + strncpy(val, ptr, len); + val[len] = '\0'; + + sig->type = signal_type_detect(val); + + debug(LOG_IO | 5, "Learned data type for index %u: %s", i, signal_type_to_str(sig->type)); } - char number[100]; - strncpy(number, ptr, next_seperator-ptr); - char * contains_dot = strstr(number, "."); - if(contains_dot == NULL){ - //no dot in string number --> number is an integer - s->data[s->length].i = strtol(ptr, &end, 10); - sample_set_data_format(s, s->length, SAMPLE_DATA_FORMAT_INT); - } - - else{ - //dot in string number --> number is a floating point value - s->data[s->length].f = strtod(ptr, &end); - sample_set_data_format(s, s->length, SAMPLE_DATA_FORMAT_FLOAT); - } - - /* There are no valid values anymore. */ - if (end == ptr) - break; + ret = signal_data_parse_str(&smp->data[i], sig, ptr, &end); + if (ret || end == ptr) /* There are no valid values anymore. */ + goto out; } - if (*end == io->delimiter) +out: if (*end == io->delimiter) end++; - if (s->length > 0) - s->flags |= SAMPLE_HAS_VALUES; + smp->length = i; + if (smp->length > 0) + smp->flags |= SAMPLE_HAS_DATA; - if (s->flags & SAMPLE_HAS_OFFSET) { + if (smp->flags & SAMPLE_HAS_OFFSET) { struct timespec off = time_from_double(offset); - s->ts.received = time_add(&s->ts.origin, &off); + smp->ts.received = time_add(&smp->ts.origin, &off); - s->flags |= SAMPLE_HAS_RECEIVED; + smp->flags |= SAMPLE_HAS_TS_RECEIVED; } return end - buf; @@ -188,7 +189,7 @@ int villas_human_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, st return i; } -int villas_human_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt) +int villas_human_sscan(struct io *io, const char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt) { int i; size_t off = 0; @@ -202,24 +203,23 @@ int villas_human_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, str return i; } -void villas_human_header(struct io *io) +void villas_human_header(struct io *io, const struct sample *smp) { FILE *f = io_stream_output(io); fprintf(f, "# %-20s", "seconds.nanoseconds+offset(sequence)"); - if (io->output.signals) { - for (int i = 0; i < list_length(io->output.signals); i++) { - struct signal *s = (struct signal *) list_at(io->output.signals, i); + for (int i = 0; i < smp->length; i++) { + struct signal *sig = (struct signal *) list_at(smp->signals, i); - fprintf(f, "%c%s", io->separator, s->name); + if (sig->name) + fprintf(f, "%c%s", io->separator, sig->name); + else + fprintf(f, "%csignal%d", io->separator, i); - if (s->unit) - fprintf(f, "[%s]", s->unit); - } + if (sig->unit) + fprintf(f, "[%s]", sig->unit); } - else - fprintf(f, "%cdata[]", io->separator); fprintf(f, "%c", io->delimiter); } @@ -233,7 +233,8 @@ static struct plugin p = { .sscan = villas_human_sscan, .header = villas_human_header, .size = 0, - .flags = IO_NEWLINES, + .flags = IO_NEWLINES | IO_AUTO_DETECT_FORMAT | + SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_DATA, .separator = '\t', .delimiter = '\n' }