diff --git a/server/include/ngsi.h b/server/include/ngsi.h index 44304b65a..2603502a0 100644 --- a/server/include/ngsi.h +++ b/server/include/ngsi.h @@ -32,11 +32,12 @@ struct node; -struct ngsi_mapping { +struct ngsi_attribute { char *name; char *type; int index; + struct list metadata; }; struct ngsi { diff --git a/server/src/ngsi.c b/server/src/ngsi.c index 0680af4a7..015959753 100644 --- a/server/src/ngsi.c +++ b/server/src/ngsi.c @@ -67,58 +67,77 @@ static json_t * json_lookup(json_t *array, char *key, char *needle) } #endif -static json_t* json_entity(struct ngsi *i, struct msg *pool, int poolsize, int first, int cnt) +enum ngsi_flags { + NGSI_ENTITY_ATTRIBUTES = (1 << 0), + NGSI_ENTITY_VALUES = (1 << 1) | NGSI_ENTITY_ATTRIBUTES, + NGSI_ENTITY_METADATA = (1 << 2) | NGSI_ENTITY_ATTRIBUTES, +}; + +struct ngsi_metadata { + char *name; + char *type; + char *value; +}; + +struct ngsi_response { + char *data; + size_t len; +}; + +static json_t* ngsi_build_entity(struct ngsi *i, struct msg *pool, int poolsize, int first, int cnt, int flags) { - json_t *attributes = json_array(); - list_foreach(struct ngsi_mapping *map, &i->mapping) { - /* Build value vector */ - json_t *values; - if (cnt) { - values = json_array(); - for (int k = 0; k < cnt; k++) { - struct msg *m = &pool[(first + k) % poolsize]; - - json_array_append_new(values, json_pack("[ f, f, i ]", - time_to_double(&MSG_TS(m)), - m->data[map->index].f, - m->sequence - )); - } - } - else - values = json_string(""); - - /* Create Metadata for attribute */ - json_t *metadatas = json_array(); - json_array_append_new(metadatas, json_pack("{ s: s, s: s, s: s+ }", - "name", "source", - "type", "string", - "value", "s2ss:", settings.name - )); - json_array_append_new(metadatas, json_pack("{ s: s, s: s, s: i }", - "name", "index", - "type", "integer", - "value", map->index - )); - - json_t *attribute = json_pack("{ s: s, s: s, s: o, s: o }", - "name", map->name, - "type", map->type, - "value", values, - "metadatas", metadatas - ); - json_array_append_new(attributes, attribute); - } - - return json_pack("{ s: s, s: s, s: b, s: o }", + json_t *entity = json_pack("{ s: s, s: s, s: b }", "id", i->entity_id, "type", i->entity_type, - "isPattern", 0, - "attributes", attributes + "isPattern", 0 ); + + if (flags & NGSI_ENTITY_ATTRIBUTES) { + json_t *attributes = json_array(); + list_foreach(struct ngsi_attribute *map, &i->mapping) { + json_t *attribute = json_pack("{ s: s, s: s }", + "name", map->name, + "type", map->type + ); + + if (flags & NGSI_ENTITY_VALUES) { /* Build value vector */ + json_t *values = json_array(); + for (int k = 0; k < cnt; k++) { + struct msg *m = &pool[(first + k) % poolsize]; + + json_array_append_new(values, json_pack("[ f, f, i ]", + time_to_double(&MSG_TS(m)), + m->data[map->index].f, + m->sequence + )); + } + + json_object_set(attribute, "value", values); + } + + if (flags & NGSI_ENTITY_METADATA) { /* Create Metadata for attribute */ + json_t *metadatas = json_array(); + list_foreach(struct ngsi_metadata *meta, &map->metadata) { + json_array_append_new(metadatas, json_pack("{ s: s, s: s, s: s }", + "name", meta->name, + "type", meta->type, + "value", meta->value + )); + } + + json_object_set(attribute, "metadatas", metadatas); + } + + json_array_append_new(attributes, attribute); + } + + json_object_set(entity, "attributes", attributes); + } + + return entity; } -static int json_entity_parse(json_t *entity, struct ngsi *i, struct msg *pool, int poolsize, int first, int cnt) +static int ngsi_parse_entity(json_t *entity, struct ngsi *i, struct msg *pool, int poolsize, int first, int cnt) { int ret; const char *id, *name, *type; @@ -146,11 +165,11 @@ static int json_entity_parse(json_t *entity, struct ngsi *i, struct msg *pool, i } json_array_foreach(attributes, index, attribute) { - struct ngsi_mapping *map; + struct ngsi_attribute *map; json_t *metadata, *values, *tuple; /* Parse JSON */ - ret = json_unpack(attribute, "{ s: s, s: s, s: o, s: o }", + ret = json_unpack(attribute, "{ s: s, s: s, s: o, s?: o }", "name", &name, "type", &type, "value", &values, @@ -204,10 +223,84 @@ static int json_entity_parse(json_t *entity, struct ngsi *i, struct msg *pool, i return cnt; } -struct ngsi_response { - char *data; - size_t len; -}; +static int ngsi_parse_mapping(config_setting_t *cfg, struct list *mapping) +{ + if (!config_setting_is_array(cfg)) + return -1; + + list_init(mapping, NULL); + + for (int j = 0; j < config_setting_length(cfg); j++) { INDENT + const char *token = config_setting_get_string_elem(cfg, j); + if (!token) + return -2; + + struct ngsi_attribute map = { .index = j }; + + /* Parse Attribute: AttributeName(AttributeType) */ + int bytes; + if (sscanf(token, "%m[^(](%m[^)])%n", &map.name, &map.type, &bytes) != 2) + cerror(cfg, "Invalid mapping token: '%s'", token); + + token += bytes; + debug(13, "Attribute: %u: %s(%s)", map.index, map.name, map.type); + + /* MetadataName(MetadataType)=MetadataValue */ + list_init(&map.metadata, NULL); + struct ngsi_metadata meta; + while (sscanf(token, " %m[^(](%m[^)])=%ms%n", &meta.name, &meta.type, &meta.value, &bytes) == 3) { INDENT + list_push(&map.metadata, memdup(&meta, sizeof(struct ngsi_metadata))); + + token += bytes; + debug(13, "Metadata: %s(%s)=%s", meta.name, meta.type, meta.value); + } + + /* Static metadata */ + struct ngsi_metadata source = { + .name = "source", + .type = "string", + .value = (char *) settings.name, + }; + + struct ngsi_metadata index = { + .name = "index", + .type = "integer", + .value = alloc(8) + }; + snprintf(index.value, 8, "%u", j); + + list_push(&map.metadata, memdup(&index, sizeof(struct ngsi_metadata))); + list_push(&map.metadata, memdup(&source, sizeof(struct ngsi_metadata))); + + list_push(mapping, memdup(&map, sizeof(struct ngsi_attribute))); + } + + return 0; +} + +static int ngsi_parse_context_response(json_t *response, int *code, char **reason, json_t **rentity) { + int ret; + char *codestr; + + ret = json_unpack(response, "{ s: [ { s: o, s: { s: s, s: s } } ] }", + "contextResponses", + "contextElement", rentity, + "statusCode", + "code", &codestr, + "reasonPhrase", reason + ); + if (ret) { + warn("Failed to find NGSI response code"); + return ret; + } + + *code = atoi(codestr); + + if (*code != 200) + warn("NGSI response: %s %s", codestr, *reason); + + return 0; +} static size_t ngsi_request_writer(void *contents, size_t size, size_t nmemb, void *userp) { @@ -229,8 +322,11 @@ static int ngsi_request(CURL *handle, const char *endpoint, const char *operatio { struct ngsi_response chunk = { 0 }; char *post = json_dumps(request, JSON_INDENT(4)); - + int old; + double time; char url[128]; + json_error_t err; + snprintf(url, sizeof(url), "%s/v1/%s", endpoint, operation); curl_easy_setopt(handle, CURLOPT_URL, url); @@ -241,36 +337,80 @@ static int ngsi_request(CURL *handle, const char *endpoint, const char *operatio debug(18, "Request to context broker: %s\n%s", url, post); - int old; /* We don't want to leave the CUrl handle in an invalid state */ + /* We don't want to leave the handle in an invalid state */ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old); CURLcode ret = curl_easy_perform(handle); pthread_setcancelstate(old, NULL); - if (ret) - error("HTTP request failed: %s", curl_easy_strerror(ret)); + if (ret) { + warn("HTTP request failed: %s", curl_easy_strerror(ret)); + goto out; + } - double time; curl_easy_getinfo(handle, CURLINFO_TOTAL_TIME, &time); debug(16, "Request to context broker completed in %.4f seconds", time); debug(17, "Response from context broker:\n%s", chunk.data); - json_error_t err; - json_t *resp = json_loads(chunk.data, 0, &err); - if (!resp) - error("Received invalid JSON: %s in %s:%u:%u\n%s", err.text, err.source, err.line, err.column, chunk.data); + *response = json_loads(chunk.data, 0, &err); + if (!*response) + warn("Received invalid JSON: %s in %s:%u:%u\n%s", err.text, err.source, err.line, err.column, chunk.data); - if (response) - *response = resp; - else - json_decref(resp); - - free(post); +out: free(post); free(chunk.data); return 0; } +static int ngsi_request_context_query(CURL *handle, const char *endpoint, json_t *entity, json_t **rentity) +{ + int ret, code; + char *reason; + + json_t *response; + json_t *request = json_pack("{ s: [ o ] }", "entities", entity); + + ret = ngsi_request(handle, endpoint, "queryContext", request, &response); + if (ret < 0) + goto out; + + ret = ngsi_parse_context_response(response, &code, &reason, rentity); + if (ret) + goto out2; + +out2: json_decref(response); +out: json_decref(request); + + return ret; +} + +static int ngsi_request_context_update(CURL *handle, const char *endpoint, const char *action, json_t *entity) +{ + int ret, code; + char *reason; + + json_t *response; + json_t *request = json_pack("{ s: s, s: [ o ] }", + "updateAction", action, + "contextElements", entity + ); + + ret = ngsi_request(handle, endpoint, "updateContext", request, &response); + if (ret) + goto out; + + json_t *rentity; + ret = ngsi_parse_context_response(response, &code, &reason, &rentity); + if (ret) + goto out2; + + json_decref(rentity); +out2: json_decref(response); +out: json_decref(request); + + return ret; +} + int ngsi_init(int argc, char *argv[], struct settings *set) { return curl_global_init(CURL_GLOBAL_ALL); @@ -308,27 +448,13 @@ int ngsi_parse(config_setting_t *cfg, struct node *n) if (!config_setting_lookup_float(cfg, "rate", &i->rate)) i->rate = 5; /* default value */ - config_setting_t *mapping = config_setting_get_member(cfg, "mapping"); - if (!mapping || !config_setting_is_array(mapping)) + config_setting_t *cfg_mapping = config_setting_get_member(cfg, "mapping"); + if (!cfg_mapping) cerror(cfg, "Missing mapping for node '%s", n->name); - list_init(&i->mapping, NULL); - for (int j = 0; j < config_setting_length(mapping); j++) { - const char *token = config_setting_get_string_elem(mapping, j); - if (!token) - cerror(mapping, "Invalid token in mapping for NGSI node '%s'", n->name); - - struct ngsi_mapping map = { - .index = j - }; - - /* Parse token */ - if (sscanf(token, "%m[^(](%m[^)])", &map.name, &map.type) != 2) - cerror(mapping, "Invalid mapping token: '%s'", token); - - list_push(&i->mapping, memdup(&map, sizeof(struct ngsi_mapping))); - } - + if (ngsi_parse_mapping(cfg_mapping, &i->mapping)) + cerror(cfg_mapping, "Invalid mapping for NGSI node '%s'", n->name); + n->ngsi = i; return 0; @@ -346,6 +472,7 @@ char * ngsi_print(struct node *n) int ngsi_open(struct node *n) { struct ngsi *i = n->ngsi; + int ret; i->curl = curl_easy_init(); i->headers = NULL; @@ -370,7 +497,7 @@ int ngsi_open(struct node *n) if (i->timeout > 1 / i->rate) warn("Timeout is to large for given rate: %f", i->rate); - int ret = timerfd_settime(i->tfd, 0, &its, NULL); + ret = timerfd_settime(i->tfd, 0, &its, NULL); if (ret) serror("Failed to start timer"); @@ -383,12 +510,15 @@ int ngsi_open(struct node *n) curl_easy_setopt(i->curl, CURLOPT_HTTPHEADER, i->headers); /* Create entity and atributes */ - json_t *request = json_pack("{ s: s, s: [ o ] }", - "updateAction", "APPEND", - "contextElements", json_entity(i, NULL, 0, 0, 0) - ); + json_t *entity = ngsi_build_entity(i, NULL, 0, 0, 0, NGSI_ENTITY_METADATA); - return ngsi_request(i->curl, i->endpoint, "updateContext", request, NULL); + ret = ngsi_request_context_update(i->curl, i->endpoint, "APPEND", entity); + if (ret) + error("Failed to create NGSI context for node '%s'", n->name); + + json_decref(entity); + + return ret; } int ngsi_close(struct node *n) @@ -397,16 +527,11 @@ int ngsi_close(struct node *n) int ret; /* Delete complete entity (not just attributes) */ - json_t *request = json_pack("{ s: s, s: [ { s: s, s: s, s: b } ] }", - "updateAction", "DELETE", - "contextElements", - "type", i->entity_type, - "id", i->entity_id, - "isPattern", 0 - ); + json_t *entity = ngsi_build_entity(i, NULL, 0, 0, 0, 0); - ret = ngsi_request(i->curl, i->endpoint, "updateContext", request, NULL); - json_decref(request); + ret = ngsi_request_context_update(i->curl, i->endpoint, "DELETE", entity); + + json_decref(entity); curl_easy_cleanup(i->curl); curl_slist_free_all(i->headers); @@ -418,62 +543,38 @@ int ngsi_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt { struct ngsi *i = n->ngsi; int ret; - const char *code; timerfd_wait(i->tfd); - - json_t *entity; - json_t *response; - json_t *request = json_pack("{ s: [ { s: s, s: s, s: b } ] }", - "entities", - "id", i->entity_id, - "type", i->entity_type, - "isPattern", 0 - ); - - /* Send query to broker */ - ret = ngsi_request(i->curl, i->endpoint, "queryContext", request, &response); - if (ret < 0) { - warn("Failed to query data from NGSI node '%s'", n->name); - return 0; - } - /* Parse response */ - ret = json_unpack(response, "{ s: [ { s: o, s: { s: s } } ] }", - "contextResponses", - "contextElement", &entity, - "statusCode", - "code", &code - ); - if (ret || strcmp(code, "200")) { - warn("Failed to parse response from NGSI node '%s'", n->name); - return 0; - } + json_t *rentity; + json_t *entity = ngsi_build_entity(i, NULL, 0, 0, 0, 0); - ret = json_entity_parse(entity, i, pool, poolsize, first, cnt); - if (ret < 0) { - warn("Failed to parse entity from context broker response: reason=%d", ret); - return 0; - } + ret = ngsi_request_context_query(i->curl, i->endpoint, entity, &rentity); + if (ret) + goto out; + + ret = ngsi_parse_entity(rentity, i, pool, poolsize, first, cnt); + if (ret) + goto out2; + +out2: json_decref(entity); +out: json_decref(rentity); - return ret; + return ret ? 0 : cnt; } int ngsi_write(struct node *n, struct msg *pool, int poolsize, int first, int cnt) { struct ngsi *i = n->ngsi; + int ret; - json_t *response; - json_t *request = json_pack("{ s: s, s : [ o ] }", - "updateAction", "UPDATE", - "contextElements", json_entity(i, pool, poolsize, first, cnt) - ); + json_t *entity = ngsi_build_entity(i, pool, poolsize, first, cnt, NGSI_ENTITY_VALUES); - int ret = ngsi_request(i->curl, i->endpoint, "updateContext", request, &response); json_decref(request); - if (ret) - error("Failed to NGSI update Context request:\n%s", json_dumps(response, JSON_INDENT(4))); - - return 1; + ret = ngsi_request_context_update(i->curl, i->endpoint, "UPDATE", entity); + + json_decref(entity); + + return ret ? 0 : cnt; } REGISTER_NODE_TYPE(NGSI, "ngsi", ngsi)