diff --git a/server/include/ngsi.h b/server/include/ngsi.h index 4350f698d..44304b65a 100644 --- a/server/include/ngsi.h +++ b/server/include/ngsi.h @@ -32,6 +32,13 @@ struct node; +struct ngsi_mapping { + char *name; + char *type; + + int index; +}; + struct ngsi { const char *endpoint; /**< The NGSI context broker endpoint URL. */ const char *entity_id; /**< The context broker entity id related to this node */ @@ -39,14 +46,15 @@ struct ngsi { const char *access_token; /**< An optional authentication token which will be sent as HTTP header. */ double timeout; /**< HTTP timeout in seconds */ + double rate; /**< Rate used for polling. */ + int tfd; /**< Timer */ int ssl_verify; /**< Boolean flag whether SSL server certificates should be verified or not. */ struct curl_slist *headers; /**< List of HTTP request headers for libcurl */ CURL *curl; /**< libcurl: handle */ - json_t *context; /**< The complete JSON tree which will be used for contextUpdate requests */ struct list mapping; /**< A mapping between indices of the S2SS messages and the attributes in ngsi::context */ }; diff --git a/server/src/ngsi.c b/server/src/ngsi.c index 24bf9d72d..0680af4a7 100644 --- a/server/src/ngsi.c +++ b/server/src/ngsi.c @@ -19,13 +19,16 @@ #include #include #include +#include #include #include "ngsi.h" #include "utils.h" +#include "timing.h" extern struct settings settings; +#if 0 /* unused at the moment */ static json_t * json_uuid() { char eid[37]; @@ -37,6 +40,7 @@ static json_t * json_uuid() return json_string(eid); } + static json_t * json_date(struct timespec *ts) { // Example: 2015-09-21T11:42:25+02:00 @@ -61,6 +65,144 @@ static json_t * json_lookup(json_t *array, char *key, char *needle) return NULL; } +#endif + +static json_t* json_entity(struct ngsi *i, struct msg *pool, int poolsize, int first, int cnt) +{ + json_t *attributes = json_array(); + list_foreach(struct ngsi_mapping *map, &i->mapping) { + /* Build value vector */ + json_t *values; + if (cnt) { + values = json_array(); + for (int k = 0; k < cnt; k++) { + struct msg *m = &pool[(first + k) % poolsize]; + + json_array_append_new(values, json_pack("[ f, f, i ]", + time_to_double(&MSG_TS(m)), + m->data[map->index].f, + m->sequence + )); + } + } + else + values = json_string(""); + + /* Create Metadata for attribute */ + json_t *metadatas = json_array(); + json_array_append_new(metadatas, json_pack("{ s: s, s: s, s: s+ }", + "name", "source", + "type", "string", + "value", "s2ss:", settings.name + )); + json_array_append_new(metadatas, json_pack("{ s: s, s: s, s: i }", + "name", "index", + "type", "integer", + "value", map->index + )); + + json_t *attribute = json_pack("{ s: s, s: s, s: o, s: o }", + "name", map->name, + "type", map->type, + "value", values, + "metadatas", metadatas + ); + json_array_append_new(attributes, attribute); + } + + return json_pack("{ s: s, s: s, s: b, s: o }", + "id", i->entity_id, + "type", i->entity_type, + "isPattern", 0, + "attributes", attributes + ); +} + +static int json_entity_parse(json_t *entity, struct ngsi *i, struct msg *pool, int poolsize, int first, int cnt) +{ + int ret; + const char *id, *name, *type; + + size_t index; + json_t *attribute, *attributes; + + ret = json_unpack(entity, "{ s: s, s: s, s: o }", + "id", &id, + "type", &type, + "attributes", &attributes + ); + if (ret || !json_is_array(attributes)) + return -1; + + if (strcmp(id, i->entity_id) || strcmp(type, i->entity_type)) + return -2; + + for (int j = 0; j < cnt; j++) { + struct msg *m = &pool[(first + j) % poolsize]; + + m->version = MSG_VERSION; + m->length = json_array_size(attributes); + m->endian = MSG_ENDIAN_HOST; + } + + json_array_foreach(attributes, index, attribute) { + struct ngsi_mapping *map; + json_t *metadata, *values, *tuple; + + /* Parse JSON */ + ret = json_unpack(attribute, "{ s: s, s: s, s: o, s: o }", + "name", &name, + "type", &type, + "value", &values, + "metadatas", &metadata + ); + if (ret) + return -3; + + /* Check attribute name and type */ + map = list_lookup(&i->mapping, name); + if (!map || strcmp(map->type, type)) + return -4; + + /* Check metadata */ + if (!json_is_array(metadata)) + return -5; + + /* Check number of values */ + if (!json_is_array(values) || json_array_size(values) != cnt) + return -6; + + size_t index2; + json_array_foreach(values, index2, tuple) { + struct msg *m = &pool[(first + index2) % poolsize]; + + /* Check sample format */ + if (!json_is_array(tuple) || json_array_size(tuple) != 3) + return -7; + + char *end; + const char *value, *ts, *seq; + ret = json_unpack(tuple, "[ s, s, s ]", &ts, &value, &seq); + if (ret) + return -8; + + m->sequence = atoi(seq); + + struct timespec tss = time_from_double(strtod(ts, &end)); + if (ts == end) + return -9; + + m->ts.sec = tss.tv_sec; + m->ts.nsec = tss.tv_nsec; + + m->data[map->index].f = strtof(value, &end); + if (value == end) + return -10; + } + } + + return cnt; +} struct ngsi_response { char *data; @@ -83,10 +225,10 @@ static size_t ngsi_request_writer(void *contents, size_t size, size_t nmemb, voi return realsize; } -static int ngsi_request(CURL *handle, const char *endpoint, const char *operation, json_t *content, json_t **response) +static int ngsi_request(CURL *handle, const char *endpoint, const char *operation, json_t *request, json_t **response) { struct ngsi_response chunk = { 0 }; - char *post = json_dumps(content, JSON_INDENT(4)); + char *post = json_dumps(request, JSON_INDENT(4)); char url[128]; snprintf(url, sizeof(url), "%s/v1/%s", endpoint, operation); @@ -107,13 +249,11 @@ static int ngsi_request(CURL *handle, const char *endpoint, const char *operatio if (ret) error("HTTP request failed: %s", curl_easy_strerror(ret)); - long code; double time; curl_easy_getinfo(handle, CURLINFO_TOTAL_TIME, &time); - curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &code); debug(16, "Request to context broker completed in %.4f seconds", time); - debug(17, "Response from context broker (code=%ld):\n%s", code, chunk.data); + debug(17, "Response from context broker:\n%s", chunk.data); json_error_t err; json_t *resp = json_loads(chunk.data, 0, &err); @@ -128,69 +268,7 @@ static int ngsi_request(CURL *handle, const char *endpoint, const char *operatio free(post); free(chunk.data); - return code; -} - -void ngsi_prepare_context(struct node *n, config_setting_t *mapping) -{ - struct ngsi *i = n->ngsi; - - list_init(&i->mapping, NULL); - - i->context = json_object(); - - json_t *elements = json_array(); - json_object_set_new(i->context, "contextElements", elements); - - json_t *entity = json_pack("{ s: s, s: s, s: b }", - "id", i->entity_id, - "type", i->entity_type, - "isPattern", 0 - ); - json_array_append_new(elements, entity); - - json_t *attributes = json_array(); - json_object_set_new(entity, "attributes", attributes); - - for (int j = 0; j < config_setting_length(mapping); j++) { - const char *token = config_setting_get_string_elem(mapping, j); - if (!token) - cerror(mapping, "Invalid mapping token"); - - /* Parse token */ - char aname[64], atype[64]; - if (sscanf(token, "%[^(](%[^)])", aname, atype) != 2) - cerror(mapping, "Invalid mapping token: '%s'", token); - - json_t *attribute = json_pack("{ s: s, s: s, s: [ ] }", - "name", aname, - "type", atype, - "value" - ); - json_array_append_new(attributes, attribute); - - /* Create Metadata for attribute */ - json_t *metadatas = json_array(); - json_object_set_new(attribute, "metadatas", metadatas); - - json_array_append_new(metadatas, json_pack("{ s: s, s: s, s: s+ }", - "name", "source", - "type", "string", - "value", "s2ss:", settings.name - )); - json_array_append_new(metadatas, json_pack("{ s: s, s: s, s: i }", - "name", "index", - "type", "integer", - "value", j - )); - json_array_append(metadatas, json_pack("{ s: s, s: s, s: s }", - "name", "timestamp", - "type", "date", - "value", "" - )); - - list_push(&i->mapping, attribute); - } + return 0; } int ngsi_init(int argc, char *argv[], struct settings *set) @@ -226,14 +304,32 @@ int ngsi_parse(config_setting_t *cfg, struct node *n) if (!config_setting_lookup_float(cfg, "timeout", &i->timeout)) i->timeout = 1; /* default value */ - - n->ngsi = i; + + if (!config_setting_lookup_float(cfg, "rate", &i->rate)) + i->rate = 5; /* default value */ config_setting_t *mapping = config_setting_get_member(cfg, "mapping"); if (!mapping || !config_setting_is_array(mapping)) cerror(cfg, "Missing mapping for node '%s", n->name); - ngsi_prepare_context(n, mapping); + list_init(&i->mapping, NULL); + for (int j = 0; j < config_setting_length(mapping); j++) { + const char *token = config_setting_get_string_elem(mapping, j); + if (!token) + cerror(mapping, "Invalid token in mapping for NGSI node '%s'", n->name); + + struct ngsi_mapping map = { + .index = j + }; + + /* Parse token */ + if (sscanf(token, "%m[^(](%m[^)])", &map.name, &map.type) != 2) + cerror(mapping, "Invalid mapping token: '%s'", token); + + list_push(&i->mapping, memdup(&map, sizeof(struct ngsi_mapping))); + } + + n->ngsi = i; return 0; } @@ -243,23 +339,41 @@ char * ngsi_print(struct node *n) struct ngsi *i = n->ngsi; char *buf = NULL; - return strcatf(&buf, "endpoint=%s, timeout=%.3f secs", - i->endpoint, i->timeout); + return strcatf(&buf, "endpoint=%s, timeout=%.3f secs, #mappings=%zu", + i->endpoint, i->timeout, list_length(&i->mapping)); } int ngsi_open(struct node *n) { - char buf[128]; struct ngsi *i = n->ngsi; - + i->curl = curl_easy_init(); i->headers = NULL; if (i->access_token) { + char buf[128]; snprintf(buf, sizeof(buf), "Auth-Token: %s", i->access_token); i->headers = curl_slist_append(i->headers, buf); } + /* Create timer */ + i->tfd = timerfd_create(CLOCK_MONOTONIC, 0); + if (i->tfd < 0) + serror("Failed to create timer"); + + /* Arm the timer with a fixed rate */ + struct itimerspec its = { + .it_interval = time_from_double(1 / i->rate), + .it_value = { 0, 1 }, + }; + + if (i->timeout > 1 / i->rate) + warn("Timeout is to large for given rate: %f", i->rate); + + int ret = timerfd_settime(i->tfd, 0, &its, NULL); + if (ret) + serror("Failed to start timer"); + i->headers = curl_slist_append(i->headers, "User-Agent: S2SS " VERSION); i->headers = curl_slist_append(i->headers, "Accept: application/json"); i->headers = curl_slist_append(i->headers, "Content-Type: application/json"); @@ -267,82 +381,96 @@ int ngsi_open(struct node *n) curl_easy_setopt(i->curl, CURLOPT_SSL_VERIFYPEER, i->ssl_verify); curl_easy_setopt(i->curl, CURLOPT_TIMEOUT_MS, i->timeout * 1e3); curl_easy_setopt(i->curl, CURLOPT_HTTPHEADER, i->headers); - + /* Create entity and atributes */ - json_object_set_new(i->context, "updateAction", json_string("APPEND")); - - return ngsi_request(i->curl, i->endpoint, "updateContext", i->context, NULL) == 200 ? 0 : -1; + json_t *request = json_pack("{ s: s, s: [ o ] }", + "updateAction", "APPEND", + "contextElements", json_entity(i, NULL, 0, 0, 0) + ); + + return ngsi_request(i->curl, i->endpoint, "updateContext", request, NULL); } int ngsi_close(struct node *n) { struct ngsi *i = n->ngsi; + int ret; - /* Delete attributes */ - json_object_set_new(i->context, "updateAction", json_string("DELETE")); - int code = ngsi_request(i->curl, i->endpoint, "updateContext", i->context, NULL) == 200 ? 0 : -1; + /* Delete complete entity (not just attributes) */ + json_t *request = json_pack("{ s: s, s: [ { s: s, s: s, s: b } ] }", + "updateAction", "DELETE", + "contextElements", + "type", i->entity_type, + "id", i->entity_id, + "isPattern", 0 + ); + + ret = ngsi_request(i->curl, i->endpoint, "updateContext", request, NULL); + json_decref(request); curl_easy_cleanup(i->curl); curl_slist_free_all(i->headers); - return code == 200 ? 0 : -1; + return ret; } int ngsi_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt) { -/* struct ngsi *i = n->ngsi; - struct msg *m = &pool[first % poolsize]; + struct ngsi *i = n->ngsi; int ret; + const char *code; + + timerfd_wait(i->tfd); + json_t *entity; json_t *response; - json_t *entities = json_array(); - json_t *query = json_pack("{ s: o }", "entities", entities); + json_t *request = json_pack("{ s: [ { s: s, s: s, s: b } ] }", + "entities", + "id", i->entity_id, + "type", i->entity_type, + "isPattern", 0 + ); - ret = ngsi_request(i->curl, i->endpoint, "queryContext", NULL, NULL); + /* Send query to broker */ + ret = ngsi_request(i->curl, i->endpoint, "queryContext", request, &response); if (ret < 0) { - warn("Failed to query data from context broker"); + warn("Failed to query data from NGSI node '%s'", n->name); return 0; } -*/ - return -1; /** @todo not yet implemented */ + + /* Parse response */ + ret = json_unpack(response, "{ s: [ { s: o, s: { s: s } } ] }", + "contextResponses", + "contextElement", &entity, + "statusCode", + "code", &code + ); + if (ret || strcmp(code, "200")) { + warn("Failed to parse response from NGSI node '%s'", n->name); + return 0; + } + + ret = json_entity_parse(entity, i, pool, poolsize, first, cnt); + if (ret < 0) { + warn("Failed to parse entity from context broker response: reason=%d", ret); + return 0; + } + + return ret; } int ngsi_write(struct node *n, struct msg *pool, int poolsize, int first, int cnt) { struct ngsi *i = n->ngsi; - /* First message */ - struct msg *fm = &pool[first % poolsize]; - - /* Update context */ - for (int j = 0; j < MIN(i->mapping.length, fm->length); j++) { - json_t *attribute = list_at(&i->mapping, j); - json_t *values = json_object_get(attribute, "value"); - json_t *metadatas = json_object_get(attribute, "metadatas"); - - /* Update timestamp */ - json_t *metadata_ts = json_lookup(metadatas, "name", "timestamp"); - json_object_set(metadata_ts, "value", json_date(&MSG_TS(fm))); - - /* Update value */ - json_array_clear(values); - for (int k = 0; k < cnt; k++) { - struct msg *m = &pool[(first + k) % poolsize]; - - double tsms = (double) m->ts.sec * 1e3 + m->ts.nsec / 1e6; - - json_array_append_new(values, json_pack("[ o, o ]", - json_real(tsms), - json_real(m->data[j].f) - )); - } - } - - json_object_set_new(i->context, "updateAction", json_string("UPDATE")); // @todo REPLACE? - json_t *response; - int code = ngsi_request(i->curl, i->endpoint, "updateContext", i->context, &response); - if (code != 200) + json_t *request = json_pack("{ s: s, s : [ o ] }", + "updateAction", "UPDATE", + "contextElements", json_entity(i, pool, poolsize, first, cnt) + ); + + int ret = ngsi_request(i->curl, i->endpoint, "updateContext", request, &response); json_decref(request); + if (ret) error("Failed to NGSI update Context request:\n%s", json_dumps(response, JSON_INDENT(4))); return 1; diff --git a/server/src/node.c b/server/src/node.c index a7b1a6f20..9e2fe00ad 100644 --- a/server/src/node.c +++ b/server/src/node.c @@ -119,8 +119,7 @@ void node_destroy(struct node *n) switch (node_type(n)) { #ifdef ENABLE_NGSI case NGSI: - json_decref(n->ngsi->context); - free(n->ngsi->context_map); + list_destroy(&n->ngsi->mapping); break; #endif #ifdef ENABLE_SOCKET