1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

ngsi: rewrite signal and attribute mapping based on new signal code

This commit is contained in:
Steffen Vogel 2020-08-05 17:27:54 +02:00
parent dea174a242
commit af1bb4af9d
3 changed files with 256 additions and 169 deletions

View file

@ -4,17 +4,34 @@ nodes = {
### The following settings are specific to the ngsi node-type!! ###
endpoint = "http://46.101.131.212:1026",# The HTTP REST API endpoint of the FIRWARE context broker
# The HTTP REST API endpoint of the FIRWARE context broker
endpoint = "http://46.101.131.212:1026",
entity_id = "S3_ElectricalGrid",
entity_type = "ElectricalGridMonitoring",
timeout = 5, # Timeout of HTTP request in seconds (default is 1)
rate = 0.1 # Rate at which we poll the broker for updates
timeout = 1, # Timeout of HTTP request in seconds (default is 1, must be smaller than 1 / rate)
verify_ssl = false, # Verification of SSL server certificates (default is true)
mapping = [ # Format: "AttributeName(AttributeType)"
"PTotalLosses(MW)",
"QTotalLosses(Mvar)"
]
in = {
signals = (
{
name = "attr1",
ngsi_attribute_name = "attr1", # defaults to signal 'name'
ngsi_attribute_type = "Volts", # default to signal 'unit'
ngsi_attribute_metadatas = (
{ name="accuracy", type="percent", value="5" }
)
}
)
}
out = {
signals = (
{ name="PTotalLosses", unit="MW" },
{ name="QTotalLosses", unit="Mvar" }
)
}
}
}

View file

@ -61,7 +61,9 @@ struct ngsi {
CURL *curl; /**< libcurl: handle */
struct vlist mapping; /**< A mapping between indices of the VILLASnode samples and the attributes in ngsi::context */
struct {
struct vlist signals; /**< A mapping between indices of the VILLASnode samples and the attributes in ngsi::context */
} in, out;
};
/** Initialize global NGSI settings and maps shared memory regions.

View file

@ -66,127 +66,142 @@ struct ngsi_response {
size_t len;
};
static json_t * ngsi_build_attribute(struct ngsi_attribute *attr, struct sample *smps[], unsigned cnt, int flags)
{
json_t *json_attribute = json_pack("{ s: s, s: s }",
"name", attr->name,
"type", attr->type
);
if (flags & NGSI_ENTITY_VALUES) { /* Build value vector */
json_t *json_values = json_array();
for (unsigned k = 0; k < cnt; k++) {
json_array_append_new(json_values, json_pack("[ f, f, i ]",
time_to_double(&smps[k]->ts.origin),
smps[k]->data[attr->index].f,
smps[k]->sequence
));
}
json_object_set(json_attribute, "value", json_values);
}
if (flags & NGSI_ENTITY_METADATA) { /* Create Metadata for attribute */
json_t *json_metadatas = json_array();
for (size_t i = 0; i < vlist_length(&attr->metadata); i++) {
struct ngsi_metadata *meta = (struct ngsi_metadata *) vlist_at(&attr->metadata, i);
json_array_append_new(json_metadatas, json_pack("{ s: s, s: s, s: s }",
"name", meta->name,
"type", meta->type,
"value", meta->value
));
}
json_object_set(json_attribute, "metadatas", json_metadatas);
}
return json_attribute;
}
static json_t* ngsi_build_entity(struct node *n, struct sample *smps[], unsigned cnt, int flags)
{
struct ngsi *i = (struct ngsi *) n->_vd;
json_t *entity = json_pack("{ s: s, s: s, s: b }",
"id", i->entity_id,
"type", i->entity_type,
"isPattern", 0
json_t *json_entity = json_pack("{ s: s, s: s, s: b }",
"id", i->entity_id,
"type", i->entity_type,
"isPattern", 0
);
if (flags & NGSI_ENTITY_ATTRIBUTES) {
json_t *attributes = json_array();
json_t *json_attrs = json_array();
for (size_t j = 0; j < vlist_length(&i->mapping); j++) {
struct ngsi_attribute *map = (struct ngsi_attribute *) vlist_at(&i->mapping, j);
for (size_t j = 0; j < vlist_length(&i->in.signals); j++) {
struct ngsi_attribute *attr = (struct ngsi_attribute *) vlist_at(&i->in.signals, j);
json_t *attribute = json_pack("{ s: s, s: s }",
"name", map->name,
"type", map->type
);
auto *json_attr = ngsi_build_attribute(attr, smps, cnt, flags);
if (flags & NGSI_ENTITY_VALUES) { /* Build value vector */
json_t *values = json_array();
for (unsigned k = 0; k < cnt; k++) {
json_array_append_new(values, json_pack("[ f, f, i ]",
time_to_double(&smps[k]->ts.origin),
smps[k]->data[map->index].f,
smps[k]->sequence
));
}
json_object_set(attribute, "value", values);
}
if (flags & NGSI_ENTITY_METADATA) { /* Create Metadata for attribute */
json_t *metadatas = json_array();
for (size_t i = 0; i < vlist_length(&map->metadata); i++) {
struct ngsi_metadata *meta = (struct ngsi_metadata *) vlist_at(&map->metadata, i);
json_array_append_new(metadatas, json_pack("{ s: s, s: s, s: s }",
"name", meta->name,
"type", meta->type,
"value", meta->value
));
}
json_object_set(attribute, "metadatas", metadatas);
}
json_array_append_new(attributes, attribute);
json_array_append_new(json_attrs, json_attr);
}
json_object_set(entity, "attributes", attributes);
for (size_t j = 0; j < vlist_length(&i->out.signals); j++) {
struct ngsi_attribute *attr = (struct ngsi_attribute *) vlist_at(&i->out.signals, j);
auto *json_attr = ngsi_build_attribute(attr, smps, cnt, flags);
json_array_append_new(json_attrs, json_attr);
}
json_object_set(json_entity, "attributes", json_attrs);
}
return entity;
return json_entity;
}
static int ngsi_parse_entity(struct node *n, json_t *entity, struct sample *smps[], unsigned cnt)
static int ngsi_parse_entity(struct node *n, json_t *json_entity, struct sample *smps[], unsigned cnt)
{
int ret;
int ret, length = 0;
const char *id, *name, *type;
struct ngsi *i = (struct ngsi *) n->_vd;
size_t l;
json_error_t err;
json_t *attribute, *attributes;
json_t *json_attr, *json_attrs;
ret = json_unpack_ex(entity, &err, 0, "{ s: s, s: s, s: o }",
ret = json_unpack_ex(json_entity, &err, 0, "{ s: s, s: s, s: o }",
"id", &id,
"type", &type,
"attributes", &attributes
"attributes", &json_attrs
);
if (ret || !json_is_array(attributes))
if (ret || !json_is_array(json_attrs))
return -1;
if (strcmp(id, i->entity_id) || strcmp(type, i->entity_type))
return -2;
for (unsigned k = 0; k < cnt; k++)
smps[k]->length = json_array_size(attributes);
json_array_foreach(attributes, l, attribute) {
struct ngsi_attribute *map;
json_array_foreach(json_attrs, l, json_attr) {
struct ngsi_attribute *attr;
json_error_t err;
json_t *metadata, *values, *tuple;
json_t *json_metadata, *json_values, *json_tuple;
/* Parse JSON */
ret = json_unpack_ex(attribute, &err, 0, "{ s: s, s: s, s: o, s?: o }",
ret = json_unpack_ex(json_attr, &err, 0, "{ s: s, s: s, s: o, s?: o }",
"name", &name,
"type", &type,
"value", &values,
"metadatas", &metadata
"value", &json_values,
"metadatas", &json_metadata
);
if (ret)
return -3;
/* Check attribute name and type */
map = (struct ngsi_attribute *) vlist_lookup(&i->mapping, name);
if (!map || strcmp(map->type, type))
return -4;
attr = (struct ngsi_attribute *) vlist_lookup(&i->in.signals, name);
if (!attr || strcmp(attr->type, type))
continue; /* skip unknown attributes */
length++;
/* Check metadata */
if (!json_is_array(metadata))
if (!json_is_array(json_metadata))
return -5;
/* Check number of values */
if (!json_is_array(values) || json_array_size(values) != cnt)
if (!json_is_array(json_values) || json_array_size(json_values) != cnt)
return -6;
size_t k;
json_array_foreach(values, k, tuple) {
json_array_foreach(json_values, k, json_tuple) {
/* Check sample format */
if (!json_is_array(tuple) || json_array_size(tuple) != 3)
if (!json_is_array(json_tuple) || json_array_size(json_tuple) != 3)
return -7;
char *end;
const char *value, *ts, *seq;
ret = json_unpack_ex(tuple, &err, 0, "[ s, s, s ]", &ts, &value, &seq);
ret = json_unpack_ex(json_tuple, &err, 0, "[ s, s, s ]", &ts, &value, &seq);
if (ret)
return -8;
@ -197,88 +212,113 @@ static int ngsi_parse_entity(struct node *n, json_t *entity, struct sample *smps
return -9;
smps[k]->ts.origin = tss;
smps[k]->data[map->index].f = strtof(value, &end);
smps[k]->data[attr->index].f = strtof(value, &end);
if (value == end)
return -10;
smps[k]->signals = &n->in.signals;
}
}
for (unsigned k = 0; k < cnt; k++) {
smps[k]->length = length;
smps[k]->signals = &n->in.signals;
}
return cnt;
}
static int ngsi_parse_mapping(struct vlist *mapping, json_t *cfg)
static int ngsi_parse_signals(json_t *json_signals, struct vlist *ngsi_signals, struct vlist *node_signals)
{
if (!json_is_array(cfg))
int ret;
size_t j;
json_error_t err;
json_t *json_signal, *json_metadata;
if (!json_is_array(json_signals))
return -1;
vlist_init(mapping);
size_t j;
json_t *json_token;
json_array_foreach(cfg, j, json_token) {
const char *token;
token = json_string_value(json_token);
if (!token)
return -2;
json_array_foreach(json_signals, j, json_signal) {
auto *s = (struct signal *) vlist_at_safe(node_signals, j);
auto *a = new struct ngsi_attribute;
if (!a)
throw MemoryAllocationError();
memset(a, 0, sizeof(struct ngsi_attribute));
a->index = j;
json_t *json_ngsi_metadata = nullptr;
/* Parse Attribute: AttributeName(AttributeType) */
int bytes;
if (sscanf(token, "%m[^(](%m[^)])%n", &a->name, &a->type, &bytes) != 2)
error("Invalid mapping token: '%s'", token);
ret = json_unpack_ex(json_signal, &err, 0, "{ s?: s, s?: s, s?: o }",
"ngsi_attribute_name", &a->name,
"ngsi_attribute_type", &a->type,
"ngsi_metadatas", &json_ngsi_metadata
);
if (ret)
return ret;
token += bytes;
/* Copy values from node signal, if 'ngsi_attribute' settings not provided */
if (s != nullptr && a->name == nullptr)
a->name = strdup(s->name ? s->name : "");
/* MetadataName(MetadataType)=MetadataValue */
vlist_init(&a->metadata);
if (s != nullptr && a->type == nullptr)
a->type = strdup(s->unit ? s->unit : "");
struct ngsi_metadata m;
while (sscanf(token, " %m[^(](%m[^)])=%ms%n", &m.name, &m.type, &m.value, &bytes) == 3) {
vlist_push(&a->metadata, memdup(&m, sizeof(m)));
token += bytes;
ret = vlist_init(&a->metadata);
if (ret)
return ret;
if (json_ngsi_metadata) {
if (!json_is_array(json_ngsi_metadata))
throw ConfigError(json_ngsi_metadata, "node-config-ngsi-metadata", "ngsi_metadata must be a list of objects");
json_array_foreach(json_ngsi_metadata, j, json_metadata) {
auto *md = new struct ngsi_metadata;
if (!md)
return -1;
memset(md, 0, sizeof(struct ngsi_metadata));
ret = json_unpack_ex(json_metadata, &err, 0, "{ s: s, s: s, s: s }",
"name", &md->name,
"type", &md->type,
"value", &md->value
);
if (ret)
return ret;
vlist_push(&a->metadata, md);
}
}
/* Metadata: source(string)=name */
struct ngsi_metadata s = {
.name = strdup("source"),
.type = strdup("string"),
.value = name
};
/* Metadata: index(integer)=j */
struct ngsi_metadata i = {
.name = strdup("index"),
.type = strdup("integer")
};
assert(asprintf(&i.value, "%zu", j));
auto *md_idx = new struct ngsi_metadata;
if (!md_idx)
return -1;
vlist_push(&a->metadata, memdup(&s, sizeof(s)));
vlist_push(&a->metadata, memdup(&i, sizeof(i)));
md_idx->name = strdup("index");
md_idx->type = strdup("integer");
asprintf(&md_idx->value, "%zu", j);
vlist_push(mapping, a);
assert(md_idx->name && md_idx->type && md_idx->value);
vlist_push(&a->metadata, md_idx);
vlist_push(ngsi_signals, a);
}
return 0;
}
static int ngsi_parse_context_response(json_t *response, int *code, char **reason, json_t **rentity) {
static int ngsi_parse_context_response(json_t *json_response, int *code, char **reason, json_t **json_rentity) {
int ret;
char *codestr;
json_error_t err;
ret = json_unpack_ex(response, &err, 0, "{ s: [ { s: O, s: { s: s, s: s } } ] }",
ret = json_unpack_ex(json_response, &err, 0, "{ s: [ { s: O, s: { s: s, s: s } } ] }",
"contextResponses",
"contextElement", rentity,
"contextElement", json_rentity,
"statusCode",
"code", &codestr,
"reasonPhrase", reason
@ -312,10 +352,10 @@ static size_t ngsi_request_writer(void *contents, size_t size, size_t nmemb, voi
return realsize;
}
static int ngsi_request(CURL *handle, const char *endpoint, const char *operation, json_t *request, json_t **response)
static int ngsi_request(CURL *handle, const char *endpoint, const char *operation, json_t *json_request, json_t **json_response)
{
struct ngsi_response chunk = { 0 };
char *post = json_dumps(request, JSON_INDENT(4));
char *post = json_dumps(json_request, JSON_INDENT(4));
int old;
double time;
char url[128];
@ -346,8 +386,8 @@ static int ngsi_request(CURL *handle, const char *endpoint, const char *operatio
debug(LOG_NGSI | 16, "Request to context broker completed in %.4f seconds", time);
debug(LOG_NGSI | 17, "Response from context broker:\n%s", chunk.data);
*response = json_loads(chunk.data, 0, &err);
if (!*response)
*json_response = json_loads(chunk.data, 0, &err);
if (!*json_response)
warning("Received invalid JSON: %s in %s:%u:%u\n%s", err.text, err.source, err.line, err.column, chunk.data);
out: free(post);
@ -356,51 +396,51 @@ out: free(post);
return ret;
}
static int ngsi_request_context_query(CURL *handle, const char *endpoint, json_t *entity, json_t **rentity)
static int ngsi_request_context_query(CURL *handle, const char *endpoint, json_t *json_entity, json_t **json_rentity)
{
int ret, code;
char *reason;
json_t *response;
json_t *request = json_pack("{ s: [ o ] }", "entities", entity);
json_t *json_response;
json_t *json_request = json_pack("{ s: [ o ] }", "entities", json_entity);
ret = ngsi_request(handle, endpoint, "queryContext", request, &response);
ret = ngsi_request(handle, endpoint, "queryContext", json_request, &json_response);
if (ret)
goto out;
ret = ngsi_parse_context_response(response, &code, &reason, rentity);
ret = ngsi_parse_context_response(json_response, &code, &reason, json_rentity);
if (ret)
goto out2;
out2: json_decref(response);
out: json_decref(request);
out2: json_decref(json_response);
out: json_decref(json_request);
return ret;
}
static int ngsi_request_context_update(CURL *handle, const char *endpoint, const char *action, json_t *entity)
static int ngsi_request_context_update(CURL *handle, const char *endpoint, const char *action, json_t *json_entity)
{
int ret, code;
char *reason;
json_t *response;
json_t *request = json_pack("{ s: s, s: [ o ] }",
json_t *json_response;
json_t *json_request = json_pack("{ s: s, s: [ o ] }",
"updateAction", action,
"contextElements", entity
"contextElements", json_entity
);
ret = ngsi_request(handle, endpoint, "updateContext", request, &response);
ret = ngsi_request(handle, endpoint, "updateContext", json_request, &json_response);
if (ret)
goto out;
json_t *rentity;
ret = ngsi_parse_context_response(response, &code, &reason, &rentity);
json_t *json_rentity;
ret = ngsi_parse_context_response(json_response, &code, &reason, &json_rentity);
if (ret)
goto out2;
json_decref(rentity);
out2: json_decref(response);
out: json_decref(request);
json_decref(json_rentity);
out2: json_decref(json_response);
out: json_decref(json_request);
return ret;
}
@ -425,15 +465,10 @@ int ngsi_parse(struct node *n, json_t *cfg)
int ret;
json_error_t err;
json_t *json_mapping;
json_t *json_signals_in = nullptr;
json_t *json_signals_out = nullptr;
/* Default values */
i->access_token = nullptr; /* disabled by default */
i->ssl_verify = 1; /* verify by default */
i->timeout = 1; /* default value */
i->rate = 5; /* default value */
ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s: s, s: s, s: s, s?: b, s?: F, s?: F, s: o }",
ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s: s, s: s, s: s, s?: b, s?: F, s?: F, s?: { s?: o }, s?: { s?: o } }",
"access_token", &i->access_token,
"endpoint", &i->endpoint,
"entity_id", &i->entity_id,
@ -441,14 +476,25 @@ int ngsi_parse(struct node *n, json_t *cfg)
"ssl_verify", &i->ssl_verify,
"timeout", &i->timeout,
"rate", &i->rate,
"mapping", &json_mapping
"in",
"signals", &json_signals_in,
"out",
"signals", &json_signals_out
);
if (ret)
jerror(&err, "Failed to parse configuration of node %s", node_name(n));
ret = ngsi_parse_mapping(&i->mapping, json_mapping);
if (ret)
error("Invalid setting 'mapping' of node %s", node_name(n));
if (json_signals_in) {
ret = ngsi_parse_signals(json_signals_in, &i->in.signals, &n->in.signals);
if (ret)
error("Invalid setting 'in.signals' of node %s", node_name(n));
}
if (json_signals_out) {
ret = ngsi_parse_signals(json_signals_out, &i->out.signals, &n->out.signals);
if (ret)
error("Invalid setting 'out.signals' of node %s", node_name(n));
}
return 0;
}
@ -457,8 +503,8 @@ char * ngsi_print(struct node *n)
{
struct ngsi *i = (struct ngsi *) n->_vd;
return strf("endpoint=%s, timeout=%.3f secs, #mappings=%zu",
i->endpoint, i->timeout, vlist_length(&i->mapping));
return strf("endpoint=%s, timeout=%.3f secs",
i->endpoint, i->timeout);
}
static int ngsi_metadata_destroy(struct ngsi_metadata *meta)
@ -509,13 +555,13 @@ int ngsi_start(struct node *n)
curl_easy_setopt(i->curl, CURLOPT_USERAGENT, USER_AGENT);
/* Create entity and atributes */
json_t *entity = ngsi_build_entity(n, nullptr, 0, NGSI_ENTITY_METADATA);
json_t *json_entity = ngsi_build_entity(n, nullptr, 0, NGSI_ENTITY_METADATA);
ret = ngsi_request_context_update(i->curl, i->endpoint, "APPEND", entity);
ret = ngsi_request_context_update(i->curl, i->endpoint, "APPEND", json_entity);
if (ret)
error("Failed to create NGSI context for node %s", node_name(n));
json_decref(entity);
json_decref(json_entity);
return ret;
}
@ -528,11 +574,11 @@ int ngsi_stop(struct node *n)
i->task.stop();
/* Delete complete entity (not just attributes) */
json_t *entity = ngsi_build_entity(n, nullptr, 0, 0);
json_t *json_entity = ngsi_build_entity(n, nullptr, 0, 0);
ret = ngsi_request_context_update(i->curl, i->endpoint, "DELETE", entity);
ret = ngsi_request_context_update(i->curl, i->endpoint, "DELETE", json_entity);
json_decref(entity);
json_decref(json_entity);
curl_easy_cleanup(i->curl);
curl_slist_free_all(i->headers);
@ -548,19 +594,19 @@ int ngsi_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rel
if (i->task.wait() == 0)
perror("Failed to wait for task");
json_t *rentity;
json_t *entity = ngsi_build_entity(n, nullptr, 0, 0);
json_t *json_rentity;
json_t *json_entity = ngsi_build_entity(n, nullptr, 0, 0);
ret = ngsi_request_context_query(i->curl, i->endpoint, entity, &rentity);
ret = ngsi_request_context_query(i->curl, i->endpoint, json_entity, &json_rentity);
if (ret)
goto out;
ret = ngsi_parse_entity(n, entity, smps, cnt);
ret = ngsi_parse_entity(n, json_rentity, smps, cnt);
if (ret)
goto out2;
out2: json_decref(rentity);
out: json_decref(entity);
out2: json_decref(json_rentity);
out: json_decref(json_entity);
return ret;
}
@ -570,11 +616,11 @@ int ngsi_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re
struct ngsi *i = (struct ngsi *) n->_vd;
int ret;
json_t *entity = ngsi_build_entity(n, smps, cnt, NGSI_ENTITY_VALUES);
json_t *json_entity = ngsi_build_entity(n, smps, cnt, NGSI_ENTITY_VALUES);
ret = ngsi_request_context_update(i->curl, i->endpoint, "UPDATE", entity);
ret = ngsi_request_context_update(i->curl, i->endpoint, "UPDATE", json_entity);
json_decref(entity);
json_decref(json_entity);
return ret ? 0 : cnt;
}
@ -590,18 +636,40 @@ int ngsi_poll_fds(struct node *n, int fds[])
int ngsi_init(struct node *n)
{
int ret;
struct ngsi *i = (struct ngsi *) n->_vd;
new (&i->task) Task(CLOCK_REALTIME);
ret = vlist_init(&i->in.signals);
if (ret)
return ret;
ret = vlist_init(&i->out.signals);
if (ret)
return ret;
/* Default values */
i->access_token = nullptr; /* disabled by default */
i->ssl_verify = 1; /* verify by default */
i->timeout = 1; /* default value */
i->rate = 1; /* default value */
return 0;
}
int ngsi_destroy(struct node *n)
{
int ret;
struct ngsi *i = (struct ngsi *) n->_vd;
vlist_destroy(&i->mapping, (dtor_cb_t) ngsi_attribute_destroy, true);
ret = vlist_destroy(&i->in.signals, (dtor_cb_t) ngsi_attribute_destroy, true);
if (ret)
return ret;
ret = vlist_destroy(&i->out.signals, (dtor_cb_t) ngsi_attribute_destroy, true);
if (ret)
return ret;
i->task.~Task();