mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
Merge branch 'eric-lab' into develop-try-merge
This commit is contained in:
commit
9205e1dbac
5 changed files with 175 additions and 108 deletions
70
etc/eric-lab.conf
Normal file
70
etc/eric-lab.conf
Normal file
|
@ -0,0 +1,70 @@
|
|||
# Example configuration file for VILLASnode
|
||||
#
|
||||
# This example includes all valid configuration options for the server.
|
||||
# Please note, that using all options at the same time does not really
|
||||
# makes sense. The purpose of this example is to serve as a reference.
|
||||
#
|
||||
# The syntax of this file is similar to JSON.
|
||||
# A detailed description of the format can be found here:
|
||||
# http://www.hyperrealm.com/libconfig/libconfig_manual.html#Configuration-Files
|
||||
#
|
||||
# Author: Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
|
||||
# Copyright: 2016, Institute for Automation of Complex Power Systems, EONERC
|
||||
##
|
||||
|
||||
stats = 3; # The interval in seconds to print path statistics.
|
||||
# A value of 0 disables the statistics.
|
||||
|
||||
name = "villas-acs" # The name of this VILLASnode. Might by used by node-types
|
||||
# to identify themselves (default is the hostname).
|
||||
|
||||
|
||||
log = {
|
||||
level = 5; # The level of verbosity for debug messages
|
||||
# Higher number => increased verbosity
|
||||
|
||||
faciltities = [ "path", "socket" ]; # The list of enabled debug faciltities.
|
||||
# If omitted, all faciltities are enabled
|
||||
# For a full list of available faciltities, check lib/log.c
|
||||
};
|
||||
|
||||
http = {
|
||||
htdocs = "/villas/web/socket/", # Root directory of internal webserver
|
||||
port = 80 # Port for HTTP connections
|
||||
}
|
||||
|
||||
############ Dictionary of nodes ############
|
||||
|
||||
nodes = {
|
||||
ws = {
|
||||
type = "websocket",
|
||||
unit = "MVa",
|
||||
units = [ "V", "A", "Var" ],
|
||||
description = "Demo Channel",
|
||||
series = (
|
||||
{ label = "Random walk" },
|
||||
{ label = "Sine" },
|
||||
{ label = "Rect" },
|
||||
{ label = "Ramp" }
|
||||
)
|
||||
},
|
||||
socket1 = {
|
||||
type = "socket",
|
||||
layper = "udp",
|
||||
local = "*:12000",
|
||||
remote = "127.0.0.1:12001"
|
||||
},
|
||||
socket2 = {
|
||||
type = "socket",
|
||||
layper = "udp",
|
||||
local = "*:12001",
|
||||
remote = "127.0.0.1:12000"
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
############ List of paths ############
|
||||
|
||||
paths = (
|
||||
{ in = "socket2", out = "ws" }
|
||||
);
|
|
@ -36,21 +36,20 @@ struct websocket {
|
|||
int id; /**< The index of this node */
|
||||
};
|
||||
|
||||
/* Internal datastructures */
|
||||
struct websocket_connection {
|
||||
enum {
|
||||
WEBSOCKET_ESTABLISHED,
|
||||
WEBSOCKET_ACTIVE,
|
||||
WEBSOCKET_SHUTDOWN,
|
||||
WEBSOCKET_CLOSED
|
||||
WEBSOCKET_CONNECTION_CLOSED,
|
||||
WEBSOCKET_CONNECTION_ESTABLISHED,
|
||||
WEBSOCKET_CONNECTION_ACTIVE,
|
||||
WEBSOCKET_CONNECTION_SHUTDOWN
|
||||
} state;
|
||||
|
||||
struct node *node;
|
||||
struct path *path;
|
||||
|
||||
struct queue queue; /**< For samples which are sent to the WebSocket */
|
||||
|
||||
struct lws *wsi;
|
||||
|
||||
struct queue queue; /**< For samples which are sent to the WebSocket */
|
||||
|
||||
struct {
|
||||
char name[64];
|
||||
char ip[64];
|
||||
|
@ -59,6 +58,12 @@ struct websocket_connection {
|
|||
char *_name;
|
||||
};
|
||||
|
||||
/* Internal datastructures */
|
||||
struct websocket_destination {
|
||||
char *uri;
|
||||
struct lws_client_connect_info info;
|
||||
};
|
||||
|
||||
int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len);
|
||||
|
||||
/** @see node_vtable::init */
|
||||
|
|
|
@ -1,11 +1,8 @@
|
|||
/** Hook-releated functions.
|
||||
*
|
||||
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
|
||||
* @copyright 2014-2016, Institute for Automation of Complex Power Systems, EONERC
|
||||
* This file is part of VILLASnode. All Rights Reserved. Proprietary and confidential.
|
||||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* @copyright 2017, Institute for Automation of Complex Power Systems, EONERC
|
||||
*********************************************************************************/
|
||||
|
||||
#include <string.h>
|
||||
#include <math.h>
|
||||
#include <libconfig.h>
|
||||
|
|
|
@ -21,12 +21,6 @@
|
|||
|
||||
#include "nodes/websocket.h"
|
||||
|
||||
/* Internal datastructures */
|
||||
struct destination {
|
||||
char *uri;
|
||||
struct lws_client_connect_info info;
|
||||
};
|
||||
|
||||
/* Private static storage */
|
||||
static int id = 0; /**< Highest assigned ID to websocket nodes. */
|
||||
|
||||
|
@ -35,18 +29,6 @@ struct list connections; /**< List of active libwebsocket connections which rece
|
|||
/* Forward declarations */
|
||||
static struct plugin p;
|
||||
|
||||
__attribute__((unused)) static int websocket_connection_init(struct websocket_connection *c)
|
||||
{
|
||||
/** @todo */
|
||||
return -1;
|
||||
}
|
||||
|
||||
__attribute__((unused)) static void websocket_connection_destroy(struct websocket_connection *c)
|
||||
{
|
||||
if (c->_name)
|
||||
free(c->_name);
|
||||
}
|
||||
|
||||
static char * websocket_connection_name(struct websocket_connection *c)
|
||||
{
|
||||
if (!c->_name) {
|
||||
|
@ -59,6 +41,58 @@ static char * websocket_connection_name(struct websocket_connection *c)
|
|||
return c->_name;
|
||||
}
|
||||
|
||||
static int websocket_connection_init(struct websocket_connection *c, struct lws *wsi)
|
||||
{
|
||||
int ret;
|
||||
|
||||
struct websocket *w = (struct websocket *) c->node->_vd;
|
||||
|
||||
lws_get_peer_addresses(wsi, lws_get_socket_fd(wsi), c->peer.name, sizeof(c->peer.name), c->peer.ip, sizeof(c->peer.ip));
|
||||
|
||||
info("LWS: New connection %s", websocket_connection_name(c));
|
||||
|
||||
c->state = WEBSOCKET_CONNECTION_ESTABLISHED;
|
||||
c->wsi = wsi;
|
||||
|
||||
if (c->node != NULL)
|
||||
list_push(&w->connections, c);
|
||||
else
|
||||
list_push(&connections, c);
|
||||
|
||||
ret = queue_init(&c->queue, DEFAULT_QUEUELEN, &memtype_hugepage);
|
||||
if (ret) {
|
||||
warn("Failed to create queue for incoming websocket connection. Closing..");
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void websocket_connection_destroy(struct websocket_connection *c)
|
||||
{
|
||||
struct websocket *w = (struct websocket *) c->node->_vd;
|
||||
|
||||
info("LWS: Connection %s closed", websocket_connection_name(c));
|
||||
|
||||
c->state = WEBSOCKET_CONNECTION_CLOSED;
|
||||
c->wsi = NULL;
|
||||
|
||||
if (c->node)
|
||||
list_remove(&w->connections, c);
|
||||
else
|
||||
list_remove(&connections, c);
|
||||
|
||||
if (c->_name)
|
||||
free(c->_name);
|
||||
|
||||
queue_destroy(&c->queue);
|
||||
}
|
||||
|
||||
static void websocket_destination_destroy(struct websocket_destination *d)
|
||||
{
|
||||
free(d->uri);
|
||||
}
|
||||
|
||||
static int websocket_connection_write(struct websocket_connection *c, struct sample *smps[], unsigned cnt)
|
||||
{
|
||||
int blocks, enqueued;
|
||||
|
@ -67,22 +101,15 @@ static int websocket_connection_write(struct websocket_connection *c, struct sam
|
|||
struct websocket *w = c->node->_vd;
|
||||
|
||||
switch (c->state) {
|
||||
case WEBSOCKET_SHUTDOWN:
|
||||
case WEBSOCKET_CONNECTION_CLOSED:
|
||||
case WEBSOCKET_CONNECTION_SHUTDOWN:
|
||||
return -1;
|
||||
case WEBSOCKET_CLOSED:
|
||||
if (c->node) {
|
||||
struct websocket *w = c->node->_vd;
|
||||
list_remove(&w->connections, c);
|
||||
}
|
||||
else
|
||||
list_remove(&connections, c);
|
||||
break;
|
||||
|
||||
case WEBSOCKET_ESTABLISHED:
|
||||
c->state = WEBSOCKET_ACTIVE;
|
||||
|
||||
case WEBSOCKET_CONNECTION_ESTABLISHED:
|
||||
c->state = WEBSOCKET_CONNECTION_ACTIVE;
|
||||
/* fall through */
|
||||
|
||||
case WEBSOCKET_ACTIVE:
|
||||
case WEBSOCKET_CONNECTION_ACTIVE:
|
||||
blocks = pool_get_many(&w->pool, (void **) bufs, cnt);
|
||||
if (blocks != cnt)
|
||||
warn("Pool underrun in websocket connection: %s", websocket_connection_name(c));
|
||||
|
@ -113,11 +140,6 @@ static int websocket_connection_write(struct websocket_connection *c, struct sam
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void websocket_destination_destroy(struct destination *d)
|
||||
{
|
||||
free(d->uri);
|
||||
}
|
||||
|
||||
int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
|
||||
{
|
||||
int ret;
|
||||
|
@ -154,49 +176,25 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
|
|||
return -1;
|
||||
}
|
||||
|
||||
c->state = WEBSOCKET_ESTABLISHED;
|
||||
c->wsi = wsi;
|
||||
|
||||
ret = queue_init(&c->queue, DEFAULT_QUEUELEN, &memtype_hugepage);
|
||||
if (ret) {
|
||||
warn("Failed to create queue for incoming websocket connection. Closing..");
|
||||
ret = websocket_connection_init(c, wsi);
|
||||
if (ret)
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* Lookup peer address for debug output */
|
||||
lws_get_peer_addresses(wsi, lws_get_socket_fd(wsi), c->peer.name, sizeof(c->peer.name), c->peer.ip, sizeof(c->peer.ip));
|
||||
|
||||
info("LWS: New connection %s", websocket_connection_name(c));
|
||||
|
||||
if (c->node != NULL) {
|
||||
struct websocket *w = c->node->_vd;
|
||||
list_push(&w->connections, c);
|
||||
}
|
||||
else {
|
||||
list_push(&connections, c);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
case LWS_CALLBACK_CLOSED:
|
||||
info("LWS: Connection %s closed", websocket_connection_name(c));
|
||||
|
||||
c->state = WEBSOCKET_CLOSED;
|
||||
c->wsi = NULL;
|
||||
|
||||
queue_destroy(&c->queue);
|
||||
|
||||
websocket_connection_destroy(c);
|
||||
return 0;
|
||||
|
||||
case LWS_CALLBACK_CLIENT_WRITEABLE:
|
||||
case LWS_CALLBACK_SERVER_WRITEABLE: {
|
||||
case LWS_CALLBACK_SERVER_WRITEABLE:
|
||||
w = c->node->_vd;
|
||||
|
||||
if (c->node && c->node->state != STATE_STARTED)
|
||||
return -1;
|
||||
|
||||
if (c->state == WEBSOCKET_SHUTDOWN) {
|
||||
if (c->state == WEBSOCKET_CONNECTION_SHUTDOWN) {
|
||||
lws_close_reason(wsi, LWS_CLOSE_STATUS_GOINGAWAY, (unsigned char *) "Node stopped", 4);
|
||||
return -1;
|
||||
}
|
||||
|
@ -220,7 +218,6 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
|
|||
lws_callback_on_writable(wsi);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
case LWS_CALLBACK_CLIENT_RECEIVE:
|
||||
case LWS_CALLBACK_RECEIVE: {
|
||||
|
@ -268,9 +265,6 @@ int websocket_start(struct node *n)
|
|||
|
||||
w->id = id++;
|
||||
|
||||
list_init(&w->connections);
|
||||
list_init(&w->destinations);
|
||||
|
||||
size_t blocklen = LWS_PRE + WEBMSG_LEN(DEFAULT_VALUES);
|
||||
|
||||
ret = pool_init(&w->pool, 64 * DEFAULT_QUEUELEN, blocklen, &memtype_hugepage);
|
||||
|
@ -281,6 +275,8 @@ int websocket_start(struct node *n)
|
|||
if (ret)
|
||||
return ret;
|
||||
|
||||
/** @todo Connection to destinations via WebSocket client */
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -291,22 +287,22 @@ int websocket_stop(struct node *n)
|
|||
for (size_t i = 0; i < list_length(&w->connections); i++) {
|
||||
struct websocket_connection *c = list_at(&w->connections, i);
|
||||
|
||||
c->state = WEBSOCKET_SHUTDOWN;
|
||||
c->state = WEBSOCKET_CONNECTION_SHUTDOWN;
|
||||
|
||||
lws_callback_on_writable(c->wsi);
|
||||
}
|
||||
|
||||
pool_destroy(&w->pool);
|
||||
queue_destroy(&w->queue);
|
||||
|
||||
list_destroy(&w->connections, NULL, false);
|
||||
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int websocket_destroy(struct node *n)
|
||||
{
|
||||
struct websocket *w = n->_vd;
|
||||
|
||||
|
||||
list_destroy(&w->connections, NULL, false);
|
||||
list_destroy(&w->destinations, (dtor_cb_t) websocket_destination_destroy, true);
|
||||
|
||||
return 0;
|
||||
|
@ -314,17 +310,13 @@ int websocket_destroy(struct node *n)
|
|||
|
||||
int websocket_read(struct node *n, struct sample *smps[], unsigned cnt)
|
||||
{
|
||||
int got;
|
||||
|
||||
struct websocket *w = n->_vd;
|
||||
|
||||
struct webmsg *msgs[cnt];
|
||||
|
||||
int got;
|
||||
|
||||
do {
|
||||
got = queue_pull_many(&w->queue, (void **) msgs, cnt);
|
||||
pthread_yield();
|
||||
} while (got == 0);
|
||||
|
||||
|
||||
got = queue_pull_many(&w->queue, (void **) msgs, cnt);
|
||||
for (int i = 0; i < got; i++) {
|
||||
smps[i]->sequence = msgs[i]->sequence;
|
||||
smps[i]->length = msgs[i]->length;
|
||||
|
@ -342,7 +334,6 @@ int websocket_write(struct node *n, struct sample *smps[], unsigned cnt)
|
|||
{
|
||||
struct websocket *w = n->_vd;
|
||||
|
||||
|
||||
for (size_t i = 0; i < list_length(&w->connections); i++) {
|
||||
struct websocket_connection *c = list_at(&w->connections, i);
|
||||
|
||||
|
@ -363,40 +354,42 @@ int websocket_parse(struct node *n, config_setting_t *cfg)
|
|||
struct websocket *w = n->_vd;
|
||||
config_setting_t *cfg_dests;
|
||||
int ret;
|
||||
|
||||
|
||||
list_init(&w->connections);
|
||||
list_init(&w->destinations);
|
||||
|
||||
cfg_dests = config_setting_get_member(cfg, "destinations");
|
||||
if (cfg_dests) {
|
||||
if (!config_setting_is_array(cfg_dests))
|
||||
cerror(cfg_dests, "The 'destinations' setting must be an array of URLs");
|
||||
|
||||
for (int i = 0; i < config_setting_length(cfg_dests); i++) {
|
||||
struct destination *d;
|
||||
const char *uri, *prot, *ads, *path;
|
||||
|
||||
uri = config_setting_get_string_elem(cfg_dests, i);
|
||||
if (!uri)
|
||||
cerror(cfg_dests, "The 'destinations' setting must be an array of URLs");
|
||||
|
||||
d = alloc(sizeof(struct destination));
|
||||
struct websocket_destination d;
|
||||
|
||||
d->uri = strdup(uri);
|
||||
if (!d->uri)
|
||||
d.uri = strdup(uri);
|
||||
if (!d.uri)
|
||||
serror("Failed to allocate memory");
|
||||
|
||||
ret = lws_parse_uri(d->uri, &prot, &ads, &d->info.port, &path);
|
||||
ret = lws_parse_uri(d.uri, &prot, &ads, &d.info.port, &path);
|
||||
if (ret)
|
||||
cerror(cfg_dests, "Failed to parse websocket URI: '%s'", uri);
|
||||
|
||||
d->info.ssl_connection = !strcmp(prot, "https");
|
||||
d->info.address = ads;
|
||||
d->info.path = path;
|
||||
d->info.protocol = prot;
|
||||
d->info.ietf_version_or_minus_one = -1;
|
||||
d.info.ssl_connection = !strcmp(prot, "https");
|
||||
d.info.address = ads;
|
||||
d.info.path = path;
|
||||
d.info.protocol = prot;
|
||||
d.info.ietf_version_or_minus_one = -1;
|
||||
|
||||
list_push(&w->destinations, d);
|
||||
list_push(&w->destinations, memdup(&d, sizeof(d)));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -18,6 +18,8 @@
|
|||
#include <villas/timing.h>
|
||||
#include <villas/pool.h>
|
||||
|
||||
#include "config.h"
|
||||
|
||||
void usage()
|
||||
{
|
||||
printf("Usage: villas-test-cmp FILE1 FILE2 [OPTIONS]\n");
|
||||
|
|
Loading…
Add table
Reference in a new issue