diff --git a/server/etc/armo-demo.conf b/server/etc/armo-demo.conf new file mode 100644 index 000000000..7f2e79476 --- /dev/null +++ b/server/etc/armo-demo.conf @@ -0,0 +1,47 @@ +# Configuration file for Mohsen and Arthur +# +# Date: 25.09.15 +# Author: Steffen Vogel +# Copyright: 2015, Institute for Automation of Complex Power Systems, EONERC +## + +stats = 0.5; +debug = 3; + +nodes = { + rtds = { + type = "socket", + layer = "udp", + local = "10.10.11.1:12000", # Local ip:port, use '*' for random port + remote = "127.0.0.1:12001", + }, + broker = { + type = "ngsi", + endpoint = "https://130.206.112.223:1026", + + ssl_verify = false, + timeout = 1, + structure = "children", + mapping = [ + "6a224113-62c4-11e5-b7be-ecf4bb16fe0c(GridSectionDataValue).value(float)" // Node: name='Linea 0 Tr' + "fca3bf58-62c4-11e5-867e-ecf4bb16fe0c(GridSectionDataValue).value(float)", // Conn: name='Linea 0 WP4_C0.1', from='Linea 0 Tr', to='Linea 0 Meter' + "fd4e0912-62c4-11e5-ba9b-ecf4bb16fe0c(GridSectionDataValue).value(float)", // Conn: name='Linea 1 WP4_C_1', from='Linea 1 N0000', to='Linea 1 N0001' + "fd4e0924-62c4-11e5-80a5-ecf4bb16fe0c(GridSectionDataValue).value(float)", // Conn: name='Linea 4 WP4_C_4', from='Linea 4 N0000', to='Linea 4 N0001' + "fd4e3036-62c4-11e5-95e9-ecf4bb16fe0c(GridSectionDataValue).value(float)", // Conn: name='Linea 7 WP4_C_9', from='Linea 7 N0000', to='Linea 7 N0002' + "fd4e5741-62c4-11e5-90ec-ecf4bb16fe0c(GridSectionDataValue).value(float)", // Conn: name='Linea 9 WP4_C_12', from='Linea 9 N0000', to='Linea 9 N0002' + "fd4e7e41-62c4-11e5-8716-ecf4bb16fe0c(GridSectionDataValue).value(float)", // Conn: name='Linea 10 WP4_C_15', from='Linea 10 N0000', to='Linea 10 N0001' + "6a232b75-62c4-11e5-8d0f-ecf4bb16fe0c(GridSectionDataValue).value(float)" // Node: name='Linea 9 N0001' + // "rtds_sub1(type_one).value(float)", // structure = flat: entityId(entityType).attributeName(attributeType) + // "rtds_sub3(type_one).v2(float)", // structure = children parentId(entityType).attributeName(attributeType) + // "rtds_sub2(type_two).i1(float)" // Example: fa846ed3-5871-11e5-b0cd-ecf4bb16fe0c(GridSectionDataValue).value(float) + ] + } +}; + +paths = ( + { + in = "rtds", # Name of the node we listen to (see above) + out = "broker", # And we loop back to the origin + rate = 10 # in Requests / sec + } +); diff --git a/server/include/config.h b/server/include/config.h index 5dec0bcd8..36fe004a4 100644 --- a/server/include/config.h +++ b/server/include/config.h @@ -52,11 +52,18 @@ { "/etc/machine-id", "0d8399d0216314f083b9ed2053a354a8" }, \ { "/dev/sda2", "\x53\xf6\xb5\xeb\x8b\x16\x46\xdc\x8d\x8f\x5b\x70\xb8\xc9\x1a\x2a", 0x468 } } -/* Hook function configuration */ -#define HOOK_FIR_INDEX 1 /**< The first value of message should be filtered. */ -#define HOOK_TS_INDEX -1 /**< The last value of message should be overwritten by a timestamp. */ -#define HOOK_DECIMATE_RATIO 30 /**< Only forward every 30th message to the destination nodes. */ +/* Hard coded configuration of hook functions */ +#define HOOK_FIR_INDEX 0 /**< Which value inside a message should be filtered? */ +#define HOOK_FIR_COEFFS { -0.003658148158728, -0.008882653268281, 0.008001024183003, \ + 0.08090485991761, 0.2035239551043, 0.3040703593515, \ + 0.3040703593515, 0.2035239551043, 0.08090485991761, \ + 0.008001024183003, -0.008882653268281,-0.003658148158728 } + +#define HOOK_TS_INDEX -1 /**< The last value of message should be overwritten by a timestamp. */ +#define HOOK_DECIMATE_RATIO 30 /**< Only forward every 30th message to the destination nodes. */ +#define HOOK_DEDUP_TYPE HOOK_ASYNC +#define HOOK_DEDUP_TRESH 1e-3 /**< Do not send messages when difference of values to last message is smaller than this threshold */ /** Global configuration */ struct settings { /** Process priority (lower is better) */ diff --git a/server/include/hooks.h b/server/include/hooks.h index 8cc8094ae..b54e11f82 100644 --- a/server/include/hooks.h +++ b/server/include/hooks.h @@ -76,6 +76,9 @@ struct hook { * @{ */ +/** Example hook: Drop messages whose values are similiar to the previous ones */ +int hook_deduplicate(struct path *); + /** Example hook: Print the message. */ int hook_print(struct path *p); diff --git a/server/include/msg_format.h b/server/include/msg_format.h index e0cc9dbeb..eb680a57e 100644 --- a/server/include/msg_format.h +++ b/server/include/msg_format.h @@ -53,7 +53,7 @@ } /** Initialize a message */ -#define MSG_INIT(i) (struct msg) { \ +#define MSG_INIT(i) { \ .version = MSG_VERSION, \ .type = MSG_TYPE_DATA, \ .endian = MSG_ENDIAN_HOST, \ diff --git a/server/src/hooks.c b/server/src/hooks.c index 733d2ba9b..9811b9bd1 100644 --- a/server/src/hooks.c +++ b/server/src/hooks.c @@ -13,6 +13,7 @@ *********************************************************************************/ #include +#include #include "timing.h" #include "config.h" @@ -25,6 +26,33 @@ struct list hooks; +REGISTER_HOOK("deduplicate", 99, hook_deduplicate, HOOK_DEDUP_TYPE) +int hook_deduplicate(struct path *p) +{ + int ret = 0; +#if HOOK_DEDUP_TYPE == HOOK_ASYNC + /** Thread local storage (TLS) is used to maintain a copy of the last run of the hook */ + static __thread struct msg previous = MSG_INIT(0); + struct msg *prev = &previous; +#else + struct msg *prev = p->previous; +#endif + struct msg *cur = p->current; + + for (int i = 0; i < MIN(cur->length, prev->length); i++) { + if (fabs(cur->data[i].f - prev->data[i].f) > HOOK_DEDUP_TRESH) + goto out; + } + + ret = -1; /* no appreciable change in values, we will drop the packet */ + +out: +#if HOOK_DEDUP_TYPE == HOOK_ASYNC + memcpy(prev, cur, sizeof(struct msg)); /* save current message for next run */ +#endif + return ret; +} + REGISTER_HOOK("print", 99, hook_print, HOOK_MSG) int hook_print(struct path *p) { @@ -59,31 +87,39 @@ int hook_ts(struct path *p) return 0; } -REGISTER_HOOK("fir", 99, hook_fir, HOOK_POST) +REGISTER_HOOK("fir", 99, hook_fir, HOOK_MSG) int hook_fir(struct path *p) { - /** Simple FIR-LP: F_s = 1kHz, F_pass = 100 Hz, F_block = 300 + /** Coefficients for simple FIR-LowPass: + * F_s = 1kHz, F_pass = 100 Hz, F_block = 300 + * * Tip: Use MATLAB's filter design tool and export coefficients - * with the integrated C-Header export */ - static const double coeffs[] = { - -0.003658148158728, -0.008882653268281, 0.008001024183003, - 0.08090485991761, 0.2035239551043, 0.3040703593515, - 0.3040703593515, 0.2035239551043, 0.08090485991761, - 0.008001024183003, -0.008882653268281,-0.003658148158728 }; + * with the integrated C-Header export + */ + static const double coeffs[] = HOOK_FIR_COEFFS; + + /** Per path thread local storage for unfiltered sample values. + * The message ringbuffer (p->pool & p->current) will contain filtered data! + */ + static __thread double *past = NULL; + + /** @todo Avoid dynamic allocation at runtime */ + if (!past) + alloc(p->poolsize * sizeof(double)); + + + /* Current value of interest */ + float *cur = &p->current->data[HOOK_FIR_INDEX].f; + + /* Save last sample, unfiltered */ + past[p->received % p->poolsize] = *cur; - /* Accumulator */ - double sum = 0; - - /** Trim FIR length to length of history buffer */ - int len = MIN(ARRAY_LEN(coeffs), p->poolsize); - - for (int i = 0; i < len; i++) { - struct msg *old = &p->pool[(p->poolsize+p->received-i) % p->poolsize]; - - sum += coeffs[i] * old->data[HOOK_FIR_INDEX].f; - } - - p->current->data[HOOK_FIR_INDEX].f = sum; + /* Reset accumulator */ + *cur = 0; + + /* FIR loop */ + for (int i = 0; i < MIN(ARRAY_LEN(coeffs), p->poolsize); i++) + *cur += coeffs[i] * past[p->received+p->poolsize-i]; return 0; } diff --git a/server/src/ngsi.c b/server/src/ngsi.c index 06eb187c1..3512d480b 100644 --- a/server/src/ngsi.c +++ b/server/src/ngsi.c @@ -1,3 +1,4 @@ + /** Node type: OMA Next Generation Services Interface 10 (NGSI) (FIWARE context broker) * * This file implements the NGSI context interface. NGSI is RESTful HTTP is specified by @@ -88,7 +89,7 @@ static size_t ngsi_request_writer(void *contents, size_t size, size_t nmemb, voi static int ngsi_request(CURL *handle, json_t *content, json_t **response) { struct ngsi_response chunk = { 0 }; - long code; + char *post = json_dumps(content, JSON_INDENT(4)); curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, ngsi_request_writer); @@ -102,8 +103,12 @@ static int ngsi_request(CURL *handle, json_t *content, json_t **response) if (ret) error("HTTP request failed: %s", curl_easy_strerror(ret)); + long code; + double time; + curl_easy_getinfo(handle, CURLINFO_TOTAL_TIME, &time); curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &code); + debug(20, "Request to context broker completed in %.4f seconds", time); debug(20, "Response from context broker (code=%ld):\n%s", code, chunk.data); json_error_t err; @@ -189,6 +194,11 @@ void ngsi_prepare_context(struct node *n, config_setting_t *mapping) "type", "integer", "value", j )); + json_array_append(metadatas, json_pack("{ s: s, s: s, s: o }", + "name", "timestamp", + "type", "date", + "value", json_date(NULL) + )); if (i->structure == NGSI_CHILDREN) { json_array_append(attributes, json_pack("{ s: s, s: s, s: s }", @@ -326,23 +336,25 @@ int ngsi_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt int ngsi_write(struct node *n, struct msg *pool, int poolsize, int first, int cnt) { struct ngsi *i = n->ngsi; - struct msg *m = &pool[first]; - + struct msg *m = &pool[first % poolsize]; + + if (cnt > 1) + error("NGSI nodes only can send a single message at once"); + /* Update context */ - for (int j = 0; j < i->context_len; j++) { + for (int j = 0; j < MIN(i->context_len, m->length); j++) { json_t *attribute = i->context_map[j]; - json_t *value = json_object_get(attribute, "value"); json_t *metadatas = json_object_get(attribute, "metadatas"); + + /* Update timestamp */ + json_t *metadata_ts = json_lookup(metadatas, "name", "timestamp"); + json_object_set(metadata_ts, "value", json_date(&MSG_TS(m))); - json_t *timestamp = json_lookup(metadatas, "name", "timestamp"); - json_object_update(timestamp, json_pack("{ s: s, s: s, s: o }", - "name", "timestamp", - "type", "date", - "value", json_date(&MSG_TS(m)) - )); - + /* Update value */ char new[64]; snprintf(new, sizeof(new), "%f", m->data[j].f); /** @todo for now we only support floating point values */ + + json_t *value = json_object_get(attribute, "value"); json_string_set(value, new); } @@ -366,4 +378,4 @@ int ngsi_write(struct node *n, struct msg *pool, int poolsize, int first, int cn return 1; } -REGISTER_NODE_TYPE(NGSI, "ngsi", ngsi) \ No newline at end of file +REGISTER_NODE_TYPE(NGSI, "ngsi", ngsi) diff --git a/server/src/socket.c b/server/src/socket.c index 1f39afa88..5e1a5946b 100644 --- a/server/src/socket.c +++ b/server/src/socket.c @@ -178,6 +178,7 @@ int socket_read(struct node *n, struct msg *pool, int poolsize, int first, int c /* Wait until next packet received */ poll(&(struct pollfd) { .fd = s->sd, .events = POLLIN }, 1, -1); + /* Get size of received packet in bytes */ ioctl(s->sd, FIONREAD, &bytes); @@ -203,17 +204,21 @@ int socket_read(struct node *n, struct msg *pool, int poolsize, int first, int c debug(10, "Received packet of %u bytes: %u samples a %u values per sample", bytes, cnt, (bytes / cnt) / 4 - 4); for (int i = 0; i < cnt; i++) { - struct msg *n = &pool[(first+poolsize+i) % poolsize]; - - /* Check integrity of packet */ - bytes -= MSG_LEN(n); + struct msg *m = &pool[(first+poolsize+i) % poolsize]; /* Convert headers to host byte order */ - n->sequence = ntohs(n->sequence); + m->sequence = ntohl(m->sequence); + m->length = ntohs(m->length); + + /* Check integrity of packet */ + if (bytes / cnt != MSG_LEN(m)) + error("Invalid message len: %u for node '%s'", MSG_LEN(m), n->name); + + bytes -= MSG_LEN(m); /* Convert message to host endianess */ - if (n->endian != MSG_ENDIAN_HOST) - msg_swap(n); + if (m->endian != MSG_ENDIAN_HOST) + msg_swap(m); } /* Check packet integrity */ @@ -456,4 +461,4 @@ int socket_parse_addr(const char *addr, struct sockaddr *saddr, enum socket_laye return ret; } -REGISTER_NODE_TYPE(BSD_SOCKET, "socket", socket) \ No newline at end of file +REGISTER_NODE_TYPE(BSD_SOCKET, "socket", socket)