diff --git a/include/villas/sample.h b/include/villas/sample.h index 8546ab365..149fcc4f5 100644 --- a/include/villas/sample.h +++ b/include/villas/sample.h @@ -79,6 +79,7 @@ struct sample { int flags; /**< Flags are used to store binary properties of a sample. */ struct node *source; /**< The node from which this sample originates. */ + struct node *destination; /**< The node to which this sample will be sent. */ atomic_int refcnt; /**< Reference counter. */ ptrdiff_t pool_off; /**< This sample belongs to this memory pool (relative pointer). See sample_pool(). */ diff --git a/lib/io/Makefile.inc b/lib/io/Makefile.inc index 123e8950e..5d6129846 100644 --- a/lib/io/Makefile.inc +++ b/lib/io/Makefile.inc @@ -20,7 +20,7 @@ # along with this program. If not, see . ################################################################################### -LIB_FORMATS += json json-reserve villas_binary villas_human csv raw +LIB_FORMATS += json json_reserve villas_binary villas_human csv raw LIB_SRCS += lib/io/msg.c # Enable Google Protobuf IO format diff --git a/lib/io/json-reserve.c b/lib/io/json_reserve.c similarity index 54% rename from lib/io/json-reserve.c rename to lib/io/json_reserve.c index 7dd71e71b..c2e8f29b3 100644 --- a/lib/io/json-reserve.c +++ b/lib/io/json_reserve.c @@ -22,93 +22,138 @@ #include #include +#include +#include #include +#include #include int json_reserve_pack_sample(json_t **j, struct sample *smp, int flags) { - json_t *json_smp; json_error_t err; + json_t *json_data, *json_name, *json_unit, *json_value; + json_t *json_created = NULL, *json_sequence = NULL; + struct signal *sig; - json_smp = json_pack_ex(&err, 0, "{ s: { s: [ I, I ], s: [ I, I ], s: [ I, I ] } }", - "ts", - "origin", smp->ts.origin.tv_sec, smp->ts.origin.tv_nsec, - "received", smp->ts.received.tv_sec, smp->ts.received.tv_nsec, - "sent", smp->ts.sent.tv_sec, smp->ts.sent.tv_nsec); + if (smp->flags & SAMPLE_HAS_ORIGIN) + json_created = json_real(time_to_double(&smp->ts.origin)); - if (flags & SAMPLE_HAS_SEQUENCE) { - json_t *json_sequence = json_integer(smp->sequence); + if (smp->flags & SAMPLE_HAS_SEQUENCE) + json_sequence = json_integer(smp->sequence); - json_object_set(json_smp, "sequence", json_sequence); - } + json_data = json_array(); - if (flags & SAMPLE_HAS_VALUES) { - json_t *json_data = json_array(); + for (int i = 0; i < smp->length; i++) { + if (smp->destination) + sig = (struct signal *) list_at(&smp->destination->signals, i); + else + sig = NULL; - for (int i = 0; i < smp->length; i++) { - json_t *json_value = sample_get_data_format(smp, i) - ? json_integer(smp->data[i].i) - : json_real(smp->data[i].f); + if (sig) { + json_name = json_string(sig->name); + json_unit = json_string(sig->unit); + } + else { + char name[32]; + snprintf(name, 32, "signal_%d", i); - json_array_append(json_data, json_value); + json_name = json_string(name); + json_unit = NULL; } - json_object_set(json_smp, "data", json_data); + json_value = json_pack_ex(&err, 0, "{ s: o, s: o*, s: f, s: o*, s: o* }", + "name", json_name, + "unit", json_unit, + "value", smp->data[i].f, + "created", json_created, + "sequence", json_sequence + ); + if (!json_value) + continue; + + json_array_append(json_data, json_value); } - *j = json_smp; + char *origin, *target; + origin = smp->source + ? smp->source->name + : NULL; + + target = smp->destination + ? smp->destination->name + : NULL; + + *j = json_pack_ex(&err, 0, "{ s: s*, s: s*, s: o }", + "origin", origin, + "target", target, + "measurements", json_data + ); + if (*j == NULL) + return -1; return 0; } int json_reserve_unpack_sample(json_t *json_smp, struct sample *smp, int flags) { - int ret; - json_t *json_data, *json_value; + int ret, idx; + double created = -1; + json_error_t err; + json_t *json_value, *json_data = NULL; size_t i; - ret = json_unpack(json_smp, "{ s: { s: [ I, I ], s: [ I, I ], s: [ I, I ] }, s: I, s: o }", - "ts", - "origin", &smp->ts.origin.tv_sec, &smp->ts.origin.tv_nsec, - "received", &smp->ts.received.tv_sec, &smp->ts.received.tv_nsec, - "sent", &smp->ts.sent.tv_sec, &smp->ts.sent.tv_nsec, - "sequence", &smp->sequence, - "data", &json_data); + const char *origin; + const char *target; + ret = json_unpack_ex(json_smp, &err, 0, "{ s?: s, s?: s, s?: o, s?: o }", + "origin", &origin, + "target", &target, + "measurements", &json_data, + "setpoints", &json_data + ); if (ret) - return ret; - - if (!json_is_array(json_data)) return -1; - smp->flags = SAMPLE_HAS_ORIGIN | SAMPLE_HAS_RECEIVED | SAMPLE_HAS_SEQUENCE; + if (!json_data || !json_is_array(json_data)) + return -1; + + smp->flags = 0; smp->length = 0; json_array_foreach(json_data, i, json_value) { - if (i >= smp->capacity) - break; + const char *name, *unit = NULL; + double value; - switch (json_typeof(json_value)) { - case JSON_REAL: - smp->data[i].f = json_real_value(json_value); - sample_set_data_format(smp, i, SAMPLE_DATA_FORMAT_FLOAT); - break; + ret = json_unpack_ex(json_value, &err, 0, "{ s: s, s?: s, s: f, s?: f }", + "name", &name, + "unit", &unit, + "value", &value, + "created", &created + ); + if (ret) + return -1; - case JSON_INTEGER: - smp->data[i].f = json_integer_value(json_value); - sample_set_data_format(smp, i, SAMPLE_DATA_FORMAT_INT); - break; + idx = signal_get_offset(name, smp->source); + if (idx < 0) + return -1; - default: - return -2; - } + if (idx >= smp->capacity) + continue; - smp->length++; + if (idx >= smp->length) + smp->length = idx; + + smp->data[idx].f = value; } if (smp->length > 0) smp->flags |= SAMPLE_HAS_VALUES; + if (created > 0) { + smp->ts.origin = time_from_double(created); + smp->flags |= SAMPLE_HAS_ORIGIN; + } + return 0; } @@ -124,7 +169,7 @@ int json_reserve_sprint(char *buf, size_t len, size_t *wbytes, struct sample *sm assert(cnt == 1); - ret = json_pack_sample(&json, smps[0], flags); + ret = json_reserve_pack_sample(&json, smps[0], flags); if (ret < 0) return ret; @@ -150,12 +195,13 @@ int json_reserve_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smp if (!json) return -1; - ret = json_unpack_sample(json, smps[0], flags); - if (ret < 0) - return ret; + ret = json_reserve_unpack_sample(json, smps[0], flags); json_decref(json); + if (ret < 0) + return ret; + if (rbytes) *rbytes = err.position; @@ -168,7 +214,7 @@ int json_reserve_fprint(FILE *f, struct sample *smps[], unsigned cnt, int flags) json_t *json; for (i = 0; i < cnt; i++) { - ret = json_pack_sample(&json, smps[i], flags); + ret = json_reserve_pack_sample(&json, smps[i], flags); if (ret) return ret; @@ -192,7 +238,7 @@ skip: json = json_loadf(f, JSON_DISABLE_EOF_CHECK, &err); if (!json) break; - ret = json_unpack_sample(json, smps[i], flags); + ret = json_reserve_unpack_sample(json, smps[i], flags); if (ret) goto skip;