mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
new NGSI node which polling support
This commit is contained in:
parent
14c03ee70c
commit
61e2bd723d
3 changed files with 260 additions and 125 deletions
|
@ -32,6 +32,13 @@
|
|||
|
||||
struct node;
|
||||
|
||||
struct ngsi_mapping {
|
||||
char *name;
|
||||
char *type;
|
||||
|
||||
int index;
|
||||
};
|
||||
|
||||
struct ngsi {
|
||||
const char *endpoint; /**< The NGSI context broker endpoint URL. */
|
||||
const char *entity_id; /**< The context broker entity id related to this node */
|
||||
|
@ -39,14 +46,15 @@ struct ngsi {
|
|||
const char *access_token; /**< An optional authentication token which will be sent as HTTP header. */
|
||||
|
||||
double timeout; /**< HTTP timeout in seconds */
|
||||
double rate; /**< Rate used for polling. */
|
||||
|
||||
int tfd; /**< Timer */
|
||||
int ssl_verify; /**< Boolean flag whether SSL server certificates should be verified or not. */
|
||||
|
||||
struct curl_slist *headers; /**< List of HTTP request headers for libcurl */
|
||||
|
||||
CURL *curl; /**< libcurl: handle */
|
||||
|
||||
json_t *context; /**< The complete JSON tree which will be used for contextUpdate requests */
|
||||
struct list mapping; /**< A mapping between indices of the S2SS messages and the attributes in ngsi::context */
|
||||
};
|
||||
|
||||
|
|
|
@ -19,13 +19,16 @@
|
|||
#include <curl/curl.h>
|
||||
#include <uuid/uuid.h>
|
||||
#include <jansson.h>
|
||||
#include <math.h>
|
||||
#include <pthread.h>
|
||||
|
||||
#include "ngsi.h"
|
||||
#include "utils.h"
|
||||
#include "timing.h"
|
||||
|
||||
extern struct settings settings;
|
||||
|
||||
#if 0 /* unused at the moment */
|
||||
static json_t * json_uuid()
|
||||
{
|
||||
char eid[37];
|
||||
|
@ -37,6 +40,7 @@ static json_t * json_uuid()
|
|||
return json_string(eid);
|
||||
}
|
||||
|
||||
|
||||
static json_t * json_date(struct timespec *ts)
|
||||
{
|
||||
// Example: 2015-09-21T11:42:25+02:00
|
||||
|
@ -61,6 +65,144 @@ static json_t * json_lookup(json_t *array, char *key, char *needle)
|
|||
|
||||
return NULL;
|
||||
}
|
||||
#endif
|
||||
|
||||
static json_t* json_entity(struct ngsi *i, struct msg *pool, int poolsize, int first, int cnt)
|
||||
{
|
||||
json_t *attributes = json_array();
|
||||
list_foreach(struct ngsi_mapping *map, &i->mapping) {
|
||||
/* Build value vector */
|
||||
json_t *values;
|
||||
if (cnt) {
|
||||
values = json_array();
|
||||
for (int k = 0; k < cnt; k++) {
|
||||
struct msg *m = &pool[(first + k) % poolsize];
|
||||
|
||||
json_array_append_new(values, json_pack("[ f, f, i ]",
|
||||
time_to_double(&MSG_TS(m)),
|
||||
m->data[map->index].f,
|
||||
m->sequence
|
||||
));
|
||||
}
|
||||
}
|
||||
else
|
||||
values = json_string("");
|
||||
|
||||
/* Create Metadata for attribute */
|
||||
json_t *metadatas = json_array();
|
||||
json_array_append_new(metadatas, json_pack("{ s: s, s: s, s: s+ }",
|
||||
"name", "source",
|
||||
"type", "string",
|
||||
"value", "s2ss:", settings.name
|
||||
));
|
||||
json_array_append_new(metadatas, json_pack("{ s: s, s: s, s: i }",
|
||||
"name", "index",
|
||||
"type", "integer",
|
||||
"value", map->index
|
||||
));
|
||||
|
||||
json_t *attribute = json_pack("{ s: s, s: s, s: o, s: o }",
|
||||
"name", map->name,
|
||||
"type", map->type,
|
||||
"value", values,
|
||||
"metadatas", metadatas
|
||||
);
|
||||
json_array_append_new(attributes, attribute);
|
||||
}
|
||||
|
||||
return json_pack("{ s: s, s: s, s: b, s: o }",
|
||||
"id", i->entity_id,
|
||||
"type", i->entity_type,
|
||||
"isPattern", 0,
|
||||
"attributes", attributes
|
||||
);
|
||||
}
|
||||
|
||||
static int json_entity_parse(json_t *entity, struct ngsi *i, struct msg *pool, int poolsize, int first, int cnt)
|
||||
{
|
||||
int ret;
|
||||
const char *id, *name, *type;
|
||||
|
||||
size_t index;
|
||||
json_t *attribute, *attributes;
|
||||
|
||||
ret = json_unpack(entity, "{ s: s, s: s, s: o }",
|
||||
"id", &id,
|
||||
"type", &type,
|
||||
"attributes", &attributes
|
||||
);
|
||||
if (ret || !json_is_array(attributes))
|
||||
return -1;
|
||||
|
||||
if (strcmp(id, i->entity_id) || strcmp(type, i->entity_type))
|
||||
return -2;
|
||||
|
||||
for (int j = 0; j < cnt; j++) {
|
||||
struct msg *m = &pool[(first + j) % poolsize];
|
||||
|
||||
m->version = MSG_VERSION;
|
||||
m->length = json_array_size(attributes);
|
||||
m->endian = MSG_ENDIAN_HOST;
|
||||
}
|
||||
|
||||
json_array_foreach(attributes, index, attribute) {
|
||||
struct ngsi_mapping *map;
|
||||
json_t *metadata, *values, *tuple;
|
||||
|
||||
/* Parse JSON */
|
||||
ret = json_unpack(attribute, "{ s: s, s: s, s: o, s: o }",
|
||||
"name", &name,
|
||||
"type", &type,
|
||||
"value", &values,
|
||||
"metadatas", &metadata
|
||||
);
|
||||
if (ret)
|
||||
return -3;
|
||||
|
||||
/* Check attribute name and type */
|
||||
map = list_lookup(&i->mapping, name);
|
||||
if (!map || strcmp(map->type, type))
|
||||
return -4;
|
||||
|
||||
/* Check metadata */
|
||||
if (!json_is_array(metadata))
|
||||
return -5;
|
||||
|
||||
/* Check number of values */
|
||||
if (!json_is_array(values) || json_array_size(values) != cnt)
|
||||
return -6;
|
||||
|
||||
size_t index2;
|
||||
json_array_foreach(values, index2, tuple) {
|
||||
struct msg *m = &pool[(first + index2) % poolsize];
|
||||
|
||||
/* Check sample format */
|
||||
if (!json_is_array(tuple) || json_array_size(tuple) != 3)
|
||||
return -7;
|
||||
|
||||
char *end;
|
||||
const char *value, *ts, *seq;
|
||||
ret = json_unpack(tuple, "[ s, s, s ]", &ts, &value, &seq);
|
||||
if (ret)
|
||||
return -8;
|
||||
|
||||
m->sequence = atoi(seq);
|
||||
|
||||
struct timespec tss = time_from_double(strtod(ts, &end));
|
||||
if (ts == end)
|
||||
return -9;
|
||||
|
||||
m->ts.sec = tss.tv_sec;
|
||||
m->ts.nsec = tss.tv_nsec;
|
||||
|
||||
m->data[map->index].f = strtof(value, &end);
|
||||
if (value == end)
|
||||
return -10;
|
||||
}
|
||||
}
|
||||
|
||||
return cnt;
|
||||
}
|
||||
|
||||
struct ngsi_response {
|
||||
char *data;
|
||||
|
@ -83,10 +225,10 @@ static size_t ngsi_request_writer(void *contents, size_t size, size_t nmemb, voi
|
|||
return realsize;
|
||||
}
|
||||
|
||||
static int ngsi_request(CURL *handle, const char *endpoint, const char *operation, json_t *content, json_t **response)
|
||||
static int ngsi_request(CURL *handle, const char *endpoint, const char *operation, json_t *request, json_t **response)
|
||||
{
|
||||
struct ngsi_response chunk = { 0 };
|
||||
char *post = json_dumps(content, JSON_INDENT(4));
|
||||
char *post = json_dumps(request, JSON_INDENT(4));
|
||||
|
||||
char url[128];
|
||||
snprintf(url, sizeof(url), "%s/v1/%s", endpoint, operation);
|
||||
|
@ -107,13 +249,11 @@ static int ngsi_request(CURL *handle, const char *endpoint, const char *operatio
|
|||
if (ret)
|
||||
error("HTTP request failed: %s", curl_easy_strerror(ret));
|
||||
|
||||
long code;
|
||||
double time;
|
||||
curl_easy_getinfo(handle, CURLINFO_TOTAL_TIME, &time);
|
||||
curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &code);
|
||||
|
||||
debug(16, "Request to context broker completed in %.4f seconds", time);
|
||||
debug(17, "Response from context broker (code=%ld):\n%s", code, chunk.data);
|
||||
debug(17, "Response from context broker:\n%s", chunk.data);
|
||||
|
||||
json_error_t err;
|
||||
json_t *resp = json_loads(chunk.data, 0, &err);
|
||||
|
@ -128,69 +268,7 @@ static int ngsi_request(CURL *handle, const char *endpoint, const char *operatio
|
|||
free(post);
|
||||
free(chunk.data);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
void ngsi_prepare_context(struct node *n, config_setting_t *mapping)
|
||||
{
|
||||
struct ngsi *i = n->ngsi;
|
||||
|
||||
list_init(&i->mapping, NULL);
|
||||
|
||||
i->context = json_object();
|
||||
|
||||
json_t *elements = json_array();
|
||||
json_object_set_new(i->context, "contextElements", elements);
|
||||
|
||||
json_t *entity = json_pack("{ s: s, s: s, s: b }",
|
||||
"id", i->entity_id,
|
||||
"type", i->entity_type,
|
||||
"isPattern", 0
|
||||
);
|
||||
json_array_append_new(elements, entity);
|
||||
|
||||
json_t *attributes = json_array();
|
||||
json_object_set_new(entity, "attributes", attributes);
|
||||
|
||||
for (int j = 0; j < config_setting_length(mapping); j++) {
|
||||
const char *token = config_setting_get_string_elem(mapping, j);
|
||||
if (!token)
|
||||
cerror(mapping, "Invalid mapping token");
|
||||
|
||||
/* Parse token */
|
||||
char aname[64], atype[64];
|
||||
if (sscanf(token, "%[^(](%[^)])", aname, atype) != 2)
|
||||
cerror(mapping, "Invalid mapping token: '%s'", token);
|
||||
|
||||
json_t *attribute = json_pack("{ s: s, s: s, s: [ ] }",
|
||||
"name", aname,
|
||||
"type", atype,
|
||||
"value"
|
||||
);
|
||||
json_array_append_new(attributes, attribute);
|
||||
|
||||
/* Create Metadata for attribute */
|
||||
json_t *metadatas = json_array();
|
||||
json_object_set_new(attribute, "metadatas", metadatas);
|
||||
|
||||
json_array_append_new(metadatas, json_pack("{ s: s, s: s, s: s+ }",
|
||||
"name", "source",
|
||||
"type", "string",
|
||||
"value", "s2ss:", settings.name
|
||||
));
|
||||
json_array_append_new(metadatas, json_pack("{ s: s, s: s, s: i }",
|
||||
"name", "index",
|
||||
"type", "integer",
|
||||
"value", j
|
||||
));
|
||||
json_array_append(metadatas, json_pack("{ s: s, s: s, s: s }",
|
||||
"name", "timestamp",
|
||||
"type", "date",
|
||||
"value", ""
|
||||
));
|
||||
|
||||
list_push(&i->mapping, attribute);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int ngsi_init(int argc, char *argv[], struct settings *set)
|
||||
|
@ -226,14 +304,32 @@ int ngsi_parse(config_setting_t *cfg, struct node *n)
|
|||
|
||||
if (!config_setting_lookup_float(cfg, "timeout", &i->timeout))
|
||||
i->timeout = 1; /* default value */
|
||||
|
||||
n->ngsi = i;
|
||||
|
||||
if (!config_setting_lookup_float(cfg, "rate", &i->rate))
|
||||
i->rate = 5; /* default value */
|
||||
|
||||
config_setting_t *mapping = config_setting_get_member(cfg, "mapping");
|
||||
if (!mapping || !config_setting_is_array(mapping))
|
||||
cerror(cfg, "Missing mapping for node '%s", n->name);
|
||||
|
||||
ngsi_prepare_context(n, mapping);
|
||||
list_init(&i->mapping, NULL);
|
||||
for (int j = 0; j < config_setting_length(mapping); j++) {
|
||||
const char *token = config_setting_get_string_elem(mapping, j);
|
||||
if (!token)
|
||||
cerror(mapping, "Invalid token in mapping for NGSI node '%s'", n->name);
|
||||
|
||||
struct ngsi_mapping map = {
|
||||
.index = j
|
||||
};
|
||||
|
||||
/* Parse token */
|
||||
if (sscanf(token, "%m[^(](%m[^)])", &map.name, &map.type) != 2)
|
||||
cerror(mapping, "Invalid mapping token: '%s'", token);
|
||||
|
||||
list_push(&i->mapping, memdup(&map, sizeof(struct ngsi_mapping)));
|
||||
}
|
||||
|
||||
n->ngsi = i;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -243,23 +339,41 @@ char * ngsi_print(struct node *n)
|
|||
struct ngsi *i = n->ngsi;
|
||||
char *buf = NULL;
|
||||
|
||||
return strcatf(&buf, "endpoint=%s, timeout=%.3f secs",
|
||||
i->endpoint, i->timeout);
|
||||
return strcatf(&buf, "endpoint=%s, timeout=%.3f secs, #mappings=%zu",
|
||||
i->endpoint, i->timeout, list_length(&i->mapping));
|
||||
}
|
||||
|
||||
int ngsi_open(struct node *n)
|
||||
{
|
||||
char buf[128];
|
||||
struct ngsi *i = n->ngsi;
|
||||
|
||||
|
||||
i->curl = curl_easy_init();
|
||||
i->headers = NULL;
|
||||
|
||||
if (i->access_token) {
|
||||
char buf[128];
|
||||
snprintf(buf, sizeof(buf), "Auth-Token: %s", i->access_token);
|
||||
i->headers = curl_slist_append(i->headers, buf);
|
||||
}
|
||||
|
||||
/* Create timer */
|
||||
i->tfd = timerfd_create(CLOCK_MONOTONIC, 0);
|
||||
if (i->tfd < 0)
|
||||
serror("Failed to create timer");
|
||||
|
||||
/* Arm the timer with a fixed rate */
|
||||
struct itimerspec its = {
|
||||
.it_interval = time_from_double(1 / i->rate),
|
||||
.it_value = { 0, 1 },
|
||||
};
|
||||
|
||||
if (i->timeout > 1 / i->rate)
|
||||
warn("Timeout is to large for given rate: %f", i->rate);
|
||||
|
||||
int ret = timerfd_settime(i->tfd, 0, &its, NULL);
|
||||
if (ret)
|
||||
serror("Failed to start timer");
|
||||
|
||||
i->headers = curl_slist_append(i->headers, "User-Agent: S2SS " VERSION);
|
||||
i->headers = curl_slist_append(i->headers, "Accept: application/json");
|
||||
i->headers = curl_slist_append(i->headers, "Content-Type: application/json");
|
||||
|
@ -267,82 +381,96 @@ int ngsi_open(struct node *n)
|
|||
curl_easy_setopt(i->curl, CURLOPT_SSL_VERIFYPEER, i->ssl_verify);
|
||||
curl_easy_setopt(i->curl, CURLOPT_TIMEOUT_MS, i->timeout * 1e3);
|
||||
curl_easy_setopt(i->curl, CURLOPT_HTTPHEADER, i->headers);
|
||||
|
||||
|
||||
/* Create entity and atributes */
|
||||
json_object_set_new(i->context, "updateAction", json_string("APPEND"));
|
||||
|
||||
return ngsi_request(i->curl, i->endpoint, "updateContext", i->context, NULL) == 200 ? 0 : -1;
|
||||
json_t *request = json_pack("{ s: s, s: [ o ] }",
|
||||
"updateAction", "APPEND",
|
||||
"contextElements", json_entity(i, NULL, 0, 0, 0)
|
||||
);
|
||||
|
||||
return ngsi_request(i->curl, i->endpoint, "updateContext", request, NULL);
|
||||
}
|
||||
|
||||
int ngsi_close(struct node *n)
|
||||
{
|
||||
struct ngsi *i = n->ngsi;
|
||||
int ret;
|
||||
|
||||
/* Delete attributes */
|
||||
json_object_set_new(i->context, "updateAction", json_string("DELETE"));
|
||||
int code = ngsi_request(i->curl, i->endpoint, "updateContext", i->context, NULL) == 200 ? 0 : -1;
|
||||
/* Delete complete entity (not just attributes) */
|
||||
json_t *request = json_pack("{ s: s, s: [ { s: s, s: s, s: b } ] }",
|
||||
"updateAction", "DELETE",
|
||||
"contextElements",
|
||||
"type", i->entity_type,
|
||||
"id", i->entity_id,
|
||||
"isPattern", 0
|
||||
);
|
||||
|
||||
ret = ngsi_request(i->curl, i->endpoint, "updateContext", request, NULL);
|
||||
json_decref(request);
|
||||
|
||||
curl_easy_cleanup(i->curl);
|
||||
curl_slist_free_all(i->headers);
|
||||
|
||||
return code == 200 ? 0 : -1;
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ngsi_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt)
|
||||
{
|
||||
/* struct ngsi *i = n->ngsi;
|
||||
struct msg *m = &pool[first % poolsize];
|
||||
struct ngsi *i = n->ngsi;
|
||||
int ret;
|
||||
const char *code;
|
||||
|
||||
timerfd_wait(i->tfd);
|
||||
|
||||
json_t *entity;
|
||||
json_t *response;
|
||||
json_t *entities = json_array();
|
||||
json_t *query = json_pack("{ s: o }", "entities", entities);
|
||||
json_t *request = json_pack("{ s: [ { s: s, s: s, s: b } ] }",
|
||||
"entities",
|
||||
"id", i->entity_id,
|
||||
"type", i->entity_type,
|
||||
"isPattern", 0
|
||||
);
|
||||
|
||||
ret = ngsi_request(i->curl, i->endpoint, "queryContext", NULL, NULL);
|
||||
/* Send query to broker */
|
||||
ret = ngsi_request(i->curl, i->endpoint, "queryContext", request, &response);
|
||||
if (ret < 0) {
|
||||
warn("Failed to query data from context broker");
|
||||
warn("Failed to query data from NGSI node '%s'", n->name);
|
||||
return 0;
|
||||
}
|
||||
*/
|
||||
return -1; /** @todo not yet implemented */
|
||||
|
||||
/* Parse response */
|
||||
ret = json_unpack(response, "{ s: [ { s: o, s: { s: s } } ] }",
|
||||
"contextResponses",
|
||||
"contextElement", &entity,
|
||||
"statusCode",
|
||||
"code", &code
|
||||
);
|
||||
if (ret || strcmp(code, "200")) {
|
||||
warn("Failed to parse response from NGSI node '%s'", n->name);
|
||||
return 0;
|
||||
}
|
||||
|
||||
ret = json_entity_parse(entity, i, pool, poolsize, first, cnt);
|
||||
if (ret < 0) {
|
||||
warn("Failed to parse entity from context broker response: reason=%d", ret);
|
||||
return 0;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ngsi_write(struct node *n, struct msg *pool, int poolsize, int first, int cnt)
|
||||
{
|
||||
struct ngsi *i = n->ngsi;
|
||||
|
||||
/* First message */
|
||||
struct msg *fm = &pool[first % poolsize];
|
||||
|
||||
/* Update context */
|
||||
for (int j = 0; j < MIN(i->mapping.length, fm->length); j++) {
|
||||
json_t *attribute = list_at(&i->mapping, j);
|
||||
json_t *values = json_object_get(attribute, "value");
|
||||
json_t *metadatas = json_object_get(attribute, "metadatas");
|
||||
|
||||
/* Update timestamp */
|
||||
json_t *metadata_ts = json_lookup(metadatas, "name", "timestamp");
|
||||
json_object_set(metadata_ts, "value", json_date(&MSG_TS(fm)));
|
||||
|
||||
/* Update value */
|
||||
json_array_clear(values);
|
||||
for (int k = 0; k < cnt; k++) {
|
||||
struct msg *m = &pool[(first + k) % poolsize];
|
||||
|
||||
double tsms = (double) m->ts.sec * 1e3 + m->ts.nsec / 1e6;
|
||||
|
||||
json_array_append_new(values, json_pack("[ o, o ]",
|
||||
json_real(tsms),
|
||||
json_real(m->data[j].f)
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
json_object_set_new(i->context, "updateAction", json_string("UPDATE")); // @todo REPLACE?
|
||||
|
||||
json_t *response;
|
||||
int code = ngsi_request(i->curl, i->endpoint, "updateContext", i->context, &response);
|
||||
if (code != 200)
|
||||
json_t *request = json_pack("{ s: s, s : [ o ] }",
|
||||
"updateAction", "UPDATE",
|
||||
"contextElements", json_entity(i, pool, poolsize, first, cnt)
|
||||
);
|
||||
|
||||
int ret = ngsi_request(i->curl, i->endpoint, "updateContext", request, &response); json_decref(request);
|
||||
if (ret)
|
||||
error("Failed to NGSI update Context request:\n%s", json_dumps(response, JSON_INDENT(4)));
|
||||
|
||||
return 1;
|
||||
|
|
|
@ -119,8 +119,7 @@ void node_destroy(struct node *n)
|
|||
switch (node_type(n)) {
|
||||
#ifdef ENABLE_NGSI
|
||||
case NGSI:
|
||||
json_decref(n->ngsi->context);
|
||||
free(n->ngsi->context_map);
|
||||
list_destroy(&n->ngsi->mapping);
|
||||
break;
|
||||
#endif
|
||||
#ifdef ENABLE_SOCKET
|
||||
|
|
Loading…
Add table
Reference in a new issue