diff --git a/include/villas/nodes/ngsi.hpp b/include/villas/nodes/ngsi.hpp index 990ff6c81..922e89d7b 100644 --- a/include/villas/nodes/ngsi.hpp +++ b/include/villas/nodes/ngsi.hpp @@ -51,6 +51,9 @@ struct ngsi { const char *entity_type; /**< The type of the entity */ const char *access_token; /**< An optional authentication token which will be sent as HTTP header. */ + bool create; /**< Weather we want to create the context element during startup. */ + bool remove; /**< Weather we want to delete the context element during startup. */ + double timeout; /**< HTTP timeout in seconds */ double rate; /**< Rate used for polling. */ @@ -90,6 +93,9 @@ int ngsi_start(struct node *n); /** @see node_type::stop */ int ngsi_stop(struct node *n); +/** @see node_type::reverse */ +int ngsi_reverse(struct node *n); + /** @see node_type::read */ int ngsi_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); diff --git a/lib/nodes/ngsi.cpp b/lib/nodes/ngsi.cpp index d90e98959..2275ccc4b 100644 --- a/lib/nodes/ngsi.cpp +++ b/lib/nodes/ngsi.cpp @@ -41,24 +41,161 @@ using namespace villas::utils; /* Some global settings */ static char *name = nullptr; -enum ngsi_flags { - NGSI_ENTITY_ATTRIBUTES = (1 << 0), - NGSI_ENTITY_VALUES = (1 << 1) | NGSI_ENTITY_ATTRIBUTES, - NGSI_ENTITY_METADATA = (1 << 2) | NGSI_ENTITY_ATTRIBUTES, +enum NgsiFlags { + NGSI_ENTITY_ATTRIBUTES_IN = (1 << 0), + NGSI_ENTITY_ATTRIBUTES_OUT = (1 << 1), + NGSI_ENTITY_ATTRIBUTES = NGSI_ENTITY_ATTRIBUTES_IN | NGSI_ENTITY_ATTRIBUTES_OUT, + NGSI_ENTITY_VALUES = (1 << 2), + NGSI_ENTITY_METADATA = (1 << 3), }; -struct ngsi_metadata { - char *name; - char *type; - char *value; +class NgsiMetadata { + +public: + NgsiMetadata(json_t *json) + { + parse(json); + } + + NgsiMetadata(const std::string &nam, const std::string &typ, const std::string &val) : + name(nam), + type(typ), + value(val) + { } + + void parse(json_t *json) + { + int ret; + + json_error_t err; + const char *nam, *typ, *val; + + ret = json_unpack_ex(json, &err, 0, "{ s: s, s: s, s: s }", + "name", &nam, + "type", &typ, + "value", &val + ); + if (ret) + throw ConfigError(json, "Failed to parse NGSI metadata"); + + name = nam; + type = typ; + value = val; + } + + std::string name; + std::string type; + std::string value; }; -struct ngsi_attribute { - char *name; - char *type; +class NgsiAttribute { - int index; - struct vlist metadata; +public: + std::string name; + std::string type; + + size_t index; + std::list metadata; + + NgsiAttribute(json_t *json, size_t j, struct signal *s) + { + parse(json, j, s); + } + + void parse(json_t *json, size_t j, struct signal *s) + { + int ret; + + json_error_t err; + json_t *json_metadatas = nullptr; + + const char *nam = nullptr; + const char *typ = nullptr; + + ret = json_unpack_ex(json, &err, 0, "{ s?: s, s?: s, s?: o }", + "ngsi_attribute_name", &nam, + "ngsi_attribute_type", &typ, + "ngsi_metadatas", &json_metadatas + ); + if (ret) + throw ConfigError(json, err, "node-config-node-ngsi", "Failed to parse NGSI attribute"); + + /* Copy values from node signal, if 'ngsi_attribute' settings not provided */ + if (s && !nam) + nam = s->name ? s->name : ""; + + if (s && !typ) + typ = s->unit ? s->unit : ""; + + name = nam; + type = typ; + index = j; + + if (json_metadatas) { + if (!json_is_array(json_metadatas)) + throw ConfigError(json_metadatas, "node-config-ngsi-metadata", "ngsi_metadata must be a list of objects"); + + json_t *json_metadata; + json_array_foreach(json_metadatas, j, json_metadata) + metadata.emplace_back(json_metadata); + } + + /* Metadata: index(integer)=j */ + metadata.emplace_back("index", "integer", fmt::format("{}", j)); + } + + json_t * build(struct sample *smps[], unsigned cnt, int flags) + { + json_t *json_attribute = json_pack("{ s: s, s: s }", + "name", name.c_str(), + "type", type.c_str() + ); + + if (flags & NGSI_ENTITY_VALUES) { +#if NGSI_VECTORS + /* Build value vector */ + json_t *json_value = json_array(); + + for (unsigned k = 0; k < cnt; k++) { + struct sample *smp = &smps[k]; + + union signal_data *sd = &smp->data[index]; + struct signal *sig = (struct signal *) vlist_at_safe(smp->signals, index); + + json_array_append_new(json_value, json_pack("[ f, o, i ]", + time_to_double(smp->ts.origin), + signal_data_to_json(sd, sig), + smp->sequence + )); + } +#else + struct sample *smp = smps[0]; + + union signal_data *sd = &smp->data[index]; + struct signal *sig = (struct signal *) vlist_at_safe(smp->signals, index); + + json_t *json_value = signal_data_to_json(sd, sig); +#endif + + json_object_set(json_attribute, "value", json_value); + } + + if (flags & NGSI_ENTITY_METADATA) { /* Create Metadata for attribute */ + json_t *json_metadatas = json_array(); + + for (auto &meta : metadata) { + json_array_append_new(json_metadatas, json_pack("{ s: s, s: s, s: s }", + "name", meta.name.c_str(), + "type", meta.type.c_str(), + "value", meta.value.c_str() + )); + } + + json_object_set(json_attribute, "metadatas", json_metadatas); + } + + return json_attribute; + } }; struct ngsi_response { @@ -66,46 +203,6 @@ struct ngsi_response { size_t len; }; -static json_t * ngsi_build_attribute(struct ngsi_attribute *attr, struct sample *smps[], unsigned cnt, int flags) -{ - json_t *json_attribute = json_pack("{ s: s, s: s }", - "name", attr->name, - "type", attr->type - ); - - if (flags & NGSI_ENTITY_VALUES) { /* Build value vector */ - json_t *json_values = json_array(); - - for (unsigned k = 0; k < cnt; k++) { - json_array_append_new(json_values, json_pack("[ f, f, i ]", - time_to_double(&smps[k]->ts.origin), - smps[k]->data[attr->index].f, - smps[k]->sequence - )); - } - - json_object_set(json_attribute, "value", json_values); - } - - if (flags & NGSI_ENTITY_METADATA) { /* Create Metadata for attribute */ - json_t *json_metadatas = json_array(); - - for (size_t i = 0; i < vlist_length(&attr->metadata); i++) { - struct ngsi_metadata *meta = (struct ngsi_metadata *) vlist_at(&attr->metadata, i); - - json_array_append_new(json_metadatas, json_pack("{ s: s, s: s, s: s }", - "name", meta->name, - "type", meta->type, - "value", meta->value - )); - } - - json_object_set(json_attribute, "metadatas", json_metadatas); - } - - return json_attribute; -} - static json_t* ngsi_build_entity(struct node *n, struct sample *smps[], unsigned cnt, int flags) { struct ngsi *i = (struct ngsi *) n->_vd; @@ -119,20 +216,24 @@ static json_t* ngsi_build_entity(struct node *n, struct sample *smps[], unsigned if (flags & NGSI_ENTITY_ATTRIBUTES) { json_t *json_attrs = json_array(); - for (size_t j = 0; j < vlist_length(&i->in.signals); j++) { - struct ngsi_attribute *attr = (struct ngsi_attribute *) vlist_at(&i->in.signals, j); + if (flags & NGSI_ENTITY_ATTRIBUTES_IN) { + for (size_t j = 0; j < vlist_length(&i->in.signals); j++) { + auto *attr = (NgsiAttribute *) vlist_at(&i->in.signals, j); - auto *json_attr = ngsi_build_attribute(attr, smps, cnt, flags); + auto *json_attr = attr->build(smps, cnt, flags); - json_array_append_new(json_attrs, json_attr); + json_array_append_new(json_attrs, json_attr); + } } - for (size_t j = 0; j < vlist_length(&i->out.signals); j++) { - struct ngsi_attribute *attr = (struct ngsi_attribute *) vlist_at(&i->out.signals, j); + if (flags & NGSI_ENTITY_ATTRIBUTES_OUT) { + for (size_t j = 0; j < vlist_length(&i->out.signals); j++) { + auto *attr = (NgsiAttribute *) vlist_at(&i->out.signals, j); - auto *json_attr = ngsi_build_attribute(attr, smps, cnt, flags); + auto *json_attr = attr->build(smps, cnt, flags); - json_array_append_new(json_attrs, json_attr); + json_array_append_new(json_attrs, json_attr); + } } json_object_set(json_entity, "attributes", json_attrs); @@ -164,23 +265,26 @@ static int ngsi_parse_entity(struct node *n, json_t *json_entity, struct sample return -2; json_array_foreach(json_attrs, l, json_attr) { - struct ngsi_attribute *attr; + NgsiAttribute *attr; json_error_t err; - json_t *json_metadata, *json_values, *json_tuple; + json_t *json_metadata, *json_value; + + char *end; + const char *value; /* Parse JSON */ ret = json_unpack_ex(json_attr, &err, 0, "{ s: s, s: s, s: o, s?: o }", "name", &name, "type", &type, - "value", &json_values, + "value", &json_value, "metadatas", &json_metadata ); if (ret) return -3; /* Check attribute name and type */ - attr = (struct ngsi_attribute *) vlist_lookup(&i->in.signals, name); - if (!attr || strcmp(attr->type, type)) + attr = (NgsiAttribute *) vlist_lookup(&i->in.signals, name); + if (!attr || attr->type != type) continue; /* skip unknown attributes */ length++; @@ -189,38 +293,82 @@ static int ngsi_parse_entity(struct node *n, json_t *json_entity, struct sample if (!json_is_array(json_metadata)) return -5; +#ifdef NGSI_VECTORS + json_t *json_tuple; + const char *ts, *seq; + /* Check number of values */ - if (!json_is_array(json_values) || json_array_size(json_values) != cnt) + if (!json_is_array(json_value) || json_array_size(json_value) != cnt) return -6; size_t k; - json_array_foreach(json_values, k, json_tuple) { + json_array_foreach(json_value, k, json_tuple) { + struct sample *smp = smps[k]; + /* Check sample format */ if (!json_is_array(json_tuple) || json_array_size(json_tuple) != 3) return -7; - char *end; - const char *value, *ts, *seq; ret = json_unpack_ex(json_tuple, &err, 0, "[ s, s, s ]", &ts, &value, &seq); if (ret) return -8; - smps[k]->sequence = atoi(seq); + smp->sequence = atoi(seq); struct timespec tss = time_from_double(strtod(ts, &end)); if (ts == end) return -9; - smps[k]->ts.origin = tss; - smps[k]->data[attr->index].f = strtof(value, &end); + smp->ts.origin = tss; + + union signal_data *sd = &smp->data[attr->index]; + struct signal *sig = (struct signal *) vlist_at_safe(&n->in.signals, attr->index); + if (!sig) + return -11; + + if (value[0] == '\0') /* No data on Orion CB? -> Use init value */ + *sd = sig->init; + else { + signal_data_parse_str(sd, sig, value, &end); + if (value == end) + return -10; + } + } +#else + struct sample *smp = smps[0]; + + /* Check number of values */ + if (!json_is_string(json_value)) + return -6; + + value = json_string_value(json_value); + + union signal_data *sd = &smp->data[attr->index]; + struct signal *sig = (struct signal *) vlist_at_safe(&n->in.signals, attr->index); + if (!sig) + return -11; + + if (value[0] == '\0') /* No data on Orion CB? -> Use init value */ + *sd = sig->init; + else { + signal_data_parse_str(sd, sig, value, &end); if (value == end) return -10; } +#endif } for (unsigned k = 0; k < cnt; k++) { - smps[k]->length = length; - smps[k]->signals = &n->in.signals; + struct sample *smp = smps[k]; + + smp->length = length; + smp->signals = &n->in.signals; + smp->flags = (int) SampleFlags::HAS_DATA; + +#ifdef NGSI_VECTORS + smp->flags |= (int) (SampleFlags::HAS_SEQUENCE | + SampleFlags::HAS_TS_ORIGIN); +#endif } return cnt; @@ -228,10 +376,8 @@ static int ngsi_parse_entity(struct node *n, json_t *json_entity, struct sample static int ngsi_parse_signals(json_t *json_signals, struct vlist *ngsi_signals, struct vlist *node_signals) { - int ret; size_t j; - json_error_t err; - json_t *json_signal, *json_metadata; + json_t *json_signal; if (!json_is_array(json_signals)) return -1; @@ -239,71 +385,10 @@ static int ngsi_parse_signals(json_t *json_signals, struct vlist *ngsi_signals, json_array_foreach(json_signals, j, json_signal) { auto *s = (struct signal *) vlist_at_safe(node_signals, j); - auto *a = new struct ngsi_attribute; + auto *a = new NgsiAttribute(json_signal, j, s); if (!a) throw MemoryAllocationError(); - memset(a, 0, sizeof(struct ngsi_attribute)); - - a->index = j; - - json_t *json_ngsi_metadata = nullptr; - - ret = json_unpack_ex(json_signal, &err, 0, "{ s?: s, s?: s, s?: o }", - "ngsi_attribute_name", &a->name, - "ngsi_attribute_type", &a->type, - "ngsi_metadatas", &json_ngsi_metadata - ); - if (ret) - return ret; - - /* Copy values from node signal, if 'ngsi_attribute' settings not provided */ - if (s != nullptr && a->name == nullptr) - a->name = strdup(s->name ? s->name : ""); - - if (s != nullptr && a->type == nullptr) - a->type = strdup(s->unit ? s->unit : ""); - - ret = vlist_init(&a->metadata); - if (ret) - return ret; - - if (json_ngsi_metadata) { - if (!json_is_array(json_ngsi_metadata)) - throw ConfigError(json_ngsi_metadata, "node-config-ngsi-metadata", "ngsi_metadata must be a list of objects"); - - json_array_foreach(json_ngsi_metadata, j, json_metadata) { - auto *md = new struct ngsi_metadata; - if (!md) - return -1; - - memset(md, 0, sizeof(struct ngsi_metadata)); - - ret = json_unpack_ex(json_metadata, &err, 0, "{ s: s, s: s, s: s }", - "name", &md->name, - "type", &md->type, - "value", &md->value - ); - if (ret) - return ret; - - vlist_push(&a->metadata, md); - } - } - - /* Metadata: index(integer)=j */ - auto *md_idx = new struct ngsi_metadata; - if (!md_idx) - return -1; - - md_idx->name = strdup("index"); - md_idx->type = strdup("integer"); - asprintf(&md_idx->value, "%zu", j); - - assert(md_idx->name && md_idx->type && md_idx->value); - - vlist_push(&a->metadata, md_idx); - vlist_push(ngsi_signals, a); } @@ -343,7 +428,7 @@ static size_t ngsi_request_writer(void *contents, size_t size, size_t nmemb, voi mem->data = (char *) realloc(mem->data, mem->len + realsize + 1); if (mem->data == nullptr) /* out of memory! */ - error("Not enough memory (realloc returned nullptr)"); + throw MemoryAllocationError(); memcpy(&(mem->data[mem->len]), contents, realsize); mem->len += realsize; @@ -468,7 +553,10 @@ int ngsi_parse(struct node *n, json_t *cfg) json_t *json_signals_in = nullptr; json_t *json_signals_out = nullptr; - ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s: s, s: s, s: s, s?: b, s?: F, s?: F, s?: { s?: o }, s?: { s?: o } }", + int create = 1; + int remove = 1; + + ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s: s, s: s, s: s, s?: b, s?: F, s?: F, s?: b, s?: b, s?: { s?: o }, s?: { s?: o } }", "access_token", &i->access_token, "endpoint", &i->endpoint, "entity_id", &i->entity_id, @@ -476,24 +564,29 @@ int ngsi_parse(struct node *n, json_t *cfg) "ssl_verify", &i->ssl_verify, "timeout", &i->timeout, "rate", &i->rate, + "create", &create, + "delete", &remove, "in", "signals", &json_signals_in, "out", "signals", &json_signals_out ); if (ret) - jerror(&err, "Failed to parse configuration of node %s", node_name(n)); + throw ConfigError(cfg, err, "node-config-node-ngsi", "Failed to parse configuration of node {}", node_name(n)); + + i->create = create; + i->remove = remove; if (json_signals_in) { ret = ngsi_parse_signals(json_signals_in, &i->in.signals, &n->in.signals); if (ret) - error("Invalid setting 'in.signals' of node %s", node_name(n)); + throw ConfigError(json_signals_in, "node-config-node-ngsi-in-signals", "Invalid setting 'in.signals' of node {}", node_name(n)); } if (json_signals_out) { ret = ngsi_parse_signals(json_signals_out, &i->out.signals, &n->out.signals); if (ret) - error("Invalid setting 'out.signals' of node %s", node_name(n)); + throw ConfigError(json_signals_out, "node-config-node-ngsi-out-signals", "Invalid setting 'out.signals' of node {}", node_name(n)); } return 0; @@ -507,25 +600,6 @@ char * ngsi_print(struct node *n) i->endpoint, i->timeout); } -static int ngsi_metadata_destroy(struct ngsi_metadata *meta) -{ - free(meta->value); - free(meta->name); - free(meta->type); - - return 0; -} - -static int ngsi_attribute_destroy(struct ngsi_attribute *attr) -{ - free(attr->name); - free(attr->type); - - vlist_destroy(&attr->metadata, (dtor_cb_t) ngsi_metadata_destroy, true); - - return 0; -} - int ngsi_start(struct node *n) { struct ngsi *i = (struct ngsi *) n->_vd; @@ -555,13 +629,15 @@ int ngsi_start(struct node *n) curl_easy_setopt(i->curl, CURLOPT_USERAGENT, USER_AGENT); /* Create entity and atributes */ - json_t *json_entity = ngsi_build_entity(n, nullptr, 0, NGSI_ENTITY_METADATA); + if (i->create) { + json_t *json_entity = ngsi_build_entity(n, nullptr, 0, NGSI_ENTITY_ATTRIBUTES | NGSI_ENTITY_METADATA); - ret = ngsi_request_context_update(i->curl, i->endpoint, "APPEND", json_entity); - if (ret) - error("Failed to create NGSI context for node %s", node_name(n)); + ret = ngsi_request_context_update(i->curl, i->endpoint, "APPEND", json_entity); + if (ret) + throw RuntimeError("Failed to create NGSI context for node {}", node_name(n)); - json_decref(json_entity); + json_decref(json_entity); + } return ret; } @@ -616,7 +692,7 @@ int ngsi_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re struct ngsi *i = (struct ngsi *) n->_vd; int ret; - json_t *json_entity = ngsi_build_entity(n, smps, cnt, NGSI_ENTITY_VALUES); + json_t *json_entity = ngsi_build_entity(n, smps, cnt, NGSI_ENTITY_ATTRIBUTES_OUT | NGSI_ENTITY_VALUES); ret = ngsi_request_context_update(i->curl, i->endpoint, "UPDATE", json_entity); @@ -663,11 +739,24 @@ int ngsi_destroy(struct node *n) int ret; struct ngsi *i = (struct ngsi *) n->_vd; - ret = vlist_destroy(&i->in.signals, (dtor_cb_t) ngsi_attribute_destroy, true); + + for (size_t j = 0; j < vlist_length(&i->in.signals); j++) { + auto *attr = (NgsiAttribute *) vlist_at(&i->in.signals, j); + + delete attr; + } + + for (size_t j = 0; j < vlist_length(&i->out.signals); j++) { + auto *attr = (NgsiAttribute *) vlist_at(&i->out.signals, j); + + delete attr; + } + + ret = vlist_destroy(&i->in.signals); if (ret) return ret; - ret = vlist_destroy(&i->out.signals, (dtor_cb_t) ngsi_attribute_destroy, true); + ret = vlist_destroy(&i->out.signals); if (ret) return ret; @@ -676,6 +765,17 @@ int ngsi_destroy(struct node *n) return 0; } +int ngsi_reverse(struct node *n) +{ + struct ngsi *i = (struct ngsi *) n->_vd; + + SWAP(n->in.signals, n->out.signals); + SWAP(i->in.signals, i->out.signals); + + return 0; +} + + static struct plugin p; __attribute__((constructor(110))) @@ -684,7 +784,11 @@ static void register_plugin() { p.description = "OMA Next Generation Services Interface 10 (libcurl, libjansson)"; p.type = PluginType::NODE; p.node.instances.state = State::DESTROYED; +#ifdef NGSI_VECTORS p.node.vectorize = 0, /* unlimited */ +#else + p.node.vectorize = 1, +#endif p.node.size = sizeof(struct ngsi); p.node.type.start = ngsi_type_start; p.node.type.stop = ngsi_type_stop; @@ -697,6 +801,7 @@ static void register_plugin() { p.node.read = ngsi_read; p.node.write = ngsi_write; p.node.poll_fds = ngsi_poll_fds; + p.node.reverse = ngsi_reverse; vlist_init(&p.node.instances); vlist_push(&plugins, &p);