mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
Merge branch 'armo'
This commit is contained in:
commit
bbb886fc39
7 changed files with 157 additions and 47 deletions
47
server/etc/armo-demo.conf
Normal file
47
server/etc/armo-demo.conf
Normal file
|
@ -0,0 +1,47 @@
|
|||
# Configuration file for Mohsen and Arthur
|
||||
#
|
||||
# Date: 25.09.15
|
||||
# Author: Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
|
||||
# Copyright: 2015, Institute for Automation of Complex Power Systems, EONERC
|
||||
##
|
||||
|
||||
stats = 0.5;
|
||||
debug = 3;
|
||||
|
||||
nodes = {
|
||||
rtds = {
|
||||
type = "socket",
|
||||
layer = "udp",
|
||||
local = "10.10.11.1:12000", # Local ip:port, use '*' for random port
|
||||
remote = "127.0.0.1:12001",
|
||||
},
|
||||
broker = {
|
||||
type = "ngsi",
|
||||
endpoint = "https://130.206.112.223:1026",
|
||||
|
||||
ssl_verify = false,
|
||||
timeout = 1,
|
||||
structure = "children",
|
||||
mapping = [
|
||||
"6a224113-62c4-11e5-b7be-ecf4bb16fe0c(GridSectionDataValue).value(float)" // Node: name='Linea 0 Tr'
|
||||
"fca3bf58-62c4-11e5-867e-ecf4bb16fe0c(GridSectionDataValue).value(float)", // Conn: name='Linea 0 WP4_C0.1', from='Linea 0 Tr', to='Linea 0 Meter'
|
||||
"fd4e0912-62c4-11e5-ba9b-ecf4bb16fe0c(GridSectionDataValue).value(float)", // Conn: name='Linea 1 WP4_C_1', from='Linea 1 N0000', to='Linea 1 N0001'
|
||||
"fd4e0924-62c4-11e5-80a5-ecf4bb16fe0c(GridSectionDataValue).value(float)", // Conn: name='Linea 4 WP4_C_4', from='Linea 4 N0000', to='Linea 4 N0001'
|
||||
"fd4e3036-62c4-11e5-95e9-ecf4bb16fe0c(GridSectionDataValue).value(float)", // Conn: name='Linea 7 WP4_C_9', from='Linea 7 N0000', to='Linea 7 N0002'
|
||||
"fd4e5741-62c4-11e5-90ec-ecf4bb16fe0c(GridSectionDataValue).value(float)", // Conn: name='Linea 9 WP4_C_12', from='Linea 9 N0000', to='Linea 9 N0002'
|
||||
"fd4e7e41-62c4-11e5-8716-ecf4bb16fe0c(GridSectionDataValue).value(float)", // Conn: name='Linea 10 WP4_C_15', from='Linea 10 N0000', to='Linea 10 N0001'
|
||||
"6a232b75-62c4-11e5-8d0f-ecf4bb16fe0c(GridSectionDataValue).value(float)" // Node: name='Linea 9 N0001'
|
||||
// "rtds_sub1(type_one).value(float)", // structure = flat: entityId(entityType).attributeName(attributeType)
|
||||
// "rtds_sub3(type_one).v2(float)", // structure = children parentId(entityType).attributeName(attributeType)
|
||||
// "rtds_sub2(type_two).i1(float)" // Example: fa846ed3-5871-11e5-b0cd-ecf4bb16fe0c(GridSectionDataValue).value(float)
|
||||
]
|
||||
}
|
||||
};
|
||||
|
||||
paths = (
|
||||
{
|
||||
in = "rtds", # Name of the node we listen to (see above)
|
||||
out = "broker", # And we loop back to the origin
|
||||
rate = 10 # in Requests / sec
|
||||
}
|
||||
);
|
|
@ -52,11 +52,18 @@
|
|||
{ "/etc/machine-id", "0d8399d0216314f083b9ed2053a354a8" }, \
|
||||
{ "/dev/sda2", "\x53\xf6\xb5\xeb\x8b\x16\x46\xdc\x8d\x8f\x5b\x70\xb8\xc9\x1a\x2a", 0x468 } }
|
||||
|
||||
/* Hook function configuration */
|
||||
#define HOOK_FIR_INDEX 1 /**< The first value of message should be filtered. */
|
||||
#define HOOK_TS_INDEX -1 /**< The last value of message should be overwritten by a timestamp. */
|
||||
#define HOOK_DECIMATE_RATIO 30 /**< Only forward every 30th message to the destination nodes. */
|
||||
/* Hard coded configuration of hook functions */
|
||||
#define HOOK_FIR_INDEX 0 /**< Which value inside a message should be filtered? */
|
||||
#define HOOK_FIR_COEFFS { -0.003658148158728, -0.008882653268281, 0.008001024183003, \
|
||||
0.08090485991761, 0.2035239551043, 0.3040703593515, \
|
||||
0.3040703593515, 0.2035239551043, 0.08090485991761, \
|
||||
0.008001024183003, -0.008882653268281,-0.003658148158728 }
|
||||
|
||||
#define HOOK_TS_INDEX -1 /**< The last value of message should be overwritten by a timestamp. */
|
||||
#define HOOK_DECIMATE_RATIO 30 /**< Only forward every 30th message to the destination nodes. */
|
||||
|
||||
#define HOOK_DEDUP_TYPE HOOK_ASYNC
|
||||
#define HOOK_DEDUP_TRESH 1e-3 /**< Do not send messages when difference of values to last message is smaller than this threshold */
|
||||
/** Global configuration */
|
||||
struct settings {
|
||||
/** Process priority (lower is better) */
|
||||
|
|
|
@ -76,6 +76,9 @@ struct hook {
|
|||
* @{
|
||||
*/
|
||||
|
||||
/** Example hook: Drop messages whose values are similiar to the previous ones */
|
||||
int hook_deduplicate(struct path *);
|
||||
|
||||
/** Example hook: Print the message. */
|
||||
int hook_print(struct path *p);
|
||||
|
||||
|
|
|
@ -53,7 +53,7 @@
|
|||
}
|
||||
|
||||
/** Initialize a message */
|
||||
#define MSG_INIT(i) (struct msg) { \
|
||||
#define MSG_INIT(i) { \
|
||||
.version = MSG_VERSION, \
|
||||
.type = MSG_TYPE_DATA, \
|
||||
.endian = MSG_ENDIAN_HOST, \
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
*********************************************************************************/
|
||||
|
||||
#include <string.h>
|
||||
#include <math.h>
|
||||
|
||||
#include "timing.h"
|
||||
#include "config.h"
|
||||
|
@ -25,6 +26,33 @@
|
|||
|
||||
struct list hooks;
|
||||
|
||||
REGISTER_HOOK("deduplicate", 99, hook_deduplicate, HOOK_DEDUP_TYPE)
|
||||
int hook_deduplicate(struct path *p)
|
||||
{
|
||||
int ret = 0;
|
||||
#if HOOK_DEDUP_TYPE == HOOK_ASYNC
|
||||
/** Thread local storage (TLS) is used to maintain a copy of the last run of the hook */
|
||||
static __thread struct msg previous = MSG_INIT(0);
|
||||
struct msg *prev = &previous;
|
||||
#else
|
||||
struct msg *prev = p->previous;
|
||||
#endif
|
||||
struct msg *cur = p->current;
|
||||
|
||||
for (int i = 0; i < MIN(cur->length, prev->length); i++) {
|
||||
if (fabs(cur->data[i].f - prev->data[i].f) > HOOK_DEDUP_TRESH)
|
||||
goto out;
|
||||
}
|
||||
|
||||
ret = -1; /* no appreciable change in values, we will drop the packet */
|
||||
|
||||
out:
|
||||
#if HOOK_DEDUP_TYPE == HOOK_ASYNC
|
||||
memcpy(prev, cur, sizeof(struct msg)); /* save current message for next run */
|
||||
#endif
|
||||
return ret;
|
||||
}
|
||||
|
||||
REGISTER_HOOK("print", 99, hook_print, HOOK_MSG)
|
||||
int hook_print(struct path *p)
|
||||
{
|
||||
|
@ -59,31 +87,39 @@ int hook_ts(struct path *p)
|
|||
return 0;
|
||||
}
|
||||
|
||||
REGISTER_HOOK("fir", 99, hook_fir, HOOK_POST)
|
||||
REGISTER_HOOK("fir", 99, hook_fir, HOOK_MSG)
|
||||
int hook_fir(struct path *p)
|
||||
{
|
||||
/** Simple FIR-LP: F_s = 1kHz, F_pass = 100 Hz, F_block = 300
|
||||
/** Coefficients for simple FIR-LowPass:
|
||||
* F_s = 1kHz, F_pass = 100 Hz, F_block = 300
|
||||
*
|
||||
* Tip: Use MATLAB's filter design tool and export coefficients
|
||||
* with the integrated C-Header export */
|
||||
static const double coeffs[] = {
|
||||
-0.003658148158728, -0.008882653268281, 0.008001024183003,
|
||||
0.08090485991761, 0.2035239551043, 0.3040703593515,
|
||||
0.3040703593515, 0.2035239551043, 0.08090485991761,
|
||||
0.008001024183003, -0.008882653268281,-0.003658148158728 };
|
||||
* with the integrated C-Header export
|
||||
*/
|
||||
static const double coeffs[] = HOOK_FIR_COEFFS;
|
||||
|
||||
/** Per path thread local storage for unfiltered sample values.
|
||||
* The message ringbuffer (p->pool & p->current) will contain filtered data!
|
||||
*/
|
||||
static __thread double *past = NULL;
|
||||
|
||||
/** @todo Avoid dynamic allocation at runtime */
|
||||
if (!past)
|
||||
alloc(p->poolsize * sizeof(double));
|
||||
|
||||
|
||||
/* Current value of interest */
|
||||
float *cur = &p->current->data[HOOK_FIR_INDEX].f;
|
||||
|
||||
/* Save last sample, unfiltered */
|
||||
past[p->received % p->poolsize] = *cur;
|
||||
|
||||
/* Accumulator */
|
||||
double sum = 0;
|
||||
|
||||
/** Trim FIR length to length of history buffer */
|
||||
int len = MIN(ARRAY_LEN(coeffs), p->poolsize);
|
||||
|
||||
for (int i = 0; i < len; i++) {
|
||||
struct msg *old = &p->pool[(p->poolsize+p->received-i) % p->poolsize];
|
||||
|
||||
sum += coeffs[i] * old->data[HOOK_FIR_INDEX].f;
|
||||
}
|
||||
|
||||
p->current->data[HOOK_FIR_INDEX].f = sum;
|
||||
/* Reset accumulator */
|
||||
*cur = 0;
|
||||
|
||||
/* FIR loop */
|
||||
for (int i = 0; i < MIN(ARRAY_LEN(coeffs), p->poolsize); i++)
|
||||
*cur += coeffs[i] * past[p->received+p->poolsize-i];
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
|
||||
/** Node type: OMA Next Generation Services Interface 10 (NGSI) (FIWARE context broker)
|
||||
*
|
||||
* This file implements the NGSI context interface. NGSI is RESTful HTTP is specified by
|
||||
|
@ -88,7 +89,7 @@ static size_t ngsi_request_writer(void *contents, size_t size, size_t nmemb, voi
|
|||
static int ngsi_request(CURL *handle, json_t *content, json_t **response)
|
||||
{
|
||||
struct ngsi_response chunk = { 0 };
|
||||
long code;
|
||||
|
||||
char *post = json_dumps(content, JSON_INDENT(4));
|
||||
|
||||
curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, ngsi_request_writer);
|
||||
|
@ -102,8 +103,12 @@ static int ngsi_request(CURL *handle, json_t *content, json_t **response)
|
|||
if (ret)
|
||||
error("HTTP request failed: %s", curl_easy_strerror(ret));
|
||||
|
||||
long code;
|
||||
double time;
|
||||
curl_easy_getinfo(handle, CURLINFO_TOTAL_TIME, &time);
|
||||
curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &code);
|
||||
|
||||
debug(20, "Request to context broker completed in %.4f seconds", time);
|
||||
debug(20, "Response from context broker (code=%ld):\n%s", code, chunk.data);
|
||||
|
||||
json_error_t err;
|
||||
|
@ -189,6 +194,11 @@ void ngsi_prepare_context(struct node *n, config_setting_t *mapping)
|
|||
"type", "integer",
|
||||
"value", j
|
||||
));
|
||||
json_array_append(metadatas, json_pack("{ s: s, s: s, s: o }",
|
||||
"name", "timestamp",
|
||||
"type", "date",
|
||||
"value", json_date(NULL)
|
||||
));
|
||||
|
||||
if (i->structure == NGSI_CHILDREN) {
|
||||
json_array_append(attributes, json_pack("{ s: s, s: s, s: s }",
|
||||
|
@ -326,23 +336,25 @@ int ngsi_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt
|
|||
int ngsi_write(struct node *n, struct msg *pool, int poolsize, int first, int cnt)
|
||||
{
|
||||
struct ngsi *i = n->ngsi;
|
||||
struct msg *m = &pool[first];
|
||||
|
||||
struct msg *m = &pool[first % poolsize];
|
||||
|
||||
if (cnt > 1)
|
||||
error("NGSI nodes only can send a single message at once");
|
||||
|
||||
/* Update context */
|
||||
for (int j = 0; j < i->context_len; j++) {
|
||||
for (int j = 0; j < MIN(i->context_len, m->length); j++) {
|
||||
json_t *attribute = i->context_map[j];
|
||||
json_t *value = json_object_get(attribute, "value");
|
||||
json_t *metadatas = json_object_get(attribute, "metadatas");
|
||||
|
||||
/* Update timestamp */
|
||||
json_t *metadata_ts = json_lookup(metadatas, "name", "timestamp");
|
||||
json_object_set(metadata_ts, "value", json_date(&MSG_TS(m)));
|
||||
|
||||
json_t *timestamp = json_lookup(metadatas, "name", "timestamp");
|
||||
json_object_update(timestamp, json_pack("{ s: s, s: s, s: o }",
|
||||
"name", "timestamp",
|
||||
"type", "date",
|
||||
"value", json_date(&MSG_TS(m))
|
||||
));
|
||||
|
||||
/* Update value */
|
||||
char new[64];
|
||||
snprintf(new, sizeof(new), "%f", m->data[j].f); /** @todo for now we only support floating point values */
|
||||
|
||||
json_t *value = json_object_get(attribute, "value");
|
||||
json_string_set(value, new);
|
||||
}
|
||||
|
||||
|
@ -366,4 +378,4 @@ int ngsi_write(struct node *n, struct msg *pool, int poolsize, int first, int cn
|
|||
return 1;
|
||||
}
|
||||
|
||||
REGISTER_NODE_TYPE(NGSI, "ngsi", ngsi)
|
||||
REGISTER_NODE_TYPE(NGSI, "ngsi", ngsi)
|
||||
|
|
|
@ -178,6 +178,7 @@ int socket_read(struct node *n, struct msg *pool, int poolsize, int first, int c
|
|||
|
||||
/* Wait until next packet received */
|
||||
poll(&(struct pollfd) { .fd = s->sd, .events = POLLIN }, 1, -1);
|
||||
|
||||
/* Get size of received packet in bytes */
|
||||
ioctl(s->sd, FIONREAD, &bytes);
|
||||
|
||||
|
@ -203,17 +204,21 @@ int socket_read(struct node *n, struct msg *pool, int poolsize, int first, int c
|
|||
debug(10, "Received packet of %u bytes: %u samples a %u values per sample", bytes, cnt, (bytes / cnt) / 4 - 4);
|
||||
|
||||
for (int i = 0; i < cnt; i++) {
|
||||
struct msg *n = &pool[(first+poolsize+i) % poolsize];
|
||||
|
||||
/* Check integrity of packet */
|
||||
bytes -= MSG_LEN(n);
|
||||
struct msg *m = &pool[(first+poolsize+i) % poolsize];
|
||||
|
||||
/* Convert headers to host byte order */
|
||||
n->sequence = ntohs(n->sequence);
|
||||
m->sequence = ntohl(m->sequence);
|
||||
m->length = ntohs(m->length);
|
||||
|
||||
/* Check integrity of packet */
|
||||
if (bytes / cnt != MSG_LEN(m))
|
||||
error("Invalid message len: %u for node '%s'", MSG_LEN(m), n->name);
|
||||
|
||||
bytes -= MSG_LEN(m);
|
||||
|
||||
/* Convert message to host endianess */
|
||||
if (n->endian != MSG_ENDIAN_HOST)
|
||||
msg_swap(n);
|
||||
if (m->endian != MSG_ENDIAN_HOST)
|
||||
msg_swap(m);
|
||||
}
|
||||
|
||||
/* Check packet integrity */
|
||||
|
@ -456,4 +461,4 @@ int socket_parse_addr(const char *addr, struct sockaddr *saddr, enum socket_laye
|
|||
return ret;
|
||||
}
|
||||
|
||||
REGISTER_NODE_TYPE(BSD_SOCKET, "socket", socket)
|
||||
REGISTER_NODE_TYPE(BSD_SOCKET, "socket", socket)
|
||||
|
|
Loading…
Add table
Reference in a new issue