diff --git a/etc/examples/nodes/ngsi.conf b/etc/examples/nodes/ngsi.conf index 1b075d37c..f95a17ffc 100644 --- a/etc/examples/nodes/ngsi.conf +++ b/etc/examples/nodes/ngsi.conf @@ -4,17 +4,34 @@ nodes = { ### The following settings are specific to the ngsi node-type!! ### - endpoint = "http://46.101.131.212:1026",# The HTTP REST API endpoint of the FIRWARE context broker + # The HTTP REST API endpoint of the FIRWARE context broker + endpoint = "http://46.101.131.212:1026", entity_id = "S3_ElectricalGrid", entity_type = "ElectricalGridMonitoring", - timeout = 5, # Timeout of HTTP request in seconds (default is 1) + rate = 0.1 # Rate at which we poll the broker for updates + timeout = 1, # Timeout of HTTP request in seconds (default is 1, must be smaller than 1 / rate) verify_ssl = false, # Verification of SSL server certificates (default is true) - mapping = [ # Format: "AttributeName(AttributeType)" - "PTotalLosses(MW)", - "QTotalLosses(Mvar)" - ] + in = { + signals = ( + { + name = "attr1", + ngsi_attribute_name = "attr1", # defaults to signal 'name' + ngsi_attribute_type = "Volts", # default to signal 'unit' + ngsi_attribute_metadatas = ( + { name="accuracy", type="percent", value="5" } + ) + } + ) + } + + out = { + signals = ( + { name="PTotalLosses", unit="MW" }, + { name="QTotalLosses", unit="Mvar" } + ) + } } } diff --git a/include/villas/nodes/ngsi.hpp b/include/villas/nodes/ngsi.hpp index a99848b99..990ff6c81 100644 --- a/include/villas/nodes/ngsi.hpp +++ b/include/villas/nodes/ngsi.hpp @@ -61,7 +61,9 @@ struct ngsi { CURL *curl; /**< libcurl: handle */ - struct vlist mapping; /**< A mapping between indices of the VILLASnode samples and the attributes in ngsi::context */ + struct { + struct vlist signals; /**< A mapping between indices of the VILLASnode samples and the attributes in ngsi::context */ + } in, out; }; /** Initialize global NGSI settings and maps shared memory regions. diff --git a/lib/nodes/ngsi.cpp b/lib/nodes/ngsi.cpp index f1ba80161..d90e98959 100644 --- a/lib/nodes/ngsi.cpp +++ b/lib/nodes/ngsi.cpp @@ -66,127 +66,142 @@ struct ngsi_response { size_t len; }; +static json_t * ngsi_build_attribute(struct ngsi_attribute *attr, struct sample *smps[], unsigned cnt, int flags) +{ + json_t *json_attribute = json_pack("{ s: s, s: s }", + "name", attr->name, + "type", attr->type + ); + + if (flags & NGSI_ENTITY_VALUES) { /* Build value vector */ + json_t *json_values = json_array(); + + for (unsigned k = 0; k < cnt; k++) { + json_array_append_new(json_values, json_pack("[ f, f, i ]", + time_to_double(&smps[k]->ts.origin), + smps[k]->data[attr->index].f, + smps[k]->sequence + )); + } + + json_object_set(json_attribute, "value", json_values); + } + + if (flags & NGSI_ENTITY_METADATA) { /* Create Metadata for attribute */ + json_t *json_metadatas = json_array(); + + for (size_t i = 0; i < vlist_length(&attr->metadata); i++) { + struct ngsi_metadata *meta = (struct ngsi_metadata *) vlist_at(&attr->metadata, i); + + json_array_append_new(json_metadatas, json_pack("{ s: s, s: s, s: s }", + "name", meta->name, + "type", meta->type, + "value", meta->value + )); + } + + json_object_set(json_attribute, "metadatas", json_metadatas); + } + + return json_attribute; +} + static json_t* ngsi_build_entity(struct node *n, struct sample *smps[], unsigned cnt, int flags) { struct ngsi *i = (struct ngsi *) n->_vd; - json_t *entity = json_pack("{ s: s, s: s, s: b }", - "id", i->entity_id, - "type", i->entity_type, - "isPattern", 0 + json_t *json_entity = json_pack("{ s: s, s: s, s: b }", + "id", i->entity_id, + "type", i->entity_type, + "isPattern", 0 ); if (flags & NGSI_ENTITY_ATTRIBUTES) { - json_t *attributes = json_array(); + json_t *json_attrs = json_array(); - for (size_t j = 0; j < vlist_length(&i->mapping); j++) { - struct ngsi_attribute *map = (struct ngsi_attribute *) vlist_at(&i->mapping, j); + for (size_t j = 0; j < vlist_length(&i->in.signals); j++) { + struct ngsi_attribute *attr = (struct ngsi_attribute *) vlist_at(&i->in.signals, j); - json_t *attribute = json_pack("{ s: s, s: s }", - "name", map->name, - "type", map->type - ); + auto *json_attr = ngsi_build_attribute(attr, smps, cnt, flags); - if (flags & NGSI_ENTITY_VALUES) { /* Build value vector */ - json_t *values = json_array(); - for (unsigned k = 0; k < cnt; k++) { - json_array_append_new(values, json_pack("[ f, f, i ]", - time_to_double(&smps[k]->ts.origin), - smps[k]->data[map->index].f, - smps[k]->sequence - )); - } - - json_object_set(attribute, "value", values); - } - - if (flags & NGSI_ENTITY_METADATA) { /* Create Metadata for attribute */ - json_t *metadatas = json_array(); - - for (size_t i = 0; i < vlist_length(&map->metadata); i++) { - struct ngsi_metadata *meta = (struct ngsi_metadata *) vlist_at(&map->metadata, i); - - json_array_append_new(metadatas, json_pack("{ s: s, s: s, s: s }", - "name", meta->name, - "type", meta->type, - "value", meta->value - )); - } - - json_object_set(attribute, "metadatas", metadatas); - } - - json_array_append_new(attributes, attribute); + json_array_append_new(json_attrs, json_attr); } - json_object_set(entity, "attributes", attributes); + for (size_t j = 0; j < vlist_length(&i->out.signals); j++) { + struct ngsi_attribute *attr = (struct ngsi_attribute *) vlist_at(&i->out.signals, j); + + auto *json_attr = ngsi_build_attribute(attr, smps, cnt, flags); + + json_array_append_new(json_attrs, json_attr); + } + + json_object_set(json_entity, "attributes", json_attrs); } - return entity; + return json_entity; } -static int ngsi_parse_entity(struct node *n, json_t *entity, struct sample *smps[], unsigned cnt) +static int ngsi_parse_entity(struct node *n, json_t *json_entity, struct sample *smps[], unsigned cnt) { - int ret; + int ret, length = 0; const char *id, *name, *type; struct ngsi *i = (struct ngsi *) n->_vd; size_t l; json_error_t err; - json_t *attribute, *attributes; + json_t *json_attr, *json_attrs; - ret = json_unpack_ex(entity, &err, 0, "{ s: s, s: s, s: o }", + ret = json_unpack_ex(json_entity, &err, 0, "{ s: s, s: s, s: o }", "id", &id, "type", &type, - "attributes", &attributes + "attributes", &json_attrs ); - if (ret || !json_is_array(attributes)) + if (ret || !json_is_array(json_attrs)) return -1; if (strcmp(id, i->entity_id) || strcmp(type, i->entity_type)) return -2; - for (unsigned k = 0; k < cnt; k++) - smps[k]->length = json_array_size(attributes); - - json_array_foreach(attributes, l, attribute) { - struct ngsi_attribute *map; + json_array_foreach(json_attrs, l, json_attr) { + struct ngsi_attribute *attr; json_error_t err; - json_t *metadata, *values, *tuple; + json_t *json_metadata, *json_values, *json_tuple; /* Parse JSON */ - ret = json_unpack_ex(attribute, &err, 0, "{ s: s, s: s, s: o, s?: o }", + ret = json_unpack_ex(json_attr, &err, 0, "{ s: s, s: s, s: o, s?: o }", "name", &name, "type", &type, - "value", &values, - "metadatas", &metadata + "value", &json_values, + "metadatas", &json_metadata ); if (ret) return -3; /* Check attribute name and type */ - map = (struct ngsi_attribute *) vlist_lookup(&i->mapping, name); - if (!map || strcmp(map->type, type)) - return -4; + attr = (struct ngsi_attribute *) vlist_lookup(&i->in.signals, name); + if (!attr || strcmp(attr->type, type)) + continue; /* skip unknown attributes */ + + length++; /* Check metadata */ - if (!json_is_array(metadata)) + if (!json_is_array(json_metadata)) return -5; /* Check number of values */ - if (!json_is_array(values) || json_array_size(values) != cnt) + if (!json_is_array(json_values) || json_array_size(json_values) != cnt) return -6; size_t k; - json_array_foreach(values, k, tuple) { + json_array_foreach(json_values, k, json_tuple) { /* Check sample format */ - if (!json_is_array(tuple) || json_array_size(tuple) != 3) + if (!json_is_array(json_tuple) || json_array_size(json_tuple) != 3) return -7; char *end; const char *value, *ts, *seq; - ret = json_unpack_ex(tuple, &err, 0, "[ s, s, s ]", &ts, &value, &seq); + ret = json_unpack_ex(json_tuple, &err, 0, "[ s, s, s ]", &ts, &value, &seq); if (ret) return -8; @@ -197,88 +212,113 @@ static int ngsi_parse_entity(struct node *n, json_t *entity, struct sample *smps return -9; smps[k]->ts.origin = tss; - smps[k]->data[map->index].f = strtof(value, &end); + smps[k]->data[attr->index].f = strtof(value, &end); if (value == end) return -10; - - smps[k]->signals = &n->in.signals; } } + for (unsigned k = 0; k < cnt; k++) { + smps[k]->length = length; + smps[k]->signals = &n->in.signals; + } + return cnt; } -static int ngsi_parse_mapping(struct vlist *mapping, json_t *cfg) +static int ngsi_parse_signals(json_t *json_signals, struct vlist *ngsi_signals, struct vlist *node_signals) { - if (!json_is_array(cfg)) + int ret; + size_t j; + json_error_t err; + json_t *json_signal, *json_metadata; + + if (!json_is_array(json_signals)) return -1; - vlist_init(mapping); - - size_t j; - json_t *json_token; - - json_array_foreach(cfg, j, json_token) { - const char *token; - - token = json_string_value(json_token); - if (!token) - return -2; + json_array_foreach(json_signals, j, json_signal) { + auto *s = (struct signal *) vlist_at_safe(node_signals, j); auto *a = new struct ngsi_attribute; if (!a) throw MemoryAllocationError(); + memset(a, 0, sizeof(struct ngsi_attribute)); + a->index = j; + + json_t *json_ngsi_metadata = nullptr; - /* Parse Attribute: AttributeName(AttributeType) */ - int bytes; - if (sscanf(token, "%m[^(](%m[^)])%n", &a->name, &a->type, &bytes) != 2) - error("Invalid mapping token: '%s'", token); + ret = json_unpack_ex(json_signal, &err, 0, "{ s?: s, s?: s, s?: o }", + "ngsi_attribute_name", &a->name, + "ngsi_attribute_type", &a->type, + "ngsi_metadatas", &json_ngsi_metadata + ); + if (ret) + return ret; - token += bytes; + /* Copy values from node signal, if 'ngsi_attribute' settings not provided */ + if (s != nullptr && a->name == nullptr) + a->name = strdup(s->name ? s->name : ""); - /* MetadataName(MetadataType)=MetadataValue */ - vlist_init(&a->metadata); + if (s != nullptr && a->type == nullptr) + a->type = strdup(s->unit ? s->unit : ""); - struct ngsi_metadata m; - while (sscanf(token, " %m[^(](%m[^)])=%ms%n", &m.name, &m.type, &m.value, &bytes) == 3) { - vlist_push(&a->metadata, memdup(&m, sizeof(m))); - token += bytes; + ret = vlist_init(&a->metadata); + if (ret) + return ret; + + if (json_ngsi_metadata) { + if (!json_is_array(json_ngsi_metadata)) + throw ConfigError(json_ngsi_metadata, "node-config-ngsi-metadata", "ngsi_metadata must be a list of objects"); + + json_array_foreach(json_ngsi_metadata, j, json_metadata) { + auto *md = new struct ngsi_metadata; + if (!md) + return -1; + + memset(md, 0, sizeof(struct ngsi_metadata)); + + ret = json_unpack_ex(json_metadata, &err, 0, "{ s: s, s: s, s: s }", + "name", &md->name, + "type", &md->type, + "value", &md->value + ); + if (ret) + return ret; + + vlist_push(&a->metadata, md); + } } - /* Metadata: source(string)=name */ - struct ngsi_metadata s = { - .name = strdup("source"), - .type = strdup("string"), - .value = name - }; - /* Metadata: index(integer)=j */ - struct ngsi_metadata i = { - .name = strdup("index"), - .type = strdup("integer") - }; - assert(asprintf(&i.value, "%zu", j)); + auto *md_idx = new struct ngsi_metadata; + if (!md_idx) + return -1; - vlist_push(&a->metadata, memdup(&s, sizeof(s))); - vlist_push(&a->metadata, memdup(&i, sizeof(i))); + md_idx->name = strdup("index"); + md_idx->type = strdup("integer"); + asprintf(&md_idx->value, "%zu", j); - vlist_push(mapping, a); + assert(md_idx->name && md_idx->type && md_idx->value); + + vlist_push(&a->metadata, md_idx); + + vlist_push(ngsi_signals, a); } return 0; } -static int ngsi_parse_context_response(json_t *response, int *code, char **reason, json_t **rentity) { +static int ngsi_parse_context_response(json_t *json_response, int *code, char **reason, json_t **json_rentity) { int ret; char *codestr; json_error_t err; - ret = json_unpack_ex(response, &err, 0, "{ s: [ { s: O, s: { s: s, s: s } } ] }", + ret = json_unpack_ex(json_response, &err, 0, "{ s: [ { s: O, s: { s: s, s: s } } ] }", "contextResponses", - "contextElement", rentity, + "contextElement", json_rentity, "statusCode", "code", &codestr, "reasonPhrase", reason @@ -312,10 +352,10 @@ static size_t ngsi_request_writer(void *contents, size_t size, size_t nmemb, voi return realsize; } -static int ngsi_request(CURL *handle, const char *endpoint, const char *operation, json_t *request, json_t **response) +static int ngsi_request(CURL *handle, const char *endpoint, const char *operation, json_t *json_request, json_t **json_response) { struct ngsi_response chunk = { 0 }; - char *post = json_dumps(request, JSON_INDENT(4)); + char *post = json_dumps(json_request, JSON_INDENT(4)); int old; double time; char url[128]; @@ -346,8 +386,8 @@ static int ngsi_request(CURL *handle, const char *endpoint, const char *operatio debug(LOG_NGSI | 16, "Request to context broker completed in %.4f seconds", time); debug(LOG_NGSI | 17, "Response from context broker:\n%s", chunk.data); - *response = json_loads(chunk.data, 0, &err); - if (!*response) + *json_response = json_loads(chunk.data, 0, &err); + if (!*json_response) warning("Received invalid JSON: %s in %s:%u:%u\n%s", err.text, err.source, err.line, err.column, chunk.data); out: free(post); @@ -356,51 +396,51 @@ out: free(post); return ret; } -static int ngsi_request_context_query(CURL *handle, const char *endpoint, json_t *entity, json_t **rentity) +static int ngsi_request_context_query(CURL *handle, const char *endpoint, json_t *json_entity, json_t **json_rentity) { int ret, code; char *reason; - json_t *response; - json_t *request = json_pack("{ s: [ o ] }", "entities", entity); + json_t *json_response; + json_t *json_request = json_pack("{ s: [ o ] }", "entities", json_entity); - ret = ngsi_request(handle, endpoint, "queryContext", request, &response); + ret = ngsi_request(handle, endpoint, "queryContext", json_request, &json_response); if (ret) goto out; - ret = ngsi_parse_context_response(response, &code, &reason, rentity); + ret = ngsi_parse_context_response(json_response, &code, &reason, json_rentity); if (ret) goto out2; -out2: json_decref(response); -out: json_decref(request); +out2: json_decref(json_response); +out: json_decref(json_request); return ret; } -static int ngsi_request_context_update(CURL *handle, const char *endpoint, const char *action, json_t *entity) +static int ngsi_request_context_update(CURL *handle, const char *endpoint, const char *action, json_t *json_entity) { int ret, code; char *reason; - json_t *response; - json_t *request = json_pack("{ s: s, s: [ o ] }", + json_t *json_response; + json_t *json_request = json_pack("{ s: s, s: [ o ] }", "updateAction", action, - "contextElements", entity + "contextElements", json_entity ); - ret = ngsi_request(handle, endpoint, "updateContext", request, &response); + ret = ngsi_request(handle, endpoint, "updateContext", json_request, &json_response); if (ret) goto out; - json_t *rentity; - ret = ngsi_parse_context_response(response, &code, &reason, &rentity); + json_t *json_rentity; + ret = ngsi_parse_context_response(json_response, &code, &reason, &json_rentity); if (ret) goto out2; - json_decref(rentity); -out2: json_decref(response); -out: json_decref(request); + json_decref(json_rentity); +out2: json_decref(json_response); +out: json_decref(json_request); return ret; } @@ -425,15 +465,10 @@ int ngsi_parse(struct node *n, json_t *cfg) int ret; json_error_t err; - json_t *json_mapping; + json_t *json_signals_in = nullptr; + json_t *json_signals_out = nullptr; - /* Default values */ - i->access_token = nullptr; /* disabled by default */ - i->ssl_verify = 1; /* verify by default */ - i->timeout = 1; /* default value */ - i->rate = 5; /* default value */ - - ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s: s, s: s, s: s, s?: b, s?: F, s?: F, s: o }", + ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s: s, s: s, s: s, s?: b, s?: F, s?: F, s?: { s?: o }, s?: { s?: o } }", "access_token", &i->access_token, "endpoint", &i->endpoint, "entity_id", &i->entity_id, @@ -441,14 +476,25 @@ int ngsi_parse(struct node *n, json_t *cfg) "ssl_verify", &i->ssl_verify, "timeout", &i->timeout, "rate", &i->rate, - "mapping", &json_mapping + "in", + "signals", &json_signals_in, + "out", + "signals", &json_signals_out ); if (ret) jerror(&err, "Failed to parse configuration of node %s", node_name(n)); - ret = ngsi_parse_mapping(&i->mapping, json_mapping); - if (ret) - error("Invalid setting 'mapping' of node %s", node_name(n)); + if (json_signals_in) { + ret = ngsi_parse_signals(json_signals_in, &i->in.signals, &n->in.signals); + if (ret) + error("Invalid setting 'in.signals' of node %s", node_name(n)); + } + + if (json_signals_out) { + ret = ngsi_parse_signals(json_signals_out, &i->out.signals, &n->out.signals); + if (ret) + error("Invalid setting 'out.signals' of node %s", node_name(n)); + } return 0; } @@ -457,8 +503,8 @@ char * ngsi_print(struct node *n) { struct ngsi *i = (struct ngsi *) n->_vd; - return strf("endpoint=%s, timeout=%.3f secs, #mappings=%zu", - i->endpoint, i->timeout, vlist_length(&i->mapping)); + return strf("endpoint=%s, timeout=%.3f secs", + i->endpoint, i->timeout); } static int ngsi_metadata_destroy(struct ngsi_metadata *meta) @@ -509,13 +555,13 @@ int ngsi_start(struct node *n) curl_easy_setopt(i->curl, CURLOPT_USERAGENT, USER_AGENT); /* Create entity and atributes */ - json_t *entity = ngsi_build_entity(n, nullptr, 0, NGSI_ENTITY_METADATA); + json_t *json_entity = ngsi_build_entity(n, nullptr, 0, NGSI_ENTITY_METADATA); - ret = ngsi_request_context_update(i->curl, i->endpoint, "APPEND", entity); + ret = ngsi_request_context_update(i->curl, i->endpoint, "APPEND", json_entity); if (ret) error("Failed to create NGSI context for node %s", node_name(n)); - json_decref(entity); + json_decref(json_entity); return ret; } @@ -528,11 +574,11 @@ int ngsi_stop(struct node *n) i->task.stop(); /* Delete complete entity (not just attributes) */ - json_t *entity = ngsi_build_entity(n, nullptr, 0, 0); + json_t *json_entity = ngsi_build_entity(n, nullptr, 0, 0); - ret = ngsi_request_context_update(i->curl, i->endpoint, "DELETE", entity); + ret = ngsi_request_context_update(i->curl, i->endpoint, "DELETE", json_entity); - json_decref(entity); + json_decref(json_entity); curl_easy_cleanup(i->curl); curl_slist_free_all(i->headers); @@ -548,19 +594,19 @@ int ngsi_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rel if (i->task.wait() == 0) perror("Failed to wait for task"); - json_t *rentity; - json_t *entity = ngsi_build_entity(n, nullptr, 0, 0); + json_t *json_rentity; + json_t *json_entity = ngsi_build_entity(n, nullptr, 0, 0); - ret = ngsi_request_context_query(i->curl, i->endpoint, entity, &rentity); + ret = ngsi_request_context_query(i->curl, i->endpoint, json_entity, &json_rentity); if (ret) goto out; - ret = ngsi_parse_entity(n, entity, smps, cnt); + ret = ngsi_parse_entity(n, json_rentity, smps, cnt); if (ret) goto out2; -out2: json_decref(rentity); -out: json_decref(entity); +out2: json_decref(json_rentity); +out: json_decref(json_entity); return ret; } @@ -570,11 +616,11 @@ int ngsi_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re struct ngsi *i = (struct ngsi *) n->_vd; int ret; - json_t *entity = ngsi_build_entity(n, smps, cnt, NGSI_ENTITY_VALUES); + json_t *json_entity = ngsi_build_entity(n, smps, cnt, NGSI_ENTITY_VALUES); - ret = ngsi_request_context_update(i->curl, i->endpoint, "UPDATE", entity); + ret = ngsi_request_context_update(i->curl, i->endpoint, "UPDATE", json_entity); - json_decref(entity); + json_decref(json_entity); return ret ? 0 : cnt; } @@ -590,18 +636,40 @@ int ngsi_poll_fds(struct node *n, int fds[]) int ngsi_init(struct node *n) { + int ret; struct ngsi *i = (struct ngsi *) n->_vd; new (&i->task) Task(CLOCK_REALTIME); + ret = vlist_init(&i->in.signals); + if (ret) + return ret; + + ret = vlist_init(&i->out.signals); + if (ret) + return ret; + + /* Default values */ + i->access_token = nullptr; /* disabled by default */ + i->ssl_verify = 1; /* verify by default */ + i->timeout = 1; /* default value */ + i->rate = 1; /* default value */ + return 0; } int ngsi_destroy(struct node *n) { + int ret; struct ngsi *i = (struct ngsi *) n->_vd; - vlist_destroy(&i->mapping, (dtor_cb_t) ngsi_attribute_destroy, true); + ret = vlist_destroy(&i->in.signals, (dtor_cb_t) ngsi_attribute_destroy, true); + if (ret) + return ret; + + ret = vlist_destroy(&i->out.signals, (dtor_cb_t) ngsi_attribute_destroy, true); + if (ret) + return ret; i->task.~Task();