mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
reenabled websocket node type
This commit is contained in:
parent
d7e8731d98
commit
57d19bfff9
3 changed files with 27 additions and 222 deletions
|
@ -48,12 +48,12 @@ endif
|
|||
endif
|
||||
|
||||
# Enable WebSocket support
|
||||
#ifndef WITHOUT_WEBSOCKETS
|
||||
ifndef WITHOUT_WEBSOCKETS
|
||||
ifeq ($(shell pkg-config libwebsockets jansson; echo $$?),0)
|
||||
# LIB_SRCS += lib/nodes/websocket.c
|
||||
LIB_SRCS += lib/nodes/websocket.c
|
||||
LIB_PKGS += libwebsockets jansson
|
||||
endif
|
||||
#endif
|
||||
endif
|
||||
|
||||
# Enable OPAL-RT Asynchronous Process support (will result in 32bit binary!!!)
|
||||
ifdef WITH_OPAL
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
#include "msg.h"
|
||||
#include "cfg.h"
|
||||
#include "config.h"
|
||||
#include "plugin.h"
|
||||
|
||||
/* Internal datastructures */
|
||||
struct destination {
|
||||
|
@ -27,44 +28,26 @@ struct destination {
|
|||
};
|
||||
|
||||
/* Private static storage */
|
||||
static config_setting_t *cfg_root; /**< Root config */
|
||||
static pthread_t thread; /**< All nodes are served by a single websocket server. This server is running in a dedicated thread. */
|
||||
static struct lws_context *context; /**< The libwebsockets server context. */
|
||||
static int id = 0; /**< Highest assigned ID to websocket nodes. */
|
||||
|
||||
static int port; /**< Port of the build in HTTP / WebSocket server */
|
||||
|
||||
static const char *ssl_cert; /**< Path to the SSL certitifcate for HTTPS / WSS */
|
||||
static const char *ssl_private_key; /**< Path to the SSL private key for HTTPS / WSS */
|
||||
static const char *htdocs; /**< Path to the directory which should be served by build in HTTP server */
|
||||
|
||||
static int id = 0;
|
||||
|
||||
struct list connections; /**< List of active libwebsocket connections which receive samples from all nodes (catch all) */
|
||||
struct list connections; /**< List of active libwebsocket connections which receive samples from all nodes (catch all) */
|
||||
|
||||
/* Forward declarations */
|
||||
static struct node_type vt;
|
||||
static int protocol_cb_http(struct lws *, enum lws_callback_reasons, void *, void *, size_t);
|
||||
static int protocol_cb_live(struct lws *, enum lws_callback_reasons, void *, void *, size_t);
|
||||
|
||||
static struct lws_protocols protocols[] = {
|
||||
{ "http-only", protocol_cb_http, 0, 0 },
|
||||
{ "live", protocol_cb_live, sizeof(struct connection), 0 },
|
||||
{ NULL }
|
||||
};
|
||||
|
||||
__attribute__((unused)) static int connection_init(struct connection *c)
|
||||
__attribute__((unused)) static int websocket_connection_init(struct websocket_connection *c)
|
||||
{
|
||||
/** @todo */
|
||||
return -1;
|
||||
}
|
||||
|
||||
__attribute__((unused)) static void connection_destroy(struct connection *c)
|
||||
__attribute__((unused)) static void websocket_connection_destroy(struct websocket_connection *c)
|
||||
{
|
||||
if (c->_name)
|
||||
free(c->_name);
|
||||
}
|
||||
|
||||
static char * connection_name(struct connection *c)
|
||||
static char * websocket_connection_name(struct websocket_connection *c)
|
||||
{
|
||||
if (!c->_name) {
|
||||
if (c->node)
|
||||
|
@ -76,12 +59,7 @@ static char * connection_name(struct connection *c)
|
|||
return c->_name;
|
||||
}
|
||||
|
||||
static void destination_destroy(struct destination *d)
|
||||
{
|
||||
free(d->uri);
|
||||
}
|
||||
|
||||
static int connection_write(struct connection *c, struct sample *smps[], unsigned cnt)
|
||||
static int websocket_connection_write(struct websocket_connection *c, struct sample *smps[], unsigned cnt)
|
||||
{
|
||||
int blocks, enqueued;
|
||||
char *bufs[cnt];
|
||||
|
@ -107,7 +85,7 @@ static int connection_write(struct connection *c, struct sample *smps[], unsigne
|
|||
case WEBSOCKET_ACTIVE:
|
||||
blocks = pool_get_many(&w->pool, (void **) bufs, cnt);
|
||||
if (blocks != cnt)
|
||||
warn("Pool underrun in websocket connection: %s", connection_name(c));
|
||||
warn("Pool underrun in websocket connection: %s", websocket_connection_name(c));
|
||||
|
||||
for (int i = 0; i < blocks; i++) {
|
||||
struct webmsg *msg = (struct webmsg *) (bufs[i] + LWS_PRE);
|
||||
|
@ -126,7 +104,7 @@ static int connection_write(struct connection *c, struct sample *smps[], unsigne
|
|||
|
||||
enqueued = queue_push_many(&c->queue, (void **) bufs, cnt);
|
||||
if (enqueued != blocks)
|
||||
warn("Queue overrun in websocket connection: %s", connection_name(c));
|
||||
warn("Queue overrun in websocket connection: %s", websocket_connection_name(c));
|
||||
|
||||
lws_callback_on_writable(c->wsi);
|
||||
break;
|
||||
|
@ -135,182 +113,15 @@ static int connection_write(struct connection *c, struct sample *smps[], unsigne
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void logger(int level, const char *msg) {
|
||||
int len = strlen(msg);
|
||||
if (strchr(msg, '\n'))
|
||||
len -= 1;
|
||||
|
||||
/* Decrease severity for some errors. */
|
||||
if (strstr(msg, "Unable to open") == msg)
|
||||
level = LLL_WARN;
|
||||
|
||||
switch (level) {
|
||||
case LLL_ERR: warn("LWS: %.*s", len, msg); break;
|
||||
case LLL_WARN: warn("LWS: %.*s", len, msg); break;
|
||||
case LLL_INFO: info("LWS: %.*s", len, msg); break;
|
||||
default: debug(LOG_WEBSOCKET | 1, "LWS: %.*s", len, msg); break;
|
||||
}
|
||||
}
|
||||
|
||||
static void * server_thread(void *ctx)
|
||||
static void websocket_destination_destroy(struct destination *d)
|
||||
{
|
||||
debug(LOG_WEBSOCKET | 3, "WebSocket: Started server thread");
|
||||
|
||||
while (lws_service(context, 10) >= 0);
|
||||
|
||||
debug(LOG_WEBSOCKET | 3, "WebSocket: shutdown voluntarily");
|
||||
|
||||
return NULL;
|
||||
free(d->uri);
|
||||
}
|
||||
|
||||
/* Choose mime type based on the file extension */
|
||||
static char * get_mimetype(const char *resource_path)
|
||||
{
|
||||
char *extension = strrchr(resource_path, '.');
|
||||
|
||||
if (extension == NULL)
|
||||
return "text/plain";
|
||||
else if (!strcmp(extension, ".png"))
|
||||
return "image/png";
|
||||
else if (!strcmp(extension, ".svg"))
|
||||
return "image/svg+xml";
|
||||
else if (!strcmp(extension, ".jpg"))
|
||||
return "image/jpg";
|
||||
else if (!strcmp(extension, ".gif"))
|
||||
return "image/gif";
|
||||
else if (!strcmp(extension, ".html"))
|
||||
return "text/html";
|
||||
else if (!strcmp(extension, ".css"))
|
||||
return "text/css";
|
||||
else if (!strcmp(extension, ".js"))
|
||||
return "application/javascript";
|
||||
else
|
||||
return "text/plain";
|
||||
}
|
||||
|
||||
static int protocol_cb_http(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
|
||||
{
|
||||
switch (reason) {
|
||||
case LWS_CALLBACK_HTTP:
|
||||
if (!htdocs) {
|
||||
lws_return_http_status(wsi, HTTP_STATUS_SERVICE_UNAVAILABLE, NULL);
|
||||
goto try_to_reuse;
|
||||
}
|
||||
|
||||
if (len < 1) {
|
||||
lws_return_http_status(wsi, HTTP_STATUS_BAD_REQUEST, NULL);
|
||||
goto try_to_reuse;
|
||||
}
|
||||
|
||||
char *requested_uri = (char *) in;
|
||||
|
||||
debug(LOG_WEBSOCKET | 3, "LWS: New HTTP request: %s", requested_uri);
|
||||
|
||||
/* Handle default path */
|
||||
if (!strcmp(requested_uri, "/")) {
|
||||
char *response = "HTTP/1.1 302 Found\r\n"
|
||||
"Content-Length: 0\r\n"
|
||||
"Location: /index.html\r\n"
|
||||
"\r\n";
|
||||
|
||||
lws_write(wsi, (void *) response, strlen(response), LWS_WRITE_HTTP);
|
||||
|
||||
goto try_to_reuse;
|
||||
}
|
||||
#ifdef WITH_JANSSON
|
||||
/* Return list of websocket nodes */
|
||||
else if (!strcmp(requested_uri, "/nodes.json")) {
|
||||
json_t *json_body = json_array();
|
||||
|
||||
list_foreach(struct node *n, &vt.instances) {
|
||||
struct websocket *w = n->_vd;
|
||||
|
||||
json_t *json_node = json_pack("{ s: s, s: i, s: i, s: i, s: i, s: i }",
|
||||
"name", node_name_short(n),
|
||||
"id", w->id,
|
||||
"connections", list_length(&w->connections),
|
||||
"state", n->state,
|
||||
"vectorize", n->vectorize,
|
||||
"affinity", n->affinity
|
||||
);
|
||||
|
||||
/* Add all additional fields of node here.
|
||||
* This can be used for metadata */
|
||||
json_object_update(json_node, config_to_json(n->cfg));
|
||||
|
||||
json_array_append_new(json_body, json_node);
|
||||
}
|
||||
|
||||
char *body = json_dumps(json_body, JSON_INDENT(4));
|
||||
|
||||
char *header = "HTTP/1.1 200 OK\r\n"
|
||||
"Connection: close\r\n"
|
||||
"Content-Type: application/json\r\n"
|
||||
"\r\n";
|
||||
|
||||
lws_write(wsi, (void *) header, strlen(header), LWS_WRITE_HTTP);
|
||||
lws_write(wsi, (void *) body, strlen(body), LWS_WRITE_HTTP);
|
||||
|
||||
free(body);
|
||||
json_decref(json_body);
|
||||
|
||||
return -1;
|
||||
}
|
||||
else if (!strcmp(requested_uri, "/config.json")) {
|
||||
char *body = json_dumps(config_to_json(cfg_root), JSON_INDENT(4));
|
||||
|
||||
char *header = "HTTP/1.1 200 OK\r\n"
|
||||
"Connection: close\r\n"
|
||||
"Content-Type: application/json\r\n"
|
||||
"\r\n";
|
||||
|
||||
lws_write(wsi, (void *) header, strlen(header), LWS_WRITE_HTTP);
|
||||
lws_write(wsi, (void *) body, strlen(body), LWS_WRITE_HTTP);
|
||||
|
||||
free(body);
|
||||
|
||||
return -1;
|
||||
}
|
||||
#endif
|
||||
else {
|
||||
char path[4069];
|
||||
snprintf(path, sizeof(path), "%s%s", htdocs, requested_uri);
|
||||
|
||||
/* refuse to serve files we don't understand */
|
||||
char *mimetype = get_mimetype(path);
|
||||
if (!mimetype) {
|
||||
warn("HTTP: Unknown mimetype for %s", path);
|
||||
lws_return_http_status(wsi, HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE, NULL);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int n = lws_serve_http_file(wsi, path, mimetype, NULL, 0);
|
||||
if (n < 0)
|
||||
return -1;
|
||||
else if (n == 0)
|
||||
break;
|
||||
else
|
||||
goto try_to_reuse;
|
||||
}
|
||||
|
||||
default:
|
||||
return 0;
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
||||
try_to_reuse:
|
||||
if (lws_http_transaction_completed(wsi))
|
||||
return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
|
||||
>>>>>>> feature-mpmc-queue
|
||||
int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
|
||||
{
|
||||
int ret;
|
||||
struct connection *c = user;
|
||||
struct websocket_connection *c = user;
|
||||
struct websocket *w;
|
||||
|
||||
switch (reason) {
|
||||
|
@ -355,7 +166,7 @@ static int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, v
|
|||
/* Lookup peer address for debug output */
|
||||
lws_get_peer_addresses(wsi, lws_get_socket_fd(wsi), c->peer.name, sizeof(c->peer.name), c->peer.ip, sizeof(c->peer.ip));
|
||||
|
||||
info("LWS: New connection %s", connection_name(c));
|
||||
info("LWS: New connection %s", websocket_connection_name(c));
|
||||
|
||||
if (c->node != NULL) {
|
||||
struct websocket *w = (struct websocket *) c->node->_vd;
|
||||
|
@ -369,7 +180,7 @@ static int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, v
|
|||
}
|
||||
|
||||
case LWS_CALLBACK_CLOSED:
|
||||
info("LWS: Connection %s closed", connection_name(c));
|
||||
info("LWS: Connection %s closed", websocket_connection_name(c));
|
||||
|
||||
c->state = WEBSOCKET_CLOSED;
|
||||
c->wsi = NULL;
|
||||
|
@ -419,14 +230,14 @@ static int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, v
|
|||
return -1;
|
||||
|
||||
if (!lws_frame_is_binary(wsi) || len < WEBMSG_LEN(0))
|
||||
warn("LWS: Received invalid packet for connection %s", connection_name(c));
|
||||
warn("LWS: Received invalid packet for connection %s", websocket_connection_name(c));
|
||||
|
||||
struct webmsg *msg = (struct webmsg *) in;
|
||||
|
||||
while ((char *) msg + WEBMSG_LEN(msg->length) <= (char *) in + len) {
|
||||
struct webmsg *msg2 = pool_get(&w->pool);
|
||||
if (!msg2) {
|
||||
warn("Pool underrun for connection %s", connection_name(c));
|
||||
warn("Pool underrun for connection %s", websocket_connection_name(c));
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -434,7 +245,7 @@ static int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, v
|
|||
|
||||
ret = queue_push(&w->queue, msg2);
|
||||
if (ret != 1) {
|
||||
warn("Queue overrun for connection %s", connection_name(c));
|
||||
warn("Queue overrun for connection %s", websocket_connection_name(c));
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -477,7 +288,7 @@ int websocket_close(struct node *n)
|
|||
{
|
||||
struct websocket *w = n->_vd;
|
||||
|
||||
list_foreach(struct connection *c, &w->connections) {
|
||||
list_foreach(struct websocket_connection *c, &w->connections) {
|
||||
c->state = WEBSOCKET_SHUTDOWN;
|
||||
lws_callback_on_writable(c->wsi);
|
||||
}
|
||||
|
@ -494,7 +305,7 @@ int websocket_destroy(struct node *n)
|
|||
{
|
||||
struct websocket *w = n->_vd;
|
||||
|
||||
list_destroy(&w->destinations, (dtor_cb_t) destination_destroy, true);
|
||||
list_destroy(&w->destinations, (dtor_cb_t) websocket_destination_destroy, true);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -529,13 +340,11 @@ int websocket_write(struct node *n, struct sample *smps[], unsigned cnt)
|
|||
{
|
||||
struct websocket *w = n->_vd;
|
||||
|
||||
list_foreach(struct connection *c, &w->connections) {
|
||||
connection_write(c, smps, cnt);
|
||||
}
|
||||
list_foreach(struct websocket_connection *c, &w->connections)
|
||||
websocket_connection_write(c, smps, cnt);
|
||||
|
||||
list_foreach(struct connection *c, &connections) {
|
||||
connection_write(c, smps, cnt);
|
||||
}
|
||||
list_foreach(struct websocket_connection *c, &connections)
|
||||
websocket_connection_write(c, smps, cnt);
|
||||
|
||||
return cnt;
|
||||
}
|
||||
|
@ -611,8 +420,6 @@ static struct plugin p = {
|
|||
.destroy = websocket_destroy,
|
||||
.read = websocket_read,
|
||||
.write = websocket_write,
|
||||
.init = websocket_init,
|
||||
.deinit = websocket_deinit,
|
||||
.print = websocket_print,
|
||||
.parse = websocket_parse
|
||||
}
|
||||
|
|
|
@ -31,14 +31,12 @@ static struct lws_protocols protocols[] = {
|
|||
.per_session_data_size = sizeof(struct api_session),
|
||||
.rx_buffer_size = 0
|
||||
},
|
||||
#if 0
|
||||
{
|
||||
.name = "live",
|
||||
.callback = websocket_protocol_cb,
|
||||
.per_session_data_size = sizeof(struct websocket_connection),
|
||||
.rx_buffer_size = 0
|
||||
},
|
||||
#endif
|
||||
{
|
||||
.name = "api",
|
||||
.callback = api_protocol_cb,
|
||||
|
|
Loading…
Add table
Reference in a new issue