1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-16 00:00:02 +01:00
VILLASnode/lib/nodes/websocket.c

563 lines
No EOL
14 KiB
C

/** Node type: Websockets (libwebsockets)
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2014-2016, Institute for Automation of Complex Power Systems, EONERC
* This file is part of VILLASnode. All Rights Reserved. Proprietary and confidential.
* Unauthorized copying of this file, via any medium is strictly prohibited.
*********************************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <signal.h>
#include <libwebsockets.h>
#include <libconfig.h>
#include "nodes/websocket.h"
#include "timing.h"
#include "utils.h"
#include "msg.h"
#include "cfg.h"
#include "config.h"
/* Private static storage */
static config_setting_t *cfg_root; /**< Root config */
static pthread_t thread; /**< All nodes are served by a single websocket server. This server is running in a dedicated thread. */
static struct lws_context *context; /**< The libwebsockets server context. */
static int port; /**< Port of the build in HTTP / WebSocket server */
static const char *ssl_cert; /**< Path to the SSL certitifcate for HTTPS / WSS */
static const char *ssl_private_key; /**< Path to the SSL private key for HTTPS / WSS */
static const char *htdocs; /**< Path to the directory which should be served by build in HTTP server */
/* Forward declarations */
static struct node_type vt;
static int protocol_cb_http(struct lws *, enum lws_callback_reasons, void *, void *, size_t);
static int protocol_cb_live(struct lws *, enum lws_callback_reasons, void *, void *, size_t);
static struct lws_protocols protocols[] = {
{
"http-only",
protocol_cb_http,
0,
0
},
{
"live",
protocol_cb_live,
sizeof(struct websocket_connection),
0
},
{ 0 /* terminator */ }
};
#if 0
static const struct lws_extension exts[] = {
{
"permessage-deflate",
lws_extension_callback_pm_deflate,
"permessage-deflate"
},
{
"deflate-frame",
lws_extension_callback_pm_deflate,
"deflate_frame"
},
{ NULL, NULL, NULL /* terminator */ }
};
#endif
static void logger(int level, const char *msg) {
int len = strlen(msg);
if (strchr(msg, '\n'))
len -= 1;
/* Decrease severity for some errors. */
if (strstr(msg, "Unable to open") == msg)
level = LLL_WARN;
switch (level) {
case LLL_ERR: error("LWS: %.*s", len, msg); break;
case LLL_WARN: warn("LWS: %.*s", len, msg); break;
case LLL_INFO: info("LWS: %.*s", len, msg); break;
default: debug(DBG_WEBSOCKET | 1, "LWS: %.*s", len, msg); break;
}
}
static void * server_thread(void *ctx)
{
debug(DBG_WEBSOCKET | 3, "WebSocket: Started server thread");
while (lws_service(context, 10) >= 0);
debug(DBG_WEBSOCKET | 3, "WebSocket: shutdown voluntarily");
return NULL;
}
/* Choose mime type based on the file extension */
static char * get_mimetype(const char *resource_path)
{
char *extension = strrchr(resource_path, '.');
if (extension == NULL)
return "text/plain";
else if (!strcmp(extension, ".png"))
return "image/png";
else if (!strcmp(extension, ".svg"))
return "image/svg+xml";
else if (!strcmp(extension, ".jpg"))
return "image/jpg";
else if (!strcmp(extension, ".gif"))
return "image/gif";
else if (!strcmp(extension, ".html"))
return "text/html";
else if (!strcmp(extension, ".css"))
return "text/css";
else if (!strcmp(extension, ".js"))
return "application/javascript";
else
return "text/plain";
}
int protocol_cb_http(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
{
switch (reason) {
case LWS_CALLBACK_HTTP:
if (!htdocs) {
lws_return_http_status(wsi, HTTP_STATUS_SERVICE_UNAVAILABLE, NULL);
goto try_to_reuse;
}
if (len < 1) {
lws_return_http_status(wsi, HTTP_STATUS_BAD_REQUEST, NULL);
goto try_to_reuse;
}
char *requested_uri = (char *) in;
debug(DBG_WEBSOCKET | 3, "LWS: New HTTP request: %s", requested_uri);
/* Handle default path */
if (!strcmp(requested_uri, "/")) {
char *response = "HTTP/1.1 302 Found\r\n"
"Content-Length: 0\r\n"
"Location: /index.html\r\n"
"\r\n";
lws_write(wsi, (void *) response, strlen(response), LWS_WRITE_HTTP);
goto try_to_reuse;
}
#ifdef WITH_JANSSON
/* Return list of websocket nodes */
else if (!strcmp(requested_uri, "/nodes.json")) {
json_t *json_body = json_array();
list_foreach(struct node *n, &vt.instances) {
struct websocket *w = n->_vd;
json_t *json_node = json_pack("{ s: s, s: i, s: i, s: i, s: i }",
"name", node_name_short(n),
"connections", list_length(&w->connections),
"state", n->state,
"vectorize", n->vectorize,
"affinity", n->affinity
);
/* Add all additional fields of node here.
* This can be used for metadata */
json_object_update(json_node, config_to_json(n->cfg));
json_array_append_new(json_body, json_node);
}
char *body = json_dumps(json_body, JSON_INDENT(4));
char *header = "HTTP/1.1 200 OK\r\n"
"Connection: close\r\n"
"Content-Type: application/json\r\n"
"\r\n";
lws_write(wsi, (void *) header, strlen(header), LWS_WRITE_HTTP);
lws_write(wsi, (void *) body, strlen(body), LWS_WRITE_HTTP);
free(body);
json_decref(json_body);
return -1;
}
else if (!strcmp(requested_uri, "/config.json")) {
char *body = json_dumps(config_to_json(cfg_root), JSON_INDENT(4));
char *header = "HTTP/1.1 200 OK\r\n"
"Connection: close\r\n"
"Content-Type: application/json\r\n"
"\r\n";
lws_write(wsi, (void *) header, strlen(header), LWS_WRITE_HTTP);
lws_write(wsi, (void *) body, strlen(body), LWS_WRITE_HTTP);
free(body);
return -1;
}
#endif
else {
char path[4069];
snprintf(path, sizeof(path), "%s%s", htdocs, requested_uri);
/* refuse to serve files we don't understand */
char *mimetype = get_mimetype(path);
if (!mimetype) {
warn("HTTP: Unknown mimetype for %s", path);
lws_return_http_status(wsi, HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE, NULL);
return -1;
}
int n = lws_serve_http_file(wsi, path, mimetype, NULL, 0);
if (n < 0)
return -1;
else if (n == 0)
break;
else
goto try_to_reuse;
}
default:
break;
}
return 0;
try_to_reuse:
if (lws_http_transaction_completed(wsi))
return -1;
return 0;
}
int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
{
struct websocket_connection *c = user;
struct websocket *w;
switch (reason) {
case LWS_CALLBACK_CLIENT_ESTABLISHED:
case LWS_CALLBACK_ESTABLISHED: {
/* Get path of incoming request */
char uri[64];
lws_hdr_copy(wsi, uri, sizeof(uri), WSI_TOKEN_GET_URI); /* The path component of the*/
if (strlen(uri) <= 0) {
warn("LWS: Closing connection with invalid URL: %s", uri);
return -1;
}
/* Search for node whose name matches the URI. */
c->node = list_lookup(&vt.instances, uri + 1);
if (c->node == NULL) {
warn("LWS: Closing Connection for non-existent node: %s", uri + 1);
return -1;
}
/* Check if node is running */
if (c->node->state != NODE_RUNNING)
return -1;
c->state = WEBSOCKET_ESTABLISHED;
c->wsi = wsi;
/* Lookup peer address for debug output */
lws_get_peer_addresses(wsi, lws_get_socket_fd(wsi), c->peer.name, sizeof(c->peer.name), c->peer.ip, sizeof(c->peer.ip));
info("LWS: New Connection for node %s from %s (%s)", node_name(c->node), c->peer.name, c->peer.ip);
struct websocket *w = (struct websocket *) c->node->_vd;
list_push(&w->connections, c);
return 0;
}
case LWS_CALLBACK_CLOSED:
info("LWS: Connection closed for node %s from %s (%s)", node_name(c->node), c->peer.name, c->peer.ip);
c->state = WEBSOCKET_CLOSED;
c->wsi = NULL;
return 0;
case LWS_CALLBACK_CLIENT_WRITEABLE:
case LWS_CALLBACK_SERVER_WRITEABLE: {
w = (struct websocket *) c->node->_vd;
if (c->node->state != NODE_RUNNING)
return -1;
if (w->shutdown) {
lws_close_reason(wsi, LWS_CLOSE_STATUS_GOINGAWAY, (unsigned char *) "Bye", 4);
return -1;
}
int cnt, sent, ret;
unsigned char *bufs[DEFAULT_QUEUELEN];
cnt = queue_get_many(&w->queue_tx, (void **) bufs, DEFAULT_QUEUELEN, c->sent);
for (sent = 0; sent < cnt; sent++) {
struct msg *msg = (struct msg *) (bufs[sent] + LWS_PRE);
ret = lws_write(wsi, (unsigned char *) msg, MSG_LEN(msg->length), LWS_WRITE_BINARY);
if (ret < MSG_LEN(msg->length))
error("Failed lws_write()");
if (lws_send_pipe_choked(wsi))
break;
}
queue_pull_many(&w->queue_tx, (void **) bufs, sent, &c->sent);
pool_put_many(&w->pool, (void **) bufs, sent);
if (sent < cnt)
lws_callback_on_writable(wsi);
return 0;
}
case LWS_CALLBACK_CLIENT_RECEIVE:
case LWS_CALLBACK_RECEIVE: {
w = (struct websocket *) c->node->_vd;
if (c->node->state != NODE_RUNNING)
return -1;
if (!lws_frame_is_binary(wsi) || len < MSG_LEN(0))
warn("LWS: Received invalid packet for node: %s", node_name(c->node));
struct msg *msg = (struct msg *) in;
while ((char *) msg + MSG_LEN(msg->length) <= (char *) in + len) {
struct msg *msg2 = pool_get(&w->pool);
if (!msg2) {
warn("Pool underrun for node: %s", node_name(c->node));
return -1;
}
memcpy(msg2, msg, MSG_LEN(msg->length));
queue_push(&w->queue_rx, msg2, &c->received);
/* Next message */
msg = (struct msg *) ((char *) msg + MSG_LEN(msg->length));
}
/** @todo Implement */
return 0;
}
default:
return 0;
}
}
int websocket_init(int argc, char * argv[], config_setting_t *cfg)
{
config_setting_t *cfg_http;
lws_set_log_level((1 << LLL_COUNT) - 1, logger);
/* Parse global config */
cfg_http = config_setting_lookup(cfg, "http");
if (cfg_http) {
config_setting_lookup_string(cfg_http, "ssl_cert", &ssl_cert);
config_setting_lookup_string(cfg_http, "ssl_private_key", &ssl_private_key);
config_setting_lookup_string(cfg_http, "htdocs", &htdocs);
config_setting_lookup_int(cfg_http, "port", &port);
}
/* Default settings */
if (!port)
port = 80;
if (!htdocs)
htdocs = "/villas/contrib/websocket";
/* Start server */
struct lws_context_creation_info info = {
.port = port,
.protocols = protocols,
.extensions = NULL, //exts,
.ssl_cert_filepath = ssl_cert,
.ssl_private_key_filepath = ssl_private_key,
.gid = -1,
.uid = -1
};
context = lws_create_context(&info);
if (context == NULL)
error("WebSocket: failed to initialize server");
/* Save root config for GET /config.json request */
cfg_root = cfg;
pthread_create(&thread, NULL, server_thread, NULL);
return 0;
}
int websocket_deinit()
{
lws_cancel_service(context);
lws_context_destroy(context);
pthread_cancel(thread);
pthread_join(thread, NULL);
return 0;
}
int websocket_open(struct node *n)
{
struct websocket *w = n->_vd;
int ret;
list_init(&w->connections);
list_init(&w->destinations);
size_t blocklen = LWS_PRE + MSG_LEN(DEFAULT_VALUES);
ret = pool_init_mmap(&w->pool, blocklen, 2 * DEFAULT_QUEUELEN);
if (ret)
return ret;
ret = queue_init(&w->queue_tx, DEFAULT_QUEUELEN);
if (ret)
return ret;
ret = queue_init(&w->queue_rx, DEFAULT_QUEUELEN);
if (ret)
return ret;
queue_reader_add(&w->queue_rx, 0, 0);
return 0;
}
int websocket_close(struct node *n)
{
struct websocket *w = n->_vd;
w->shutdown = 1;
list_foreach(struct lws *wsi, &w->connections)
lws_callback_on_writable(wsi);
pool_destroy(&w->pool);
queue_destroy(&w->queue_tx);
list_destroy(&w->connections, NULL, false);
return 0;
}
int websocket_destroy(struct node *n)
{
// struct websocket *w = n->_vd;
return 0;
}
int websocket_read(struct node *n, struct sample *smps[], unsigned cnt)
{
struct websocket *w = n->_vd;
struct msg *msgs[cnt];
int got;
got = queue_pull_many(&w->queue_rx, (void **) msgs, cnt, &w->received);
for (int i = 0; i < got; i++) {
smps[i]->sequence = msgs[i]->sequence;
smps[i]->length = msgs[i]->length;
smps[i]->ts.origin = MSG_TS(msgs[i]);
memcpy(&smps[i]->data, &msgs[i]->data, MSG_DATA_LEN(msgs[i]->length));
}
pool_put_many(&w->pool, (void **) msgs, got);
return got;
}
int websocket_write(struct node *n, struct sample *smps[], unsigned cnt)
{
struct websocket *w = n->_vd;
int blocks, enqueued;
char *bufs[cnt];
/* Copy samples to websocket queue */
blocks = pool_get_many(&w->pool, (void **) bufs, cnt);
if (blocks != cnt)
warn("Pool underrun in websocket node: %s", node_name(n));
for (int i = 0; i < blocks; i++) {
struct msg *msg = (struct msg *) (bufs[i] + LWS_PRE);
msg->version = MSG_VERSION;
msg->type = MSG_TYPE_DATA;
msg->endian = MSG_ENDIAN_HOST;
msg->length = smps[i]->length;
msg->sequence = smps[i]->sequence;
msg->ts.sec = smps[i]->ts.origin.tv_sec;
msg->ts.nsec = smps[i]->ts.origin.tv_nsec;
memcpy(&msg->data, &smps[i]->data, smps[i]->length * 4);
}
enqueued = queue_push_many(&w->queue_tx, (void **) bufs, cnt, &w->sent);
if (enqueued != blocks)
warn("Queue overrun in websocket node: %s", node_name(n));
/* Notify all active websocket connections to send new data */
list_foreach(struct websocket_connection *c, &w->connections) {
switch (c->state) {
case WEBSOCKET_CLOSED:
queue_reader_remove(&w->queue_tx, c->sent, w->sent);
list_remove(&w->connections, c);
break;
case WEBSOCKET_ESTABLISHED:
c->sent = w->sent;
c->state = WEBSOCKET_ACTIVE;
queue_reader_add(&w->queue_tx, c->sent, w->sent);
case WEBSOCKET_ACTIVE:
lws_callback_on_writable(c->wsi);
break;
}
}
return cnt;
}
static struct node_type vt = {
.name = "websocket",
.description = "Send and receive samples of a WebSocket connection (libwebsockets)",
.vectorize = 0, /* unlimited */
.size = sizeof(struct websocket),
.open = websocket_open,
.close = websocket_close,
.destroy = websocket_destroy,
.read = websocket_read,
.write = websocket_write,
.init = websocket_init,
.deinit = websocket_deinit
};
REGISTER_NODE_TYPE(&vt)