mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
rewrite of NGSI node
This commit is contained in:
parent
48fbe1d784
commit
44bddd8b01
2 changed files with 248 additions and 146 deletions
|
@ -32,11 +32,12 @@
|
|||
|
||||
struct node;
|
||||
|
||||
struct ngsi_mapping {
|
||||
struct ngsi_attribute {
|
||||
char *name;
|
||||
char *type;
|
||||
|
||||
int index;
|
||||
struct list metadata;
|
||||
};
|
||||
|
||||
struct ngsi {
|
||||
|
|
|
@ -67,58 +67,77 @@ static json_t * json_lookup(json_t *array, char *key, char *needle)
|
|||
}
|
||||
#endif
|
||||
|
||||
static json_t* json_entity(struct ngsi *i, struct msg *pool, int poolsize, int first, int cnt)
|
||||
enum ngsi_flags {
|
||||
NGSI_ENTITY_ATTRIBUTES = (1 << 0),
|
||||
NGSI_ENTITY_VALUES = (1 << 1) | NGSI_ENTITY_ATTRIBUTES,
|
||||
NGSI_ENTITY_METADATA = (1 << 2) | NGSI_ENTITY_ATTRIBUTES,
|
||||
};
|
||||
|
||||
struct ngsi_metadata {
|
||||
char *name;
|
||||
char *type;
|
||||
char *value;
|
||||
};
|
||||
|
||||
struct ngsi_response {
|
||||
char *data;
|
||||
size_t len;
|
||||
};
|
||||
|
||||
static json_t* ngsi_build_entity(struct ngsi *i, struct msg *pool, int poolsize, int first, int cnt, int flags)
|
||||
{
|
||||
json_t *attributes = json_array();
|
||||
list_foreach(struct ngsi_mapping *map, &i->mapping) {
|
||||
/* Build value vector */
|
||||
json_t *values;
|
||||
if (cnt) {
|
||||
values = json_array();
|
||||
for (int k = 0; k < cnt; k++) {
|
||||
struct msg *m = &pool[(first + k) % poolsize];
|
||||
|
||||
json_array_append_new(values, json_pack("[ f, f, i ]",
|
||||
time_to_double(&MSG_TS(m)),
|
||||
m->data[map->index].f,
|
||||
m->sequence
|
||||
));
|
||||
}
|
||||
}
|
||||
else
|
||||
values = json_string("");
|
||||
|
||||
/* Create Metadata for attribute */
|
||||
json_t *metadatas = json_array();
|
||||
json_array_append_new(metadatas, json_pack("{ s: s, s: s, s: s+ }",
|
||||
"name", "source",
|
||||
"type", "string",
|
||||
"value", "s2ss:", settings.name
|
||||
));
|
||||
json_array_append_new(metadatas, json_pack("{ s: s, s: s, s: i }",
|
||||
"name", "index",
|
||||
"type", "integer",
|
||||
"value", map->index
|
||||
));
|
||||
|
||||
json_t *attribute = json_pack("{ s: s, s: s, s: o, s: o }",
|
||||
"name", map->name,
|
||||
"type", map->type,
|
||||
"value", values,
|
||||
"metadatas", metadatas
|
||||
);
|
||||
json_array_append_new(attributes, attribute);
|
||||
}
|
||||
|
||||
return json_pack("{ s: s, s: s, s: b, s: o }",
|
||||
json_t *entity = json_pack("{ s: s, s: s, s: b }",
|
||||
"id", i->entity_id,
|
||||
"type", i->entity_type,
|
||||
"isPattern", 0,
|
||||
"attributes", attributes
|
||||
"isPattern", 0
|
||||
);
|
||||
|
||||
if (flags & NGSI_ENTITY_ATTRIBUTES) {
|
||||
json_t *attributes = json_array();
|
||||
list_foreach(struct ngsi_attribute *map, &i->mapping) {
|
||||
json_t *attribute = json_pack("{ s: s, s: s }",
|
||||
"name", map->name,
|
||||
"type", map->type
|
||||
);
|
||||
|
||||
if (flags & NGSI_ENTITY_VALUES) { /* Build value vector */
|
||||
json_t *values = json_array();
|
||||
for (int k = 0; k < cnt; k++) {
|
||||
struct msg *m = &pool[(first + k) % poolsize];
|
||||
|
||||
json_array_append_new(values, json_pack("[ f, f, i ]",
|
||||
time_to_double(&MSG_TS(m)),
|
||||
m->data[map->index].f,
|
||||
m->sequence
|
||||
));
|
||||
}
|
||||
|
||||
json_object_set(attribute, "value", values);
|
||||
}
|
||||
|
||||
if (flags & NGSI_ENTITY_METADATA) { /* Create Metadata for attribute */
|
||||
json_t *metadatas = json_array();
|
||||
list_foreach(struct ngsi_metadata *meta, &map->metadata) {
|
||||
json_array_append_new(metadatas, json_pack("{ s: s, s: s, s: s }",
|
||||
"name", meta->name,
|
||||
"type", meta->type,
|
||||
"value", meta->value
|
||||
));
|
||||
}
|
||||
|
||||
json_object_set(attribute, "metadatas", metadatas);
|
||||
}
|
||||
|
||||
json_array_append_new(attributes, attribute);
|
||||
}
|
||||
|
||||
json_object_set(entity, "attributes", attributes);
|
||||
}
|
||||
|
||||
return entity;
|
||||
}
|
||||
|
||||
static int json_entity_parse(json_t *entity, struct ngsi *i, struct msg *pool, int poolsize, int first, int cnt)
|
||||
static int ngsi_parse_entity(json_t *entity, struct ngsi *i, struct msg *pool, int poolsize, int first, int cnt)
|
||||
{
|
||||
int ret;
|
||||
const char *id, *name, *type;
|
||||
|
@ -146,11 +165,11 @@ static int json_entity_parse(json_t *entity, struct ngsi *i, struct msg *pool, i
|
|||
}
|
||||
|
||||
json_array_foreach(attributes, index, attribute) {
|
||||
struct ngsi_mapping *map;
|
||||
struct ngsi_attribute *map;
|
||||
json_t *metadata, *values, *tuple;
|
||||
|
||||
/* Parse JSON */
|
||||
ret = json_unpack(attribute, "{ s: s, s: s, s: o, s: o }",
|
||||
ret = json_unpack(attribute, "{ s: s, s: s, s: o, s?: o }",
|
||||
"name", &name,
|
||||
"type", &type,
|
||||
"value", &values,
|
||||
|
@ -204,10 +223,84 @@ static int json_entity_parse(json_t *entity, struct ngsi *i, struct msg *pool, i
|
|||
return cnt;
|
||||
}
|
||||
|
||||
struct ngsi_response {
|
||||
char *data;
|
||||
size_t len;
|
||||
};
|
||||
static int ngsi_parse_mapping(config_setting_t *cfg, struct list *mapping)
|
||||
{
|
||||
if (!config_setting_is_array(cfg))
|
||||
return -1;
|
||||
|
||||
list_init(mapping, NULL);
|
||||
|
||||
for (int j = 0; j < config_setting_length(cfg); j++) { INDENT
|
||||
const char *token = config_setting_get_string_elem(cfg, j);
|
||||
if (!token)
|
||||
return -2;
|
||||
|
||||
struct ngsi_attribute map = { .index = j };
|
||||
|
||||
/* Parse Attribute: AttributeName(AttributeType) */
|
||||
int bytes;
|
||||
if (sscanf(token, "%m[^(](%m[^)])%n", &map.name, &map.type, &bytes) != 2)
|
||||
cerror(cfg, "Invalid mapping token: '%s'", token);
|
||||
|
||||
token += bytes;
|
||||
debug(13, "Attribute: %u: %s(%s)", map.index, map.name, map.type);
|
||||
|
||||
/* MetadataName(MetadataType)=MetadataValue */
|
||||
list_init(&map.metadata, NULL);
|
||||
struct ngsi_metadata meta;
|
||||
while (sscanf(token, " %m[^(](%m[^)])=%ms%n", &meta.name, &meta.type, &meta.value, &bytes) == 3) { INDENT
|
||||
list_push(&map.metadata, memdup(&meta, sizeof(struct ngsi_metadata)));
|
||||
|
||||
token += bytes;
|
||||
debug(13, "Metadata: %s(%s)=%s", meta.name, meta.type, meta.value);
|
||||
}
|
||||
|
||||
/* Static metadata */
|
||||
struct ngsi_metadata source = {
|
||||
.name = "source",
|
||||
.type = "string",
|
||||
.value = (char *) settings.name,
|
||||
};
|
||||
|
||||
struct ngsi_metadata index = {
|
||||
.name = "index",
|
||||
.type = "integer",
|
||||
.value = alloc(8)
|
||||
};
|
||||
snprintf(index.value, 8, "%u", j);
|
||||
|
||||
list_push(&map.metadata, memdup(&index, sizeof(struct ngsi_metadata)));
|
||||
list_push(&map.metadata, memdup(&source, sizeof(struct ngsi_metadata)));
|
||||
|
||||
list_push(mapping, memdup(&map, sizeof(struct ngsi_attribute)));
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int ngsi_parse_context_response(json_t *response, int *code, char **reason, json_t **rentity) {
|
||||
int ret;
|
||||
char *codestr;
|
||||
|
||||
ret = json_unpack(response, "{ s: [ { s: o, s: { s: s, s: s } } ] }",
|
||||
"contextResponses",
|
||||
"contextElement", rentity,
|
||||
"statusCode",
|
||||
"code", &codestr,
|
||||
"reasonPhrase", reason
|
||||
);
|
||||
if (ret) {
|
||||
warn("Failed to find NGSI response code");
|
||||
return ret;
|
||||
}
|
||||
|
||||
*code = atoi(codestr);
|
||||
|
||||
if (*code != 200)
|
||||
warn("NGSI response: %s %s", codestr, *reason);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static size_t ngsi_request_writer(void *contents, size_t size, size_t nmemb, void *userp)
|
||||
{
|
||||
|
@ -229,8 +322,11 @@ static int ngsi_request(CURL *handle, const char *endpoint, const char *operatio
|
|||
{
|
||||
struct ngsi_response chunk = { 0 };
|
||||
char *post = json_dumps(request, JSON_INDENT(4));
|
||||
|
||||
int old;
|
||||
double time;
|
||||
char url[128];
|
||||
json_error_t err;
|
||||
|
||||
snprintf(url, sizeof(url), "%s/v1/%s", endpoint, operation);
|
||||
|
||||
curl_easy_setopt(handle, CURLOPT_URL, url);
|
||||
|
@ -241,36 +337,80 @@ static int ngsi_request(CURL *handle, const char *endpoint, const char *operatio
|
|||
|
||||
debug(18, "Request to context broker: %s\n%s", url, post);
|
||||
|
||||
int old; /* We don't want to leave the CUrl handle in an invalid state */
|
||||
/* We don't want to leave the handle in an invalid state */
|
||||
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old);
|
||||
CURLcode ret = curl_easy_perform(handle);
|
||||
pthread_setcancelstate(old, NULL);
|
||||
|
||||
if (ret)
|
||||
error("HTTP request failed: %s", curl_easy_strerror(ret));
|
||||
if (ret) {
|
||||
warn("HTTP request failed: %s", curl_easy_strerror(ret));
|
||||
goto out;
|
||||
}
|
||||
|
||||
double time;
|
||||
curl_easy_getinfo(handle, CURLINFO_TOTAL_TIME, &time);
|
||||
|
||||
debug(16, "Request to context broker completed in %.4f seconds", time);
|
||||
debug(17, "Response from context broker:\n%s", chunk.data);
|
||||
|
||||
json_error_t err;
|
||||
json_t *resp = json_loads(chunk.data, 0, &err);
|
||||
if (!resp)
|
||||
error("Received invalid JSON: %s in %s:%u:%u\n%s", err.text, err.source, err.line, err.column, chunk.data);
|
||||
*response = json_loads(chunk.data, 0, &err);
|
||||
if (!*response)
|
||||
warn("Received invalid JSON: %s in %s:%u:%u\n%s", err.text, err.source, err.line, err.column, chunk.data);
|
||||
|
||||
if (response)
|
||||
*response = resp;
|
||||
else
|
||||
json_decref(resp);
|
||||
|
||||
free(post);
|
||||
out: free(post);
|
||||
free(chunk.data);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int ngsi_request_context_query(CURL *handle, const char *endpoint, json_t *entity, json_t **rentity)
|
||||
{
|
||||
int ret, code;
|
||||
char *reason;
|
||||
|
||||
json_t *response;
|
||||
json_t *request = json_pack("{ s: [ o ] }", "entities", entity);
|
||||
|
||||
ret = ngsi_request(handle, endpoint, "queryContext", request, &response);
|
||||
if (ret < 0)
|
||||
goto out;
|
||||
|
||||
ret = ngsi_parse_context_response(response, &code, &reason, rentity);
|
||||
if (ret)
|
||||
goto out2;
|
||||
|
||||
out2: json_decref(response);
|
||||
out: json_decref(request);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int ngsi_request_context_update(CURL *handle, const char *endpoint, const char *action, json_t *entity)
|
||||
{
|
||||
int ret, code;
|
||||
char *reason;
|
||||
|
||||
json_t *response;
|
||||
json_t *request = json_pack("{ s: s, s: [ o ] }",
|
||||
"updateAction", action,
|
||||
"contextElements", entity
|
||||
);
|
||||
|
||||
ret = ngsi_request(handle, endpoint, "updateContext", request, &response);
|
||||
if (ret)
|
||||
goto out;
|
||||
|
||||
json_t *rentity;
|
||||
ret = ngsi_parse_context_response(response, &code, &reason, &rentity);
|
||||
if (ret)
|
||||
goto out2;
|
||||
|
||||
json_decref(rentity);
|
||||
out2: json_decref(response);
|
||||
out: json_decref(request);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ngsi_init(int argc, char *argv[], struct settings *set)
|
||||
{
|
||||
return curl_global_init(CURL_GLOBAL_ALL);
|
||||
|
@ -308,27 +448,13 @@ int ngsi_parse(config_setting_t *cfg, struct node *n)
|
|||
if (!config_setting_lookup_float(cfg, "rate", &i->rate))
|
||||
i->rate = 5; /* default value */
|
||||
|
||||
config_setting_t *mapping = config_setting_get_member(cfg, "mapping");
|
||||
if (!mapping || !config_setting_is_array(mapping))
|
||||
config_setting_t *cfg_mapping = config_setting_get_member(cfg, "mapping");
|
||||
if (!cfg_mapping)
|
||||
cerror(cfg, "Missing mapping for node '%s", n->name);
|
||||
|
||||
list_init(&i->mapping, NULL);
|
||||
for (int j = 0; j < config_setting_length(mapping); j++) {
|
||||
const char *token = config_setting_get_string_elem(mapping, j);
|
||||
if (!token)
|
||||
cerror(mapping, "Invalid token in mapping for NGSI node '%s'", n->name);
|
||||
|
||||
struct ngsi_mapping map = {
|
||||
.index = j
|
||||
};
|
||||
|
||||
/* Parse token */
|
||||
if (sscanf(token, "%m[^(](%m[^)])", &map.name, &map.type) != 2)
|
||||
cerror(mapping, "Invalid mapping token: '%s'", token);
|
||||
|
||||
list_push(&i->mapping, memdup(&map, sizeof(struct ngsi_mapping)));
|
||||
}
|
||||
|
||||
if (ngsi_parse_mapping(cfg_mapping, &i->mapping))
|
||||
cerror(cfg_mapping, "Invalid mapping for NGSI node '%s'", n->name);
|
||||
|
||||
n->ngsi = i;
|
||||
|
||||
return 0;
|
||||
|
@ -346,6 +472,7 @@ char * ngsi_print(struct node *n)
|
|||
int ngsi_open(struct node *n)
|
||||
{
|
||||
struct ngsi *i = n->ngsi;
|
||||
int ret;
|
||||
|
||||
i->curl = curl_easy_init();
|
||||
i->headers = NULL;
|
||||
|
@ -370,7 +497,7 @@ int ngsi_open(struct node *n)
|
|||
if (i->timeout > 1 / i->rate)
|
||||
warn("Timeout is to large for given rate: %f", i->rate);
|
||||
|
||||
int ret = timerfd_settime(i->tfd, 0, &its, NULL);
|
||||
ret = timerfd_settime(i->tfd, 0, &its, NULL);
|
||||
if (ret)
|
||||
serror("Failed to start timer");
|
||||
|
||||
|
@ -383,12 +510,15 @@ int ngsi_open(struct node *n)
|
|||
curl_easy_setopt(i->curl, CURLOPT_HTTPHEADER, i->headers);
|
||||
|
||||
/* Create entity and atributes */
|
||||
json_t *request = json_pack("{ s: s, s: [ o ] }",
|
||||
"updateAction", "APPEND",
|
||||
"contextElements", json_entity(i, NULL, 0, 0, 0)
|
||||
);
|
||||
json_t *entity = ngsi_build_entity(i, NULL, 0, 0, 0, NGSI_ENTITY_METADATA);
|
||||
|
||||
return ngsi_request(i->curl, i->endpoint, "updateContext", request, NULL);
|
||||
ret = ngsi_request_context_update(i->curl, i->endpoint, "APPEND", entity);
|
||||
if (ret)
|
||||
error("Failed to create NGSI context for node '%s'", n->name);
|
||||
|
||||
json_decref(entity);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ngsi_close(struct node *n)
|
||||
|
@ -397,16 +527,11 @@ int ngsi_close(struct node *n)
|
|||
int ret;
|
||||
|
||||
/* Delete complete entity (not just attributes) */
|
||||
json_t *request = json_pack("{ s: s, s: [ { s: s, s: s, s: b } ] }",
|
||||
"updateAction", "DELETE",
|
||||
"contextElements",
|
||||
"type", i->entity_type,
|
||||
"id", i->entity_id,
|
||||
"isPattern", 0
|
||||
);
|
||||
json_t *entity = ngsi_build_entity(i, NULL, 0, 0, 0, 0);
|
||||
|
||||
ret = ngsi_request(i->curl, i->endpoint, "updateContext", request, NULL);
|
||||
json_decref(request);
|
||||
ret = ngsi_request_context_update(i->curl, i->endpoint, "DELETE", entity);
|
||||
|
||||
json_decref(entity);
|
||||
|
||||
curl_easy_cleanup(i->curl);
|
||||
curl_slist_free_all(i->headers);
|
||||
|
@ -418,62 +543,38 @@ int ngsi_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt
|
|||
{
|
||||
struct ngsi *i = n->ngsi;
|
||||
int ret;
|
||||
const char *code;
|
||||
|
||||
timerfd_wait(i->tfd);
|
||||
|
||||
json_t *entity;
|
||||
json_t *response;
|
||||
json_t *request = json_pack("{ s: [ { s: s, s: s, s: b } ] }",
|
||||
"entities",
|
||||
"id", i->entity_id,
|
||||
"type", i->entity_type,
|
||||
"isPattern", 0
|
||||
);
|
||||
|
||||
/* Send query to broker */
|
||||
ret = ngsi_request(i->curl, i->endpoint, "queryContext", request, &response);
|
||||
if (ret < 0) {
|
||||
warn("Failed to query data from NGSI node '%s'", n->name);
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* Parse response */
|
||||
ret = json_unpack(response, "{ s: [ { s: o, s: { s: s } } ] }",
|
||||
"contextResponses",
|
||||
"contextElement", &entity,
|
||||
"statusCode",
|
||||
"code", &code
|
||||
);
|
||||
if (ret || strcmp(code, "200")) {
|
||||
warn("Failed to parse response from NGSI node '%s'", n->name);
|
||||
return 0;
|
||||
}
|
||||
json_t *rentity;
|
||||
json_t *entity = ngsi_build_entity(i, NULL, 0, 0, 0, 0);
|
||||
|
||||
ret = json_entity_parse(entity, i, pool, poolsize, first, cnt);
|
||||
if (ret < 0) {
|
||||
warn("Failed to parse entity from context broker response: reason=%d", ret);
|
||||
return 0;
|
||||
}
|
||||
ret = ngsi_request_context_query(i->curl, i->endpoint, entity, &rentity);
|
||||
if (ret)
|
||||
goto out;
|
||||
|
||||
ret = ngsi_parse_entity(rentity, i, pool, poolsize, first, cnt);
|
||||
if (ret)
|
||||
goto out2;
|
||||
|
||||
out2: json_decref(entity);
|
||||
out: json_decref(rentity);
|
||||
|
||||
return ret;
|
||||
return ret ? 0 : cnt;
|
||||
}
|
||||
|
||||
int ngsi_write(struct node *n, struct msg *pool, int poolsize, int first, int cnt)
|
||||
{
|
||||
struct ngsi *i = n->ngsi;
|
||||
int ret;
|
||||
|
||||
json_t *response;
|
||||
json_t *request = json_pack("{ s: s, s : [ o ] }",
|
||||
"updateAction", "UPDATE",
|
||||
"contextElements", json_entity(i, pool, poolsize, first, cnt)
|
||||
);
|
||||
json_t *entity = ngsi_build_entity(i, pool, poolsize, first, cnt, NGSI_ENTITY_VALUES);
|
||||
|
||||
int ret = ngsi_request(i->curl, i->endpoint, "updateContext", request, &response); json_decref(request);
|
||||
if (ret)
|
||||
error("Failed to NGSI update Context request:\n%s", json_dumps(response, JSON_INDENT(4)));
|
||||
|
||||
return 1;
|
||||
ret = ngsi_request_context_update(i->curl, i->endpoint, "UPDATE", entity);
|
||||
|
||||
json_decref(entity);
|
||||
|
||||
return ret ? 0 : cnt;
|
||||
}
|
||||
|
||||
REGISTER_NODE_TYPE(NGSI, "ngsi", ngsi)
|
||||
|
|
Loading…
Add table
Reference in a new issue