1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

fixed broken nags node-type

This commit is contained in:
Steffen Vogel 2016-06-08 23:42:44 +02:00
parent ad8ee1c3f7
commit 9096de8024
3 changed files with 60 additions and 54 deletions

View file

@ -68,13 +68,13 @@ endif
# LIB_OBJS += gtfpga.o
# PKGS += libpci
#endif
#
## Enable NGSI support
#ifeq ($(shell pkg-config libcurl jansson uuid; echo $$?),0)
# LIB_OBJS += ngsi.o
# PKGS += libcurl jansson uuid
#endif
#
# Enable NGSI support
ifeq ($(shell pkg-config libcurl jansson uuid; echo $$?),0)
LIB_OBJS += ngsi.o
PKGS += libcurl jansson uuid
endif
## Enable WebSocket support
#ifeq ($(shell pkg-config libwebsockets jansson; echo $$?),0)
# LIB_OBJS += websocket.o websocket-live.o websocket-http.o

View file

@ -32,14 +32,6 @@
struct node;
struct ngsi_attribute {
char *name;
char *type;
int index;
struct list metadata;
};
struct ngsi {
const char *endpoint; /**< The NGSI context broker endpoint URL. */
const char *entity_id; /**< The context broker entity id related to this node */
@ -56,7 +48,7 @@ struct ngsi {
CURL *curl; /**< libcurl: handle */
struct list mapping; /**< A mapping between indices of the S2SS messages and the attributes in ngsi::context */
struct list mapping; /**< A mapping between indices of the VILLASnode samples and the attributes in ngsi::context */
};
/** Initialize global NGSI settings and maps shared memory regions.
@ -84,9 +76,9 @@ int ngsi_open(struct node *n);
int ngsi_close(struct node *n);
/** @see node_vtable::read */
int ngsi_read(struct node *n, struct pool *pool, int cnt);
int ngsi_read(struct node *n, struct sample *smps[], unsigned cnt);
/** @see node_vtable::write */
int ngsi_write(struct node *n, struct pool *pool, int cnt);
int ngsi_write(struct node *n, struct sample *smps[], unsigned cnt);
#endif /** _NGSI_H_ @} */

View file

@ -75,12 +75,20 @@ struct ngsi_metadata {
char *value;
};
struct ngsi_attribute {
char *name;
char *type;
int index;
struct list metadata;
};
struct ngsi_response {
char *data;
size_t len;
};
static json_t* ngsi_build_entity(struct ngsi *i, struct pool *pool, int cnt, int flags)
static json_t* ngsi_build_entity(struct ngsi *i, struct sample *smps[], unsigned cnt, int flags)
{
json_t *entity = json_pack("{ s: s, s: s, s: b }",
"id", i->entity_id,
@ -99,12 +107,10 @@ static json_t* ngsi_build_entity(struct ngsi *i, struct pool *pool, int cnt, int
if (flags & NGSI_ENTITY_VALUES) { /* Build value vector */
json_t *values = json_array();
for (int k = 0; k < cnt; k++) {
struct msg *m = pool_getrel(pool, k);
json_array_append_new(values, json_pack("[ f, f, i ]",
time_to_double(&MSG_TS(m)),
m->data[map->index].f,
m->sequence
time_to_double(&smps[k]->ts.origin),
smps[k]->values[map->index].f,
smps[k]->sequence
));
}
@ -133,7 +139,7 @@ static json_t* ngsi_build_entity(struct ngsi *i, struct pool *pool, int cnt, int
return entity;
}
static int ngsi_parse_entity(json_t *entity, struct ngsi *i, struct pool *pool, int cnt)
static int ngsi_parse_entity(json_t *entity, struct ngsi *i, struct sample *smps[], unsigned cnt)
{
int ret;
const char *id, *name, *type;
@ -152,13 +158,8 @@ static int ngsi_parse_entity(json_t *entity, struct ngsi *i, struct pool *pool,
if (strcmp(id, i->entity_id) || strcmp(type, i->entity_type))
return -2;
for (int j = 0; j < cnt; j++) {
struct msg *m = pool_getrel(pool, j);
m->version = MSG_VERSION;
m->values = json_array_size(attributes);
m->endian = MSG_ENDIAN_HOST;
}
for (int k = 0; k < cnt; k++)
smps[k]->length = json_array_size(attributes);
json_array_foreach(attributes, index, attribute) {
struct ngsi_attribute *map;
@ -187,10 +188,8 @@ static int ngsi_parse_entity(json_t *entity, struct ngsi *i, struct pool *pool,
if (!json_is_array(values) || json_array_size(values) != cnt)
return -6;
size_t index2;
json_array_foreach(values, index2, tuple) {
struct msg *m = pool_getrel(pool, index2);
size_t k;
json_array_foreach(values, k, tuple) {
/* Check sample format */
if (!json_is_array(tuple) || json_array_size(tuple) != 3)
return -7;
@ -201,16 +200,14 @@ static int ngsi_parse_entity(json_t *entity, struct ngsi *i, struct pool *pool,
if (ret)
return -8;
m->sequence = atoi(seq);
smps[k]->sequence = atoi(seq);
struct timespec tss = time_from_double(strtod(ts, &end));
if (ts == end)
return -9;
m->ts.sec = tss.tv_sec;
m->ts.nsec = tss.tv_nsec;
m->data[map->index].f = strtof(value, &end);
smps[k]->ts.origin = tss;
smps[k]->values[map->index].f = strtof(value, &end);
if (value == end)
return -10;
}
@ -224,14 +221,16 @@ static int ngsi_parse_mapping(struct list *mapping, config_setting_t *cfg)
if (!config_setting_is_array(cfg))
return -1;
list_init(mapping, NULL);
list_init(mapping);
for (int j = 0; j < config_setting_length(cfg); j++) { INDENT
const char *token = config_setting_get_string_elem(cfg, j);
if (!token)
return -2;
struct ngsi_attribute map = { .index = j };
struct ngsi_attribute map = {
.index = j
};
/* Parse Attribute: AttributeName(AttributeType) */
int bytes;
@ -241,7 +240,7 @@ static int ngsi_parse_mapping(struct list *mapping, config_setting_t *cfg)
token += bytes;
/* MetadataName(MetadataType)=MetadataValue */
list_init(&map.metadata, NULL);
list_init(&map.metadata);
struct ngsi_metadata meta;
while (sscanf(token, " %m[^(](%m[^)])=%ms%n", &meta.name, &meta.type, &meta.value, &bytes) == 3) { INDENT
list_push(&map.metadata, memdup(&meta, sizeof(struct ngsi_metadata)));
@ -328,7 +327,7 @@ static int ngsi_request(CURL *handle, const char *endpoint, const char *operatio
curl_easy_setopt(handle, CURLOPT_POSTFIELDSIZE, strlen(post));
curl_easy_setopt(handle, CURLOPT_POSTFIELDS, post);
debug(18, "Request to context broker: %s\n%s", url, post);
debug(DBG_NGSI | 18, "Request to context broker: %s\n%s", url, post);
/* We don't want to leave the handle in an invalid state */
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old);
@ -342,8 +341,8 @@ static int ngsi_request(CURL *handle, const char *endpoint, const char *operatio
curl_easy_getinfo(handle, CURLINFO_TOTAL_TIME, &time);
debug(16, "Request to context broker completed in %.4f seconds", time);
debug(17, "Response from context broker:\n%s", chunk.data);
debug(DBG_NGSI | 16, "Request to context broker completed in %.4f seconds", time);
debug(DBG_NGSI | 17, "Response from context broker:\n%s", chunk.data);
*response = json_loads(chunk.data, 0, &err);
if (!*response)
@ -470,11 +469,26 @@ char * ngsi_print(struct node *n)
i->endpoint, i->timeout, list_length(&i->mapping));
}
static void ngsi_destroy_metadata(struct ngsi_metadata *meta)
{
free(meta->value);
free(meta->name);
free(meta->type);
}
static void ngsi_destroy_attribute(struct ngsi_attribute *attr)
{
free(attr->name);
free(attr->type);
list_destroy(&attr->metadata, (dtor_cb_t) ngsi_destroy_metadata, true);
}
int ngsi_destroy(struct node *n)
{
struct ngsi *i = n->_vd;
list_destroy(&i->mapping);
list_destroy(&i->mapping, (dtor_cb_t) ngsi_destroy_attribute, true);
return 0;
}
@ -501,7 +515,7 @@ int ngsi_open(struct node *n)
if (i->tfd < 0)
serror("Failed to create timer");
i->headers = curl_slist_append(i->headers, "User-Agent: S2SS " VERSION);
i->headers = curl_slist_append(i->headers, "User-Agent: VILLASnode " VERSION);
i->headers = curl_slist_append(i->headers, "Accept: application/json");
i->headers = curl_slist_append(i->headers, "Content-Type: application/json");
@ -539,7 +553,7 @@ int ngsi_close(struct node *n)
return ret;
}
int ngsi_read(struct node *n, struct pool *pool, int cnt)
int ngsi_read(struct node *n, struct sample *smps[], unsigned cnt)
{
struct ngsi *i = n->_vd;
int ret;
@ -548,13 +562,13 @@ int ngsi_read(struct node *n, struct pool *pool, int cnt)
perror("Failed to wait for timer");
json_t *rentity;
json_t *entity = ngsi_build_entity(i, NULL, 0, 0);
json_t *entity = ngsi_build_entity(i, NULL, 0, 0);
ret = ngsi_request_context_query(i->curl, i->endpoint, entity, &rentity);
if (ret)
goto out;
ret = ngsi_parse_entity(rentity, i, pool, cnt);
ret = ngsi_parse_entity(rentity, i, smps, cnt);
if (ret)
goto out2;
@ -564,12 +578,12 @@ out: json_decref(entity);
return ret;
}
int ngsi_write(struct node *n, struct pool *pool, int cnt)
int ngsi_write(struct node *n, struct sample *smps[], unsigned cnt)
{
struct ngsi *i = n->_vd;
int ret;
json_t *entity = ngsi_build_entity(i, pool, cnt, NGSI_ENTITY_VALUES);
json_t *entity = ngsi_build_entity(i, smps, cnt, NGSI_ENTITY_VALUES);
ret = ngsi_request_context_update(i->curl, i->endpoint, "UPDATE", entity);