diff --git a/Makefile b/Makefile index 38d4f8447..c917c7e08 100644 --- a/Makefile +++ b/Makefile @@ -68,13 +68,13 @@ endif # LIB_OBJS += gtfpga.o # PKGS += libpci #endif -# -## Enable NGSI support -#ifeq ($(shell pkg-config libcurl jansson uuid; echo $$?),0) -# LIB_OBJS += ngsi.o -# PKGS += libcurl jansson uuid -#endif -# + +# Enable NGSI support +ifeq ($(shell pkg-config libcurl jansson uuid; echo $$?),0) + LIB_OBJS += ngsi.o + PKGS += libcurl jansson uuid +endif + ## Enable WebSocket support #ifeq ($(shell pkg-config libwebsockets jansson; echo $$?),0) # LIB_OBJS += websocket.o websocket-live.o websocket-http.o diff --git a/include/ngsi.h b/include/ngsi.h index 8b9c20fae..807904a87 100644 --- a/include/ngsi.h +++ b/include/ngsi.h @@ -32,14 +32,6 @@ struct node; -struct ngsi_attribute { - char *name; - char *type; - - int index; - struct list metadata; -}; - struct ngsi { const char *endpoint; /**< The NGSI context broker endpoint URL. */ const char *entity_id; /**< The context broker entity id related to this node */ @@ -56,7 +48,7 @@ struct ngsi { CURL *curl; /**< libcurl: handle */ - struct list mapping; /**< A mapping between indices of the S2SS messages and the attributes in ngsi::context */ + struct list mapping; /**< A mapping between indices of the VILLASnode samples and the attributes in ngsi::context */ }; /** Initialize global NGSI settings and maps shared memory regions. @@ -84,9 +76,9 @@ int ngsi_open(struct node *n); int ngsi_close(struct node *n); /** @see node_vtable::read */ -int ngsi_read(struct node *n, struct pool *pool, int cnt); +int ngsi_read(struct node *n, struct sample *smps[], unsigned cnt); /** @see node_vtable::write */ -int ngsi_write(struct node *n, struct pool *pool, int cnt); +int ngsi_write(struct node *n, struct sample *smps[], unsigned cnt); #endif /** _NGSI_H_ @} */ \ No newline at end of file diff --git a/lib/ngsi.c b/lib/ngsi.c index 2302e6e7b..6b6d1ed20 100644 --- a/lib/ngsi.c +++ b/lib/ngsi.c @@ -75,12 +75,20 @@ struct ngsi_metadata { char *value; }; +struct ngsi_attribute { + char *name; + char *type; + + int index; + struct list metadata; +}; + struct ngsi_response { char *data; size_t len; }; -static json_t* ngsi_build_entity(struct ngsi *i, struct pool *pool, int cnt, int flags) +static json_t* ngsi_build_entity(struct ngsi *i, struct sample *smps[], unsigned cnt, int flags) { json_t *entity = json_pack("{ s: s, s: s, s: b }", "id", i->entity_id, @@ -99,12 +107,10 @@ static json_t* ngsi_build_entity(struct ngsi *i, struct pool *pool, int cnt, int if (flags & NGSI_ENTITY_VALUES) { /* Build value vector */ json_t *values = json_array(); for (int k = 0; k < cnt; k++) { - struct msg *m = pool_getrel(pool, k); - json_array_append_new(values, json_pack("[ f, f, i ]", - time_to_double(&MSG_TS(m)), - m->data[map->index].f, - m->sequence + time_to_double(&smps[k]->ts.origin), + smps[k]->values[map->index].f, + smps[k]->sequence )); } @@ -133,7 +139,7 @@ static json_t* ngsi_build_entity(struct ngsi *i, struct pool *pool, int cnt, int return entity; } -static int ngsi_parse_entity(json_t *entity, struct ngsi *i, struct pool *pool, int cnt) +static int ngsi_parse_entity(json_t *entity, struct ngsi *i, struct sample *smps[], unsigned cnt) { int ret; const char *id, *name, *type; @@ -152,13 +158,8 @@ static int ngsi_parse_entity(json_t *entity, struct ngsi *i, struct pool *pool, if (strcmp(id, i->entity_id) || strcmp(type, i->entity_type)) return -2; - for (int j = 0; j < cnt; j++) { - struct msg *m = pool_getrel(pool, j); - - m->version = MSG_VERSION; - m->values = json_array_size(attributes); - m->endian = MSG_ENDIAN_HOST; - } + for (int k = 0; k < cnt; k++) + smps[k]->length = json_array_size(attributes); json_array_foreach(attributes, index, attribute) { struct ngsi_attribute *map; @@ -187,10 +188,8 @@ static int ngsi_parse_entity(json_t *entity, struct ngsi *i, struct pool *pool, if (!json_is_array(values) || json_array_size(values) != cnt) return -6; - size_t index2; - json_array_foreach(values, index2, tuple) { - struct msg *m = pool_getrel(pool, index2); - + size_t k; + json_array_foreach(values, k, tuple) { /* Check sample format */ if (!json_is_array(tuple) || json_array_size(tuple) != 3) return -7; @@ -201,16 +200,14 @@ static int ngsi_parse_entity(json_t *entity, struct ngsi *i, struct pool *pool, if (ret) return -8; - m->sequence = atoi(seq); + smps[k]->sequence = atoi(seq); struct timespec tss = time_from_double(strtod(ts, &end)); if (ts == end) return -9; - m->ts.sec = tss.tv_sec; - m->ts.nsec = tss.tv_nsec; - - m->data[map->index].f = strtof(value, &end); + smps[k]->ts.origin = tss; + smps[k]->values[map->index].f = strtof(value, &end); if (value == end) return -10; } @@ -224,14 +221,16 @@ static int ngsi_parse_mapping(struct list *mapping, config_setting_t *cfg) if (!config_setting_is_array(cfg)) return -1; - list_init(mapping, NULL); + list_init(mapping); for (int j = 0; j < config_setting_length(cfg); j++) { INDENT const char *token = config_setting_get_string_elem(cfg, j); if (!token) return -2; - struct ngsi_attribute map = { .index = j }; + struct ngsi_attribute map = { + .index = j + }; /* Parse Attribute: AttributeName(AttributeType) */ int bytes; @@ -241,7 +240,7 @@ static int ngsi_parse_mapping(struct list *mapping, config_setting_t *cfg) token += bytes; /* MetadataName(MetadataType)=MetadataValue */ - list_init(&map.metadata, NULL); + list_init(&map.metadata); struct ngsi_metadata meta; while (sscanf(token, " %m[^(](%m[^)])=%ms%n", &meta.name, &meta.type, &meta.value, &bytes) == 3) { INDENT list_push(&map.metadata, memdup(&meta, sizeof(struct ngsi_metadata))); @@ -328,7 +327,7 @@ static int ngsi_request(CURL *handle, const char *endpoint, const char *operatio curl_easy_setopt(handle, CURLOPT_POSTFIELDSIZE, strlen(post)); curl_easy_setopt(handle, CURLOPT_POSTFIELDS, post); - debug(18, "Request to context broker: %s\n%s", url, post); + debug(DBG_NGSI | 18, "Request to context broker: %s\n%s", url, post); /* We don't want to leave the handle in an invalid state */ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old); @@ -342,8 +341,8 @@ static int ngsi_request(CURL *handle, const char *endpoint, const char *operatio curl_easy_getinfo(handle, CURLINFO_TOTAL_TIME, &time); - debug(16, "Request to context broker completed in %.4f seconds", time); - debug(17, "Response from context broker:\n%s", chunk.data); + debug(DBG_NGSI | 16, "Request to context broker completed in %.4f seconds", time); + debug(DBG_NGSI | 17, "Response from context broker:\n%s", chunk.data); *response = json_loads(chunk.data, 0, &err); if (!*response) @@ -470,11 +469,26 @@ char * ngsi_print(struct node *n) i->endpoint, i->timeout, list_length(&i->mapping)); } +static void ngsi_destroy_metadata(struct ngsi_metadata *meta) +{ + free(meta->value); + free(meta->name); + free(meta->type); +} + +static void ngsi_destroy_attribute(struct ngsi_attribute *attr) +{ + free(attr->name); + free(attr->type); + + list_destroy(&attr->metadata, (dtor_cb_t) ngsi_destroy_metadata, true); +} + int ngsi_destroy(struct node *n) { struct ngsi *i = n->_vd; - list_destroy(&i->mapping); + list_destroy(&i->mapping, (dtor_cb_t) ngsi_destroy_attribute, true); return 0; } @@ -501,7 +515,7 @@ int ngsi_open(struct node *n) if (i->tfd < 0) serror("Failed to create timer"); - i->headers = curl_slist_append(i->headers, "User-Agent: S2SS " VERSION); + i->headers = curl_slist_append(i->headers, "User-Agent: VILLASnode " VERSION); i->headers = curl_slist_append(i->headers, "Accept: application/json"); i->headers = curl_slist_append(i->headers, "Content-Type: application/json"); @@ -539,7 +553,7 @@ int ngsi_close(struct node *n) return ret; } -int ngsi_read(struct node *n, struct pool *pool, int cnt) +int ngsi_read(struct node *n, struct sample *smps[], unsigned cnt) { struct ngsi *i = n->_vd; int ret; @@ -548,13 +562,13 @@ int ngsi_read(struct node *n, struct pool *pool, int cnt) perror("Failed to wait for timer"); json_t *rentity; - json_t *entity = ngsi_build_entity(i, NULL, 0, 0); + json_t *entity = ngsi_build_entity(i, NULL, 0, 0); ret = ngsi_request_context_query(i->curl, i->endpoint, entity, &rentity); if (ret) goto out; - ret = ngsi_parse_entity(rentity, i, pool, cnt); + ret = ngsi_parse_entity(rentity, i, smps, cnt); if (ret) goto out2; @@ -564,12 +578,12 @@ out: json_decref(entity); return ret; } -int ngsi_write(struct node *n, struct pool *pool, int cnt) +int ngsi_write(struct node *n, struct sample *smps[], unsigned cnt) { struct ngsi *i = n->_vd; int ret; - json_t *entity = ngsi_build_entity(i, pool, cnt, NGSI_ENTITY_VALUES); + json_t *entity = ngsi_build_entity(i, smps, cnt, NGSI_ENTITY_VALUES); ret = ngsi_request_context_update(i->curl, i->endpoint, "UPDATE", entity);