diff --git a/documentation/clients/NGSI.md b/documentation/clients/NGSI.md index a08d2f512..6e1ac9ddb 100644 --- a/documentation/clients/NGSI.md +++ b/documentation/clients/NGSI.md @@ -8,28 +8,44 @@ It's using `libcurl` and `libjansson` to communicate with the context broker ove ## Configuration +You can use the `combine` setting to send multiple samples in a vector. + Every `ngsi` node supports the following special settings: #### `endpoint` *(string: URL)* +#### `entity_id` *(string)* + +#### `entity_type` *(string)* + #### `ssl_verify` *(boolean)* #### `timeout` *(float: seconds)* -#### `structure` *("flat" | "children")* - - - `flat`: - - `children`: - #### `mapping` *(array of strings)* -Format for `structure = flat`: `"entityId(entityType).attributeName(attributeType)"` - -Format for `structure = children`: `"parentId(entityType).value(attributeType)"` +Format `AttributeName(AttributeType)` ### Example -@todo add example from example.conf + ngsi_node = { + type = "ngsi", + + ### The following settings are specific to the ngsi node-type!! ### + + endpoint = "http://46.101.131.212:1026",# The HTTP REST API endpoint of the FIRWARE context broker + + entity_id = "S3_ElectricalGrid", + entity_type = "ElectricalGridMonitoring", + + timeout = 5, # Timeout of HTTP request in seconds (default is 1) + verify_ssl = false, # Verification of SSL server certificates (default is true) + + mapping = [ # Format: "AttributeName(AttributeType)" + PTotalLosses(MW)", + "QTotalLosses(Mvar)" + ] + } ## Further reading diff --git a/server/etc/example.conf b/server/etc/example.conf index e178c772f..8cd0e679e 100644 --- a/server/etc/example.conf +++ b/server/etc/example.conf @@ -118,20 +118,17 @@ nodes = { ### The following settings are specific to the ngsi node-type!! ### endpoint = "http://46.101.131.212:1026",# The HTTP REST API endpoint of the FIRWARE context broker + + entity_id = "S3_ElectricalGrid", + entity_type = "ElectricalGridMonitoring", + timeout = 5, # Timeout of HTTP request in seconds (default is 1) - structure = "children", # Structure of published context (default is "flat") verify_ssl = false, # Verification of SSL server certificates (default is true) - mapping = [ # Example mapping for structure = flat - "rtds_sub1(V2).v2(float)", # Format: "entityId(entityType).AttributeName(AttributeType)" - "rtds_sub2(power).p1(float)" + mapping = [ # Format: "AttributeName(AttributeType)" + PTotalLosses(MW)", + "QTotalLosses(Mvar)" ] - # mapping = [ # Alternative mapping for structure = children - # "fa846ed3-5871-11e5-b0cd-ecf4bb16fe0c(GridSectionDataValue).value(float)", # Index #0 - # "1d2c63e4-6130-11e5-9b0d-001c42f23160(GridSectionDataValue).value(float)" # Index #1 - # .... # Index #n - # (every line correspondents to one value) - #] } }; diff --git a/server/include/ngsi.h b/server/include/ngsi.h index 85ff870d5..4350f698d 100644 --- a/server/include/ngsi.h +++ b/server/include/ngsi.h @@ -34,18 +34,13 @@ struct node; struct ngsi { const char *endpoint; /**< The NGSI context broker endpoint URL. */ - const char *token; /**< An optional authentication token which will be sent as HTTP header. */ + const char *entity_id; /**< The context broker entity id related to this node */ + const char *entity_type; /**< The type of the entity */ + const char *access_token; /**< An optional authentication token which will be sent as HTTP header. */ double timeout; /**< HTTP timeout in seconds */ - int ssl_verify; /**< Boolean flag whether SSL server certificates should be verified or not. */ - - - enum ngsi_structure { - NGSI_FLAT, - NGSI_CHILDREN - } structure; /**< Structure of published entitites */ - + int ssl_verify; /**< Boolean flag whether SSL server certificates should be verified or not. */ struct curl_slist *headers; /**< List of HTTP request headers for libcurl */ diff --git a/server/src/ngsi.c b/server/src/ngsi.c index b12e7a7fd..4dd7b9e23 100644 --- a/server/src/ngsi.c +++ b/server/src/ngsi.c @@ -107,7 +107,7 @@ static int ngsi_request(CURL *handle, const char *endpoint, const char *operatio curl_easy_getinfo(handle, CURLINFO_TOTAL_TIME, &time); curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &code); - debug(20, "Request to context broker completed in %.4f seconds", time); + debug(10, "Request to context broker completed in %.4f seconds", time); debug(20, "Response from context broker (code=%ld):\n%s", code, chunk.data); json_error_t err; @@ -129,53 +129,44 @@ static int ngsi_request(CURL *handle, const char *endpoint, const char *operatio void ngsi_prepare_context(struct node *n, config_setting_t *mapping) { struct ngsi *i = n->ngsi; - - i->context = json_object(); - + list_init(&i->mapping, NULL); + i->context = json_object(); + json_t *elements = json_array(); + json_object_set_new(i->context, "contextElements", elements); + + json_t *entity = json_pack("{ s: s, s: s, s: b }", + "id", i->entity_id, + "type", i->entity_type, + "isPattern", 0 + ); + json_array_append_new(elements, entity); + + json_t *attributes = json_array(); + json_object_set_new(entity, "attributes", attributes); for (int j = 0; j < config_setting_length(mapping); j++) { - /* Get token */ - config_setting_t *ctoken = config_setting_get_elem(mapping, j); - const char *stoken = config_setting_get_string(ctoken); - if (!stoken) - cerror(mapping, "Invalid token"); + const char *token = config_setting_get_string_elem(mapping, j); + if (!token) + cerror(mapping, "Invalid mapping token"); /* Parse token */ - char eid[64], etype[64], aname[64], atype[64]; - if (sscanf(stoken, "%63[^().](%63[^().]).%63[^().](%63[^().])", eid, etype, aname, atype) != 4) - cerror(ctoken, "Invalid token: '%s'", stoken); - - /* Create entity */ - json_t *attributes; /* borrowed reference */ - - json_t *entity = json_lookup(elements, "id", eid); - if (!entity) { - entity = json_pack("{ s: s, s: s, s: b }", - "id", eid, - "type", etype, - "isPattern", 0 - ); - json_array_append_new(elements, entity); - - attributes = json_array(); - json_object_set_new(entity, "attributes", attributes); - } - else { - if (i->structure == NGSI_CHILDREN) - cerror(ctoken, "Duplicate mapping for index %u", j); - - attributes = json_object_get(entity, "attributes"); - } - - /* Create attribute for entity */ - if (json_lookup(attributes, "name", aname)) - cerror(ctoken, "Duplicated attribute '%s' in NGSI mapping of node '%s'", aname, n->name); + char aname[64], atype[64]; + if (sscanf(token, "%[^(](%[^)])", aname, atype) != 2) + cerror(mapping, "Invalid mapping token: '%s'", token); + json_t *attribute = json_pack("{ s: s, s: s, s: [ ] }", + "name", aname, + "type", atype, + "value" + ); + json_array_append_new(attributes, attribute); + /* Create Metadata for attribute */ json_t *metadatas = json_array(); + json_object_set_new(attribute, "metadatas", metadatas); json_array_append_new(metadatas, json_pack("{ s: s, s: s, s: s+ }", "name", "source", @@ -191,41 +182,10 @@ void ngsi_prepare_context(struct node *n, config_setting_t *mapping) "name", "timestamp", "type", "date", "value", "" - )); - - if (i->structure == NGSI_CHILDREN) { - json_array_append_new(attributes, json_pack("{ s: s, s: s, s: s }", - "name", "parentId", - "type", "uuid", - "value", eid - )); - - json_array_append_new(attributes, json_pack("{ s: s, s: s, s: s }", - "name", "source", - "type", "string", - "value", "measurement" - )); - - json_array_append_new(attributes, json_pack("{ s: s, s: s, s: o }", - "name", "timestamp", - "type", "date", - "value", json_date(NULL) - )); - } - - json_t *attribute = json_pack("{ s: s, s: s, s: [ ] }", - "name", aname, - "type", atype, - "value" - ); + )); list_push(&i->mapping, attribute); - - json_object_set_new(attribute, "metadatas", metadatas); - json_array_append_new(attributes, attribute); } - - json_object_set_new(i->context, "contextElements", elements); } int ngsi_init(int argc, char *argv[], struct settings *set) @@ -244,11 +204,17 @@ int ngsi_parse(config_setting_t *cfg, struct node *n) { struct ngsi *i = alloc(sizeof(struct ngsi)); - if (!config_setting_lookup_string(cfg, "token", &i->token)) - i->token = NULL; /* disabled by default */ + if (!config_setting_lookup_string(cfg, "access_token", &i->access_token)) + i->access_token = NULL; /* disabled by default */ if (!config_setting_lookup_string(cfg, "endpoint", &i->endpoint)) cerror(cfg, "Missing NGSI endpoint for node '%s'", n->name); + + if (!config_setting_lookup_string(cfg, "entity_id", &i->entity_id)) + cerror(cfg, "Missing NGSI entity ID for node '%s'", n->name); + + if (!config_setting_lookup_string(cfg, "entity_type", &i->entity_type)) + cerror(cfg, "Missing NGSI entity type for node '%s'", n->name); if (!config_setting_lookup_bool(cfg, "ssl_verify", &i->ssl_verify)) i->ssl_verify = 1; /* verify by default */ @@ -256,18 +222,6 @@ int ngsi_parse(config_setting_t *cfg, struct node *n) if (!config_setting_lookup_float(cfg, "timeout", &i->timeout)) i->timeout = 1; /* default value */ - const char *structure; - if (!config_setting_lookup_string(cfg, "structure", &structure)) - i->structure = NGSI_FLAT; - else { - if (!strcmp(structure, "flat")) - i->structure = NGSI_FLAT; - else if (!strcmp(structure, "children")) - i->structure = NGSI_CHILDREN; - else - cerror(cfg, "Invalid NGSI entity structure '%s' for node '%s'", structure, n->name); - } - n->ngsi = i; config_setting_t *mapping = config_setting_get_member(cfg, "mapping"); @@ -296,8 +250,8 @@ int ngsi_open(struct node *n) i->curl = curl_easy_init(); i->headers = NULL; - if (i->token) { - snprintf(buf, sizeof(buf), "Auth-Token: %s", i->token); + if (i->access_token) { + snprintf(buf, sizeof(buf), "Auth-Token: %s", i->access_token); i->headers = curl_slist_append(i->headers, buf); } @@ -330,44 +284,56 @@ int ngsi_close(struct node *n) } int ngsi_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt) -{ +{ +/* struct ngsi *i = n->ngsi; + struct msg *m = &pool[first % poolsize]; + int ret; + + json_t *response; + json_t *entities = json_array(); + json_t *query = json_pack("{ s: o }", "entities", entities); + + ret = ngsi_request(i->curl, i->endpoint, "queryContext", NULL, NULL); + if (ret < 0) { + warn("Failed to query data from context broker"); + return 0; + } +*/ return -1; /** @todo not yet implemented */ } int ngsi_write(struct node *n, struct msg *pool, int poolsize, int first, int cnt) { struct ngsi *i = n->ngsi; - struct msg *m = &pool[first % poolsize]; - - if (cnt > 1) - error("NGSI nodes only can send a single message at once"); + + /* First message */ + struct msg *fm = &pool[first % poolsize]; /* Update context */ - for (int j = 0; j < MIN(i->mapping.length, m->length); j++) { + for (int j = 0; j < MIN(i->mapping.length, fm->length); j++) { json_t *attribute = list_at(&i->mapping, j); json_t *values = json_object_get(attribute, "value"); json_t *metadatas = json_object_get(attribute, "metadatas"); /* Update timestamp */ json_t *metadata_ts = json_lookup(metadatas, "name", "timestamp"); - json_object_set(metadata_ts, "value", json_date(&MSG_TS(m))); + json_object_set(metadata_ts, "value", json_date(&MSG_TS(fm))); /* Update value */ json_array_clear(values); - json_array_append_new(values, json_real(m->data[j].f)); + for (int k = 0; k < cnt; k++) { + struct msg *m = &pool[(first + k) % poolsize]; + + double tsms = (double) m->ts.sec * 1e3 + m->ts.nsec / 1e6; + + json_array_append_new(values, json_pack("[ o, o ]", + json_real(tsms), + json_real(m->data[j].f) + )); + } } - - /* Update UUIDs for children structure */ - if (i->structure == NGSI_CHILDREN) { - json_t *entity, *elements = json_object_get(i->context, "contextElements"); - size_t ind; - json_array_foreach(elements, ind, entity) - json_object_set_new(entity, "id", json_uuid()); - - json_object_set_new(i->context, "updateAction", json_string("APPEND")); - } - else - json_object_set_new(i->context, "updateAction", json_string("UPDATE")); + + json_object_set_new(i->context, "updateAction", json_string("UPDATE")); // @todo REPLACE? json_t *response; int code = ngsi_request(i->curl, i->endpoint, "updateContext", i->context, &response);