mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
new web socket node type mostly completed
This commit is contained in:
parent
b310a83fbc
commit
0f29816608
2 changed files with 260 additions and 155 deletions
|
@ -19,20 +19,20 @@
|
|||
#ifndef _WEBSOCKET_H_
|
||||
#define _WEBSOCKET_H_
|
||||
|
||||
/* Forward declarations */
|
||||
struct node;
|
||||
struct msg;
|
||||
struct settings;
|
||||
#include "node.h"
|
||||
|
||||
struct websocket {
|
||||
uint16_t port;
|
||||
char *ssl_cert;
|
||||
char *ssl_private_key;
|
||||
char *htdocs;
|
||||
/* Forward declarations */
|
||||
struct msg;
|
||||
struct libwebsocket_context;
|
||||
|
||||
struct websocket {
|
||||
struct {
|
||||
pthread_cond_t cond;
|
||||
pthread_mutex_t mutex;
|
||||
struct msg *m;
|
||||
} read, write;
|
||||
|
||||
struct libwebsocket_context *context;
|
||||
|
||||
pthread_t thread;
|
||||
struct list connections; /**< List of struct libwebsockets sockets */
|
||||
};
|
||||
|
||||
/** @see node_vtable::init */
|
||||
|
|
|
@ -1,3 +1,13 @@
|
|||
/** Node type: Websockets (libwebsockets)
|
||||
*
|
||||
* This file implements the weboscket subtype for nodes.
|
||||
*
|
||||
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
|
||||
* @copyright 2014-2015, Institute for Automation of Complex Power Systems, EONERC
|
||||
* This file is part of S2SS. All Rights Reserved. Proprietary and confidential.
|
||||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
*********************************************************************************/
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <unistd.h>
|
||||
|
@ -6,22 +16,42 @@
|
|||
#include <signal.h>
|
||||
|
||||
#include <libwebsockets.h>
|
||||
#include <config.h>
|
||||
|
||||
#include "websocket.h"
|
||||
#include "timing.h"
|
||||
#include "utils.h"
|
||||
#include "msg.h"
|
||||
|
||||
struct msg m;
|
||||
/* Forward declarations */
|
||||
static int protocol_cb_http(struct libwebsocket_context *, struct libwebsocket *, enum libwebsocket_callback_reasons, void *, void *, size_t);
|
||||
static int protocol_cb_live(struct libwebsocket_context *, struct libwebsocket *, enum libwebsocket_callback_reasons, void *, void *, size_t);
|
||||
|
||||
int newdata = 0;
|
||||
char *resource_path = NULL;
|
||||
/* Static storage */
|
||||
static struct websocket_global {
|
||||
pthread_t thread; /**< All nodes are served by a single websocket server. This server is running in a dedicated thread. */
|
||||
struct libwebsocket_context *context; /**< The libwebsockets server context. */
|
||||
|
||||
struct list nodes; /**< List of websocket node instances. */
|
||||
|
||||
int port; /**< Port of the build in HTTP / WebSocket server */
|
||||
|
||||
const char *ssl_cert; /**< Path to the SSL certitifcate for HTTPS / WSS */
|
||||
const char *ssl_private_key; /**< Path to the SSL private key for HTTPS / WSS */
|
||||
const char *htdocs; /**< Path to the directory which should be served by build in HTTP server */
|
||||
} global;
|
||||
|
||||
static struct libwebsocket_protocols protocols[] = {
|
||||
{ "http-only", protocol_cb_http, 0, 0 },
|
||||
{ "live", protocol_cb_live, sizeof(struct node *), 10 },
|
||||
{ 0 } /* terminator */
|
||||
};
|
||||
|
||||
/* Choose mime type based on the file extension */
|
||||
static char * get_mimetype(const char *resource_path)
|
||||
{
|
||||
char *extension = strrchr(resource_path, '.');
|
||||
|
||||
// choose mime type based on the file extension
|
||||
|
||||
if (extension == NULL)
|
||||
return "text/plain";
|
||||
else if (!strcmp(extension, ".png"))
|
||||
|
@ -40,56 +70,79 @@ static char * get_mimetype(const char *resource_path)
|
|||
return "text/plain";
|
||||
}
|
||||
|
||||
static int protocol_callback_http(struct libwebsocket_context *context, struct libwebsocket *wsi, enum libwebsocket_callback_reasons reason, void *user, void *in, size_t len)
|
||||
static int protocol_cb_http(struct libwebsocket_context *context, struct libwebsocket *wsi, enum libwebsocket_callback_reasons reason, void *user, void *in, size_t len)
|
||||
{
|
||||
int n;
|
||||
struct websocket_global *global = libwebsocket_context_user(context);
|
||||
|
||||
switch (reason) {
|
||||
case LWS_CALLBACK_HTTP:
|
||||
if (!resource_path) {
|
||||
libwebsockets_return_http_status(context, wsi, HTTP_STATUS_SERVICE_UNAVAILABLE, NULL);
|
||||
goto try_to_reuse;
|
||||
}
|
||||
case LWS_CALLBACK_HTTP:
|
||||
if (!global->htdocs) {
|
||||
libwebsockets_return_http_status(context, wsi, HTTP_STATUS_SERVICE_UNAVAILABLE, NULL);
|
||||
goto try_to_reuse;
|
||||
}
|
||||
|
||||
if (len < 1) {
|
||||
libwebsockets_return_http_status(context, wsi, HTTP_STATUS_BAD_REQUEST, NULL);
|
||||
goto try_to_reuse;
|
||||
}
|
||||
if (len < 1) {
|
||||
libwebsockets_return_http_status(context, wsi, HTTP_STATUS_BAD_REQUEST, NULL);
|
||||
goto try_to_reuse;
|
||||
}
|
||||
|
||||
char *requested_uri = (char *) in;
|
||||
|
||||
/* Handle default path */
|
||||
if (!strcmp(requested_uri, "/")) {
|
||||
void *universal_response = "HTTP/1.1 302 Found\r\nContent-Length: 0\r\nLocation: /index.html\r\n\r\ntest\r\n";
|
||||
libwebsocket_write(wsi, universal_response, strlen(universal_response), LWS_WRITE_HTTP);
|
||||
char *requested_uri = (char *) in;
|
||||
|
||||
goto try_to_reuse;
|
||||
}
|
||||
else {
|
||||
char buf[4069];
|
||||
snprintf(buf, sizeof(buf), "%s%s", resource_path, requested_uri);
|
||||
|
||||
/* refuse to serve files we don't understand */
|
||||
char *mimetype = get_mimetype(buf);
|
||||
if (!mimetype) {
|
||||
lwsl_err("Unknown mimetype for %s\n", buf);
|
||||
libwebsockets_return_http_status(context, wsi, HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE, NULL);
|
||||
debug(3, "Got new lws HTTP request: %s", requested_uri);
|
||||
|
||||
/* Handle default path */
|
||||
if (!strcmp(requested_uri, "/")) {
|
||||
char *response = "HTTP/1.1 302 Found\r\n"
|
||||
"Content-Length: 0\r\n"
|
||||
"Location: /index.html\r\n"
|
||||
"\r\n";
|
||||
|
||||
libwebsocket_write(wsi, (void *) response, strlen(response), LWS_WRITE_HTTP);
|
||||
|
||||
goto try_to_reuse;
|
||||
}
|
||||
/* Return list of websocket nodes */
|
||||
else if (!strcmp(requested_uri, "/nodes.json")) {
|
||||
char response[4096];
|
||||
char *body = NULL;
|
||||
|
||||
list_foreach(struct node *n, &global->nodes)
|
||||
strcatf(&body, "'%s', ", n->name);
|
||||
|
||||
snprintf(response, sizeof(response),
|
||||
"HTTP/1.1 200 OK\r\n"
|
||||
"Connection: close\r\n"
|
||||
"\r\n"
|
||||
"[ %.*s ]", (int) strlen(body) - 2, body);
|
||||
|
||||
libwebsocket_write(wsi, (void *) response, strlen(response), LWS_WRITE_HTTP);
|
||||
free(body);
|
||||
|
||||
return -1;
|
||||
}
|
||||
else {
|
||||
char path[4069];
|
||||
snprintf(path, sizeof(path), "%s%s", global->htdocs, requested_uri);
|
||||
|
||||
n = libwebsockets_serve_http_file(context, wsi, buf, mimetype, NULL, 0);
|
||||
if (n < 0) {
|
||||
lwsl_warn("Failed to serve: %s", resource_path);
|
||||
return -1;
|
||||
/* refuse to serve files we don't understand */
|
||||
char *mimetype = get_mimetype(path);
|
||||
if (!mimetype) {
|
||||
warn("HTTP: Unknown mimetype for %s", path);
|
||||
libwebsockets_return_http_status(context, wsi, HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE, NULL);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int n = libwebsockets_serve_http_file(context, wsi, path, mimetype, NULL, 0);
|
||||
if (n < 0)
|
||||
return -1;
|
||||
else if (n == 0)
|
||||
break;
|
||||
else
|
||||
goto try_to_reuse;
|
||||
}
|
||||
if (n == 0)
|
||||
break;
|
||||
}
|
||||
|
||||
goto try_to_reuse;
|
||||
|
||||
default:
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -101,151 +154,183 @@ try_to_reuse:
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int protocol_callback_live(struct libwebsocket_context *context, struct libwebsocket *wsi, enum libwebsocket_callback_reasons reason, void *user, void *in, size_t len)
|
||||
{
|
||||
static int protocol_cb_live(struct libwebsocket_context *context, struct libwebsocket *wsi, enum libwebsocket_callback_reasons reason, void *user, void *in, size_t len)
|
||||
{
|
||||
struct node *n;
|
||||
struct websocket *w;
|
||||
struct websocket_global *global = libwebsocket_context_user(context);
|
||||
|
||||
char *buf, uri[1024];
|
||||
|
||||
switch (reason) {
|
||||
case LWS_CALLBACK_ESTABLISHED: {
|
||||
char name[1024];
|
||||
char rip[1024];
|
||||
case LWS_CALLBACK_ESTABLISHED:
|
||||
lws_hdr_copy(wsi, uri, sizeof(uri), WSI_TOKEN_GET_URI); /* The path component of the*/
|
||||
|
||||
/* Search for node which matches uri */
|
||||
list_foreach(n, &global->nodes) {
|
||||
if (!strcmp(n->name, uri + 1)) /* we skip leading '/' */
|
||||
goto found;
|
||||
}
|
||||
|
||||
int fd = libwebsocket_get_socket_fd(wsi);
|
||||
libwebsockets_get_peer_addresses(context, wsi, fd, name, sizeof(name), rip, sizeof(rip));
|
||||
warn("WebSocket: New Connection for non-exsitent node: %s", uri + 1);
|
||||
|
||||
return -1;
|
||||
|
||||
found: * (void **) user = n;
|
||||
w = n->websocket;
|
||||
|
||||
list_push(&w->connections, wsi);
|
||||
|
||||
info("WebSocket: New Connection for node: %s", n->name);
|
||||
|
||||
return 0;
|
||||
|
||||
lwsl_notice("New Connection from: %s %s", name, rip);
|
||||
break;
|
||||
}
|
||||
case LWS_CALLBACK_CLOSED:
|
||||
n = * (struct node **) user;
|
||||
w = n->websocket;
|
||||
|
||||
list_remove(&w->connections, wsi);
|
||||
|
||||
info("WebSocket: Connection closed for node: %s", n->name);
|
||||
|
||||
return 0;
|
||||
|
||||
case LWS_CALLBACK_SERVER_WRITEABLE: {
|
||||
char *data = NULL, *buf;
|
||||
int len;
|
||||
case LWS_CALLBACK_SERVER_WRITEABLE:
|
||||
n = * (struct node **) user;
|
||||
w = n->websocket;
|
||||
|
||||
strcatf(&data, "%f", time_to_double(&MSG_TS(&m)));
|
||||
for (int i = 0; i < m.length; i++)
|
||||
strcatf(&data, " %f", m.data[i].f);
|
||||
buf = malloc(LWS_SEND_BUFFER_PRE_PADDING + LWS_SEND_BUFFER_POST_PADDING + 4096);
|
||||
|
||||
len = strlen(data);
|
||||
buf = malloc(LWS_SEND_BUFFER_PRE_PADDING + len + LWS_SEND_BUFFER_POST_PADDING);
|
||||
|
||||
memcpy(buf + LWS_SEND_BUFFER_PRE_PADDING, data, len);
|
||||
free(data);
|
||||
|
||||
libwebsocket_write(wsi, (unsigned char *) &buf[LWS_SEND_BUFFER_PRE_PADDING], len, LWS_WRITE_TEXT);
|
||||
break;
|
||||
}
|
||||
pthread_mutex_lock(&w->write.mutex);
|
||||
|
||||
case LWS_CALLBACK_RECEIVE: {
|
||||
lwsl_notice("Received data: %s", (char *) in);
|
||||
len = msg_print(buf + LWS_SEND_BUFFER_PRE_PADDING, 4096, w->write.m, MSG_PRINT_NANOSECONDS | MSG_PRINT_VALUES, 0);
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION: {
|
||||
char buf[1024];
|
||||
pthread_mutex_unlock(&w->write.mutex);
|
||||
|
||||
lws_hdr_copy(wsi, buf, sizeof(buf), WSI_TOKEN_GET_URI);
|
||||
lwsl_notice("WSI_TOKEN_GET_URI = %s", buf);
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
libwebsocket_write(wsi, (unsigned char *) (buf + LWS_SEND_BUFFER_PRE_PADDING), len, LWS_WRITE_TEXT);
|
||||
|
||||
return 0;
|
||||
|
||||
case LWS_CALLBACK_RECEIVE:
|
||||
n = * (struct node **) user;
|
||||
w = n->websocket;
|
||||
|
||||
pthread_mutex_lock(&w->read.mutex);
|
||||
|
||||
msg_scan(in, w->read.m, NULL, NULL);
|
||||
|
||||
pthread_mutex_unlock(&w->read.mutex);
|
||||
pthread_cond_broadcast(&w->read.cond);
|
||||
|
||||
return 0;
|
||||
|
||||
default:
|
||||
break;
|
||||
return 0;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static struct libwebsocket_protocols protocols[] = {
|
||||
{ "http-only", protocol_callback_http, 0, 0 },
|
||||
{ "live", protocol_callback_live, 30, 10 },
|
||||
{ NULL, NULL, 0, 0 } /* terminator */
|
||||
};
|
||||
|
||||
|
||||
static void lws_debug(int level, const char *msg) {
|
||||
static void logger(int level, const char *msg) {
|
||||
int len = strlen(msg);
|
||||
if (strchr(msg, '\n'))
|
||||
len -= 1;
|
||||
|
||||
switch (level) {
|
||||
case LLL_ERR: error("lws: %.*s", len, msg); break;
|
||||
case LLL_WARN: warn("lws: %.*s", len, msg); break;
|
||||
case LLL_INFO: info("lws: %.*s", len, msg); break;
|
||||
default: debug(1, "lws: %.*s", len, msg);
|
||||
case LLL_ERR: error("WebSocket: %.*s", len, msg); break;
|
||||
case LLL_WARN: warn("WebSocket: %.*s", len, msg); break;
|
||||
case LLL_INFO: info("WebSocket: %.*s", len, msg); break;
|
||||
default: debug(1, "WebSocket: %.*s", len, msg); break;
|
||||
}
|
||||
}
|
||||
|
||||
static void * lws_thread(void *ctx)
|
||||
static void * server_thread(void *ctx)
|
||||
{
|
||||
struct websocket *w = ctx;
|
||||
debug(3, "WebSocket: Started server thread");
|
||||
|
||||
int n;
|
||||
do {
|
||||
n = libwebsocket_service(w->context, 10);
|
||||
} while (n >= 0);
|
||||
while (libwebsocket_service(global.context, 10) >= 0);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int websocket_init(int argc, char * argv[], struct settings *set)
|
||||
{
|
||||
lws_set_log_level((1 << LLL_COUNT) - 1, lws_debug);
|
||||
lws_set_log_level((1 << LLL_COUNT) - 1, logger);
|
||||
|
||||
/* Parse global config */
|
||||
/*
|
||||
config_setting_lookup_string(cfg, "ssl_cert", &global.ssl_cert);
|
||||
config_setting_lookup_string(cfg, "ssl_private_key", &global.ssl_private_key);
|
||||
config_setting_lookup_string(cfg, "htdocs", &global.htdocs);
|
||||
|
||||
if (!config_setting_lookup_int(cfg, "port", &global.port))
|
||||
port = 80;
|
||||
*/
|
||||
|
||||
/* @todo Fake settings */
|
||||
global.port = 8080;
|
||||
global.htdocs = "/s2ss/contrib/websocket";
|
||||
|
||||
/* Initialize list of nodes */
|
||||
list_init(&global.nodes, NULL);
|
||||
|
||||
/* Start server */
|
||||
struct lws_context_creation_info info = {
|
||||
.port = global.port,
|
||||
.protocols = protocols,
|
||||
.extensions = libwebsocket_get_internal_extensions(),
|
||||
.ssl_cert_filepath = global.ssl_cert,
|
||||
.ssl_private_key_filepath = global.ssl_private_key,
|
||||
.gid = -1,
|
||||
.uid = -1,
|
||||
.user = &global
|
||||
};
|
||||
|
||||
global.context = libwebsocket_create_context(&info);
|
||||
if (global.context == NULL)
|
||||
error("WebSocket: failed to initialize server");
|
||||
|
||||
pthread_create(&global.thread, NULL, server_thread, NULL);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int websocket_deinit()
|
||||
{
|
||||
pthread_join(global.thread, NULL);
|
||||
|
||||
libwebsocket_cancel_service(global.context);
|
||||
libwebsocket_context_destroy(global.context);
|
||||
|
||||
list_destroy(&global.nodes);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int websocket_parse(config_setting_t *cfg, struct node *n)
|
||||
{
|
||||
struct websocket *w = n->websocket;
|
||||
n->websocket = alloc(sizeof(struct websocket));
|
||||
|
||||
config_lookup_string(cfg, "ssl_cert", &w->ssl_cert);
|
||||
config_lookup_string(cfg, "ssl_private_key", &w->ssl_private_key);
|
||||
config_lookup_string(cfg, "htdocs", &w->htdocs);
|
||||
|
||||
if (!config_lookup_int(cfg, "port", &w->port))
|
||||
w->port = 80;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
char * websocket_print(struct node *n)
|
||||
{
|
||||
struct websocket *w = n->websocket;
|
||||
char *buf = NULL;
|
||||
|
||||
return strcatf(&buf, "port=%u", w->port);
|
||||
return "";
|
||||
}
|
||||
|
||||
int websocket_open(struct node *n)
|
||||
{
|
||||
struct websocket *w = n->websocket;
|
||||
|
||||
struct lws_context_creation_info info;
|
||||
|
||||
memset(&info, 0, sizeof info);
|
||||
info.port = w->port;
|
||||
info.iface = NULL;
|
||||
info.protocols = protocols;
|
||||
info.extensions = libwebsocket_get_internal_extensions();
|
||||
info.ssl_cert_filepath = w->ssl_cert;
|
||||
info.ssl_private_key_filepath = w->ssl_private_key;
|
||||
info.gid = -1;
|
||||
info.uid = -1;
|
||||
info.options = 0;
|
||||
|
||||
w->context = libwebsocket_create_context(&info);
|
||||
if (w->context == NULL) {
|
||||
lwsl_notice("init failed");
|
||||
return -1;
|
||||
}
|
||||
list_init(&w->connections, NULL);
|
||||
list_push(&global.nodes, n);
|
||||
|
||||
pthread_mutex_init(&w->read.mutex, NULL);
|
||||
pthread_mutex_init(&w->write.mutex, NULL);
|
||||
pthread_cond_init(&w->read.cond, NULL);
|
||||
|
||||
/* pthread_cond_wait() expects the mutex to be already locked */
|
||||
pthread_mutex_lock(&w->read.mutex);
|
||||
|
||||
pthread_create(w->thread, NULL, lws_thread, w);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -253,23 +338,43 @@ int websocket_close(struct node *n)
|
|||
{
|
||||
struct websocket *w = n->websocket;
|
||||
|
||||
libwebsocket_cancel_service(w->context);
|
||||
libwebsocket_context_destroy(w->context);
|
||||
|
||||
pthread_join(w->thread, NULL);
|
||||
list_remove(&global.nodes, n);
|
||||
|
||||
pthread_mutex_destroy(&w->read.mutex);
|
||||
pthread_mutex_destroy(&w->write.mutex);
|
||||
pthread_cond_destroy(&w->read.cond);
|
||||
|
||||
lwsl_notice("exited cleanly");
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int websocket_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt)
|
||||
{
|
||||
return 0;
|
||||
struct websocket *w = n->websocket;
|
||||
struct msg *m = pool + (first % poolsize);
|
||||
|
||||
w->read.m = m;
|
||||
|
||||
pthread_cond_wait(&w->read.cond, &w->read.mutex);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
int websocket_write(struct node *n, struct msg *pool, int poolsize, int first, int cnt)
|
||||
{
|
||||
struct websocket *w = n->websocket;
|
||||
struct msg *m = pool + (first % poolsize);
|
||||
|
||||
pthread_mutex_lock(&w->write.mutex);
|
||||
|
||||
w->write.m = m;
|
||||
|
||||
/* Notify all active websocket connections to send new data */
|
||||
list_foreach(struct libwebsocket *wsi, &w->connections) {
|
||||
libwebsocket_callback_on_writable(global.context, wsi);
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&w->write.mutex);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue