diff --git a/lib/formats/csv.cpp b/lib/formats/csv.cpp index 6b2c544f1..54fc5593a 100644 --- a/lib/formats/csv.cpp +++ b/lib/formats/csv.cpp @@ -110,7 +110,6 @@ static size_t csv_sscan_single(struct io *io, const char *buf, size_t len, struc smp->flags |= (int) SampleFlags::HAS_SEQUENCE; for (ptr = end + 1, i = 0; i < smp->capacity; ptr = end + 1, i++) { - if (*end == io->delimiter) goto out; @@ -177,7 +176,9 @@ void csv_header(struct io *io, const struct sample *smp) if (io->flags & (int) SampleFlags::HAS_DATA) { for (unsigned i = 0; i < smp->length; i++) { - struct signal *sig = (struct signal *) vlist_at(smp->signals, i); + struct signal *sig = (struct signal *) vlist_at_safe(smp->signals, i); + if (!sig) + break; if (sig->name) fprintf(f, "%s", sig->name); diff --git a/lib/formats/json.cpp b/lib/formats/json.cpp index 51b671de4..3141f4a5d 100644 --- a/lib/formats/json.cpp +++ b/lib/formats/json.cpp @@ -124,6 +124,8 @@ static int json_pack_sample(struct io *io, json_t **j, struct sample *smp) for (unsigned i = 0; i < smp->length; i++) { struct signal *sig = (struct signal *) vlist_at_safe(smp->signals, i); + if (!sig) + return -1; json_t *json_value = signal_data_to_json(&smp->data[i], sig); diff --git a/lib/formats/json_reserve.cpp b/lib/formats/json_reserve.cpp index 3540f8c37..88767f42a 100644 --- a/lib/formats/json_reserve.cpp +++ b/lib/formats/json_reserve.cpp @@ -49,9 +49,7 @@ static int json_reserve_pack_sample(struct io *io, json_t **j, struct sample *sm json_data = json_array(); for (unsigned i = 0; i < smp->length; i++) { - struct signal *sig; - - sig = (struct signal *) vlist_at_safe(smp->signals, i); + struct signal *sig = (struct signal *) vlist_at_safe(smp->signals, i); if (!sig) return -1; diff --git a/lib/formats/msg.cpp b/lib/formats/msg.cpp index c2099639e..e9811a260 100644 --- a/lib/formats/msg.cpp +++ b/lib/formats/msg.cpp @@ -76,18 +76,17 @@ int msg_verify(struct msg *m) int msg_to_sample(struct msg *msg, struct sample *smp, struct vlist *signals) { int ret; + unsigned i; ret = msg_verify(msg); if (ret) - return -1; + return ret; - smp->flags = (int) SampleFlags::HAS_TS_ORIGIN | (int) SampleFlags::HAS_SEQUENCE | (int) SampleFlags::HAS_DATA; - smp->length = MIN(msg->length, smp->capacity); - smp->sequence = msg->sequence; - MSG_TS(msg, smp->ts.origin); - - for (unsigned i = 0; i < MIN(smp->length, vlist_length(signals)); i++) { - struct signal *sig = (struct signal *) vlist_at(signals, i); + unsigned len = MIN(msg->length, smp->capacity); + for (i = 0; i < MIN(len, vlist_length(signals)); i++) { + struct signal *sig = (struct signal *) vlist_at_safe(signals, i); + if (!sig) + return -1; switch (sig->type) { case SignalType::FLOAT: @@ -103,6 +102,11 @@ int msg_to_sample(struct msg *msg, struct sample *smp, struct vlist *signals) } } + smp->flags = (int) SampleFlags::HAS_TS_ORIGIN | (int) SampleFlags::HAS_SEQUENCE | (int) SampleFlags::HAS_DATA; + smp->length = i; + smp->sequence = msg->sequence; + MSG_TS(msg, smp->ts.origin); + return 0; } @@ -118,7 +122,9 @@ int msg_from_sample(struct msg *msg_in, struct sample *smp, struct vlist *signal msg_in->ts.nsec = smp->ts.origin.tv_nsec; for (unsigned i = 0; i < smp->length; i++) { - struct signal *sig = (struct signal *) vlist_at(signals, i); + struct signal *sig = (struct signal *) vlist_at_safe(signals, i); + if (!sig) + return -1; switch (sig->type) { case SignalType::FLOAT: diff --git a/lib/formats/value.cpp b/lib/formats/value.cpp index d6b625a3a..c9cb553fa 100644 --- a/lib/formats/value.cpp +++ b/lib/formats/value.cpp @@ -43,7 +43,7 @@ int value_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, struct sa for (i = 0; i < smp->length; i++) { sig = (struct signal *) vlist_at_safe(smp->signals, i); if (!sig) - break; + return -1; off += signal_data_print_str(&smp->data[i], sig, buf, len); off += snprintf(buf + off, len - off, "\n"); @@ -70,7 +70,7 @@ int value_sscan(struct io *io, const char *buf, size_t len, size_t *rbytes, stru if (smp->capacity >= 1) { struct signal *sig = (struct signal *) vlist_at_safe(io->signals, i); if (!sig) - goto out; + return -1; ret = signal_data_parse_str(&smp->data[i], sig, ptr, &end); if (ret || end == ptr) /* There are no valid values anymore. */ diff --git a/lib/formats/villas_human.cpp b/lib/formats/villas_human.cpp index 9cb60d6c9..90c3b01d3 100644 --- a/lib/formats/villas_human.cpp +++ b/lib/formats/villas_human.cpp @@ -133,7 +133,6 @@ static size_t villas_human_sscan_single(struct io *io, const char *buf, size_t l unsigned i; for (ptr = end + 1, i = 0; i < smp->capacity; ptr = end + 1, i++) { - if (*end == io->delimiter) goto out; @@ -208,7 +207,9 @@ void villas_human_header(struct io *io, const struct sample *smp) if (io->flags & (int) SampleFlags::HAS_DATA) { for (unsigned i = 0; i < MIN(smp->length, vlist_length(smp->signals)); i++) { - struct signal *sig = (struct signal *) vlist_at(smp->signals, i); + struct signal *sig = (struct signal *) vlist_at_safe(smp->signals, i); + if (!sig) + break; if (sig->name) fprintf(f, "%c%s", io->separator, sig->name); diff --git a/lib/nodes/comedi.cpp b/lib/nodes/comedi.cpp index f2422ddbf..72bc60519 100644 --- a/lib/nodes/comedi.cpp +++ b/lib/nodes/comedi.cpp @@ -579,6 +579,7 @@ int comedi_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *r for (size_t i = 0; i < cnt; i++) { d->counter++; + smps[i]->signals = &n->in.signals; smps[i]->flags = (int) SampleFlags::HAS_TS_ORIGIN | (int) SampleFlags::HAS_DATA | (int) SampleFlags::HAS_SEQUENCE; smps[i]->sequence = d->counter / d->chanlist_len; diff --git a/lib/nodes/example.cpp b/lib/nodes/example.cpp index 1fb80c836..e702a7257 100644 --- a/lib/nodes/example.cpp +++ b/lib/nodes/example.cpp @@ -179,6 +179,11 @@ int example_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned * smps[0]->data[0].f = time_delta(&now, &s->start_time); + /* Dont forget to set other flags in struct sample::flags + * E.g. for sequence no, timestamps... */ + smps[0]->flags = (int) SampleFlags::HAS_DATA; + smps[0]->signals = &n->in.signals; + read = 1; /* The number of samples read */ return read; diff --git a/lib/nodes/fpga.cpp b/lib/nodes/fpga.cpp index 509bd267e..4da9dc7ee 100644 --- a/lib/nodes/fpga.cpp +++ b/lib/nodes/fpga.cpp @@ -287,6 +287,8 @@ int fpga_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rel for (unsigned i = 0; i < MIN(read, smp->capacity); i++) smp->data[i].i = f->in.mem[i]; + smp->signals = &n->in.signals; + return read; } diff --git a/lib/nodes/infiniband.cpp b/lib/nodes/infiniband.cpp index dc17dbc5b..2ed97a39c 100644 --- a/lib/nodes/infiniband.cpp +++ b/lib/nodes/infiniband.cpp @@ -865,6 +865,7 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea smps[j]->length = SAMPLE_NUMBER_OF_VALUES(wc[j].byte_len - correction); smps[j]->ts.received = ts_receive; smps[j]->flags = (int) SampleFlags::HAS_TS_ORIGIN | (int) SampleFlags::HAS_TS_RECEIVED | (int) SampleFlags::HAS_SEQUENCE; + smps[j]->signals = &n->in.signals; } } diff --git a/lib/nodes/ngsi.cpp b/lib/nodes/ngsi.cpp index 916681059..f1ba80161 100644 --- a/lib/nodes/ngsi.cpp +++ b/lib/nodes/ngsi.cpp @@ -66,8 +66,10 @@ struct ngsi_response { size_t len; }; -static json_t* ngsi_build_entity(struct ngsi *i, struct sample *smps[], unsigned cnt, int flags) +static json_t* ngsi_build_entity(struct node *n, struct sample *smps[], unsigned cnt, int flags) { + struct ngsi *i = (struct ngsi *) n->_vd; + json_t *entity = json_pack("{ s: s, s: s, s: b }", "id", i->entity_id, "type", i->entity_type, @@ -123,11 +125,13 @@ static json_t* ngsi_build_entity(struct ngsi *i, struct sample *smps[], unsigned return entity; } -static int ngsi_parse_entity(json_t *entity, struct ngsi *i, struct sample *smps[], unsigned cnt) +static int ngsi_parse_entity(struct node *n, json_t *entity, struct sample *smps[], unsigned cnt) { int ret; const char *id, *name, *type; + struct ngsi *i = (struct ngsi *) n->_vd; + size_t l; json_error_t err; json_t *attribute, *attributes; @@ -196,6 +200,8 @@ static int ngsi_parse_entity(json_t *entity, struct ngsi *i, struct sample *smps smps[k]->data[map->index].f = strtof(value, &end); if (value == end) return -10; + + smps[k]->signals = &n->in.signals; } } @@ -503,7 +509,7 @@ int ngsi_start(struct node *n) curl_easy_setopt(i->curl, CURLOPT_USERAGENT, USER_AGENT); /* Create entity and atributes */ - json_t *entity = ngsi_build_entity(i, nullptr, 0, NGSI_ENTITY_METADATA); + json_t *entity = ngsi_build_entity(n, nullptr, 0, NGSI_ENTITY_METADATA); ret = ngsi_request_context_update(i->curl, i->endpoint, "APPEND", entity); if (ret) @@ -522,7 +528,7 @@ int ngsi_stop(struct node *n) i->task.stop(); /* Delete complete entity (not just attributes) */ - json_t *entity = ngsi_build_entity(i, nullptr, 0, 0); + json_t *entity = ngsi_build_entity(n, nullptr, 0, 0); ret = ngsi_request_context_update(i->curl, i->endpoint, "DELETE", entity); @@ -543,13 +549,13 @@ int ngsi_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rel perror("Failed to wait for task"); json_t *rentity; - json_t *entity = ngsi_build_entity(i, nullptr, 0, 0); + json_t *entity = ngsi_build_entity(n, nullptr, 0, 0); ret = ngsi_request_context_query(i->curl, i->endpoint, entity, &rentity); if (ret) goto out; - ret = ngsi_parse_entity(rentity, i, smps, cnt); + ret = ngsi_parse_entity(n, entity, smps, cnt); if (ret) goto out2; @@ -564,7 +570,7 @@ int ngsi_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re struct ngsi *i = (struct ngsi *) n->_vd; int ret; - json_t *entity = ngsi_build_entity(i, smps, cnt, NGSI_ENTITY_VALUES); + json_t *entity = ngsi_build_entity(n, smps, cnt, NGSI_ENTITY_VALUES); ret = ngsi_request_context_update(i->curl, i->endpoint, "UPDATE", entity); diff --git a/lib/nodes/test_rtt.cpp b/lib/nodes/test_rtt.cpp index b8de6188a..526a5cf98 100644 --- a/lib/nodes/test_rtt.cpp +++ b/lib/nodes/test_rtt.cpp @@ -399,6 +399,7 @@ int test_rtt_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned smps[i]->sequence = t->counter; smps[i]->ts.origin = now; smps[i]->flags = (int) SampleFlags::HAS_DATA | (int) SampleFlags::HAS_SEQUENCE | (int) SampleFlags::HAS_TS_ORIGIN; + smps[i]->signals = &n->in.signals; t->counter++; }