mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
added 'combine' aka vector support to NGSI node
This commit is contained in:
parent
d0dcc8c499
commit
19fbd94e6f
4 changed files with 108 additions and 134 deletions
|
@ -8,28 +8,44 @@ It's using `libcurl` and `libjansson` to communicate with the context broker ove
|
|||
|
||||
## Configuration
|
||||
|
||||
You can use the `combine` setting to send multiple samples in a vector.
|
||||
|
||||
Every `ngsi` node supports the following special settings:
|
||||
|
||||
#### `endpoint` *(string: URL)*
|
||||
|
||||
#### `entity_id` *(string)*
|
||||
|
||||
#### `entity_type` *(string)*
|
||||
|
||||
#### `ssl_verify` *(boolean)*
|
||||
|
||||
#### `timeout` *(float: seconds)*
|
||||
|
||||
#### `structure` *("flat" | "children")*
|
||||
|
||||
- `flat`:
|
||||
- `children`:
|
||||
|
||||
#### `mapping` *(array of strings)*
|
||||
|
||||
Format for `structure = flat`: `"entityId(entityType).attributeName(attributeType)"`
|
||||
|
||||
Format for `structure = children`: `"parentId(entityType).value(attributeType)"`
|
||||
Format `AttributeName(AttributeType)`
|
||||
|
||||
### Example
|
||||
|
||||
@todo add example from example.conf
|
||||
ngsi_node = {
|
||||
type = "ngsi",
|
||||
|
||||
### The following settings are specific to the ngsi node-type!! ###
|
||||
|
||||
endpoint = "http://46.101.131.212:1026",# The HTTP REST API endpoint of the FIRWARE context broker
|
||||
|
||||
entity_id = "S3_ElectricalGrid",
|
||||
entity_type = "ElectricalGridMonitoring",
|
||||
|
||||
timeout = 5, # Timeout of HTTP request in seconds (default is 1)
|
||||
verify_ssl = false, # Verification of SSL server certificates (default is true)
|
||||
|
||||
mapping = [ # Format: "AttributeName(AttributeType)"
|
||||
PTotalLosses(MW)",
|
||||
"QTotalLosses(Mvar)"
|
||||
]
|
||||
}
|
||||
|
||||
## Further reading
|
||||
|
||||
|
|
|
@ -118,20 +118,17 @@ nodes = {
|
|||
### The following settings are specific to the ngsi node-type!! ###
|
||||
|
||||
endpoint = "http://46.101.131.212:1026",# The HTTP REST API endpoint of the FIRWARE context broker
|
||||
|
||||
entity_id = "S3_ElectricalGrid",
|
||||
entity_type = "ElectricalGridMonitoring",
|
||||
|
||||
timeout = 5, # Timeout of HTTP request in seconds (default is 1)
|
||||
structure = "children", # Structure of published context (default is "flat")
|
||||
verify_ssl = false, # Verification of SSL server certificates (default is true)
|
||||
|
||||
mapping = [ # Example mapping for structure = flat
|
||||
"rtds_sub1(V2).v2(float)", # Format: "entityId(entityType).AttributeName(AttributeType)"
|
||||
"rtds_sub2(power).p1(float)"
|
||||
mapping = [ # Format: "AttributeName(AttributeType)"
|
||||
PTotalLosses(MW)",
|
||||
"QTotalLosses(Mvar)"
|
||||
]
|
||||
# mapping = [ # Alternative mapping for structure = children
|
||||
# "fa846ed3-5871-11e5-b0cd-ecf4bb16fe0c(GridSectionDataValue).value(float)", # Index #0
|
||||
# "1d2c63e4-6130-11e5-9b0d-001c42f23160(GridSectionDataValue).value(float)" # Index #1
|
||||
# .... # Index #n
|
||||
# (every line correspondents to one value)
|
||||
#]
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -34,18 +34,13 @@ struct node;
|
|||
|
||||
struct ngsi {
|
||||
const char *endpoint; /**< The NGSI context broker endpoint URL. */
|
||||
const char *token; /**< An optional authentication token which will be sent as HTTP header. */
|
||||
const char *entity_id; /**< The context broker entity id related to this node */
|
||||
const char *entity_type; /**< The type of the entity */
|
||||
const char *access_token; /**< An optional authentication token which will be sent as HTTP header. */
|
||||
|
||||
double timeout; /**< HTTP timeout in seconds */
|
||||
|
||||
int ssl_verify; /**< Boolean flag whether SSL server certificates should be verified or not. */
|
||||
|
||||
|
||||
enum ngsi_structure {
|
||||
NGSI_FLAT,
|
||||
NGSI_CHILDREN
|
||||
} structure; /**< Structure of published entitites */
|
||||
|
||||
int ssl_verify; /**< Boolean flag whether SSL server certificates should be verified or not. */
|
||||
|
||||
struct curl_slist *headers; /**< List of HTTP request headers for libcurl */
|
||||
|
||||
|
|
|
@ -107,7 +107,7 @@ static int ngsi_request(CURL *handle, const char *endpoint, const char *operatio
|
|||
curl_easy_getinfo(handle, CURLINFO_TOTAL_TIME, &time);
|
||||
curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &code);
|
||||
|
||||
debug(20, "Request to context broker completed in %.4f seconds", time);
|
||||
debug(10, "Request to context broker completed in %.4f seconds", time);
|
||||
debug(20, "Response from context broker (code=%ld):\n%s", code, chunk.data);
|
||||
|
||||
json_error_t err;
|
||||
|
@ -129,53 +129,44 @@ static int ngsi_request(CURL *handle, const char *endpoint, const char *operatio
|
|||
void ngsi_prepare_context(struct node *n, config_setting_t *mapping)
|
||||
{
|
||||
struct ngsi *i = n->ngsi;
|
||||
|
||||
i->context = json_object();
|
||||
|
||||
|
||||
list_init(&i->mapping, NULL);
|
||||
|
||||
i->context = json_object();
|
||||
|
||||
json_t *elements = json_array();
|
||||
json_object_set_new(i->context, "contextElements", elements);
|
||||
|
||||
json_t *entity = json_pack("{ s: s, s: s, s: b }",
|
||||
"id", i->entity_id,
|
||||
"type", i->entity_type,
|
||||
"isPattern", 0
|
||||
);
|
||||
json_array_append_new(elements, entity);
|
||||
|
||||
json_t *attributes = json_array();
|
||||
json_object_set_new(entity, "attributes", attributes);
|
||||
|
||||
for (int j = 0; j < config_setting_length(mapping); j++) {
|
||||
/* Get token */
|
||||
config_setting_t *ctoken = config_setting_get_elem(mapping, j);
|
||||
const char *stoken = config_setting_get_string(ctoken);
|
||||
if (!stoken)
|
||||
cerror(mapping, "Invalid token");
|
||||
const char *token = config_setting_get_string_elem(mapping, j);
|
||||
if (!token)
|
||||
cerror(mapping, "Invalid mapping token");
|
||||
|
||||
/* Parse token */
|
||||
char eid[64], etype[64], aname[64], atype[64];
|
||||
if (sscanf(stoken, "%63[^().](%63[^().]).%63[^().](%63[^().])", eid, etype, aname, atype) != 4)
|
||||
cerror(ctoken, "Invalid token: '%s'", stoken);
|
||||
|
||||
/* Create entity */
|
||||
json_t *attributes; /* borrowed reference */
|
||||
|
||||
json_t *entity = json_lookup(elements, "id", eid);
|
||||
if (!entity) {
|
||||
entity = json_pack("{ s: s, s: s, s: b }",
|
||||
"id", eid,
|
||||
"type", etype,
|
||||
"isPattern", 0
|
||||
);
|
||||
json_array_append_new(elements, entity);
|
||||
|
||||
attributes = json_array();
|
||||
json_object_set_new(entity, "attributes", attributes);
|
||||
}
|
||||
else {
|
||||
if (i->structure == NGSI_CHILDREN)
|
||||
cerror(ctoken, "Duplicate mapping for index %u", j);
|
||||
|
||||
attributes = json_object_get(entity, "attributes");
|
||||
}
|
||||
|
||||
/* Create attribute for entity */
|
||||
if (json_lookup(attributes, "name", aname))
|
||||
cerror(ctoken, "Duplicated attribute '%s' in NGSI mapping of node '%s'", aname, n->name);
|
||||
char aname[64], atype[64];
|
||||
if (sscanf(token, "%[^(](%[^)])", aname, atype) != 2)
|
||||
cerror(mapping, "Invalid mapping token: '%s'", token);
|
||||
|
||||
json_t *attribute = json_pack("{ s: s, s: s, s: [ ] }",
|
||||
"name", aname,
|
||||
"type", atype,
|
||||
"value"
|
||||
);
|
||||
json_array_append_new(attributes, attribute);
|
||||
|
||||
/* Create Metadata for attribute */
|
||||
json_t *metadatas = json_array();
|
||||
json_object_set_new(attribute, "metadatas", metadatas);
|
||||
|
||||
json_array_append_new(metadatas, json_pack("{ s: s, s: s, s: s+ }",
|
||||
"name", "source",
|
||||
|
@ -191,41 +182,10 @@ void ngsi_prepare_context(struct node *n, config_setting_t *mapping)
|
|||
"name", "timestamp",
|
||||
"type", "date",
|
||||
"value", ""
|
||||
));
|
||||
|
||||
if (i->structure == NGSI_CHILDREN) {
|
||||
json_array_append_new(attributes, json_pack("{ s: s, s: s, s: s }",
|
||||
"name", "parentId",
|
||||
"type", "uuid",
|
||||
"value", eid
|
||||
));
|
||||
|
||||
json_array_append_new(attributes, json_pack("{ s: s, s: s, s: s }",
|
||||
"name", "source",
|
||||
"type", "string",
|
||||
"value", "measurement"
|
||||
));
|
||||
|
||||
json_array_append_new(attributes, json_pack("{ s: s, s: s, s: o }",
|
||||
"name", "timestamp",
|
||||
"type", "date",
|
||||
"value", json_date(NULL)
|
||||
));
|
||||
}
|
||||
|
||||
json_t *attribute = json_pack("{ s: s, s: s, s: [ ] }",
|
||||
"name", aname,
|
||||
"type", atype,
|
||||
"value"
|
||||
);
|
||||
));
|
||||
|
||||
list_push(&i->mapping, attribute);
|
||||
|
||||
json_object_set_new(attribute, "metadatas", metadatas);
|
||||
json_array_append_new(attributes, attribute);
|
||||
}
|
||||
|
||||
json_object_set_new(i->context, "contextElements", elements);
|
||||
}
|
||||
|
||||
int ngsi_init(int argc, char *argv[], struct settings *set)
|
||||
|
@ -244,11 +204,17 @@ int ngsi_parse(config_setting_t *cfg, struct node *n)
|
|||
{
|
||||
struct ngsi *i = alloc(sizeof(struct ngsi));
|
||||
|
||||
if (!config_setting_lookup_string(cfg, "token", &i->token))
|
||||
i->token = NULL; /* disabled by default */
|
||||
if (!config_setting_lookup_string(cfg, "access_token", &i->access_token))
|
||||
i->access_token = NULL; /* disabled by default */
|
||||
|
||||
if (!config_setting_lookup_string(cfg, "endpoint", &i->endpoint))
|
||||
cerror(cfg, "Missing NGSI endpoint for node '%s'", n->name);
|
||||
|
||||
if (!config_setting_lookup_string(cfg, "entity_id", &i->entity_id))
|
||||
cerror(cfg, "Missing NGSI entity ID for node '%s'", n->name);
|
||||
|
||||
if (!config_setting_lookup_string(cfg, "entity_type", &i->entity_type))
|
||||
cerror(cfg, "Missing NGSI entity type for node '%s'", n->name);
|
||||
|
||||
if (!config_setting_lookup_bool(cfg, "ssl_verify", &i->ssl_verify))
|
||||
i->ssl_verify = 1; /* verify by default */
|
||||
|
@ -256,18 +222,6 @@ int ngsi_parse(config_setting_t *cfg, struct node *n)
|
|||
if (!config_setting_lookup_float(cfg, "timeout", &i->timeout))
|
||||
i->timeout = 1; /* default value */
|
||||
|
||||
const char *structure;
|
||||
if (!config_setting_lookup_string(cfg, "structure", &structure))
|
||||
i->structure = NGSI_FLAT;
|
||||
else {
|
||||
if (!strcmp(structure, "flat"))
|
||||
i->structure = NGSI_FLAT;
|
||||
else if (!strcmp(structure, "children"))
|
||||
i->structure = NGSI_CHILDREN;
|
||||
else
|
||||
cerror(cfg, "Invalid NGSI entity structure '%s' for node '%s'", structure, n->name);
|
||||
}
|
||||
|
||||
n->ngsi = i;
|
||||
|
||||
config_setting_t *mapping = config_setting_get_member(cfg, "mapping");
|
||||
|
@ -296,8 +250,8 @@ int ngsi_open(struct node *n)
|
|||
i->curl = curl_easy_init();
|
||||
i->headers = NULL;
|
||||
|
||||
if (i->token) {
|
||||
snprintf(buf, sizeof(buf), "Auth-Token: %s", i->token);
|
||||
if (i->access_token) {
|
||||
snprintf(buf, sizeof(buf), "Auth-Token: %s", i->access_token);
|
||||
i->headers = curl_slist_append(i->headers, buf);
|
||||
}
|
||||
|
||||
|
@ -330,44 +284,56 @@ int ngsi_close(struct node *n)
|
|||
}
|
||||
|
||||
int ngsi_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt)
|
||||
{
|
||||
{
|
||||
/* struct ngsi *i = n->ngsi;
|
||||
struct msg *m = &pool[first % poolsize];
|
||||
int ret;
|
||||
|
||||
json_t *response;
|
||||
json_t *entities = json_array();
|
||||
json_t *query = json_pack("{ s: o }", "entities", entities);
|
||||
|
||||
ret = ngsi_request(i->curl, i->endpoint, "queryContext", NULL, NULL);
|
||||
if (ret < 0) {
|
||||
warn("Failed to query data from context broker");
|
||||
return 0;
|
||||
}
|
||||
*/
|
||||
return -1; /** @todo not yet implemented */
|
||||
}
|
||||
|
||||
int ngsi_write(struct node *n, struct msg *pool, int poolsize, int first, int cnt)
|
||||
{
|
||||
struct ngsi *i = n->ngsi;
|
||||
struct msg *m = &pool[first % poolsize];
|
||||
|
||||
if (cnt > 1)
|
||||
error("NGSI nodes only can send a single message at once");
|
||||
|
||||
/* First message */
|
||||
struct msg *fm = &pool[first % poolsize];
|
||||
|
||||
/* Update context */
|
||||
for (int j = 0; j < MIN(i->mapping.length, m->length); j++) {
|
||||
for (int j = 0; j < MIN(i->mapping.length, fm->length); j++) {
|
||||
json_t *attribute = list_at(&i->mapping, j);
|
||||
json_t *values = json_object_get(attribute, "value");
|
||||
json_t *metadatas = json_object_get(attribute, "metadatas");
|
||||
|
||||
/* Update timestamp */
|
||||
json_t *metadata_ts = json_lookup(metadatas, "name", "timestamp");
|
||||
json_object_set(metadata_ts, "value", json_date(&MSG_TS(m)));
|
||||
json_object_set(metadata_ts, "value", json_date(&MSG_TS(fm)));
|
||||
|
||||
/* Update value */
|
||||
json_array_clear(values);
|
||||
json_array_append_new(values, json_real(m->data[j].f));
|
||||
for (int k = 0; k < cnt; k++) {
|
||||
struct msg *m = &pool[(first + k) % poolsize];
|
||||
|
||||
double tsms = (double) m->ts.sec * 1e3 + m->ts.nsec / 1e6;
|
||||
|
||||
json_array_append_new(values, json_pack("[ o, o ]",
|
||||
json_real(tsms),
|
||||
json_real(m->data[j].f)
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
/* Update UUIDs for children structure */
|
||||
if (i->structure == NGSI_CHILDREN) {
|
||||
json_t *entity, *elements = json_object_get(i->context, "contextElements");
|
||||
size_t ind;
|
||||
json_array_foreach(elements, ind, entity)
|
||||
json_object_set_new(entity, "id", json_uuid());
|
||||
|
||||
json_object_set_new(i->context, "updateAction", json_string("APPEND"));
|
||||
}
|
||||
else
|
||||
json_object_set_new(i->context, "updateAction", json_string("UPDATE"));
|
||||
|
||||
json_object_set_new(i->context, "updateAction", json_string("UPDATE")); // @todo REPLACE?
|
||||
|
||||
json_t *response;
|
||||
int code = ngsi_request(i->curl, i->endpoint, "updateContext", i->context, &response);
|
||||
|
|
Loading…
Add table
Reference in a new issue