diff --git a/server/include/websocket.h b/server/include/websocket.h index 4c5fa9fb9..c18b38c61 100644 --- a/server/include/websocket.h +++ b/server/include/websocket.h @@ -19,20 +19,20 @@ #ifndef _WEBSOCKET_H_ #define _WEBSOCKET_H_ -/* Forward declarations */ -struct node; -struct msg; -struct settings; +#include "node.h" -struct websocket { - uint16_t port; - char *ssl_cert; - char *ssl_private_key; - char *htdocs; +/* Forward declarations */ +struct msg; +struct libwebsocket_context; + +struct websocket { + struct { + pthread_cond_t cond; + pthread_mutex_t mutex; + struct msg *m; + } read, write; - struct libwebsocket_context *context; - - pthread_t thread; + struct list connections; /**< List of struct libwebsockets sockets */ }; /** @see node_vtable::init */ diff --git a/server/src/websocket.c b/server/src/websocket.c index c6d8e68d9..e35eba544 100644 --- a/server/src/websocket.c +++ b/server/src/websocket.c @@ -1,3 +1,13 @@ +/** Node type: Websockets (libwebsockets) + * + * This file implements the weboscket subtype for nodes. + * + * @author Steffen Vogel + * @copyright 2014-2015, Institute for Automation of Complex Power Systems, EONERC + * This file is part of S2SS. All Rights Reserved. Proprietary and confidential. + * Unauthorized copying of this file, via any medium is strictly prohibited. + *********************************************************************************/ + #include #include #include @@ -6,22 +16,42 @@ #include #include +#include #include "websocket.h" #include "timing.h" #include "utils.h" #include "msg.h" -struct msg m; +/* Forward declarations */ +static int protocol_cb_http(struct libwebsocket_context *, struct libwebsocket *, enum libwebsocket_callback_reasons, void *, void *, size_t); +static int protocol_cb_live(struct libwebsocket_context *, struct libwebsocket *, enum libwebsocket_callback_reasons, void *, void *, size_t); -int newdata = 0; -char *resource_path = NULL; +/* Static storage */ +static struct websocket_global { + pthread_t thread; /**< All nodes are served by a single websocket server. This server is running in a dedicated thread. */ + struct libwebsocket_context *context; /**< The libwebsockets server context. */ + + struct list nodes; /**< List of websocket node instances. */ + int port; /**< Port of the build in HTTP / WebSocket server */ + + const char *ssl_cert; /**< Path to the SSL certitifcate for HTTPS / WSS */ + const char *ssl_private_key; /**< Path to the SSL private key for HTTPS / WSS */ + const char *htdocs; /**< Path to the directory which should be served by build in HTTP server */ +} global; + +static struct libwebsocket_protocols protocols[] = { + { "http-only", protocol_cb_http, 0, 0 }, + { "live", protocol_cb_live, sizeof(struct node *), 10 }, + { 0 } /* terminator */ +}; + +/* Choose mime type based on the file extension */ static char * get_mimetype(const char *resource_path) { char *extension = strrchr(resource_path, '.'); - - // choose mime type based on the file extension + if (extension == NULL) return "text/plain"; else if (!strcmp(extension, ".png")) @@ -40,56 +70,79 @@ static char * get_mimetype(const char *resource_path) return "text/plain"; } -static int protocol_callback_http(struct libwebsocket_context *context, struct libwebsocket *wsi, enum libwebsocket_callback_reasons reason, void *user, void *in, size_t len) +static int protocol_cb_http(struct libwebsocket_context *context, struct libwebsocket *wsi, enum libwebsocket_callback_reasons reason, void *user, void *in, size_t len) { - int n; + struct websocket_global *global = libwebsocket_context_user(context); switch (reason) { - case LWS_CALLBACK_HTTP: - if (!resource_path) { - libwebsockets_return_http_status(context, wsi, HTTP_STATUS_SERVICE_UNAVAILABLE, NULL); - goto try_to_reuse; - } + case LWS_CALLBACK_HTTP: + if (!global->htdocs) { + libwebsockets_return_http_status(context, wsi, HTTP_STATUS_SERVICE_UNAVAILABLE, NULL); + goto try_to_reuse; + } - if (len < 1) { - libwebsockets_return_http_status(context, wsi, HTTP_STATUS_BAD_REQUEST, NULL); - goto try_to_reuse; - } + if (len < 1) { + libwebsockets_return_http_status(context, wsi, HTTP_STATUS_BAD_REQUEST, NULL); + goto try_to_reuse; + } - char *requested_uri = (char *) in; - - /* Handle default path */ - if (!strcmp(requested_uri, "/")) { - void *universal_response = "HTTP/1.1 302 Found\r\nContent-Length: 0\r\nLocation: /index.html\r\n\r\ntest\r\n"; - libwebsocket_write(wsi, universal_response, strlen(universal_response), LWS_WRITE_HTTP); + char *requested_uri = (char *) in; - goto try_to_reuse; - } - else { - char buf[4069]; - snprintf(buf, sizeof(buf), "%s%s", resource_path, requested_uri); - - /* refuse to serve files we don't understand */ - char *mimetype = get_mimetype(buf); - if (!mimetype) { - lwsl_err("Unknown mimetype for %s\n", buf); - libwebsockets_return_http_status(context, wsi, HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE, NULL); + debug(3, "Got new lws HTTP request: %s", requested_uri); + + /* Handle default path */ + if (!strcmp(requested_uri, "/")) { + char *response = "HTTP/1.1 302 Found\r\n" + "Content-Length: 0\r\n" + "Location: /index.html\r\n" + "\r\n"; + + libwebsocket_write(wsi, (void *) response, strlen(response), LWS_WRITE_HTTP); + + goto try_to_reuse; + } + /* Return list of websocket nodes */ + else if (!strcmp(requested_uri, "/nodes.json")) { + char response[4096]; + char *body = NULL; + + list_foreach(struct node *n, &global->nodes) + strcatf(&body, "'%s', ", n->name); + + snprintf(response, sizeof(response), + "HTTP/1.1 200 OK\r\n" + "Connection: close\r\n" + "\r\n" + "[ %.*s ]", (int) strlen(body) - 2, body); + + libwebsocket_write(wsi, (void *) response, strlen(response), LWS_WRITE_HTTP); + free(body); + return -1; } + else { + char path[4069]; + snprintf(path, sizeof(path), "%s%s", global->htdocs, requested_uri); - n = libwebsockets_serve_http_file(context, wsi, buf, mimetype, NULL, 0); - if (n < 0) { - lwsl_warn("Failed to serve: %s", resource_path); - return -1; + /* refuse to serve files we don't understand */ + char *mimetype = get_mimetype(path); + if (!mimetype) { + warn("HTTP: Unknown mimetype for %s", path); + libwebsockets_return_http_status(context, wsi, HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE, NULL); + return -1; + } + + int n = libwebsockets_serve_http_file(context, wsi, path, mimetype, NULL, 0); + if (n < 0) + return -1; + else if (n == 0) + break; + else + goto try_to_reuse; } - if (n == 0) - break; - } - - goto try_to_reuse; - default: - break; + default: + break; } return 0; @@ -101,151 +154,183 @@ try_to_reuse: return 0; } -static int protocol_callback_live(struct libwebsocket_context *context, struct libwebsocket *wsi, enum libwebsocket_callback_reasons reason, void *user, void *in, size_t len) -{ +static int protocol_cb_live(struct libwebsocket_context *context, struct libwebsocket *wsi, enum libwebsocket_callback_reasons reason, void *user, void *in, size_t len) +{ + struct node *n; + struct websocket *w; + struct websocket_global *global = libwebsocket_context_user(context); + + char *buf, uri[1024]; + switch (reason) { - case LWS_CALLBACK_ESTABLISHED: { - char name[1024]; - char rip[1024]; + case LWS_CALLBACK_ESTABLISHED: + lws_hdr_copy(wsi, uri, sizeof(uri), WSI_TOKEN_GET_URI); /* The path component of the*/ + + /* Search for node which matches uri */ + list_foreach(n, &global->nodes) { + if (!strcmp(n->name, uri + 1)) /* we skip leading '/' */ + goto found; + } - int fd = libwebsocket_get_socket_fd(wsi); - libwebsockets_get_peer_addresses(context, wsi, fd, name, sizeof(name), rip, sizeof(rip)); + warn("WebSocket: New Connection for non-exsitent node: %s", uri + 1); + + return -1; + +found: * (void **) user = n; + w = n->websocket; + + list_push(&w->connections, wsi); + + info("WebSocket: New Connection for node: %s", n->name); + + return 0; - lwsl_notice("New Connection from: %s %s", name, rip); - break; - } + case LWS_CALLBACK_CLOSED: + n = * (struct node **) user; + w = n->websocket; + + list_remove(&w->connections, wsi); + + info("WebSocket: Connection closed for node: %s", n->name); + + return 0; - case LWS_CALLBACK_SERVER_WRITEABLE: { - char *data = NULL, *buf; - int len; + case LWS_CALLBACK_SERVER_WRITEABLE: + n = * (struct node **) user; + w = n->websocket; - strcatf(&data, "%f", time_to_double(&MSG_TS(&m))); - for (int i = 0; i < m.length; i++) - strcatf(&data, " %f", m.data[i].f); + buf = malloc(LWS_SEND_BUFFER_PRE_PADDING + LWS_SEND_BUFFER_POST_PADDING + 4096); - len = strlen(data); - buf = malloc(LWS_SEND_BUFFER_PRE_PADDING + len + LWS_SEND_BUFFER_POST_PADDING); - - memcpy(buf + LWS_SEND_BUFFER_PRE_PADDING, data, len); - free(data); - - libwebsocket_write(wsi, (unsigned char *) &buf[LWS_SEND_BUFFER_PRE_PADDING], len, LWS_WRITE_TEXT); - break; - } + pthread_mutex_lock(&w->write.mutex); - case LWS_CALLBACK_RECEIVE: { - lwsl_notice("Received data: %s", (char *) in); + len = msg_print(buf + LWS_SEND_BUFFER_PRE_PADDING, 4096, w->write.m, MSG_PRINT_NANOSECONDS | MSG_PRINT_VALUES, 0); - break; - } - - case LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION: { - char buf[1024]; + pthread_mutex_unlock(&w->write.mutex); - lws_hdr_copy(wsi, buf, sizeof(buf), WSI_TOKEN_GET_URI); - lwsl_notice("WSI_TOKEN_GET_URI = %s", buf); - - break; - } - + libwebsocket_write(wsi, (unsigned char *) (buf + LWS_SEND_BUFFER_PRE_PADDING), len, LWS_WRITE_TEXT); + return 0; + + case LWS_CALLBACK_RECEIVE: + n = * (struct node **) user; + w = n->websocket; + + pthread_mutex_lock(&w->read.mutex); + + msg_scan(in, w->read.m, NULL, NULL); + + pthread_mutex_unlock(&w->read.mutex); + pthread_cond_broadcast(&w->read.cond); + + return 0; + default: - break; + return 0; } - - return 0; } -static struct libwebsocket_protocols protocols[] = { - { "http-only", protocol_callback_http, 0, 0 }, - { "live", protocol_callback_live, 30, 10 }, - { NULL, NULL, 0, 0 } /* terminator */ -}; - - -static void lws_debug(int level, const char *msg) { +static void logger(int level, const char *msg) { int len = strlen(msg); if (strchr(msg, '\n')) len -= 1; switch (level) { - case LLL_ERR: error("lws: %.*s", len, msg); break; - case LLL_WARN: warn("lws: %.*s", len, msg); break; - case LLL_INFO: info("lws: %.*s", len, msg); break; - default: debug(1, "lws: %.*s", len, msg); + case LLL_ERR: error("WebSocket: %.*s", len, msg); break; + case LLL_WARN: warn("WebSocket: %.*s", len, msg); break; + case LLL_INFO: info("WebSocket: %.*s", len, msg); break; + default: debug(1, "WebSocket: %.*s", len, msg); break; } } -static void * lws_thread(void *ctx) +static void * server_thread(void *ctx) { - struct websocket *w = ctx; + debug(3, "WebSocket: Started server thread"); - int n; - do { - n = libwebsocket_service(w->context, 10); - } while (n >= 0); + while (libwebsocket_service(global.context, 10) >= 0); return NULL; } int websocket_init(int argc, char * argv[], struct settings *set) { - lws_set_log_level((1 << LLL_COUNT) - 1, lws_debug); + lws_set_log_level((1 << LLL_COUNT) - 1, logger); + + /* Parse global config */ + /* + config_setting_lookup_string(cfg, "ssl_cert", &global.ssl_cert); + config_setting_lookup_string(cfg, "ssl_private_key", &global.ssl_private_key); + config_setting_lookup_string(cfg, "htdocs", &global.htdocs); + + if (!config_setting_lookup_int(cfg, "port", &global.port)) + port = 80; + */ + + /* @todo Fake settings */ + global.port = 8080; + global.htdocs = "/s2ss/contrib/websocket"; + + /* Initialize list of nodes */ + list_init(&global.nodes, NULL); + + /* Start server */ + struct lws_context_creation_info info = { + .port = global.port, + .protocols = protocols, + .extensions = libwebsocket_get_internal_extensions(), + .ssl_cert_filepath = global.ssl_cert, + .ssl_private_key_filepath = global.ssl_private_key, + .gid = -1, + .uid = -1, + .user = &global + }; + + global.context = libwebsocket_create_context(&info); + if (global.context == NULL) + error("WebSocket: failed to initialize server"); + + pthread_create(&global.thread, NULL, server_thread, NULL); return 0; } int websocket_deinit() { + pthread_join(global.thread, NULL); + + libwebsocket_cancel_service(global.context); + libwebsocket_context_destroy(global.context); + + list_destroy(&global.nodes); + return 0; } int websocket_parse(config_setting_t *cfg, struct node *n) { - struct websocket *w = n->websocket; + n->websocket = alloc(sizeof(struct websocket)); - config_lookup_string(cfg, "ssl_cert", &w->ssl_cert); - config_lookup_string(cfg, "ssl_private_key", &w->ssl_private_key); - config_lookup_string(cfg, "htdocs", &w->htdocs); - - if (!config_lookup_int(cfg, "port", &w->port)) - w->port = 80; - + return 0; } char * websocket_print(struct node *n) { - struct websocket *w = n->websocket; - char *buf = NULL; - - return strcatf(&buf, "port=%u", w->port); + return ""; } int websocket_open(struct node *n) { struct websocket *w = n->websocket; - - struct lws_context_creation_info info; - - memset(&info, 0, sizeof info); - info.port = w->port; - info.iface = NULL; - info.protocols = protocols; - info.extensions = libwebsocket_get_internal_extensions(); - info.ssl_cert_filepath = w->ssl_cert; - info.ssl_private_key_filepath = w->ssl_private_key; - info.gid = -1; - info.uid = -1; - info.options = 0; - w->context = libwebsocket_create_context(&info); - if (w->context == NULL) { - lwsl_notice("init failed"); - return -1; - } + list_init(&w->connections, NULL); + list_push(&global.nodes, n); + + pthread_mutex_init(&w->read.mutex, NULL); + pthread_mutex_init(&w->write.mutex, NULL); + pthread_cond_init(&w->read.cond, NULL); + + /* pthread_cond_wait() expects the mutex to be already locked */ + pthread_mutex_lock(&w->read.mutex); - pthread_create(w->thread, NULL, lws_thread, w); - return 0; } @@ -253,23 +338,43 @@ int websocket_close(struct node *n) { struct websocket *w = n->websocket; - libwebsocket_cancel_service(w->context); - libwebsocket_context_destroy(w->context); - - pthread_join(w->thread, NULL); + list_remove(&global.nodes, n); + + pthread_mutex_destroy(&w->read.mutex); + pthread_mutex_destroy(&w->write.mutex); + pthread_cond_destroy(&w->read.cond); - lwsl_notice("exited cleanly"); - return 0; } int websocket_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt) { - return 0; + struct websocket *w = n->websocket; + struct msg *m = pool + (first % poolsize); + + w->read.m = m; + + pthread_cond_wait(&w->read.cond, &w->read.mutex); + + return 1; } int websocket_write(struct node *n, struct msg *pool, int poolsize, int first, int cnt) { + struct websocket *w = n->websocket; + struct msg *m = pool + (first % poolsize); + + pthread_mutex_lock(&w->write.mutex); + + w->write.m = m; + + /* Notify all active websocket connections to send new data */ + list_foreach(struct libwebsocket *wsi, &w->connections) { + libwebsocket_callback_on_writable(global.context, wsi); + } + + pthread_mutex_unlock(&w->write.mutex); + return 0; }