1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

Merge branch 'ws-relay' into develop

# Conflicts:
#	include/villas/node/config.h.in
#	include/villas/utils.h
#	lib/CMakeLists.txt
#	lib/nodes/websocket.c
#	lib/utils.c
#	lib/web.c
#	src/CMakeLists.txt
#	src/villas-pipe.cpp
This commit is contained in:
Steffen Vogel 2019-01-12 13:48:29 +01:00
commit c2f83b01fd
14 changed files with 450 additions and 27 deletions

View file

@ -5,11 +5,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
## [0.6.5] - Unrelease
## [0.6.5] - Unreleased
### Changed
### Added
- The configuration of many node-types is now splitted into seperate `in` and `out` sections. Please update your configuration files accordingly.
- A new sub-command `villas-relay` implements a client-to-client WebSocket relay.
It can be used as a proxy for nodes which sit behind a NAT firewall.
## [0.6.4] - 2018-07-18
@ -52,6 +53,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
### Changed
- The IO format names have changed. They now use dots (`raw.flt32`) instead of hyphens (`raw-flt32`) in their name. Please update your configuration files accordingly.
- The configuration of many node-types is now splitted into seperate `in` and `out` sections. Please update your configuration files accordingly.
## [0.6.1] - 2018-02-17

View file

@ -84,6 +84,7 @@ find_package(Mosquitto)
find_package(Opal)
find_package(IBVerbs)
find_package(RDMACM)
find_package(spdlog)
# Check programs
find_program(PROTOBUFC_COMPILER NAMES protoc-c)

43
etc/websocket-client.conf Normal file
View file

@ -0,0 +1,43 @@
/** Example configuration file for VILLASnode.
*
* The syntax of this file is similar to JSON.
* A detailed description of the format can be found here:
* http://www.hyperrealm.com/libconfig/libconfig_manual.html#Configuration-Files
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2018, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
http = {
enabled = false
}
nodes = {
ws = {
type = "websocket",
hooks = (
{ type = "stats" }
)
destinations = [
"http://localhost:8088/test_session"
]
}
};

View file

@ -55,9 +55,10 @@ set(LIB_SRC
shmem.c
config_helper.c
signal.c
copyright.c
pool.c
queue.c
queue_signalled.c
queue_signalled.c
)
if(IBVERBS_FOUND AND RDMACM_FOUND)

View file

@ -20,11 +20,11 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
###################################################################################
# All executables link against libvillas
link_libraries(villas)
add_executable(villas-node villas-node.cpp)
target_link_libraries(villas-node PUBLIC villas)
add_executable(villas-test-rtt villas-test-rtt.cpp)
target_link_libraries(villas-test-rtt PUBLIC villas)
install(
TARGETS villas-node villas-test-rtt
@ -32,13 +32,30 @@ install(
RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}
)
if(WITH_WEB)
add_executable(villas-relay villas-relay.cpp)
target_include_directories(villas-relay PRIVATE ${LIBWEBSOCKETS_INCLUDE_DIRS} ${OPENSSL_INCLUDE_DIR} ${PROJECT_SOURCE_DIR}/common/include)
target_link_libraries(villas-relay PRIVATE PkgConfig::LIBWEBSOCKETS villas-common spdlog)
install(
TARGETS villas-relay
COMPONENT bin
RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}
)
endif()
if(WITH_IO)
add_executable(villas-test-cmp villas-test-cmp.cpp)
add_executable(villas-convert villas-convert.cpp)
add_executable(villas-pipe villas-pipe.cpp)
add_executable(villas-signal villas-signal.cpp)
target_link_libraries(villas-test-cmp PUBLIC villas)
target_link_libraries(villas-pipe PUBLIC Threads::Threads)
add_executable(villas-convert villas-convert.cpp)
target_link_libraries(villas-convert PUBLIC villas)
add_executable(villas-pipe villas-pipe.cpp)
target_link_libraries(villas-pipe PUBLIC villas Threads::Threads)
add_executable(villas-signal villas-signal.cpp)
target_link_libraries(villas-signal PUBLIC villas)
install(
TARGETS villas-convert villas-pipe villas-signal villas-test-cmp
@ -49,6 +66,7 @@ endif()
if(WITH_IO AND WITH_HOOKS)
add_executable(villas-hook villas-hook.cpp)
target_link_libraries(villas-hook PUBLIC villas)
install(
TARGETS villas-hook

View file

@ -32,6 +32,7 @@
#include <villas/sample.h>
#include <villas/plugin.h>
#include <villas/exceptions.hpp>
#include <villas/copyright.hpp>
using namespace villas;
@ -45,7 +46,7 @@ static void usage()
<< " -h show this usage information" << std::endl
<< " -V show the version of the tool" << std::endl << std::endl;
utils::print_copyright();
print_copyright();
}
int main(int argc, char *argv[])
@ -59,7 +60,7 @@ int main(int argc, char *argv[])
while ((c = getopt(argc, argv, "Vhd:i:o:")) != -1) {
switch (c) {
case 'V':
utils::print_version();
print_version();
exit(EXIT_SUCCESS);
case 'i':

View file

@ -38,6 +38,7 @@
#include <villas/pool.h>
#include <villas/log.hpp>
#include <villas/exceptions.hpp>
#include <villas/copyright.hpp>
#include <villas/plugin.h>
#include <villas/config_helper.h>
#include <villas/kernel/rt.hpp>
@ -76,7 +77,7 @@ static void usage()
<< " villas-signal random | villas-hook skip_first seconds=10" << std::endl
<< std::endl;
utils::print_copyright();
print_copyright();
}
int main(int argc, char *argv[])
@ -105,7 +106,7 @@ int main(int argc, char *argv[])
while ((c = getopt(argc, argv, "Vhv:d:f:o:")) != -1) {
switch (c) {
case 'V':
utils::print_version();
print_version();
exit(EXIT_SUCCESS);
case 'f':

View file

@ -40,6 +40,7 @@
#include <villas/web.hpp>
#include <villas/log.hpp>
#include <villas/exceptions.hpp>
#include <villas/copyright.hpp>
#include <villas/plugin.h>
#include <villas/kernel/kernel.hpp>
#include <villas/kernel/rt.hpp>
@ -98,7 +99,7 @@ static void usage()
plugin_dump(PLUGIN_TYPE_FORMAT);
std::cout << std::endl;
utils::print_copyright();
print_copyright();
exit(EXIT_FAILURE);
}
@ -127,7 +128,7 @@ int main(int argc, char *argv[])
while ((c = getopt(argc, argv, "hVd:")) != -1) {
switch (c) {
case 'V':
utils::print_version();
print_version();
exit(EXIT_SUCCESS);
case 'd':

View file

@ -36,6 +36,7 @@
#include <villas/node/config.h>
#include <villas/config_helper.h>
#include <villas/super_node.hpp>
#include <villas/copyright.hpp>
#include <villas/utils.hpp>
#include <villas/utils.h>
#include <villas/log.hpp>
@ -129,7 +130,7 @@ static void usage()
<< " -d set logging level" << std::endl
<< " -V show the version of the tool" << std::endl << std::endl;
utils::print_copyright();
print_copyright();
}
static void * send_loop(void *ctx)
@ -260,7 +261,7 @@ int main(int argc, char *argv[])
while ((c = getopt(argc, argv, "Vhxrsd:l:L:t:f:o:")) != -1) {
switch (c) {
case 'V':
utils::print_version();
print_version();
exit(EXIT_SUCCESS);
case 'f':
@ -374,9 +375,9 @@ check: if (optarg == endptr)
if (reverse)
node_reverse(node);
ret = node_type_start(node->_vt, reinterpret_cast<super_node *>(&sn));
ret = node_type_start(node_type(node), reinterpret_cast<super_node *>(&sn));
if (ret)
throw RuntimeError("Failed to intialize node type {}: reason={}", node_type_name(node->_vt), ret);
throw RuntimeError("Failed to intialize node type {}: reason={}", node_type_name(node_type(node)), ret);
ret = node_check(node);
if (ret)

350
src/villas-relay.cpp Normal file
View file

@ -0,0 +1,350 @@
/** Simple WebSocket relay facilitating client-to-client connections.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2018, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#include <iostream>
#include <vector>
#include <map>
#include <queue>
#include <string>
#include <utility>
#include <memory>
#include <string.h>
#include <spdlog/spdlog.h>
#include <spdlog/sinks/stdout_color_sinks.h>
#include <villas/copyright.hpp>
#include <libwebsockets.h>
auto console = spdlog::stdout_color_mt("console");
/** The libwebsockets server context. */
static lws_context *context;
/** The libwebsockets vhost. */
static lws_vhost *vhost;
/* Forward declarations */
lws_callback_function protocol_cb;
class Session;
class Connection;
static std::map<std::string, std::shared_ptr<Session>> sessions;
class InvalidUrlException { };
struct Options {
bool loopback;
} opts;
class Frame : public std::vector<uint8_t> {
public:
Frame() {
// lws_write() requires LWS_PRE bytes in front of the payload
insert(end(), LWS_PRE, 0);
}
uint8_t * data() {
return std::vector<uint8_t>::data() + LWS_PRE;
}
size_type size() {
return std::vector<uint8_t>::size() - LWS_PRE;
}
};
class Session {
public:
typedef std::string Identifier;
static std::shared_ptr<Session> get(lws *wsi)
{
char uri[64];
/* We use the URI to associate this connection to a session
* Example: ws://example.com/node_1
* Will select the session with the name 'node_1'
*/
/* Get path of incoming request */
lws_hdr_copy(wsi, uri, sizeof(uri), WSI_TOKEN_GET_URI);
if (strlen(uri) <= 0)
throw InvalidUrlException();
Identifier sid = uri;//&uri[1];
auto it = sessions.find(sid);
if (it == sessions.end()) {
auto s = std::make_shared<Session>(sid);
sessions[sid] = s;
console->info("Creating new session: {}", sid);
return s;
}
else {
console->info("Reusing existing session: ", sid);
return it->second;
}
}
Session(Identifier sid) :
identifier(sid)
{ }
~Session()
{ }
Identifier identifier;
std::map<lws *, std::shared_ptr<Connection>> connections;
};
class Connection {
protected:
lws *wsi;
std::shared_ptr<Frame> currentFrame;
std::queue<std::shared_ptr<Frame>> outgoingFrames;
std::shared_ptr<Session> session;
public:
Connection(lws *w) :
wsi(w),
currentFrame(std::make_shared<Frame>()),
outgoingFrames()
{
session = Session::get(wsi);
session->connections[wsi] = std::shared_ptr<Connection>(this);
console->info("New connection established to session: {}", session->identifier);
}
~Connection() {
console->info("Connection closed");
session->connections.erase(wsi);
}
void write() {
while (!outgoingFrames.empty()) {
std::shared_ptr<Frame> fr = outgoingFrames.front();
lws_write(wsi, fr->data(), fr->size(), LWS_WRITE_BINARY);
outgoingFrames.pop();
}
}
void read(void *in, size_t len) {
currentFrame->insert(currentFrame->end(), (uint8_t *) in, (uint8_t *) in + len);
if (lws_is_final_fragment(wsi)) {
console->debug("Received frame, relaying to {} connections", session->connections.size() - (opts.loopback ? 0 : 1));
for (auto p : session->connections) {
auto c = p.second;
/* We skip the current connection in order
* to avoid receiving our own data */
if (opts.loopback == false && c.get() == this)
continue;
c->outgoingFrames.push(currentFrame);
lws_callback_on_writable(c->wsi);
}
currentFrame = std::make_shared<Frame>();
}
}
};
/** List of libwebsockets protocols. */
lws_protocols protocols[] = {
{
.name = "live",
.callback = protocol_cb,
.per_session_data_size = sizeof(Connection),
.rx_buffer_size = 0
},
{ NULL /* terminator */ }
};
/** List of libwebsockets extensions. */
static const lws_extension extensions[] = {
{
"permessage-deflate",
lws_extension_callback_pm_deflate,
"permessage-deflate"
},
{
"deflate-frame",
lws_extension_callback_pm_deflate,
"deflate_frame"
},
{ NULL /* terminator */ }
};
static void logger(int level, const char *msg) {
auto log = spdlog::get("lws");
int len = strlen(msg);
if (strchr(msg, '\n'))
len -= 1;
/* Decrease severity for some errors. */
if (strstr(msg, "Unable to open") == msg)
level = LLL_WARN;
switch (level) {
case LLL_ERR: log->error("{}", msg); break;
case LLL_WARN: log->warn( "{}", msg); break;
case LLL_INFO: log->info( "{}", msg); break;
default: log->debug("{}", msg); break;
}
}
int protocol_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
{
Connection *c = reinterpret_cast<Connection *>(user);
switch (reason) {
case LWS_CALLBACK_ESTABLISHED: {
auto s = Session::get(wsi);
try {
new (c) Connection(wsi);
}
catch (InvalidUrlException e) {
lws_close_reason(wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, (unsigned char *) "Invalid URL", strlen("Invalid URL"));
return -1;
}
break;
}
case LWS_CALLBACK_CLOSED:
c->~Connection();
break;
case LWS_CALLBACK_SERVER_WRITEABLE:
c->write();
break;
case LWS_CALLBACK_RECEIVE:
c->read(in, len);
break;
default:
break;
}
return 0;
}
void usage()
{
std::cout << "Usage: villas-relay [OPTIONS]" << std::endl;
std::cout << " OPTIONS is one or more of the following options:" << std::endl;
std::cout << " -d LVL set debug level" << std::endl;
std::cout << " -p PORT the port number to listen on" << std::endl;
std::cout << " -P PROT the websocket protocol" << std::endl;
std::cout << " -V show version and exit" << std::endl;
std::cout << " -h show usage and exit" << std::endl;
std::cout << std::endl;
villas::print_copyright();
}
int main(int argc, char *argv[])
{
spdlog::stdout_color_mt("lws");
lws_set_log_level((1 << LLL_COUNT) - 1, logger);
/* Start server */
lws_context_creation_info ctx_info = { 0 };
ctx_info.options = LWS_SERVER_OPTION_EXPLICIT_VHOSTS | LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
ctx_info.gid = -1;
ctx_info.uid = -1;
ctx_info.protocols = protocols;
ctx_info.extensions = extensions;
ctx_info.port = 8088;
char c, *endptr;
while ((c = getopt (argc, argv, "hVp:P:l")) != -1) {
switch (c) {
case 'p':
ctx_info.port = strtoul(optarg, &endptr, 10);
goto check;
case 'P':
protocols[0].name = optarg;
break;
case 'l':
opts.loopback = true;
break;
case 'V':
villas::print_version();
exit(EXIT_SUCCESS);
case 'h':
case '?':
usage();
exit(c == '?' ? EXIT_FAILURE : EXIT_SUCCESS);
}
continue;
check: if (optarg == endptr) {
console->error("Failed to parse parse option argument '-{} {}'", c, optarg);
exit(EXIT_FAILURE);
}
}
if (argc - optind < 0) {
usage();
exit(EXIT_FAILURE);
}
context = lws_create_context(&ctx_info);
if (context == NULL) {
console->error("WebSocket: failed to initialize server context");
exit(EXIT_FAILURE);
}
vhost = lws_create_vhost(context, &ctx_info);
if (vhost == NULL) {
console->error("WebSocket: failed to initialize virtual host");
exit(EXIT_FAILURE);
}
for (;;)
lws_service(context, 100);
return 0;
}

View file

@ -33,6 +33,7 @@
#include <villas/io.h>
#include <villas/utils.hpp>
#include <villas/exceptions.hpp>
#include <villas/copyright.hpp>
#include <villas/log.hpp>
#include <villas/sample.h>
#include <villas/timing.h>
@ -154,7 +155,7 @@ void usage()
<< " -o OFF the DC bias" << std::endl
<< " -l NUM only send LIMIT messages and stop" << std::endl << std::endl;
utils::print_copyright();
print_copyright();
}
static void quit(int signal, siginfo_t *sinfo, void *ctx)

View file

@ -31,6 +31,7 @@
#include <villas/format_type.h>
#include <villas/utils.hpp>
#include <villas/log.hpp>
#include <villas/copyright.hpp>
#include <villas/pool.h>
#include <villas/exceptions.hpp>
#include <villas/node/config.h>
@ -108,7 +109,7 @@ void usage()
<< " 4 number of values is not equal" << std::endl
<< " 5 data is not equal" << std::endl << std::endl;
utils::print_copyright();
print_copyright();
}
int main(int argc, char *argv[])
@ -148,7 +149,7 @@ int main(int argc, char *argv[])
break;
case 'V':
utils::print_version();
print_version();
exit(EXIT_SUCCESS);
case 'd':

View file

@ -33,6 +33,7 @@
#include <villas/node/config.h>
#include <villas/super_node.hpp>
#include <villas/exceptions.hpp>
#include <villas/copyright.hpp>
#include <villas/log.hpp>
#include <villas/node.h>
#include <villas/utils.hpp>
@ -66,7 +67,7 @@ void usage()
<< " -h show this usage information" << std::endl
<< " -V show the version of the tool" << std::endl << std::endl;
utils::print_copyright();
print_copyright();
}
int main(int argc, char *argv[])
@ -119,7 +120,7 @@ int main(int argc, char *argv[])
goto check;
case 'V':
utils::print_version();
print_version();
exit(EXIT_SUCCESS);
case 'd':

View file

@ -28,12 +28,13 @@
#include <villas/config.h>
#include <villas/config_helper.h>
#include <villas/utils.hpp>
#include <villas/copyright.hpp>
void usage()
{
std::cout << "Usage: conf2json input.conf > output.json" << std::endl << std::endl;
villas::utils::print_copyright();
villas::print_copyright();
}
int main(int argc, char *argv[])