From 104455e0d59252f7b48145bccfd35edf4eb937bc Mon Sep 17 00:00:00 2001
From: Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
Date: Wed, 8 Jun 2016 22:39:17 +0200
Subject: [PATCH] splitting web socket code in multiple files

---
 include/websocket.h  |  29 ++++----
 lib/websocket-http.c | 154 +++++++++++++++++++++++++++++++++++++++++++
 lib/websocket-live.c | 133 +++++++++++++++++++++++++++++++++++++
 lib/websocket.c      |  52 +++++----------
 4 files changed, 318 insertions(+), 50 deletions(-)
 create mode 100644 lib/websocket-http.c
 create mode 100644 lib/websocket-live.c

diff --git a/include/websocket.h b/include/websocket.h
index b2e7ed9bf..d60fd7ee3 100644
--- a/include/websocket.h
+++ b/include/websocket.h
@@ -21,20 +21,19 @@
 
 #include "node.h"
 
-/* Forward declarations */
-struct msg;
-struct libwebsocket_context;
-
-struct websocket {	
-	struct {
-		pthread_cond_t cond;
-		pthread_mutex_t mutex;
-		struct pool *pool;
-		size_t cnt;
-	} read, write;
+/** Internal data per websocket node */
+struct websocket {
+	struct list connections;	/**< List of active libwebsocket connections (struct websocket_connection) */
+	struct list destinations;	/**< List of struct lws_client_connect_info to connect to. */
 	
-	int shutdown;
-	struct list connections; /**< List of struct libwebsockets sockets */
+	struct websocket_connection *writer;
+};
+
+struct websocket_connection {
+	struct node *node;
+	struct path *path;
+	
+	qptr_t received;
 };
 
 /** @see node_vtable::init */
@@ -53,9 +52,9 @@ int websocket_close(struct node *n);
 int websocket_destroy(struct node *n);
 
 /** @see node_vtable::read */
-int websocket_read(struct node *n, struct pool *pool, int cnt);
+int websocket_read(struct node *n, struct sample *smps[], unsigned cnt);
 
 /** @see node_vtable::write */
-int websocket_write(struct node *n, struct pool *pool, int cnt);
+int websocket_write(struct node *n, struct sample *smps[], unsigned cnt);
 
 #endif /* _WEBSOCKET_H_ */
\ No newline at end of file
diff --git a/lib/websocket-http.c b/lib/websocket-http.c
new file mode 100644
index 000000000..bdb8782c5
--- /dev/null
+++ b/lib/websocket-http.c
@@ -0,0 +1,154 @@
+/** HTTP protocol of the websocket node type
+ *
+ * @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
+ * @copyright 2014-2016, Institute for Automation of Complex Power Systems, EONERC
+ *   This file is part of S2SS. All Rights Reserved. Proprietary and confidential.
+ *   Unauthorized copying of this file, via any medium is strictly prohibited.
+ *********************************************************************************/
+
+#include <libwebsockets.h>
+#include <libconfig.h>
+
+#ifdef WITH_JANSSON
+  #include <jansson.h>
+#endif
+
+/* Choose mime type based on the file extension */
+static char * get_mimetype(const char *resource_path)
+{
+	char *extension = strrchr(resource_path, '.');
+
+	if (extension == NULL)
+		return "text/plain";
+	else if (!strcmp(extension, ".png"))
+		return "image/png";
+	else if (!strcmp(extension, ".jpg"))
+		return "image/jpg";
+	else if (!strcmp(extension, ".gif"))
+		return "image/gif";
+	else if (!strcmp(extension, ".html"))
+		return "text/html";
+	else if (!strcmp(extension, ".css"))
+		return "text/css";
+	else if (!strcmp(extension, ".js"))
+		return "application/javascript";
+	else
+		return "text/plain";
+}
+
+static int protocol_cb_http(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
+{
+	switch (reason) {
+		case LWS_CALLBACK_HTTP:			
+			if (!htdocs) {
+				lws_return_http_status(wsi, HTTP_STATUS_SERVICE_UNAVAILABLE, NULL);
+				goto try_to_reuse;
+			}
+
+			if (len < 1) {
+				lws_return_http_status(wsi, HTTP_STATUS_BAD_REQUEST, NULL);
+				goto try_to_reuse;
+			}
+
+			char *requested_uri = (char *) in;
+			
+			debug(DBG_WEBSOCKET | 3, "WebSocket: New HTTP request: %s", requested_uri);
+
+			/* Handle default path */
+			if      (!strcmp(requested_uri, "/")) {
+				char *response = "HTTP/1.1 302 Found\r\n"
+						 "Content-Length: 0\r\n"
+						 "Location: /index.html\r\n"
+						 "\r\n";
+			
+				lws_write(wsi, (void *) response, strlen(response), LWS_WRITE_HTTP);
+			
+				goto try_to_reuse;
+			}
+#ifdef WITH_JANSSON
+			/* Return list of websocket nodes */
+			else if (!strcmp(requested_uri, "/nodes.json")) {
+				json_t *json_body = json_array();
+								
+				list_foreach(struct node *n, &vt.instances) {
+					struct websocket *w = n->_vd;
+
+					json_t *json_node = json_pack("{ s: s, s: i, s: i, s: i, s: i }",
+						"name",		node_name_short(n),
+						"connections",	list_length(&w->connections),
+						"state",	n->state,
+						"vectorize",	n->vectorize,
+						"affinity",	n->affinity
+					);
+					
+					/* Add all additional fields of node here.
+					 * This can be used for metadata */	
+					json_object_update(json_node, config_to_json(n->cfg));
+					
+					json_array_append_new(json_body, json_node);
+				}
+				
+				char *body = json_dumps(json_body, JSON_INDENT(4));
+					
+				char *header =  "HTTP/1.1 200 OK\r\n"
+						"Connection: close\r\n"
+       						"Content-Type: application/json\r\n"
+						"\r\n";
+				
+				lws_write(wsi, (void *) header, strlen(header), LWS_WRITE_HTTP);
+				lws_write(wsi, (void *) body,   strlen(body),   LWS_WRITE_HTTP);
+
+				free(body);
+				json_decref(json_body);
+				
+				return -1;
+			}
+			else if (!strcmp(requested_uri, "/config.json")) {
+				char *body = json_dumps(config_to_json(cfg_root), JSON_INDENT(4));
+					
+				char *header =  "HTTP/1.1 200 OK\r\n"
+						"Connection: close\r\n"
+       						"Content-Type: application/json\r\n"
+						"\r\n";
+				
+				lws_write(wsi, (void *) header, strlen(header), LWS_WRITE_HTTP);
+				lws_write(wsi, (void *) body,   strlen(body),   LWS_WRITE_HTTP);
+
+				free(body);
+				
+				return -1;
+			}
+#endif
+			else {
+				char path[4069];
+				snprintf(path, sizeof(path), "%s%s", htdocs, requested_uri);
+
+				/* refuse to serve files we don't understand */
+				char *mimetype = get_mimetype(path);
+				if (!mimetype) {
+					warn("HTTP: Unknown mimetype for %s", path);
+					lws_return_http_status(wsi, HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE, NULL);
+					return -1;
+				}
+
+				int n = lws_serve_http_file(wsi, path, mimetype, NULL, 0);
+				if      (n < 0)
+					return -1;
+				else if (n == 0)
+					break;
+				else
+					goto try_to_reuse;
+			}
+
+		default:
+			break;
+	}
+
+	return 0;
+	
+try_to_reuse:
+	if (lws_http_transaction_completed(wsi))
+		return -1;
+
+	return 0;
+}
\ No newline at end of file
diff --git a/lib/websocket-live.c b/lib/websocket-live.c
new file mode 100644
index 000000000..4dff1d72c
--- /dev/null
+++ b/lib/websocket-live.c
@@ -0,0 +1,133 @@
+/** Live protocol of the websocket node type
+ *
+ * This protocol callback function is used to handle the binary websocket protoocol
+ * which is used to send / receive struct msg's.
+ *
+ * @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
+ * @copyright 2014-2016, Institute for Automation of Complex Power Systems, EONERC
+ *   This file is part of S2SS. All Rights Reserved. Proprietary and confidential.
+ *   Unauthorized copying of this file, via any medium is strictly prohibited.
+ *********************************************************************************/
+
+static int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
+{
+	struct conn *c = user;
+	
+	switch (reason) {
+		case LWS_CALLBACK_WSI_CREATE:
+			c->wsi = wsi;
+			return 0;
+
+		case LWS_CALLBACK_WSI_DESTROY:
+			c->wsi = NULL;
+
+			connection_destroy(c); /* release c->peer.ip & c->peer.name */
+			return 0;
+		
+		case LWS_CALLBACK_CLIENT_ESTABLISHED:
+		case LWS_CALLBACK_ESTABLISHED:
+			c->state = CONN_STATE_ESTABLISHED;
+			c->role = (reason == LWS_CALLBACK_ESTABLISHED)
+				? CONN_ROLE_SERVER
+				: CONN_ROLE_CLIENT;
+
+			/* Get path of incoming request */
+			char uri[64]
+			lws_hdr_copy(wsi, uri, sizeof(uri), WSI_TOKEN_GET_URI); /* The path component of the*/
+			if (strlen(uri) <= 0) {
+				warn("WebSocket: Closing connection with invalid URL: %s")
+				return -1;
+			}
+			
+			/* Search for node whose name matches the URI. */
+			c->node = list_lookup(&vt.instances, uri + 1);
+			if (c->node == NULL) {
+				warn("WebSocket: Closing Connection for non-existent node: %s", uri + 1);
+				return -1;
+			}
+			
+			/* Check if node is running */
+			if (c->node.state != NODE_RUNNING)
+				return -1;
+
+			/* Alias to ease readability */
+			c->ws = n->_vd;
+
+			/* Lookup peer address for debug output */
+			c->peer.name = alloc(64);
+			c->peer.ip = alloc(64);
+			lws_get_peer_addresses(wsi, lws_get_socket_fd(wsi), c->peer.name, 64, c->peer.ip, 64);
+
+			info("WebSocket: New Connection for node %s from %s (%s)", node_name(c->node), c->peer.name, c->peer.ip);
+			list_push(&c->ws->connections, c);
+
+			return 0;
+		
+		case LWS_CALLBACK_CLOSED:			
+			c->state = CONN_STATE_CLOSED;
+			
+			info("WebSocket: Connection closed for node %s from %s (%s)", node_name(c->node), c->peer.name, c->peer.ip);
+			list_remove(&c->ws->connections, c);
+
+			return 0;
+			
+		case LWS_CALLBACK_CLIENT_WRITABLE:		
+		case LWS_CALLBACK_SERVER_WRITEABLE:
+			if (c->node.state != NODE_RUNNING)
+				return -1;
+			
+			int sent, sz, remain;
+			struct queue *q = &c->tx_queue;
+		
+			pthread_mutex_lock(&q->lock);
+			
+			sz = q->tail - q->head;
+			if (len == 0)
+				goto out; /* nothing to sent at the moment */
+					
+			sent = lws_write(wsi, q->head, sz, LWS_WRITE_BINARY);
+			if (sent < 0)
+				goto out;
+				
+			/* Move unsent part to head of queue */
+			remain = sz - sent;
+			if (remain > 0)
+				memmove(q->head, q->head + sent, remain);
+			
+			/* Update queue tail */
+			q->tail = q->head + remain;
+			
+out:			pthread_mutex_unlock(&q->lock);
+
+			return (sent < 0) ? -1 : 0;
+
+		case LWS_CALLBACK_CLIENT_RECEIVE:
+		case LWS_CALLBACK_RECEIVE:
+			if (c->node.state != NODE_RUNNING)
+				return -1;
+		
+			if (!lws_frame_is_binary(wsi)) {
+				warn("WebSocket: Received non-binary frame for node %s", node_name(c->node));
+				return -1;
+			}
+			
+			int ret = 0;
+			struct queue *q = &w->rx_queue;
+
+			pthread_mutex_lock(&q->lock);
+			
+	
+			
+			memcpy(q->tail, in, len);
+			
+			q->tail += len;
+
+			pthread_cond_broadcast(&q->cond);
+out2:			pthread_mutex_unlock(&q->lock);			
+
+			return ret;
+			
+		default:
+			return 0;
+	}
+}
\ No newline at end of file
diff --git a/lib/websocket.c b/lib/websocket.c
index 58f72dbeb..ece8725ce 100644
--- a/lib/websocket.c
+++ b/lib/websocket.c
@@ -1,6 +1,4 @@
 /** Node type: Websockets (libwebsockets)
- *
- * This file implements the weboscket subtype for nodes.
  *
  * @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
  * @copyright 2014-2016, Institute for Automation of Complex Power Systems, EONERC
@@ -89,7 +87,7 @@ static int protocol_cb_http(struct lws *wsi, enum lws_callback_reasons reason, v
 
 			char *requested_uri = (char *) in;
 			
-			debug(3, "WebSocket: New HTTP request: %s", requested_uri);
+			debug(DBG_WEBSOCKET | 3, "WebSocket: New HTTP request: %s", requested_uri);
 
 			/* Handle default path */
 			if      (!strcmp(requested_uri, "/")) {
@@ -345,17 +343,17 @@ static void logger(int level, const char *msg) {
 		case LLL_ERR:  error("WebSocket: %.*s", len, msg); break;
 		case LLL_WARN:	warn("WebSocket: %.*s", len, msg); break;
 		case LLL_INFO:	info("WebSocket: %.*s", len, msg); break;
-		default:    debug(1, "WebSocket: %.*s", len, msg); break;
+		default:    debug(DBG_WEBSOCKET | 1, "WebSocket: %.*s", len, msg); break;
 	}
 }
 
 static void * server_thread(void *ctx)
 {
-	debug(3, "WebSocket: Started server thread");
+	debug(DBG_WEBSOCKET | 3, "WebSocket: Started server thread");
 	
 	while (lws_service(context, 10) >= 0);
 	
-	debug(3, "WebSocket: shutdown voluntarily");
+	debug(DBG_WEBSOCKET | 3, "WebSocket: shutdown voluntarily");
 	
 	return NULL;
 }
@@ -419,14 +417,8 @@ int websocket_open(struct node *n)
 {
 	struct websocket *w = n->_vd;
 
-	list_init(&w->connections, NULL);
-	
-	pthread_mutex_init(&w->read.mutex, NULL);
-	pthread_mutex_init(&w->write.mutex, NULL);
-	pthread_cond_init(&w->read.cond, NULL);
-	
-	/* pthread_cond_wait() expects the mutex to be already locked */
-	pthread_mutex_lock(&w->read.mutex);
+	list_init(&w->connections);
+	list_init(&w->destinations);
 	
 	return 0;
 }
@@ -439,6 +431,9 @@ int websocket_close(struct node *n)
 	
 	list_foreach(struct lws *wsi, &w->connections)
 		lws_callback_on_writable(wsi);
+	
+	/** @todo Is is safe? */
+	list_destroy(&w->connections);
 		
 	return 0;
 }
@@ -447,40 +442,27 @@ int websocket_destroy(struct node *n)
 {
 	struct websocket *w = n->_vd;
 
-	pthread_mutex_destroy(&w->read.mutex);
-	pthread_mutex_destroy(&w->write.mutex);
-	pthread_cond_destroy(&w->read.cond);
-
 	return 0;
 }
 
-int websocket_read(struct node *n, struct pool *pool, int cnt)
+int websocket_read(struct node *n, struct pool *pool, unsigned cnt)
 {
 	struct websocket *w = n->_vd;
-	
-	w->read.pool = pool;
-	w->read.cnt = cnt;
-	
-	pthread_cond_wait(&w->read.cond, &w->read.mutex);
-	
-	return 1;
+
+	/* Check for new websocket connections and more readers to queue */
+	list_foreach(struct)
+
+	return cnt;
 }
 
-int websocket_write(struct node *n, struct pool *pool, int cnt)
+int websocket_write(struct node *n, struct pool *pool, unsigned cnt)
 {
 	struct websocket *w = n->_vd;
-
-	pthread_mutex_lock(&w->write.mutex);
-
-	w->write.pool = pool;
-	w->write.cnt = cnt;
 	
 	/* Notify all active websocket connections to send new data */
 	list_foreach(struct lws *wsi, &w->connections)
 		lws_callback_on_writable(wsi);
-	
-	pthread_mutex_unlock(&w->write.mutex);
-		
+
 	return 1;
 }