diff --git a/lib/formats/json.c b/lib/formats/json.c index be5faae5a..0328c6ec4 100644 --- a/lib/formats/json.c +++ b/lib/formats/json.c @@ -261,6 +261,9 @@ int json_print(struct io *io, struct sample *smps[], unsigned cnt) fputc('\n', f); json_decref(json); + + if (ret) + return ret; } return i; diff --git a/lib/formats/json_reserve.c b/lib/formats/json_reserve.c index 632c79414..c4e4f2d75 100644 --- a/lib/formats/json_reserve.c +++ b/lib/formats/json_reserve.c @@ -45,8 +45,8 @@ int json_reserve_pack_sample(struct io *io, json_t **j, struct sample *smp) json_data = json_array(); for (int i = 0; i < smp->length; i++) { - if (smp->destination) - sig = (struct signal *) list_at(&smp->destination->signals, i); + if (io->output.signals) + sig = (struct signal *) list_at_safe(io->output.signals, i); else sig = NULL; @@ -134,17 +134,21 @@ int json_reserve_unpack_sample(struct io *io, json_t *json_smp, struct sample *s if (ret) return -1; - idx = signal_get_offset(name, smp->source); - if (idx < 0) - return -1; + void *s = list_lookup(io->input.signals, name); + if (s) + idx = list_index(io->input.signals, s); + else { + ret = sscanf(name, "signal_%d", &idx); + if (ret != 1) + continue; + } - if (idx >= smp->capacity) - continue; + if (idx < smp->capacity) { + smp->data[idx].f = value; - if (idx >= smp->length) - smp->length = idx; - - smp->data[idx].f = value; + if (idx >= smp->length) + smp->length = idx; + } } if (smp->length > 0) @@ -206,7 +210,7 @@ int json_reserve_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, str if (rbytes) *rbytes = err.position; - return ret; + return 1; } int json_reserve_print(struct io *io, struct sample *smps[], unsigned cnt) @@ -225,6 +229,9 @@ int json_reserve_print(struct io *io, struct sample *smps[], unsigned cnt) fputc('\n', f); json_decref(json); + + if (ret) + return ret; } return i; diff --git a/lib/io.c b/lib/io.c index 8d80c1351..012a3060a 100644 --- a/lib/io.c +++ b/lib/io.c @@ -81,6 +81,10 @@ skip: bytes = getdelim(&io->input.buffer, &io->input.buflen, io->delimiter, f); int io_init(struct io *io, struct format_type *fmt, struct node *n, int flags) { + int ret; + + assert(io->state == STATE_DESTROYED); + io->_vt = fmt; io->_vd = alloc(fmt->size); @@ -101,13 +105,21 @@ int io_init(struct io *io, struct format_type *fmt, struct node *n, int flags) io->output.signals = &n->signals; } - return io->_vt->init ? io->_vt->init(io) : 0; + ret = io->_vt->init ? io->_vt->init(io) : 0; + if (ret) + return ret; + + io->state = STATE_INITIALIZED; + + return 0; } int io_destroy(struct io *io) { int ret; + assert(io->state == STATE_CLOSED || io->state == STATE_INITIALIZED); + ret = io->_vt->destroy ? io->_vt->destroy(io) : 0; if (ret) return ret; @@ -116,6 +128,8 @@ int io_destroy(struct io *io) free(io->input.buffer); free(io->output.buffer); + io->state = STATE_DESTROYED; + return 0; } @@ -295,10 +309,10 @@ int io_open(struct io *io, const char *uri) if (ret) return ret; - io_header(io); - io->state = STATE_OPENED; + io_header(io); + return 0; } @@ -447,12 +461,12 @@ int io_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, struct sample { struct format_type *fmt = io->_vt; - return fmt->sscan ? fmt->sscan(buf, len, rbytes, smps, cnt, io->flags) : -1; + return fmt->sscan ? fmt->sscan(io, buf, len, rbytes, smps, cnt) : -1; } int io_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt) { struct format_type *fmt = io->_vt; - return fmt->sprint ? fmt->sprint(buf, len, wbytes, smps, cnt, io->flags) : -1; + return fmt->sprint ? fmt->sprint(io, buf, len, wbytes, smps, cnt) : -1; }