2015-12-04 01:47:49 +01:00
|
|
|
/** Node type: Websockets (libwebsockets)
|
|
|
|
*
|
|
|
|
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
|
2020-01-20 17:17:00 +01:00
|
|
|
* @copyright 2014-2020, Institute for Automation of Complex Power Systems, EONERC
|
2017-04-27 12:56:43 +02:00
|
|
|
* @license GNU General Public License (version 3)
|
|
|
|
*
|
|
|
|
* VILLASnode
|
|
|
|
*
|
|
|
|
* This program is free software: you can redistribute it and/or modify
|
|
|
|
* it under the terms of the GNU General Public License as published by
|
|
|
|
* the Free Software Foundation, either version 3 of the License, or
|
|
|
|
* any later version.
|
2017-05-05 19:24:16 +00:00
|
|
|
*
|
2017-04-27 12:56:43 +02:00
|
|
|
* This program is distributed in the hope that it will be useful,
|
|
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
* GNU General Public License for more details.
|
2017-05-05 19:24:16 +00:00
|
|
|
*
|
2017-04-27 12:56:43 +02:00
|
|
|
* You should have received a copy of the GNU General Public License
|
|
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
2015-12-04 01:47:49 +01:00
|
|
|
*********************************************************************************/
|
|
|
|
|
2019-06-23 16:57:00 +02:00
|
|
|
#include <cstdio>
|
|
|
|
#include <cstdlib>
|
2015-12-02 13:55:58 +01:00
|
|
|
#include <unistd.h>
|
2019-06-23 16:57:00 +02:00
|
|
|
#include <cstring>
|
2015-12-02 13:55:58 +01:00
|
|
|
#include <signal.h>
|
2015-12-13 02:01:57 +01:00
|
|
|
|
2018-08-02 10:28:08 +02:00
|
|
|
#include <libwebsockets.h>
|
|
|
|
|
2017-12-09 02:19:28 +08:00
|
|
|
#include <villas/timing.h>
|
2019-04-23 13:15:00 +02:00
|
|
|
#include <villas/utils.hpp>
|
2021-06-21 16:11:42 -04:00
|
|
|
#include <villas/node.h>
|
2019-04-23 00:12:31 +02:00
|
|
|
#include <villas/nodes/websocket.hpp>
|
2019-04-23 13:14:47 +02:00
|
|
|
#include <villas/super_node.hpp>
|
2017-03-12 17:13:37 -03:00
|
|
|
|
2020-06-08 04:03:07 +02:00
|
|
|
using namespace villas;
|
2021-05-10 00:12:30 +02:00
|
|
|
using namespace villas::node;
|
2019-06-04 16:55:38 +02:00
|
|
|
using namespace villas::utils;
|
|
|
|
|
2018-05-24 09:10:56 +02:00
|
|
|
#define DEFAULT_WEBSOCKET_BUFFER_SIZE (1 << 12)
|
|
|
|
|
2015-12-13 02:01:57 +01:00
|
|
|
/* Private static storage */
|
2020-06-16 02:35:34 +02:00
|
|
|
static struct vlist connections; /**< List of active libwebsocket connections which receive samples from all nodes (catch all) */
|
2018-10-21 10:28:07 +02:00
|
|
|
|
2019-04-23 13:14:47 +02:00
|
|
|
static villas::node::Web *web;
|
2021-02-16 14:15:14 +01:00
|
|
|
static villas::Logger logger = logging.get("websocket");
|
2016-11-07 22:19:30 -05:00
|
|
|
|
2016-07-11 18:18:20 +02:00
|
|
|
/* Forward declarations */
|
2021-06-21 16:11:42 -04:00
|
|
|
static struct vnode_type p;
|
2015-12-04 01:47:49 +01:00
|
|
|
|
2017-03-06 12:28:06 -04:00
|
|
|
static char * websocket_connection_name(struct websocket_connection *c)
|
2016-11-08 00:24:57 -05:00
|
|
|
{
|
|
|
|
if (!c->_name) {
|
2017-08-27 17:05:34 +02:00
|
|
|
if (c->wsi) {
|
2017-08-27 18:44:03 +02:00
|
|
|
char name[128];
|
|
|
|
char ip[128];
|
2017-09-04 14:28:55 +02:00
|
|
|
|
2017-08-27 18:44:03 +02:00
|
|
|
lws_get_peer_addresses(c->wsi, lws_get_socket_fd(c->wsi), name, sizeof(name), ip, sizeof(ip));
|
2017-09-04 14:28:55 +02:00
|
|
|
|
2017-08-27 18:44:03 +02:00
|
|
|
strcatf(&c->_name, "remote.ip=%s, remote.name=%s", ip, name);
|
2017-08-27 17:05:34 +02:00
|
|
|
}
|
2019-06-23 16:13:23 +02:00
|
|
|
else if (c->mode == websocket_connection::Mode::CLIENT && c->destination != nullptr)
|
2018-06-12 18:36:59 +02:00
|
|
|
strcatf(&c->_name, "dest=%s:%d", c->destination->info.address, c->destination->info.port);
|
2017-05-05 19:24:16 +00:00
|
|
|
|
2016-11-08 00:24:57 -05:00
|
|
|
if (c->node)
|
2017-08-27 18:44:03 +02:00
|
|
|
strcatf(&c->_name, ", node=%s", node_name(c->node));
|
2017-05-05 19:24:16 +00:00
|
|
|
|
2019-06-23 16:13:23 +02:00
|
|
|
strcatf(&c->_name, ", mode=%s", c->mode == websocket_connection::Mode::CLIENT ? "client" : "server");
|
2016-11-08 00:24:57 -05:00
|
|
|
}
|
2017-05-05 19:24:16 +00:00
|
|
|
|
2016-11-08 00:24:57 -05:00
|
|
|
return c->_name;
|
|
|
|
}
|
|
|
|
|
2017-03-29 06:01:50 +02:00
|
|
|
static void websocket_destination_destroy(struct websocket_destination *d)
|
2016-11-08 00:24:57 -05:00
|
|
|
{
|
|
|
|
free(d->uri);
|
2017-05-05 19:24:16 +00:00
|
|
|
|
2017-04-24 19:28:45 +02:00
|
|
|
free((char *) d->info.path);
|
|
|
|
free((char *) d->info.address);
|
2016-11-08 00:24:57 -05:00
|
|
|
}
|
|
|
|
|
2018-06-12 18:36:59 +02:00
|
|
|
static int websocket_connection_init(struct websocket_connection *c)
|
|
|
|
{
|
|
|
|
int ret;
|
|
|
|
|
2019-04-22 23:45:38 +02:00
|
|
|
c->_name = nullptr;
|
2018-06-12 18:36:59 +02:00
|
|
|
|
2019-10-26 13:34:03 +02:00
|
|
|
ret = queue_init(&c->queue, DEFAULT_QUEUE_LENGTH);
|
2018-06-12 18:36:59 +02:00
|
|
|
if (ret)
|
|
|
|
return ret;
|
|
|
|
|
2021-05-10 00:12:30 +02:00
|
|
|
c->formatter->start(&c->node->in.signals, ~(int) SampleFlags::HAS_OFFSET);
|
2018-08-20 18:27:45 +02:00
|
|
|
|
2020-06-08 04:03:07 +02:00
|
|
|
c->buffers.recv = new Buffer(DEFAULT_WEBSOCKET_BUFFER_SIZE);
|
|
|
|
c->buffers.send = new Buffer(DEFAULT_WEBSOCKET_BUFFER_SIZE);
|
2018-06-12 18:36:59 +02:00
|
|
|
|
2020-07-04 16:22:10 +02:00
|
|
|
if (!c->buffers.recv || !c->buffers.send)
|
|
|
|
throw MemoryAllocationError();
|
|
|
|
|
2019-06-23 16:13:23 +02:00
|
|
|
c->state = websocket_connection::State::INITIALIZED;
|
2018-06-12 18:36:59 +02:00
|
|
|
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
static int websocket_connection_destroy(struct websocket_connection *c)
|
|
|
|
{
|
|
|
|
int ret;
|
|
|
|
|
2019-06-23 16:13:23 +02:00
|
|
|
assert(c->state != websocket_connection::State::DESTROYED);
|
2018-06-12 18:36:59 +02:00
|
|
|
|
|
|
|
if (c->_name)
|
|
|
|
free(c->_name);
|
|
|
|
|
|
|
|
/* Return all samples to pool */
|
|
|
|
int avail;
|
|
|
|
struct sample *smp;
|
|
|
|
while ((avail = queue_pull(&c->queue, (void **) &smp)))
|
2018-08-07 09:22:26 +02:00
|
|
|
sample_decref(smp);
|
2018-06-12 18:36:59 +02:00
|
|
|
|
|
|
|
ret = queue_destroy(&c->queue);
|
|
|
|
if (ret)
|
|
|
|
return ret;
|
|
|
|
|
2021-05-10 00:12:30 +02:00
|
|
|
delete c->formatter;
|
2020-06-08 04:03:07 +02:00
|
|
|
delete c->buffers.recv;
|
|
|
|
delete c->buffers.send;
|
2018-06-12 18:36:59 +02:00
|
|
|
|
2019-04-22 23:45:38 +02:00
|
|
|
c->wsi = nullptr;
|
|
|
|
c->_name = nullptr;
|
2018-06-12 18:36:59 +02:00
|
|
|
|
2019-06-23 16:13:23 +02:00
|
|
|
c->state = websocket_connection::State::DESTROYED;
|
2018-06-12 18:36:59 +02:00
|
|
|
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
2021-05-10 00:12:30 +02:00
|
|
|
static int websocket_connection_write(struct websocket_connection *c, struct sample * const smps[], unsigned cnt)
|
2016-11-08 00:24:57 -05:00
|
|
|
{
|
2017-08-27 17:05:34 +02:00
|
|
|
int pushed;
|
2017-09-04 14:28:55 +02:00
|
|
|
|
2019-06-23 16:13:23 +02:00
|
|
|
if (c->state != websocket_connection::State::INITIALIZED)
|
2018-06-12 18:36:59 +02:00
|
|
|
return -1;
|
|
|
|
|
2021-06-29 10:45:12 -04:00
|
|
|
sample_incref_many(smps, cnt);
|
|
|
|
|
2017-08-27 17:05:34 +02:00
|
|
|
pushed = queue_push_many(&c->queue, (void **) smps, cnt);
|
2021-06-29 10:45:12 -04:00
|
|
|
if (pushed < (int) cnt) {
|
|
|
|
sample_decref_many(smps + pushed, cnt - pushed);
|
2021-02-16 14:15:14 +01:00
|
|
|
c->node->logger->warn("Queue overrun in WebSocket connection: {}", websocket_connection_name(c));
|
2021-06-29 10:45:12 -04:00
|
|
|
}
|
2017-09-04 14:28:55 +02:00
|
|
|
|
2021-02-16 14:15:14 +01:00
|
|
|
c->node->logger->debug("Enqueued {} samples to {}", pushed, websocket_connection_name(c));
|
2017-09-04 14:28:55 +02:00
|
|
|
|
2017-08-27 17:05:34 +02:00
|
|
|
/* Client connections which are currently conecting don't have an associate c->wsi yet */
|
2019-01-12 19:05:09 +01:00
|
|
|
if (c->wsi)
|
2019-04-23 13:14:47 +02:00
|
|
|
web->callbackOnWritable(c->wsi);
|
2017-05-05 19:24:16 +00:00
|
|
|
|
2016-11-08 00:24:57 -05:00
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
2017-04-24 19:28:45 +02:00
|
|
|
static void websocket_connection_close(struct websocket_connection *c, struct lws *wsi, enum lws_close_status status, const char *reason)
|
|
|
|
{
|
|
|
|
lws_close_reason(wsi, status, (unsigned char *) reason, strlen(reason));
|
|
|
|
|
2021-02-16 14:15:14 +01:00
|
|
|
c->node->logger->debug("Closing WebSocket connection with {}: status={}, reason={}", websocket_connection_name(c), status, reason);
|
2017-04-24 19:28:45 +02:00
|
|
|
}
|
|
|
|
|
2017-03-06 12:28:06 -04:00
|
|
|
int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
|
2015-12-04 01:47:49 +01:00
|
|
|
{
|
2017-08-27 17:05:34 +02:00
|
|
|
int ret, recvd, pulled, cnt = 128;
|
2019-04-22 23:43:46 +02:00
|
|
|
struct websocket_connection *c = (struct websocket_connection *) user;
|
2017-09-04 14:28:55 +02:00
|
|
|
|
2015-12-02 13:55:58 +01:00
|
|
|
switch (reason) {
|
2016-07-11 18:18:20 +02:00
|
|
|
case LWS_CALLBACK_CLIENT_ESTABLISHED:
|
2017-04-03 09:01:14 +02:00
|
|
|
case LWS_CALLBACK_ESTABLISHED:
|
2017-08-27 17:05:34 +02:00
|
|
|
c->wsi = wsi;
|
2019-06-23 16:13:23 +02:00
|
|
|
c->state = websocket_connection::State::ESTABLISHED;
|
2018-06-12 18:36:59 +02:00
|
|
|
|
|
|
|
if (reason == LWS_CALLBACK_CLIENT_ESTABLISHED)
|
2019-06-23 16:13:23 +02:00
|
|
|
c->mode = websocket_connection::Mode::CLIENT;
|
2018-06-12 18:36:59 +02:00
|
|
|
else {
|
2019-06-23 16:13:23 +02:00
|
|
|
c->mode = websocket_connection::Mode::SERVER;
|
2018-06-12 18:36:59 +02:00
|
|
|
/* We use the URI to associate this connection to a node
|
|
|
|
* and choose a protocol.
|
|
|
|
*
|
|
|
|
* Example: ws://example.com/node_1.json
|
|
|
|
* Will select the node with the name 'node_1'
|
|
|
|
* and format 'json'.
|
|
|
|
*/
|
|
|
|
|
|
|
|
/* Get path of incoming request */
|
2019-02-18 01:09:33 +01:00
|
|
|
char *node, *format, *lasts;
|
2018-06-12 18:36:59 +02:00
|
|
|
char uri[64];
|
|
|
|
|
|
|
|
lws_hdr_copy(wsi, uri, sizeof(uri), WSI_TOKEN_GET_URI); /* The path component of the*/
|
|
|
|
if (strlen(uri) <= 0) {
|
|
|
|
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Invalid URL");
|
2021-02-16 14:15:14 +01:00
|
|
|
|
|
|
|
logger->warn("Failed to get request URI");
|
|
|
|
|
2018-06-12 18:36:59 +02:00
|
|
|
return -1;
|
|
|
|
}
|
2017-09-04 14:28:55 +02:00
|
|
|
|
2019-02-18 01:09:33 +01:00
|
|
|
node = strtok_r(uri, "/.", &lasts);
|
2018-06-12 18:36:59 +02:00
|
|
|
if (!node) {
|
|
|
|
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_POLICY_VIOLATION, "Unknown node");
|
2021-02-16 14:15:14 +01:00
|
|
|
logger->warn("Failed to tokenize request URI");
|
2018-06-12 18:36:59 +02:00
|
|
|
return -1;
|
|
|
|
}
|
2017-09-04 14:28:55 +02:00
|
|
|
|
2019-04-22 23:45:38 +02:00
|
|
|
format = strtok_r(nullptr, "", &lasts);
|
2018-06-12 18:36:59 +02:00
|
|
|
if (!format)
|
2019-04-22 23:43:46 +02:00
|
|
|
format = (char *) "villas.web";
|
2017-05-05 19:24:16 +00:00
|
|
|
|
2018-06-12 18:36:59 +02:00
|
|
|
/* Search for node whose name matches the URI. */
|
2021-06-21 16:11:42 -04:00
|
|
|
c->node = p.instances.lookup(node);
|
2018-06-12 18:36:59 +02:00
|
|
|
if (!c->node) {
|
|
|
|
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_POLICY_VIOLATION, "Unknown node");
|
2021-02-16 14:15:14 +01:00
|
|
|
logger->warn("Failed to find node: {}", node);
|
2018-06-12 18:36:59 +02:00
|
|
|
return -1;
|
|
|
|
}
|
2018-03-28 14:29:55 +02:00
|
|
|
|
2021-05-10 00:12:30 +02:00
|
|
|
c->formatter = FormatFactory::make(format);
|
|
|
|
if (!c->formatter) {
|
2018-06-12 18:36:59 +02:00
|
|
|
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_POLICY_VIOLATION, "Unknown format");
|
2021-02-16 14:15:14 +01:00
|
|
|
c->node->logger->warn("Failed to find format: format={}", format);
|
2018-06-12 18:36:59 +02:00
|
|
|
return -1;
|
|
|
|
}
|
2017-08-14 14:42:07 +02:00
|
|
|
}
|
2017-09-04 14:28:55 +02:00
|
|
|
|
2018-06-12 18:36:59 +02:00
|
|
|
ret = websocket_connection_init(c);
|
|
|
|
if (ret) {
|
|
|
|
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_POLICY_VIOLATION, "Internal error");
|
2021-02-16 14:15:14 +01:00
|
|
|
c->node->logger->warn("Failed to intialize WebSocket connection: reason={}", ret);
|
2016-11-07 22:19:30 -05:00
|
|
|
return -1;
|
2018-06-12 18:36:59 +02:00
|
|
|
}
|
2015-12-04 01:47:49 +01:00
|
|
|
|
2019-01-07 10:28:55 +01:00
|
|
|
vlist_push(&connections, c);
|
2017-09-04 14:28:55 +02:00
|
|
|
|
2021-02-16 14:15:14 +01:00
|
|
|
c->node->logger->info("Established WebSocket connection: {}", websocket_connection_name(c));
|
2020-07-04 17:15:27 +02:00
|
|
|
|
2017-08-14 14:42:07 +02:00
|
|
|
break;
|
2017-04-24 19:28:45 +02:00
|
|
|
|
2017-08-27 17:05:34 +02:00
|
|
|
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
|
2019-06-23 16:13:23 +02:00
|
|
|
c->state = websocket_connection::State::ERROR;
|
2017-05-05 19:24:16 +00:00
|
|
|
|
2021-02-16 14:15:14 +01:00
|
|
|
logger->warn("Failed to establish WebSocket connection: reason={}", in ? (char *) in : "unknown");
|
2016-02-04 18:25:13 +01:00
|
|
|
|
2017-08-27 17:05:34 +02:00
|
|
|
return -1;
|
2017-09-04 14:28:55 +02:00
|
|
|
|
2015-12-04 01:47:49 +01:00
|
|
|
case LWS_CALLBACK_CLOSED:
|
2021-02-16 14:15:14 +01:00
|
|
|
c->node->logger->debug("Closed WebSocket connection: {}", websocket_connection_name(c));
|
2017-09-04 14:28:55 +02:00
|
|
|
|
2019-06-23 16:13:23 +02:00
|
|
|
if (c->state != websocket_connection::State::SHUTDOWN) {
|
2017-08-27 17:05:34 +02:00
|
|
|
/** @todo Attempt reconnect here */
|
2016-02-04 16:30:36 +01:00
|
|
|
}
|
|
|
|
|
2019-06-23 16:13:23 +02:00
|
|
|
if (connections.state == State::INITIALIZED)
|
2019-02-24 09:23:31 +01:00
|
|
|
vlist_remove_all(&connections, c);
|
2017-05-05 19:24:16 +00:00
|
|
|
|
2019-06-23 16:13:23 +02:00
|
|
|
if (c->state == websocket_connection::State::INITIALIZED)
|
2018-06-12 20:02:43 +02:00
|
|
|
websocket_connection_destroy(c);
|
2017-04-24 19:28:45 +02:00
|
|
|
|
2019-06-23 16:13:23 +02:00
|
|
|
if (c->mode == websocket_connection::Mode::CLIENT)
|
2020-01-21 16:26:51 +01:00
|
|
|
delete c;
|
2017-04-24 19:28:45 +02:00
|
|
|
|
2017-08-27 17:05:34 +02:00
|
|
|
break;
|
2017-05-05 19:24:16 +00:00
|
|
|
|
2016-07-11 18:18:20 +02:00
|
|
|
case LWS_CALLBACK_CLIENT_WRITEABLE:
|
2018-08-20 18:27:45 +02:00
|
|
|
case LWS_CALLBACK_SERVER_WRITEABLE: {
|
2020-09-11 14:57:05 +02:00
|
|
|
struct sample *smps[cnt];
|
2017-07-13 22:39:38 +02:00
|
|
|
|
2017-08-27 17:05:34 +02:00
|
|
|
pulled = queue_pull_many(&c->queue, (void **) smps, cnt);
|
|
|
|
if (pulled > 0) {
|
2018-05-23 00:17:41 +02:00
|
|
|
size_t wbytes;
|
2021-05-10 00:12:30 +02:00
|
|
|
c->formatter->sprint(c->buffers.send->data() + LWS_PRE, c->buffers.send->size() - LWS_PRE, &wbytes, smps, pulled);
|
2017-09-04 14:28:55 +02:00
|
|
|
|
2021-05-10 00:12:30 +02:00
|
|
|
ret = lws_write(wsi, (unsigned char *) c->buffers.send->data() + LWS_PRE, wbytes, c->formatter->isBinaryPayload() ? LWS_WRITE_BINARY : LWS_WRITE_TEXT);
|
2017-05-05 19:24:16 +00:00
|
|
|
|
2018-08-07 09:22:26 +02:00
|
|
|
sample_decref_many(smps, pulled);
|
2017-05-05 19:24:16 +00:00
|
|
|
|
2019-01-14 10:00:09 +01:00
|
|
|
if (ret < 0)
|
|
|
|
return ret;
|
|
|
|
|
2021-02-16 14:15:14 +01:00
|
|
|
c->node->logger->debug("Send {} samples to connection: {}, bytes={}", pulled, websocket_connection_name(c), ret);
|
2017-07-13 22:39:38 +02:00
|
|
|
}
|
2017-09-04 14:28:55 +02:00
|
|
|
|
2017-08-27 17:59:24 +02:00
|
|
|
if (queue_available(&c->queue) > 0)
|
|
|
|
lws_callback_on_writable(wsi);
|
2019-06-23 16:13:23 +02:00
|
|
|
else if (c->state == websocket_connection::State::SHUTDOWN) {
|
2018-08-20 18:27:45 +02:00
|
|
|
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_GOINGAWAY, "Node stopped");
|
|
|
|
return -1;
|
|
|
|
}
|
2017-09-04 14:28:55 +02:00
|
|
|
|
2017-08-27 17:05:34 +02:00
|
|
|
break;
|
2018-08-20 18:27:45 +02:00
|
|
|
}
|
2015-12-02 13:55:58 +01:00
|
|
|
|
2016-07-11 18:18:20 +02:00
|
|
|
case LWS_CALLBACK_CLIENT_RECEIVE:
|
2017-09-04 14:28:55 +02:00
|
|
|
case LWS_CALLBACK_RECEIVE:
|
2017-08-27 17:05:34 +02:00
|
|
|
if (lws_is_first_fragment(wsi))
|
2020-06-08 04:03:07 +02:00
|
|
|
c->buffers.recv->clear();
|
2017-09-04 14:28:55 +02:00
|
|
|
|
2020-10-20 22:17:55 +02:00
|
|
|
c->buffers.recv->append((char *) in, len);
|
2017-09-04 14:28:55 +02:00
|
|
|
|
2017-08-27 17:05:34 +02:00
|
|
|
/* We dont try to parse the frame yet, as we have to wait for the remaining fragments */
|
|
|
|
if (lws_is_final_fragment(wsi)) {
|
|
|
|
struct timespec ts_recv = time_now();
|
2020-08-25 21:00:52 +02:00
|
|
|
struct vnode *n = c->node;
|
2018-02-06 23:30:37 +01:00
|
|
|
|
2018-05-07 18:48:34 +02:00
|
|
|
int avail, enqueued;
|
2018-02-06 23:30:37 +01:00
|
|
|
struct websocket *w = (struct websocket *) n->_vd;
|
2020-09-11 14:57:05 +02:00
|
|
|
struct sample *smps[cnt];
|
2017-09-04 14:28:55 +02:00
|
|
|
|
2018-05-07 18:48:34 +02:00
|
|
|
avail = sample_alloc_many(&w->pool, smps, cnt);
|
|
|
|
if (avail < cnt)
|
2021-02-16 14:15:14 +01:00
|
|
|
c->node->logger->warn("Pool underrun for connection: {}", websocket_connection_name(c));
|
2017-05-05 19:24:16 +00:00
|
|
|
|
2021-05-10 00:12:30 +02:00
|
|
|
recvd = c->formatter->sscan(c->buffers.recv->data(), c->buffers.recv->size(), nullptr, smps, avail);
|
2017-08-27 17:05:34 +02:00
|
|
|
if (recvd < 0) {
|
2021-02-16 14:15:14 +01:00
|
|
|
c->node->logger->warn("Failed to parse sample data received on connection: {}", websocket_connection_name(c));
|
2017-08-27 17:05:34 +02:00
|
|
|
break;
|
2017-04-24 19:28:45 +02:00
|
|
|
}
|
2017-05-05 19:24:16 +00:00
|
|
|
|
2021-02-16 14:15:14 +01:00
|
|
|
c->node->logger->debug("Received {} samples from connection: {}", recvd, websocket_connection_name(c));
|
2017-05-05 19:24:16 +00:00
|
|
|
|
2017-08-27 17:05:34 +02:00
|
|
|
/* Set receive timestamp */
|
2017-09-04 14:28:55 +02:00
|
|
|
for (int i = 0; i < recvd; i++) {
|
2017-08-27 17:05:34 +02:00
|
|
|
smps[i]->ts.received = ts_recv;
|
2019-06-23 16:13:23 +02:00
|
|
|
smps[i]->flags |= (int) SampleFlags::HAS_TS_RECEIVED;
|
2017-09-04 14:28:55 +02:00
|
|
|
}
|
2017-04-24 19:28:45 +02:00
|
|
|
|
2018-05-07 18:48:34 +02:00
|
|
|
enqueued = queue_signalled_push_many(&w->queue, (void **) smps, recvd);
|
|
|
|
if (enqueued < recvd)
|
2021-02-16 14:15:14 +01:00
|
|
|
c->node->logger->warn("Queue overrun in connection: {}", websocket_connection_name(c));
|
2017-09-04 14:28:55 +02:00
|
|
|
|
2018-05-07 18:48:34 +02:00
|
|
|
/* Release unused samples back to pool */
|
|
|
|
if (enqueued < avail)
|
2018-08-07 09:22:26 +02:00
|
|
|
sample_decref_many(&smps[enqueued], avail - enqueued);
|
2018-05-07 18:48:34 +02:00
|
|
|
|
2020-06-08 04:03:07 +02:00
|
|
|
c->buffers.recv->clear();
|
2018-05-26 01:13:22 +02:00
|
|
|
|
2019-06-23 16:13:23 +02:00
|
|
|
if (c->state == websocket_connection::State::SHUTDOWN) {
|
2017-08-27 17:05:34 +02:00
|
|
|
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_GOINGAWAY, "Node stopped");
|
|
|
|
return -1;
|
2016-11-07 22:19:30 -05:00
|
|
|
}
|
2016-01-14 23:17:39 +01:00
|
|
|
}
|
2017-05-05 19:24:16 +00:00
|
|
|
|
2017-08-27 17:05:34 +02:00
|
|
|
break;
|
2016-07-11 18:18:20 +02:00
|
|
|
|
2015-12-02 13:55:58 +01:00
|
|
|
default:
|
2017-08-14 14:42:07 +02:00
|
|
|
break;
|
2015-12-02 13:55:58 +01:00
|
|
|
}
|
2017-09-04 14:28:55 +02:00
|
|
|
|
2017-08-14 14:42:07 +02:00
|
|
|
return 0;
|
2015-12-02 13:55:58 +01:00
|
|
|
}
|
|
|
|
|
2019-04-23 13:14:47 +02:00
|
|
|
int websocket_type_start(villas::node::SuperNode *sn)
|
2017-04-24 19:28:45 +02:00
|
|
|
{
|
2020-09-10 11:11:42 +02:00
|
|
|
int ret;
|
|
|
|
|
|
|
|
ret = vlist_init(&connections);
|
|
|
|
if (ret)
|
|
|
|
return ret;
|
2017-05-05 19:24:16 +00:00
|
|
|
|
2019-04-23 13:14:47 +02:00
|
|
|
web = sn->getWeb();
|
2019-06-23 16:13:23 +02:00
|
|
|
if (web->getState() != State::STARTED)
|
2018-12-02 02:56:52 +01:00
|
|
|
return -1;
|
2017-04-24 19:28:45 +02:00
|
|
|
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
2020-08-25 21:00:52 +02:00
|
|
|
int websocket_start(struct vnode *n)
|
2015-12-02 13:55:58 +01:00
|
|
|
{
|
2016-07-11 18:18:20 +02:00
|
|
|
int ret;
|
2017-10-18 15:39:53 +02:00
|
|
|
struct websocket *w = (struct websocket *) n->_vd;
|
2017-05-05 19:24:16 +00:00
|
|
|
|
2021-06-24 06:33:23 -04:00
|
|
|
ret = pool_init(&w->pool, DEFAULT_WEBSOCKET_QUEUE_LENGTH, SAMPLE_LENGTH(vlist_length(&n->in.signals)));
|
2016-07-11 18:18:20 +02:00
|
|
|
if (ret)
|
|
|
|
return ret;
|
2017-05-05 19:24:16 +00:00
|
|
|
|
2019-10-26 13:34:03 +02:00
|
|
|
ret = queue_signalled_init(&w->queue, DEFAULT_WEBSOCKET_QUEUE_LENGTH);
|
2017-04-02 13:02:49 +02:00
|
|
|
if (ret)
|
|
|
|
return ret;
|
2016-11-07 22:19:30 -05:00
|
|
|
|
2019-04-22 23:43:46 +02:00
|
|
|
for (size_t i = 0; i < vlist_length(&w->destinations); i++) {
|
2018-08-20 18:27:45 +02:00
|
|
|
const char *format;
|
2020-01-21 16:26:51 +01:00
|
|
|
auto *d = (struct websocket_destination *) vlist_at(&w->destinations, i);
|
|
|
|
auto *c = new struct websocket_connection;
|
2020-07-04 16:22:10 +02:00
|
|
|
if (!c)
|
|
|
|
throw MemoryAllocationError();
|
2017-05-05 19:24:16 +00:00
|
|
|
|
2019-06-23 16:13:23 +02:00
|
|
|
c->state = websocket_connection::State::CONNECTING;
|
2018-06-12 18:36:59 +02:00
|
|
|
|
2018-08-20 18:27:45 +02:00
|
|
|
format = strchr(d->info.path, '.');
|
2019-01-12 19:06:23 +01:00
|
|
|
if (format)
|
2019-03-26 07:09:55 +01:00
|
|
|
format = format + 1; /* Removes "." */
|
2019-01-12 19:06:23 +01:00
|
|
|
else
|
2018-08-20 18:27:45 +02:00
|
|
|
format = "villas.web";
|
|
|
|
|
2021-05-10 00:12:30 +02:00
|
|
|
c->formatter = FormatFactory::make(format);
|
|
|
|
if (!c->formatter)
|
2018-08-20 18:27:45 +02:00
|
|
|
return -1;
|
|
|
|
|
2017-04-24 19:28:45 +02:00
|
|
|
c->node = n;
|
2017-08-14 14:42:07 +02:00
|
|
|
c->destination = d;
|
2017-05-05 19:24:16 +00:00
|
|
|
|
2019-04-23 13:14:47 +02:00
|
|
|
d->info.context = web->getContext();
|
|
|
|
d->info.vhost = web->getVHost();
|
2017-04-24 19:28:45 +02:00
|
|
|
d->info.userdata = c;
|
2017-09-04 14:28:55 +02:00
|
|
|
|
2017-04-24 19:28:45 +02:00
|
|
|
lws_client_connect_via_info(&d->info);
|
|
|
|
}
|
2017-03-16 22:42:58 -03:00
|
|
|
|
2015-12-02 13:55:58 +01:00
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
2020-08-25 21:00:52 +02:00
|
|
|
int websocket_stop(struct vnode *n)
|
2016-02-04 17:13:28 +01:00
|
|
|
{
|
2019-03-31 12:52:07 +02:00
|
|
|
int ret, open_connections = 0;;
|
2017-10-18 15:39:53 +02:00
|
|
|
struct websocket *w = (struct websocket *) n->_vd;
|
2017-09-04 14:28:55 +02:00
|
|
|
|
2019-01-07 10:28:55 +01:00
|
|
|
for (size_t i = 0; i < vlist_length(&connections); i++) {
|
|
|
|
struct websocket_connection *c = (struct websocket_connection *) vlist_at(&connections, i);
|
2017-09-04 14:28:55 +02:00
|
|
|
|
2017-08-27 17:05:34 +02:00
|
|
|
if (c->node != n)
|
|
|
|
continue;
|
2017-05-05 19:24:16 +00:00
|
|
|
|
2019-06-23 16:13:23 +02:00
|
|
|
c->state = websocket_connection::State::SHUTDOWN;
|
2017-03-29 06:01:50 +02:00
|
|
|
|
2016-11-07 22:19:30 -05:00
|
|
|
lws_callback_on_writable(c->wsi);
|
|
|
|
}
|
2017-04-24 19:28:45 +02:00
|
|
|
|
2019-03-31 12:52:07 +02:00
|
|
|
/* Count open connections belonging to this node */
|
2019-04-22 23:43:46 +02:00
|
|
|
for (size_t i = 0; i < vlist_length(&connections); i++) {
|
2019-03-31 12:52:07 +02:00
|
|
|
struct websocket_connection *c = (struct websocket_connection *) vlist_at(&connections, i);
|
2018-08-20 18:27:45 +02:00
|
|
|
|
2019-03-31 12:52:07 +02:00
|
|
|
if (c->node == n)
|
|
|
|
open_connections++;
|
|
|
|
}
|
2018-08-20 18:27:45 +02:00
|
|
|
|
2019-03-31 12:52:07 +02:00
|
|
|
if (open_connections > 0) {
|
2021-02-16 14:15:14 +01:00
|
|
|
n->logger->info("Waiting for shutdown of {} connections...", open_connections);
|
2018-08-20 18:27:45 +02:00
|
|
|
sleep(1);
|
|
|
|
}
|
|
|
|
|
2019-03-31 12:52:07 +02:00
|
|
|
ret = queue_signalled_close(&w->queue);
|
|
|
|
if (ret)
|
|
|
|
return ret;
|
|
|
|
|
2017-04-07 12:25:17 +02:00
|
|
|
ret = queue_signalled_destroy(&w->queue);
|
2017-04-02 13:02:49 +02:00
|
|
|
if (ret)
|
|
|
|
return ret;
|
2017-05-05 19:24:16 +00:00
|
|
|
|
2017-04-24 19:28:45 +02:00
|
|
|
ret = pool_destroy(&w->pool);
|
|
|
|
if (ret)
|
|
|
|
return ret;
|
|
|
|
|
2016-02-04 17:13:28 +01:00
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
2020-08-25 21:00:52 +02:00
|
|
|
int websocket_destroy(struct vnode *n)
|
2015-12-02 13:55:58 +01:00
|
|
|
{
|
2017-10-18 15:39:53 +02:00
|
|
|
struct websocket *w = (struct websocket *) n->_vd;
|
2018-05-26 01:15:23 +02:00
|
|
|
int ret;
|
2017-03-16 22:42:58 -03:00
|
|
|
|
2019-01-07 10:28:55 +01:00
|
|
|
ret = vlist_destroy(&w->destinations, (dtor_cb_t) websocket_destination_destroy, true);
|
2018-05-26 01:15:23 +02:00
|
|
|
if (ret)
|
|
|
|
return ret;
|
2015-12-04 01:47:49 +01:00
|
|
|
|
2015-12-02 13:55:58 +01:00
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
2021-05-10 00:12:30 +02:00
|
|
|
int websocket_read(struct vnode *n, struct sample * const smps[], unsigned cnt)
|
2015-12-02 13:55:58 +01:00
|
|
|
{
|
2017-04-24 19:28:45 +02:00
|
|
|
int avail;
|
2017-03-16 22:42:58 -03:00
|
|
|
|
2017-10-18 15:39:53 +02:00
|
|
|
struct websocket *w = (struct websocket *) n->_vd;
|
2018-07-11 18:14:29 +02:00
|
|
|
struct sample *cpys[cnt];
|
2017-03-16 22:42:58 -03:00
|
|
|
|
2018-07-11 18:14:29 +02:00
|
|
|
avail = queue_signalled_pull_many(&w->queue, (void **) cpys, cnt);
|
2017-08-27 17:05:34 +02:00
|
|
|
if (avail < 0)
|
|
|
|
return avail;
|
2017-04-24 19:28:45 +02:00
|
|
|
|
2018-03-28 14:29:55 +02:00
|
|
|
sample_copy_many(smps, cpys, avail);
|
2018-08-07 09:22:26 +02:00
|
|
|
sample_decref_many(cpys, avail);
|
2016-06-08 22:39:17 +02:00
|
|
|
|
2017-04-24 19:28:45 +02:00
|
|
|
return avail;
|
2015-12-02 13:55:58 +01:00
|
|
|
}
|
|
|
|
|
2021-05-10 00:12:30 +02:00
|
|
|
int websocket_write(struct vnode *n, struct sample * const smps[], unsigned cnt)
|
2015-12-02 13:55:58 +01:00
|
|
|
{
|
2017-04-24 19:28:45 +02:00
|
|
|
int avail;
|
|
|
|
|
2017-10-18 15:39:53 +02:00
|
|
|
struct websocket *w = (struct websocket *) n->_vd;
|
2018-07-11 18:14:29 +02:00
|
|
|
struct sample *cpys[cnt];
|
2017-05-05 19:24:16 +00:00
|
|
|
|
2017-04-24 19:28:45 +02:00
|
|
|
/* Make copies of all samples */
|
2018-07-11 18:14:29 +02:00
|
|
|
avail = sample_alloc_many(&w->pool, cpys, cnt);
|
2019-04-22 23:43:46 +02:00
|
|
|
if (avail < (int) cnt)
|
2021-02-16 14:15:14 +01:00
|
|
|
n->logger->warn("Pool underrun: avail={}", avail);
|
2017-04-24 19:28:45 +02:00
|
|
|
|
2018-03-28 14:29:55 +02:00
|
|
|
sample_copy_many(cpys, smps, avail);
|
2017-05-05 19:24:16 +00:00
|
|
|
|
2019-01-07 10:28:55 +01:00
|
|
|
for (size_t i = 0; i < vlist_length(&connections); i++) {
|
|
|
|
struct websocket_connection *c = (struct websocket_connection *) vlist_at(&connections, i);
|
2017-09-04 14:28:55 +02:00
|
|
|
|
2018-03-28 14:29:55 +02:00
|
|
|
if (c->node == n)
|
2018-07-11 18:14:29 +02:00
|
|
|
websocket_connection_write(c, cpys, cnt);
|
2017-03-25 21:23:31 +01:00
|
|
|
}
|
2017-05-05 19:24:16 +00:00
|
|
|
|
2018-08-07 09:22:26 +02:00
|
|
|
sample_decref_many(cpys, avail);
|
2016-06-08 22:39:17 +02:00
|
|
|
|
2018-07-11 18:14:29 +02:00
|
|
|
return cnt;
|
2015-12-02 13:55:58 +01:00
|
|
|
}
|
2016-07-11 18:18:20 +02:00
|
|
|
|
2021-02-16 14:15:14 +01:00
|
|
|
int websocket_parse(struct vnode *n, json_t *json)
|
2016-11-07 22:19:30 -05:00
|
|
|
{
|
2017-10-18 15:39:53 +02:00
|
|
|
struct websocket *w = (struct websocket *) n->_vd;
|
2016-11-08 00:24:57 -05:00
|
|
|
int ret;
|
2017-03-16 22:42:58 -03:00
|
|
|
|
2018-08-07 18:40:32 +02:00
|
|
|
size_t i;
|
2019-04-22 23:45:38 +02:00
|
|
|
json_t *json_dests = nullptr;
|
2017-10-16 08:08:35 +02:00
|
|
|
json_t *json_dest;
|
2017-08-03 00:19:27 +02:00
|
|
|
json_error_t err;
|
|
|
|
|
2020-09-10 11:11:42 +02:00
|
|
|
ret = vlist_init(&w->destinations);
|
|
|
|
if (ret)
|
|
|
|
return ret;
|
2017-03-16 22:42:58 -03:00
|
|
|
|
2021-02-16 14:15:14 +01:00
|
|
|
ret = json_unpack_ex(json, &err, 0, "{ s?: o }", "destinations", &json_dests);
|
2017-08-03 00:19:27 +02:00
|
|
|
if (ret)
|
2021-02-16 14:15:14 +01:00
|
|
|
throw ConfigError(json, err, "node-config-node-websocket");
|
2017-08-03 00:19:27 +02:00
|
|
|
|
2017-10-16 08:08:35 +02:00
|
|
|
if (json_dests) {
|
|
|
|
if (!json_is_array(json_dests))
|
2021-02-16 14:15:14 +01:00
|
|
|
throw ConfigError(json_dests, err, "node-config-node-websocket-destinations", "The 'destinations' setting must be an array of URLs");
|
2017-05-05 19:24:16 +00:00
|
|
|
|
2018-08-07 18:40:32 +02:00
|
|
|
json_array_foreach(json_dests, i, json_dest) {
|
2017-03-11 23:50:46 -03:00
|
|
|
const char *uri, *prot, *ads, *path;
|
2017-05-05 19:24:16 +00:00
|
|
|
|
2017-10-16 08:08:35 +02:00
|
|
|
uri = json_string_value(json_dest);
|
2017-03-11 23:50:46 -03:00
|
|
|
if (!uri)
|
2021-02-16 14:15:14 +01:00
|
|
|
throw ConfigError(json_dest, err, "node-config-node-websocket-destinations", "The 'destinations' setting must be an array of URLs");
|
2017-05-05 19:24:16 +00:00
|
|
|
|
2020-01-21 16:26:51 +01:00
|
|
|
auto *d = new struct websocket_destination;
|
2020-07-04 16:22:10 +02:00
|
|
|
if (!d)
|
|
|
|
throw MemoryAllocationError();
|
|
|
|
|
|
|
|
memset(d, 0, sizeof(struct websocket_destination));
|
2017-05-05 19:24:16 +00:00
|
|
|
|
2017-07-09 14:36:09 +02:00
|
|
|
d->uri = strdup(uri);
|
2017-05-05 19:24:16 +00:00
|
|
|
|
2017-07-09 14:36:09 +02:00
|
|
|
ret = lws_parse_uri(d->uri, &prot, &ads, &d->info.port, &path);
|
2017-03-11 23:50:46 -03:00
|
|
|
if (ret)
|
2021-02-16 14:15:14 +01:00
|
|
|
throw ConfigError(json_dest, err, "node-config-node-websocket-destinations", "Failed to parse WebSocket URI: '{}'", uri);
|
2017-05-05 19:24:16 +00:00
|
|
|
|
2017-07-09 14:36:09 +02:00
|
|
|
d->info.ssl_connection = !strcmp(prot, "https");
|
|
|
|
d->info.address = strdup(ads);
|
2020-09-30 16:07:39 +02:00
|
|
|
d->info.path = strf("/%s", path);
|
2017-07-09 14:36:09 +02:00
|
|
|
d->info.host = d->info.address;
|
|
|
|
d->info.origin = d->info.address;
|
|
|
|
d->info.ietf_version_or_minus_one = -1;
|
|
|
|
d->info.protocol = "live";
|
2017-05-05 19:24:16 +00:00
|
|
|
|
2019-01-07 10:28:55 +01:00
|
|
|
vlist_push(&w->destinations, d);
|
2017-03-11 23:50:46 -03:00
|
|
|
}
|
2016-07-11 18:18:20 +02:00
|
|
|
}
|
2017-03-08 09:53:28 -03:00
|
|
|
|
2016-11-08 00:24:57 -05:00
|
|
|
return 0;
|
2016-11-07 22:19:30 -05:00
|
|
|
}
|
|
|
|
|
2020-08-25 21:00:52 +02:00
|
|
|
char * websocket_print(struct vnode *n)
|
2016-11-07 22:19:30 -05:00
|
|
|
{
|
2017-10-18 15:39:53 +02:00
|
|
|
struct websocket *w = (struct websocket *) n->_vd;
|
2016-11-08 00:24:57 -05:00
|
|
|
|
2019-04-22 23:45:38 +02:00
|
|
|
char *buf = nullptr;
|
2017-05-05 19:24:16 +00:00
|
|
|
|
2017-04-24 18:11:05 +02:00
|
|
|
buf = strcatf(&buf, "destinations=[ ");
|
2017-05-05 19:24:16 +00:00
|
|
|
|
2019-01-07 10:28:55 +01:00
|
|
|
for (size_t i = 0; i < vlist_length(&w->destinations); i++) {
|
|
|
|
struct websocket_destination *d = (struct websocket_destination *) vlist_at(&w->destinations, i);
|
2017-03-25 21:23:31 +01:00
|
|
|
|
2017-08-27 17:05:34 +02:00
|
|
|
buf = strcatf(&buf, "%s://%s:%d/%s ",
|
2017-04-24 18:11:05 +02:00
|
|
|
d->info.ssl_connection ? "wss" : "ws",
|
|
|
|
d->info.address,
|
|
|
|
d->info.port,
|
|
|
|
d->info.path
|
2016-11-08 00:24:57 -05:00
|
|
|
);
|
2016-07-11 18:18:20 +02:00
|
|
|
}
|
2017-05-05 19:24:16 +00:00
|
|
|
|
2017-04-24 18:11:05 +02:00
|
|
|
buf = strcatf(&buf, "]");
|
2017-05-05 19:24:16 +00:00
|
|
|
|
2016-11-08 00:24:57 -05:00
|
|
|
return buf;
|
2015-12-02 13:55:58 +01:00
|
|
|
}
|
|
|
|
|
2020-08-25 21:00:52 +02:00
|
|
|
int websocket_poll_fds(struct vnode *n, int fds[])
|
2017-08-30 13:30:31 +02:00
|
|
|
{
|
2017-10-18 15:39:53 +02:00
|
|
|
struct websocket *w = (struct websocket *) n->_vd;
|
2017-09-04 14:28:55 +02:00
|
|
|
|
2019-01-21 15:47:34 +01:00
|
|
|
fds[0] = queue_signalled_fd(&w->queue);
|
|
|
|
|
|
|
|
return 1;
|
2017-08-30 13:30:31 +02:00
|
|
|
}
|
|
|
|
|
2019-04-22 23:43:46 +02:00
|
|
|
__attribute__((constructor(110))) static void UNIQUE(__ctor)() {
|
2021-06-21 16:11:42 -04:00
|
|
|
p.name = "websocket";
|
|
|
|
p.description = "Send and receive samples of a WebSocket connection (libwebsockets)";
|
|
|
|
p.vectorize = 0;
|
|
|
|
p.size = sizeof(struct websocket);
|
|
|
|
p.type.start = websocket_type_start;
|
|
|
|
p.destroy = websocket_destroy;
|
|
|
|
p.parse = websocket_parse;
|
|
|
|
p.print = websocket_print;
|
|
|
|
p.start = websocket_start;
|
|
|
|
p.stop = websocket_stop;
|
|
|
|
p.read = websocket_read;
|
|
|
|
p.write = websocket_write;
|
|
|
|
p.poll_fds = websocket_poll_fds;
|
|
|
|
|
|
|
|
if (!node_types)
|
|
|
|
node_types = new NodeTypeList();
|
|
|
|
|
|
|
|
node_types->push_back(&p);
|
2019-04-22 23:43:46 +02:00
|
|
|
}
|