mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
fixed most major bugs of web socket node-type
This commit is contained in:
parent
6d8401d88a
commit
8115a9caa2
6 changed files with 300 additions and 485 deletions
13
Makefile
13
Makefile
|
@ -41,8 +41,7 @@ LDFLAGS += -pthread -L. -Wl,-rpath,'$$ORIGIN'
|
|||
# pkg-config dependencies
|
||||
PKGS = libconfig
|
||||
|
||||
#DOCKEROPTS = -p 80:80 -p 443:443 --ulimit memlock=1073741824 --security-opt seccomp:unconfined
|
||||
DOCKEROPTS = -p 1234 --ulimit memlock=1073741824 --security-opt seccomp:unconfined
|
||||
DOCKEROPTS = -p 80:80 -p 443:443 -p 1234:1234 --ulimit memlock=1073741824 --security-opt seccomp:unconfined
|
||||
|
||||
# Add more compiler flags
|
||||
ifdef DEBUG
|
||||
|
@ -79,11 +78,11 @@ ifeq ($(shell pkg-config libcurl jansson uuid; echo $$?),0)
|
|||
PKGS += libcurl jansson uuid
|
||||
endif
|
||||
|
||||
## Enable WebSocket support
|
||||
#ifeq ($(shell pkg-config libwebsockets jansson; echo $$?),0)
|
||||
# LIB_OBJS += websocket.o websocket-live.o websocket-http.o
|
||||
# PKGS += libwebsockets jansson
|
||||
#endif
|
||||
# Enable WebSocket support
|
||||
ifeq ($(shell pkg-config libwebsockets jansson; echo $$?),0)
|
||||
LIB_OBJS += websocket.o
|
||||
PKGS += libwebsockets jansson
|
||||
endif
|
||||
|
||||
## Add support for LAPACK / BLAS benchmarks / solvers
|
||||
ifeq ($(shell pkg-config blas lapack; echo $$?),0)
|
||||
|
|
|
@ -20,20 +20,12 @@
|
|||
############ Dictionary of nodes ############
|
||||
|
||||
nodes = {
|
||||
file = {
|
||||
type = "file",
|
||||
#vectorize = 10,
|
||||
in = {
|
||||
path = "/test_fifo"
|
||||
}
|
||||
},
|
||||
|
||||
ws = {
|
||||
type = "websocket",
|
||||
unit = "MVa",
|
||||
units = [ "V", "A", "Var" ],
|
||||
description = "Das ist ein Test",
|
||||
vectorize = 100,
|
||||
#vectorize = 10,
|
||||
source = {
|
||||
simulator = "OP5600",
|
||||
location = "ACS lab"
|
||||
|
@ -52,8 +44,8 @@ nodes = {
|
|||
|
||||
paths = (
|
||||
{
|
||||
in = "file",
|
||||
in = "ws",
|
||||
out = "ws",
|
||||
hook = [ "stats_send:ws_stats" ]
|
||||
# hook = [ "stats_send:ws_stats" ]
|
||||
}
|
||||
);
|
||||
|
|
|
@ -20,19 +20,46 @@
|
|||
#define _WEBSOCKET_H_
|
||||
|
||||
#include "node.h"
|
||||
#include "pool.h"
|
||||
#include "queue.h"
|
||||
|
||||
/* Forward declaration */
|
||||
struct lws;
|
||||
|
||||
/** Internal data per websocket node */
|
||||
struct websocket {
|
||||
struct list connections; /**< List of active libwebsocket connections (struct websocket_connection) */
|
||||
struct list destinations; /**< List of struct lws_client_connect_info to connect to. */
|
||||
struct list connections; /**< List of active libwebsocket connections in server mode (struct websocket_connection) */
|
||||
struct list destinations; /**< List of struct lws_client_connect_info to connect to in client mode. */
|
||||
|
||||
struct websocket_connection *writer;
|
||||
struct pool pool;
|
||||
|
||||
struct queue queue_tx; /**< For samples which are sent to WebSockets */
|
||||
struct queue queue_rx; /**< For samples which are received from WebSockets */
|
||||
|
||||
qptr_t sent;
|
||||
qptr_t received;
|
||||
|
||||
int shutdown;
|
||||
};
|
||||
|
||||
struct websocket_connection {
|
||||
enum {
|
||||
WEBSOCKET_ESTABLISHED,
|
||||
WEBSOCKET_ACTIVE,
|
||||
WEBSOCKET_CLOSED
|
||||
} state;
|
||||
|
||||
struct node *node;
|
||||
struct path *path;
|
||||
|
||||
struct lws *wsi;
|
||||
|
||||
struct {
|
||||
char name[64];
|
||||
char ip[64];
|
||||
} peer;
|
||||
|
||||
qptr_t sent;
|
||||
qptr_t received;
|
||||
};
|
||||
|
||||
|
|
|
@ -1,154 +0,0 @@
|
|||
/** HTTP protocol of the websocket node type
|
||||
*
|
||||
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
|
||||
* @copyright 2014-2016, Institute for Automation of Complex Power Systems, EONERC
|
||||
* This file is part of VILLASnode. All Rights Reserved. Proprietary and confidential.
|
||||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
*********************************************************************************/
|
||||
|
||||
#include <libwebsockets.h>
|
||||
#include <libconfig.h>
|
||||
|
||||
#ifdef WITH_JANSSON
|
||||
#include <jansson.h>
|
||||
#endif
|
||||
|
||||
/* Choose mime type based on the file extension */
|
||||
static char * get_mimetype(const char *resource_path)
|
||||
{
|
||||
char *extension = strrchr(resource_path, '.');
|
||||
|
||||
if (extension == NULL)
|
||||
return "text/plain";
|
||||
else if (!strcmp(extension, ".png"))
|
||||
return "image/png";
|
||||
else if (!strcmp(extension, ".jpg"))
|
||||
return "image/jpg";
|
||||
else if (!strcmp(extension, ".gif"))
|
||||
return "image/gif";
|
||||
else if (!strcmp(extension, ".html"))
|
||||
return "text/html";
|
||||
else if (!strcmp(extension, ".css"))
|
||||
return "text/css";
|
||||
else if (!strcmp(extension, ".js"))
|
||||
return "application/javascript";
|
||||
else
|
||||
return "text/plain";
|
||||
}
|
||||
|
||||
static int protocol_cb_http(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
|
||||
{
|
||||
switch (reason) {
|
||||
case LWS_CALLBACK_HTTP:
|
||||
if (!htdocs) {
|
||||
lws_return_http_status(wsi, HTTP_STATUS_SERVICE_UNAVAILABLE, NULL);
|
||||
goto try_to_reuse;
|
||||
}
|
||||
|
||||
if (len < 1) {
|
||||
lws_return_http_status(wsi, HTTP_STATUS_BAD_REQUEST, NULL);
|
||||
goto try_to_reuse;
|
||||
}
|
||||
|
||||
char *requested_uri = (char *) in;
|
||||
|
||||
debug(DBG_WEBSOCKET | 3, "WebSocket: New HTTP request: %s", requested_uri);
|
||||
|
||||
/* Handle default path */
|
||||
if (!strcmp(requested_uri, "/")) {
|
||||
char *response = "HTTP/1.1 302 Found\r\n"
|
||||
"Content-Length: 0\r\n"
|
||||
"Location: /index.html\r\n"
|
||||
"\r\n";
|
||||
|
||||
lws_write(wsi, (void *) response, strlen(response), LWS_WRITE_HTTP);
|
||||
|
||||
goto try_to_reuse;
|
||||
}
|
||||
#ifdef WITH_JANSSON
|
||||
/* Return list of websocket nodes */
|
||||
else if (!strcmp(requested_uri, "/nodes.json")) {
|
||||
json_t *json_body = json_array();
|
||||
|
||||
list_foreach(struct node *n, &vt.instances) {
|
||||
struct websocket *w = n->_vd;
|
||||
|
||||
json_t *json_node = json_pack("{ s: s, s: i, s: i, s: i, s: i }",
|
||||
"name", node_name_short(n),
|
||||
"connections", list_length(&w->connections),
|
||||
"state", n->state,
|
||||
"vectorize", n->vectorize,
|
||||
"affinity", n->affinity
|
||||
);
|
||||
|
||||
/* Add all additional fields of node here.
|
||||
* This can be used for metadata */
|
||||
json_object_update(json_node, config_to_json(n->cfg));
|
||||
|
||||
json_array_append_new(json_body, json_node);
|
||||
}
|
||||
|
||||
char *body = json_dumps(json_body, JSON_INDENT(4));
|
||||
|
||||
char *header = "HTTP/1.1 200 OK\r\n"
|
||||
"Connection: close\r\n"
|
||||
"Content-Type: application/json\r\n"
|
||||
"\r\n";
|
||||
|
||||
lws_write(wsi, (void *) header, strlen(header), LWS_WRITE_HTTP);
|
||||
lws_write(wsi, (void *) body, strlen(body), LWS_WRITE_HTTP);
|
||||
|
||||
free(body);
|
||||
json_decref(json_body);
|
||||
|
||||
return -1;
|
||||
}
|
||||
else if (!strcmp(requested_uri, "/config.json")) {
|
||||
char *body = json_dumps(config_to_json(cfg_root), JSON_INDENT(4));
|
||||
|
||||
char *header = "HTTP/1.1 200 OK\r\n"
|
||||
"Connection: close\r\n"
|
||||
"Content-Type: application/json\r\n"
|
||||
"\r\n";
|
||||
|
||||
lws_write(wsi, (void *) header, strlen(header), LWS_WRITE_HTTP);
|
||||
lws_write(wsi, (void *) body, strlen(body), LWS_WRITE_HTTP);
|
||||
|
||||
free(body);
|
||||
|
||||
return -1;
|
||||
}
|
||||
#endif
|
||||
else {
|
||||
char path[4069];
|
||||
snprintf(path, sizeof(path), "%s%s", htdocs, requested_uri);
|
||||
|
||||
/* refuse to serve files we don't understand */
|
||||
char *mimetype = get_mimetype(path);
|
||||
if (!mimetype) {
|
||||
warn("HTTP: Unknown mimetype for %s", path);
|
||||
lws_return_http_status(wsi, HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE, NULL);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int n = lws_serve_http_file(wsi, path, mimetype, NULL, 0);
|
||||
if (n < 0)
|
||||
return -1;
|
||||
else if (n == 0)
|
||||
break;
|
||||
else
|
||||
goto try_to_reuse;
|
||||
}
|
||||
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
||||
try_to_reuse:
|
||||
if (lws_http_transaction_completed(wsi))
|
||||
return -1;
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -1,133 +0,0 @@
|
|||
/** Live protocol of the websocket node type
|
||||
*
|
||||
* This protocol callback function is used to handle the binary websocket protoocol
|
||||
* which is used to send / receive struct msg's.
|
||||
*
|
||||
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
|
||||
* @copyright 2014-2016, Institute for Automation of Complex Power Systems, EONERC
|
||||
* This file is part of VILLASnode. All Rights Reserved. Proprietary and confidential.
|
||||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
*********************************************************************************/
|
||||
|
||||
static int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
|
||||
{
|
||||
struct conn *c = user;
|
||||
|
||||
switch (reason) {
|
||||
case LWS_CALLBACK_WSI_CREATE:
|
||||
c->wsi = wsi;
|
||||
return 0;
|
||||
|
||||
case LWS_CALLBACK_WSI_DESTROY:
|
||||
c->wsi = NULL;
|
||||
|
||||
connection_destroy(c); /* release c->peer.ip & c->peer.name */
|
||||
return 0;
|
||||
|
||||
case LWS_CALLBACK_CLIENT_ESTABLISHED:
|
||||
case LWS_CALLBACK_ESTABLISHED:
|
||||
c->state = CONN_STATE_ESTABLISHED;
|
||||
c->role = (reason == LWS_CALLBACK_ESTABLISHED)
|
||||
? CONN_ROLE_SERVER
|
||||
: CONN_ROLE_CLIENT;
|
||||
|
||||
/* Get path of incoming request */
|
||||
char uri[64]
|
||||
lws_hdr_copy(wsi, uri, sizeof(uri), WSI_TOKEN_GET_URI); /* The path component of the*/
|
||||
if (strlen(uri) <= 0) {
|
||||
warn("WebSocket: Closing connection with invalid URL: %s")
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* Search for node whose name matches the URI. */
|
||||
c->node = list_lookup(&vt.instances, uri + 1);
|
||||
if (c->node == NULL) {
|
||||
warn("WebSocket: Closing Connection for non-existent node: %s", uri + 1);
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* Check if node is running */
|
||||
if (c->node.state != NODE_RUNNING)
|
||||
return -1;
|
||||
|
||||
/* Alias to ease readability */
|
||||
c->ws = n->_vd;
|
||||
|
||||
/* Lookup peer address for debug output */
|
||||
c->peer.name = alloc(64);
|
||||
c->peer.ip = alloc(64);
|
||||
lws_get_peer_addresses(wsi, lws_get_socket_fd(wsi), c->peer.name, 64, c->peer.ip, 64);
|
||||
|
||||
info("WebSocket: New Connection for node %s from %s (%s)", node_name(c->node), c->peer.name, c->peer.ip);
|
||||
list_push(&c->ws->connections, c);
|
||||
|
||||
return 0;
|
||||
|
||||
case LWS_CALLBACK_CLOSED:
|
||||
c->state = CONN_STATE_CLOSED;
|
||||
|
||||
info("WebSocket: Connection closed for node %s from %s (%s)", node_name(c->node), c->peer.name, c->peer.ip);
|
||||
list_remove(&c->ws->connections, c);
|
||||
|
||||
return 0;
|
||||
|
||||
case LWS_CALLBACK_CLIENT_WRITABLE:
|
||||
case LWS_CALLBACK_SERVER_WRITEABLE:
|
||||
if (c->node.state != NODE_RUNNING)
|
||||
return -1;
|
||||
|
||||
int sent, sz, remain;
|
||||
struct queue *q = &c->tx_queue;
|
||||
|
||||
pthread_mutex_lock(&q->lock);
|
||||
|
||||
sz = q->tail - q->head;
|
||||
if (len == 0)
|
||||
goto out; /* nothing to sent at the moment */
|
||||
|
||||
sent = lws_write(wsi, q->head, sz, LWS_WRITE_BINARY);
|
||||
if (sent < 0)
|
||||
goto out;
|
||||
|
||||
/* Move unsent part to head of queue */
|
||||
remain = sz - sent;
|
||||
if (remain > 0)
|
||||
memmove(q->head, q->head + sent, remain);
|
||||
|
||||
/* Update queue tail */
|
||||
q->tail = q->head + remain;
|
||||
|
||||
out: pthread_mutex_unlock(&q->lock);
|
||||
|
||||
return (sent < 0) ? -1 : 0;
|
||||
|
||||
case LWS_CALLBACK_CLIENT_RECEIVE:
|
||||
case LWS_CALLBACK_RECEIVE:
|
||||
if (c->node.state != NODE_RUNNING)
|
||||
return -1;
|
||||
|
||||
if (!lws_frame_is_binary(wsi)) {
|
||||
warn("WebSocket: Received non-binary frame for node %s", node_name(c->node));
|
||||
return -1;
|
||||
}
|
||||
|
||||
int ret = 0;
|
||||
struct queue *q = &w->rx_queue;
|
||||
|
||||
pthread_mutex_lock(&q->lock);
|
||||
|
||||
|
||||
|
||||
memcpy(q->tail, in, len);
|
||||
|
||||
q->tail += len;
|
||||
|
||||
pthread_cond_broadcast(&q->cond);
|
||||
out2: pthread_mutex_unlock(&q->lock);
|
||||
|
||||
return ret;
|
||||
|
||||
default:
|
||||
return 0;
|
||||
}
|
||||
}
|
|
@ -15,21 +15,12 @@
|
|||
#include <libwebsockets.h>
|
||||
#include <libconfig.h>
|
||||
|
||||
#ifdef WITH_JANSSON
|
||||
#include <jansson.h>
|
||||
#endif
|
||||
|
||||
#include "websocket.h"
|
||||
#include "nodes/websocket.h"
|
||||
#include "timing.h"
|
||||
#include "utils.h"
|
||||
#include "msg.h"
|
||||
#include "cfg.h"
|
||||
|
||||
/* Forward declarations */
|
||||
static struct node_type vt;
|
||||
|
||||
static int protocol_cb_http(struct lws *, enum lws_callback_reasons, void *, void *, size_t);
|
||||
static int protocol_cb_live(struct lws *, enum lws_callback_reasons, void *, void *, size_t);
|
||||
#include "config.h"
|
||||
|
||||
/* Private static storage */
|
||||
static config_setting_t *cfg_root; /**< Root config */
|
||||
|
@ -42,12 +33,71 @@ static const char *ssl_cert; /**< Path to the SSL certitifcate for HTTPS / WSS
|
|||
static const char *ssl_private_key; /**< Path to the SSL private key for HTTPS / WSS */
|
||||
static const char *htdocs; /**< Path to the directory which should be served by build in HTTP server */
|
||||
|
||||
/* Forward declarations */
|
||||
static struct node_type vt;
|
||||
static int protocol_cb_http(struct lws *, enum lws_callback_reasons, void *, void *, size_t);
|
||||
static int protocol_cb_live(struct lws *, enum lws_callback_reasons, void *, void *, size_t);
|
||||
|
||||
static struct lws_protocols protocols[] = {
|
||||
{ "http-only", protocol_cb_http, 0, 0 },
|
||||
{ "live", protocol_cb_live, sizeof(struct node *), 10 },
|
||||
{ 0 } /* terminator */
|
||||
{
|
||||
"http-only",
|
||||
protocol_cb_http,
|
||||
0,
|
||||
0
|
||||
},
|
||||
{
|
||||
"live",
|
||||
protocol_cb_live,
|
||||
sizeof(struct websocket_connection),
|
||||
0
|
||||
},
|
||||
{ 0 /* terminator */ }
|
||||
};
|
||||
|
||||
#if 0
|
||||
static const struct lws_extension exts[] = {
|
||||
{
|
||||
"permessage-deflate",
|
||||
lws_extension_callback_pm_deflate,
|
||||
"permessage-deflate"
|
||||
},
|
||||
{
|
||||
"deflate-frame",
|
||||
lws_extension_callback_pm_deflate,
|
||||
"deflate_frame"
|
||||
},
|
||||
{ NULL, NULL, NULL /* terminator */ }
|
||||
};
|
||||
#endif
|
||||
|
||||
static void logger(int level, const char *msg) {
|
||||
int len = strlen(msg);
|
||||
if (strchr(msg, '\n'))
|
||||
len -= 1;
|
||||
|
||||
/* Decrease severity for some errors. */
|
||||
if (strstr(msg, "Unable to open") == msg)
|
||||
level = LLL_WARN;
|
||||
|
||||
switch (level) {
|
||||
case LLL_ERR: error("LWS: %.*s", len, msg); break;
|
||||
case LLL_WARN: warn("LWS: %.*s", len, msg); break;
|
||||
case LLL_INFO: info("LWS: %.*s", len, msg); break;
|
||||
default: debug(DBG_WEBSOCKET | 1, "LWS: %.*s", len, msg); break;
|
||||
}
|
||||
}
|
||||
|
||||
static void * server_thread(void *ctx)
|
||||
{
|
||||
debug(DBG_WEBSOCKET | 3, "WebSocket: Started server thread");
|
||||
|
||||
while (lws_service(context, 10) >= 0);
|
||||
|
||||
debug(DBG_WEBSOCKET | 3, "WebSocket: shutdown voluntarily");
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Choose mime type based on the file extension */
|
||||
static char * get_mimetype(const char *resource_path)
|
||||
{
|
||||
|
@ -57,6 +107,8 @@ static char * get_mimetype(const char *resource_path)
|
|||
return "text/plain";
|
||||
else if (!strcmp(extension, ".png"))
|
||||
return "image/png";
|
||||
else if (!strcmp(extension, ".svg"))
|
||||
return "image/svg+xml";
|
||||
else if (!strcmp(extension, ".jpg"))
|
||||
return "image/jpg";
|
||||
else if (!strcmp(extension, ".gif"))
|
||||
|
@ -71,10 +123,10 @@ static char * get_mimetype(const char *resource_path)
|
|||
return "text/plain";
|
||||
}
|
||||
|
||||
static int protocol_cb_http(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
|
||||
int protocol_cb_http(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
|
||||
{
|
||||
switch (reason) {
|
||||
case LWS_CALLBACK_HTTP:
|
||||
case LWS_CALLBACK_HTTP:
|
||||
if (!htdocs) {
|
||||
lws_return_http_status(wsi, HTTP_STATUS_SERVICE_UNAVAILABLE, NULL);
|
||||
goto try_to_reuse;
|
||||
|
@ -87,7 +139,7 @@ static int protocol_cb_http(struct lws *wsi, enum lws_callback_reasons reason, v
|
|||
|
||||
char *requested_uri = (char *) in;
|
||||
|
||||
debug(DBG_WEBSOCKET | 3, "WebSocket: New HTTP request: %s", requested_uri);
|
||||
debug(DBG_WEBSOCKET | 3, "LWS: New HTTP request: %s", requested_uri);
|
||||
|
||||
/* Handle default path */
|
||||
if (!strcmp(requested_uri, "/")) {
|
||||
|
@ -127,7 +179,7 @@ static int protocol_cb_http(struct lws *wsi, enum lws_callback_reasons reason, v
|
|||
|
||||
char *header = "HTTP/1.1 200 OK\r\n"
|
||||
"Connection: close\r\n"
|
||||
"Content-Type: application/json\r\n"
|
||||
"Content-Type: application/json\r\n"
|
||||
"\r\n";
|
||||
|
||||
lws_write(wsi, (void *) header, strlen(header), LWS_WRITE_HTTP);
|
||||
|
@ -143,7 +195,7 @@ static int protocol_cb_http(struct lws *wsi, enum lws_callback_reasons reason, v
|
|||
|
||||
char *header = "HTTP/1.1 200 OK\r\n"
|
||||
"Connection: close\r\n"
|
||||
"Content-Type: application/json\r\n"
|
||||
"Content-Type: application/json\r\n"
|
||||
"\r\n";
|
||||
|
||||
lws_write(wsi, (void *) header, strlen(header), LWS_WRITE_HTTP);
|
||||
|
@ -188,174 +240,128 @@ try_to_reuse:
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
|
||||
int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
|
||||
{
|
||||
struct node *n;
|
||||
struct websocket_connection *c = user;
|
||||
struct websocket *w;
|
||||
|
||||
char *buf, uri[1024];
|
||||
|
||||
switch (reason) {
|
||||
case LWS_CALLBACK_ESTABLISHED:
|
||||
case LWS_CALLBACK_CLIENT_ESTABLISHED:
|
||||
case LWS_CALLBACK_ESTABLISHED: {
|
||||
/* Get path of incoming request */
|
||||
char uri[64];
|
||||
lws_hdr_copy(wsi, uri, sizeof(uri), WSI_TOKEN_GET_URI); /* The path component of the*/
|
||||
|
||||
/* Search for node which matches uri */
|
||||
list_foreach(n, &vt.instances) {
|
||||
if (!strcmp(n->name, uri + 1)) /* we skip leading '/' */
|
||||
goto found;
|
||||
if (strlen(uri) <= 0) {
|
||||
warn("LWS: Closing connection with invalid URL: %s", uri);
|
||||
return -1;
|
||||
}
|
||||
|
||||
warn("WebSocket: Closing Connection for non-existent node: %s", uri + 1);
|
||||
|
||||
return -1;
|
||||
|
||||
found: * (void **) user = n;
|
||||
w = n->_vd;
|
||||
|
||||
if (w->shutdown)
|
||||
goto shutdown;
|
||||
/* Search for node whose name matches the URI. */
|
||||
c->node = list_lookup(&vt.instances, uri + 1);
|
||||
if (c->node == NULL) {
|
||||
warn("LWS: Closing Connection for non-existent node: %s", uri + 1);
|
||||
return -1;
|
||||
}
|
||||
|
||||
list_push(&w->connections, wsi);
|
||||
/* Check if node is running */
|
||||
if (c->node->state != NODE_RUNNING)
|
||||
return -1;
|
||||
|
||||
/* Get peer information */
|
||||
char client_name[128], client_ip[128];
|
||||
lws_get_peer_addresses(wsi, lws_get_socket_fd(wsi),
|
||||
client_name, sizeof(client_name),
|
||||
client_ip, sizeof(client_ip)
|
||||
);
|
||||
c->state = WEBSOCKET_ESTABLISHED;
|
||||
c->wsi = wsi;
|
||||
|
||||
info("WebSocket: New Connection for node: %s from %s (%s)", node_name(n), client_name, client_ip);
|
||||
/* Lookup peer address for debug output */
|
||||
lws_get_peer_addresses(wsi, lws_get_socket_fd(wsi), c->peer.name, sizeof(c->peer.name), c->peer.ip, sizeof(c->peer.ip));
|
||||
|
||||
info("LWS: New Connection for node %s from %s (%s)", node_name(c->node), c->peer.name, c->peer.ip);
|
||||
|
||||
struct websocket *w = (struct websocket *) c->node->_vd;
|
||||
list_push(&w->connections, c);
|
||||
|
||||
return 0;
|
||||
|
||||
}
|
||||
|
||||
case LWS_CALLBACK_CLOSED:
|
||||
n = * (struct node **) user;
|
||||
w = n->_vd;
|
||||
|
||||
list_remove(&w->connections, wsi);
|
||||
info("LWS: Connection closed for node %s from %s (%s)", node_name(c->node), c->peer.name, c->peer.ip);
|
||||
|
||||
info("WebSocket: Connection closed for node: %s", node_name(n));
|
||||
c->state = WEBSOCKET_CLOSED;
|
||||
c->wsi = NULL;
|
||||
|
||||
return 0;
|
||||
|
||||
case LWS_CALLBACK_CLIENT_WRITEABLE:
|
||||
case LWS_CALLBACK_SERVER_WRITEABLE: {
|
||||
w = (struct websocket *) c->node->_vd;
|
||||
|
||||
if (c->node->state != NODE_RUNNING)
|
||||
return -1;
|
||||
|
||||
if (w->shutdown) {
|
||||
lws_close_reason(wsi, LWS_CLOSE_STATUS_GOINGAWAY, (unsigned char *) "Bye", 4);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
int cnt, sent, ret;
|
||||
unsigned char *bufs[DEFAULT_QUEUELEN];
|
||||
|
||||
cnt = queue_get_many(&w->queue_tx, (void **) bufs, DEFAULT_QUEUELEN, c->sent);
|
||||
|
||||
for (sent = 0; sent < cnt; sent++) {
|
||||
struct msg *msg = (struct msg *) (bufs[sent] + LWS_PRE);
|
||||
|
||||
ret = lws_write(wsi, (unsigned char *) msg, MSG_LEN(msg->length), LWS_WRITE_BINARY);
|
||||
if (ret < MSG_LEN(msg->length))
|
||||
error("Failed lws_write()");
|
||||
|
||||
if (lws_send_pipe_choked(wsi))
|
||||
break;
|
||||
}
|
||||
|
||||
queue_pull_many(&w->queue_tx, (void **) bufs, sent, &c->sent);
|
||||
|
||||
pool_put_many(&w->pool, (void **) bufs, sent);
|
||||
|
||||
if (sent < cnt)
|
||||
lws_callback_on_writable(wsi);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
case LWS_CALLBACK_CLIENT_RECEIVE:
|
||||
case LWS_CALLBACK_RECEIVE: {
|
||||
w = (struct websocket *) c->node->_vd;
|
||||
|
||||
if (c->node->state != NODE_RUNNING)
|
||||
return -1;
|
||||
|
||||
if (!lws_frame_is_binary(wsi) || len < MSG_LEN(0))
|
||||
warn("LWS: Received invalid packet for node: %s", node_name(c->node));
|
||||
|
||||
struct msg *msg = (struct msg *) in;
|
||||
|
||||
while ((char *) msg + MSG_LEN(msg->length) <= (char *) in + len) {
|
||||
struct msg *msg2 = pool_get(&w->pool);
|
||||
if (!msg2) {
|
||||
warn("Pool underrun for node: %s", node_name(c->node));
|
||||
return -1;
|
||||
}
|
||||
|
||||
memcpy(msg2, msg, MSG_LEN(msg->length));
|
||||
|
||||
queue_push(&w->queue_rx, msg2, &c->received);
|
||||
|
||||
/* Next message */
|
||||
msg = (struct msg *) ((char *) msg + MSG_LEN(msg->length));
|
||||
}
|
||||
|
||||
case LWS_CALLBACK_SERVER_WRITEABLE:
|
||||
n = * (struct node **) user;
|
||||
if (!n)
|
||||
return -1;
|
||||
|
||||
w = n->_vd;
|
||||
if (!w)
|
||||
return -1;
|
||||
|
||||
if (w->shutdown)
|
||||
goto shutdown;
|
||||
|
||||
pthread_mutex_lock(&w->write.mutex);
|
||||
|
||||
if (w->write.pool == NULL || w->write.cnt == 0)
|
||||
return 0; /* no samples available to send */
|
||||
|
||||
/* Calculate required buffer size */
|
||||
size_t bytes = 0;
|
||||
for (int i = 0; i < w->write.cnt; i++) {
|
||||
struct msg *src = pool_getrel(w->write.pool, i);
|
||||
bytes += MSG_LEN(src->values);
|
||||
}
|
||||
|
||||
/* Allocate buffer */
|
||||
buf = malloc(LWS_SEND_BUFFER_PRE_PADDING + bytes);
|
||||
|
||||
/* Fill buffer */
|
||||
for (int i = 0; i < w->write.cnt; i++) {
|
||||
struct msg *src = pool_getrel(w->write.pool, i);
|
||||
struct msg *dst = (struct msg *) (buf + len + LWS_SEND_BUFFER_PRE_PADDING);
|
||||
|
||||
size_t bytes = MSG_LEN(src->values);
|
||||
len += bytes;
|
||||
|
||||
memcpy(dst, src, bytes);
|
||||
}
|
||||
|
||||
/* We've done our work here. Do not send again.. */
|
||||
w->write.pool = NULL;
|
||||
w->write.pool = 0;
|
||||
|
||||
pthread_mutex_unlock(&w->write.mutex);
|
||||
|
||||
lws_write(wsi, (unsigned char *) (buf + LWS_SEND_BUFFER_PRE_PADDING), len, LWS_WRITE_BINARY);
|
||||
|
||||
/** @todo Implement */
|
||||
return 0;
|
||||
}
|
||||
|
||||
case LWS_CALLBACK_RECEIVE:
|
||||
n = * (struct node **) user;
|
||||
if (!n)
|
||||
return -1;
|
||||
|
||||
w = n->_vd;
|
||||
|
||||
if (w->read.pool == NULL || w->read.cnt == 0)
|
||||
return 0;
|
||||
|
||||
pthread_mutex_lock(&w->read.mutex);
|
||||
|
||||
size_t offset = 0;
|
||||
for (int i = 0; i < w->read.cnt; i++) {
|
||||
struct msg *dst = pool_getrel(w->read.pool, i);
|
||||
struct msg *src = (struct msg *) in + offset;
|
||||
|
||||
memcpy(dst, src, MSG_LEN(src->values));
|
||||
|
||||
offset += MSG_LEN(src->values);
|
||||
if (offset >= len)
|
||||
break;
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&w->read.mutex);
|
||||
pthread_cond_broadcast(&w->read.cond); /* new data available, wake-up websocket_read() */
|
||||
|
||||
return 0;
|
||||
|
||||
default:
|
||||
return 0;
|
||||
}
|
||||
|
||||
shutdown:
|
||||
warn("Dropping connection: node is currently shutting down");
|
||||
|
||||
#if LWS_LIBRARY_VERSION_NUMBER > 1006002
|
||||
char *bye = "VILLASnode is shutting down. Bye";
|
||||
lws_close_reason(wsi, LWS_CLOSE_STATUS_GOINGAWAY, (unsigned char *) bye, strlen(bye));
|
||||
#endif
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
static void logger(int level, const char *msg) {
|
||||
int len = strlen(msg);
|
||||
if (strchr(msg, '\n'))
|
||||
len -= 1;
|
||||
|
||||
/* Decrease severity for some errors. */
|
||||
if (strstr(msg, "Unable to open") == msg)
|
||||
level = LLL_WARN;
|
||||
|
||||
switch (level) {
|
||||
case LLL_ERR: error("WebSocket: %.*s", len, msg); break;
|
||||
case LLL_WARN: warn("WebSocket: %.*s", len, msg); break;
|
||||
case LLL_INFO: info("WebSocket: %.*s", len, msg); break;
|
||||
default: debug(DBG_WEBSOCKET | 1, "WebSocket: %.*s", len, msg); break;
|
||||
}
|
||||
}
|
||||
|
||||
static void * server_thread(void *ctx)
|
||||
{
|
||||
debug(DBG_WEBSOCKET | 3, "WebSocket: Started server thread");
|
||||
|
||||
while (lws_service(context, 10) >= 0);
|
||||
|
||||
debug(DBG_WEBSOCKET | 3, "WebSocket: shutdown voluntarily");
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int websocket_init(int argc, char * argv[], config_setting_t *cfg)
|
||||
|
@ -383,7 +389,7 @@ int websocket_init(int argc, char * argv[], config_setting_t *cfg)
|
|||
struct lws_context_creation_info info = {
|
||||
.port = port,
|
||||
.protocols = protocols,
|
||||
.extensions = lws_get_internal_extensions(),
|
||||
.extensions = NULL, //exts,
|
||||
.ssl_cert_filepath = ssl_cert,
|
||||
.ssl_private_key_filepath = ssl_private_key,
|
||||
.gid = -1,
|
||||
|
@ -417,9 +423,27 @@ int websocket_open(struct node *n)
|
|||
{
|
||||
struct websocket *w = n->_vd;
|
||||
|
||||
int ret;
|
||||
|
||||
list_init(&w->connections);
|
||||
list_init(&w->destinations);
|
||||
|
||||
size_t blocklen = LWS_PRE + MSG_LEN(DEFAULT_VALUES);
|
||||
|
||||
ret = pool_init_mmap(&w->pool, blocklen, 2 * DEFAULT_QUEUELEN);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
ret = queue_init(&w->queue_tx, DEFAULT_QUEUELEN);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
ret = queue_init(&w->queue_rx, DEFAULT_QUEUELEN);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
queue_reader_add(&w->queue_rx, 0, 0);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -432,38 +456,98 @@ int websocket_close(struct node *n)
|
|||
list_foreach(struct lws *wsi, &w->connections)
|
||||
lws_callback_on_writable(wsi);
|
||||
|
||||
/** @todo Is is safe? */
|
||||
list_destroy(&w->connections);
|
||||
pool_destroy(&w->pool);
|
||||
queue_destroy(&w->queue_tx);
|
||||
|
||||
list_destroy(&w->connections, NULL, false);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int websocket_destroy(struct node *n)
|
||||
{
|
||||
struct websocket *w = n->_vd;
|
||||
// struct websocket *w = n->_vd;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int websocket_read(struct node *n, struct pool *pool, unsigned cnt)
|
||||
int websocket_read(struct node *n, struct sample *smps[], unsigned cnt)
|
||||
{
|
||||
struct websocket *w = n->_vd;
|
||||
|
||||
/* Check for new websocket connections and more readers to queue */
|
||||
list_foreach(struct)
|
||||
struct msg *msgs[cnt];
|
||||
|
||||
int got;
|
||||
|
||||
got = queue_pull_many(&w->queue_rx, (void **) msgs, cnt, &w->received);
|
||||
|
||||
if (got)
|
||||
info("got %u", got);
|
||||
|
||||
for (int i = 0; i < got; i++) {
|
||||
smps[i]->sequence = msgs[i]->sequence;
|
||||
smps[i]->length = msgs[i]->length;
|
||||
smps[i]->ts.origin = MSG_TS(msgs[i]);
|
||||
|
||||
memcpy(&smps[i]->data, &msgs[i]->data, MSG_DATA_LEN(msgs[i]->length));
|
||||
}
|
||||
|
||||
pool_put_many(&w->pool, (void **) msgs, got);
|
||||
|
||||
return cnt;
|
||||
return got;
|
||||
}
|
||||
|
||||
int websocket_write(struct node *n, struct pool *pool, unsigned cnt)
|
||||
int websocket_write(struct node *n, struct sample *smps[], unsigned cnt)
|
||||
{
|
||||
struct websocket *w = n->_vd;
|
||||
|
||||
int blocks, enqueued;
|
||||
char *bufs[cnt];
|
||||
|
||||
/* Copy samples to websocket queue */
|
||||
blocks = pool_get_many(&w->pool, (void **) bufs, cnt);
|
||||
if (blocks != cnt)
|
||||
warn("Pool underrun in websocket node: %s", node_name(n));
|
||||
|
||||
for (int i = 0; i < blocks; i++) {
|
||||
struct msg *msg = (struct msg *) (bufs[i] + LWS_PRE);
|
||||
|
||||
msg->version = MSG_VERSION;
|
||||
msg->type = MSG_TYPE_DATA;
|
||||
msg->endian = MSG_ENDIAN_HOST;
|
||||
msg->length = smps[i]->length;
|
||||
msg->sequence = smps[i]->sequence;
|
||||
msg->ts.sec = smps[i]->ts.origin.tv_sec;
|
||||
msg->ts.nsec = smps[i]->ts.origin.tv_nsec;
|
||||
|
||||
memcpy(&msg->data, &smps[i]->data, smps[i]->length * 4);
|
||||
}
|
||||
|
||||
enqueued = queue_push_many(&w->queue_tx, (void **) bufs, cnt, &w->sent);
|
||||
if (enqueued != blocks)
|
||||
warn("Queue overrun in websocket node: %s", node_name(n));
|
||||
|
||||
/* Notify all active websocket connections to send new data */
|
||||
list_foreach(struct lws *wsi, &w->connections)
|
||||
lws_callback_on_writable(wsi);
|
||||
list_foreach(struct websocket_connection *c, &w->connections) {
|
||||
switch (c->state) {
|
||||
case WEBSOCKET_CLOSED:
|
||||
queue_reader_remove(&w->queue_tx, c->sent, w->sent);
|
||||
list_remove(&w->connections, c);
|
||||
break;
|
||||
|
||||
case WEBSOCKET_ESTABLISHED:
|
||||
c->sent = w->sent;
|
||||
c->state = WEBSOCKET_ACTIVE;
|
||||
|
||||
queue_reader_add(&w->queue_tx, c->sent, w->sent);
|
||||
|
||||
case WEBSOCKET_ACTIVE:
|
||||
lws_callback_on_writable(c->wsi);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return 1;
|
||||
return cnt;
|
||||
}
|
||||
|
||||
static struct node_type vt = {
|
||||
|
|
Loading…
Add table
Reference in a new issue