diff --git a/lib/io/csv.c b/lib/io/csv.c index 26d556d5e..aae43a75b 100644 --- a/lib/io/csv.c +++ b/lib/io/csv.c @@ -28,52 +28,120 @@ #include "sample.h" #include "timing.h" -int csv_fprint_single(FILE *f, struct sample *s, int flags) +size_t csv_sprint_single(char *buf, size_t len, struct sample *s, int flags) { - fprintf(f, "%ld %09ld %d", s->ts.origin.tv_sec, s->ts.origin.tv_nsec, s->sequence); + size_t off = snprintf(buf, len, "%ld", s->ts.origin.tv_sec); + + if (flags & IO_FORMAT_NANOSECONDS) + off += snprintf(buf + off, len - off, "%c%09llu", CSV_SEPARATOR, (unsigned long long) s->ts.origin.tv_nsec); + + if (flags & IO_FORMAT_SEQUENCE) + off += snprintf(buf + off, len - off, "%c%u", CSV_SEPARATOR, s->sequence); for (int i = 0; i < s->length; i++) { switch ((s->format >> i) & 0x1) { case SAMPLE_DATA_FORMAT_FLOAT: - fprintf(f, "%c%.6f", CSV_SEPARATOR, s->data[i].f); + off += snprintf(buf + off, len - off, "%c%.6f", CSV_SEPARATOR, s->data[i].f); break; case SAMPLE_DATA_FORMAT_INT: - fprintf(f, "%c%" PRId64, CSV_SEPARATOR, s->data[i].i); + off += snprintf(buf + off, len - off, "%c%" PRId64, CSV_SEPARATOR, s->data[i].i); break; } } - fputc('\n', f); - - return 0; + off += snprintf(buf + off, len - off, "\n"); + + return off; } size_t csv_sscan_single(const char *buf, size_t len, struct sample *s, int *flags) { - int ret, off; + const char *ptr = buf; + char *end; - ret = sscanf(buf, "%ld %ld %d %n", &s->ts.origin.tv_sec, &s->ts.origin.tv_nsec, &s->sequence, &off); - if (ret != 3) - return -1; + s->ts.origin.tv_sec = strtoul(ptr, &end, 10); + if (end == ptr || *end == '\n') + goto out; + + ptr = end; + + s->ts.origin.tv_nsec = strtoul(ptr, &end, 10); + if (end == ptr || *end == '\n') + goto out; + + ptr = end; + + s->sequence = strtoul(ptr, &end, 10); + if (end == ptr || *end == '\n') + goto out; - int i; - for (i = 0; i < s->capacity; i++) { - switch (s->format & (1 << i)) { + for (ptr = end, s->length = 0; + s->length < s->capacity; + ptr = end, s->length++) { + if (*end == '\n') + goto out; + + switch (s->format & (1 << s->length)) { case SAMPLE_DATA_FORMAT_FLOAT: - ret = sscanf(buf + off, "%lf %n", &s->data[i].f, &off); + s->data[s->length].f = strtod(ptr, &end); break; case SAMPLE_DATA_FORMAT_INT: - ret = sscanf(buf + off, "%" PRId64 " %n", &s->data[i].i, &off); + s->data[s->length].i = strtol(ptr, &end, 10); break; } - if (ret != 2) - break; + /* There are no valid FP values anymore. */ + if (end == ptr) + goto out; } + +out: if (*end == '\n') + end++; - s->length = i; s->ts.received = time_now(); + return end - buf; +} + +int csv_sprint(char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt, int flags) +{ + int i; + size_t off = 0; + + for (i = 0; i < cnt && off < len; i++) + off += csv_sprint_single(buf + off, len - off, smps[i], flags); + + if (wbytes) + *wbytes = off; + + return i; +} + +int csv_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags) +{ + int i; + size_t off = 0; + + for (i = 0; i < cnt && off < len; i++) + off += csv_sscan_single(buf + off, len - off, smps[i], flags); + + if (rbytes) + *rbytes = off; + + return i; +} + +int csv_fprint_single(FILE *f, struct sample *s, int flags) +{ + int ret; + char line[4096]; + + ret = csv_sprint_single(line, sizeof(line), s, flags); + if (ret < 0) + return ret; + + fputs(line, f); + return 0; } @@ -125,6 +193,8 @@ static struct plugin p = { .io = { .fprint = csv_fprint, .fscan = csv_fscan, + .sprint = csv_sprint, + .sscan = csv_sscan, .size = 0 } }; diff --git a/lib/io/json.c b/lib/io/json.c index 8e4bce22e..f9b9374c7 100644 --- a/lib/io/json.c +++ b/lib/io/json.c @@ -23,120 +23,170 @@ #include "plugin.h" #include "io/json.h" -int json_pack_sample(json_t **j, struct sample *s, int flags) +int json_pack_sample(json_t **j, struct sample *smp, int flags) { + json_t *json_smp; json_error_t err; - json_t *json_data = json_array(); - for (int i = 0; i < s->length; i++) { - json_t *json_value = sample_get_data_format(s, i) - ? json_integer(s->data[i].i) - : json_real(s->data[i].f); - - json_array_append(json_data, json_value); - } - - *j = json_pack_ex(&err, 0, "{ s: { s: [ I, I ], s: [ I, I ], s: [ I, I ] }, s: I, s: o }", + json_smp = json_pack_ex(&err, 0, "{ s: { s: [ I, I ], s: [ I, I ], s: [ I, I ] } }", "ts", - "origin", s->ts.origin.tv_sec, s->ts.origin.tv_nsec, - "received", s->ts.received.tv_sec, s->ts.received.tv_nsec, - "sent", s->ts.sent.tv_sec, s->ts.sent.tv_nsec, - "sequence", s->sequence, - "data", json_data); + "origin", smp->ts.origin.tv_sec, smp->ts.origin.tv_nsec, + "received", smp->ts.received.tv_sec, smp->ts.received.tv_nsec, + "sent", smp->ts.sent.tv_sec, smp->ts.sent.tv_nsec); + + if (flags & IO_FORMAT_SEQUENCE) { + json_t *json_sequence = json_integer(smp->sequence); + + json_object_set(json_smp, "sequence", json_sequence); + } + + if (flags & IO_FORMAT_VALUES) { + json_t *json_data = json_array(); - if (!*j) - return -1; + for (int i = 0; i < smp->length; i++) { + json_t *json_value = sample_get_data_format(smp, i) + ? json_integer(smp->data[i].i) + : json_real(smp->data[i].f); + + json_array_append(json_data, json_value); + } + + json_object_set(json_smp, "data", json_data); + } + + *j = json_smp; return 0; } -int json_unpack_sample(json_t *j, struct sample *s, int *flags) +int json_pack_samples(json_t **j, struct sample *smps[], unsigned cnt, int flags) { - int ret, i; - json_t *json_data, *json_value; + int ret; + json_t *json_smps = json_array(); + + for (int i = 0; i < cnt; i++) { + json_t *json_smp; + + ret = json_pack_sample(&json_smp, smps[i], flags); + if (ret) + break; + + json_array_append(json_smps, json_smp); + } + + *j = json_smps; + + return cnt; +} - ret = json_unpack(j, "{ s: { s: [ I, I ], s: [ I, I ], s: [ I, I ] }, s: I, s: o }", +int json_unpack_sample(json_t *json_smp, struct sample *smp, int *flags) +{ + int ret; + json_t *json_data, *json_value; + size_t i; + + ret = json_unpack(json_smp, "{ s: { s: [ I, I ], s: [ I, I ], s: [ I, I ] }, s: I, s: o }", "ts", - "origin", &s->ts.origin.tv_sec, &s->ts.origin.tv_nsec, - "received", &s->ts.received.tv_sec, &s->ts.received.tv_nsec, - "sent", &s->ts.sent.tv_sec, &s->ts.sent.tv_nsec, - "sequence", &s->sequence, + "origin", &smp->ts.origin.tv_sec, &smp->ts.origin.tv_nsec, + "received", &smp->ts.received.tv_sec, &smp->ts.received.tv_nsec, + "sent", &smp->ts.sent.tv_sec, &smp->ts.sent.tv_nsec, + "sequence", &smp->sequence, "data", &json_data); if (ret) return ret; + + if (!json_is_array(json_data)) + return -1; - s->length = 0; + smp->length = 0; json_array_foreach(json_data, i, json_value) { + if (i >= smp->capacity) + break; + switch (json_typeof(json_value)) { case JSON_REAL: - s->data[i].f = json_real_value(json_value); - sample_set_data_format(s, i, SAMPLE_DATA_FORMAT_FLOAT); + smp->data[i].f = json_real_value(json_value); + sample_set_data_format(smp, i, SAMPLE_DATA_FORMAT_FLOAT); break; case JSON_INTEGER: - s->data[i].f = json_integer_value(json_value); - sample_set_data_format(s, i, SAMPLE_DATA_FORMAT_INT); + smp->data[i].f = json_integer_value(json_value); + sample_set_data_format(smp, i, SAMPLE_DATA_FORMAT_INT); break; default: - return -1; + return -2; } - s->length++; + smp->length++; + } + + return 0; +} + +int json_unpack_samples(json_t *json_smps, struct sample *smps[], unsigned cnt, int *flags) +{ + int ret; + json_t *json_smp; + size_t i; + + if (!json_is_array(json_smps)) + return -1; + + json_array_foreach(json_smps, i, json_smp) { + if (i >= cnt) + break; + + ret = json_unpack_sample(json_smp, smps[i], flags); + if (ret < 0) + break; } - return 0; + return i; } int json_sprint(char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt, int flags) { - int i, ret; + int ret; json_t *json; - size_t wr, off = 0; + size_t wr; - for (i = 0; i < cnt; i++) { - ret = json_pack_sample(&json, smps[i], flags); - if (ret) - return ret; + ret = json_pack_samples(&json, smps, cnt, flags); + if (ret < 0) + return ret; - wr = json_dumpb(json, buf + off, len - off, 0); + wr = json_dumpb(json, buf, len, 0); - json_decref(json); + json_decref(json); - if (wr > len) - break; + if (wbytes) + *wbytes = wr; - off += wr; - } - - return i; + return ret; } int json_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags) { - int i, ret; + int ret; json_t *json; json_error_t err; - size_t off = 0; - for (i = 0; i < cnt; i++) { - json = json_loadb(buf + off, len - off, JSON_DISABLE_EOF_CHECK, &err); - if (!json) - break; + json = json_loadb(buf, len, 0, &err); + if (!json) + return -1; - off += err.position; + ret = json_unpack_samples(json, smps, cnt, flags); + if (ret < 0) + return ret; - ret = json_unpack_sample(json, smps[i], flags); + json_decref(json); + + if (rbytes) + *rbytes = err.position; - json_decref(json); - - if (ret) - break; - } - - return i; + return ret; } int json_fprint(FILE *f, struct sample *smps[], unsigned cnt, int flags) diff --git a/lib/io/msg.c b/lib/io/msg.c index 61f6f010c..b386f050d 100644 --- a/lib/io/msg.c +++ b/lib/io/msg.c @@ -146,6 +146,11 @@ int msg_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsi { int ret, i = 0; char *ptr = buf; + + if (len % 4 != 0) { + warn("Packet size is invalid: %zd Must be multiple of 4 bytes.", len); + return 0; + } for (i = 0; i < cnt; i++) { struct msg *msg = (struct msg *) ptr; diff --git a/lib/io/raw.c b/lib/io/raw.c index e3ef875da..dc757f8cb 100644 --- a/lib/io/raw.c +++ b/lib/io/raw.c @@ -121,7 +121,7 @@ int raw_sprint(char *buf, size_t len, size_t *wbytes, struct sample *smps[], uns } } } - + if (wbytes) *wbytes = o * (bits / 8); @@ -209,6 +209,9 @@ int raw_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsi smp->ts.received.tv_sec = 0; smp->ts.received.tv_nsec = 0; + + if (rbytes) + *rbytes = len; return 1; } diff --git a/lib/io/villas.c b/lib/io/villas.c index 938ae14a7..568dd339d 100644 --- a/lib/io/villas.c +++ b/lib/io/villas.c @@ -78,8 +78,8 @@ size_t villas_sscan_single(const char *buf, size_t len, struct sample *s, int *f /* Mandatory: seconds */ s->ts.origin.tv_sec = (uint32_t) strtoul(ptr, &end, 10); - if (ptr == end) - return -2; + if (ptr == end || *end == '\n') + return -1; /* Optional: nano seconds */ if (*end == '.') { @@ -122,7 +122,9 @@ size_t villas_sscan_single(const char *buf, size_t len, struct sample *s, int *f for (ptr = end, s->length = 0; s->length < s->capacity; ptr = end, s->length++) { - + if (*end == '\n') + break; + switch (s->format & (1 << s->length)) { case SAMPLE_DATA_FORMAT_FLOAT: s->data[s->length].f = strtod(ptr, &end); @@ -132,9 +134,13 @@ size_t villas_sscan_single(const char *buf, size_t len, struct sample *s, int *f break; } - if (end == ptr) /* There are no valid FP values anymore */ + /* There are no valid FP values anymore. */ + if (end == ptr) break; } + + if (*end == '\n') + end++; if (s->length > 0) fl |= IO_FORMAT_VALUES;