From 4e5dc58e1802720fb663ae5c76c029c11282d336 Mon Sep 17 00:00:00 2001
From: Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
Date: Mon, 24 Apr 2017 19:28:45 +0200
Subject: [PATCH] added first version of web socket client support

---
 include/villas/nodes/websocket.h |   5 +
 lib/nodes/websocket.c            | 378 +++++++++++++++++++++----------
 2 files changed, 266 insertions(+), 117 deletions(-)

diff --git a/include/villas/nodes/websocket.h b/include/villas/nodes/websocket.h
index 20467926e..fecaadc92 100644
--- a/include/villas/nodes/websocket.h
+++ b/include/villas/nodes/websocket.h
@@ -51,6 +51,11 @@ struct websocket_connection {
 		char ip[64];
 	} peer;
 	
+	enum {
+		WEBSOCKET_MODE_CLIENT,
+		WEBSOCKET_MODE_SERVER,
+	} mode;
+	
 	enum state state;
 	
 	char *_name;
diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c
index da4f34bf8..3a375c3bf 100644
--- a/lib/nodes/websocket.c
+++ b/lib/nodes/websocket.c
@@ -23,6 +23,7 @@
 
 /* Private static storage */
 static struct list connections = { .state = STATE_DESTROYED };	/**< List of active libwebsocket connections which receive samples from all nodes (catch all) */
+static struct web *web;
 
 /* Forward declarations */
 static struct plugin p;
@@ -30,10 +31,12 @@ static struct plugin p;
 static char * websocket_connection_name(struct websocket_connection *c)
 {
 	if (!c->_name) {
+		strcatf(&c->_name, "%s (%s)", c->peer.name, c->peer.ip);
+		
 		if (c->node)
-			asprintf(&c->_name, "%s (%s) for node %s", c->peer.name, c->peer.ip, node_name(c->node));
-		else
-			asprintf(&c->_name, "%s (%s) for all nodes", c->peer.name, c->peer.ip);
+			strcatf(&c->_name, " for node %s", node_name(c->node));
+		
+		strcatf(&c->_name, " in %s mode", c->mode == WEBSOCKET_MODE_CLIENT ? "client" : "server");
 	}
 	
 	return c->_name;
@@ -42,9 +45,7 @@ static char * websocket_connection_name(struct websocket_connection *c)
 static int websocket_connection_init(struct websocket_connection *c, struct lws *wsi)
 {
 	int ret;
-	
-	struct websocket *w = c->node->_vd;
-	
+
 	lws_get_peer_addresses(wsi, lws_get_socket_fd(wsi), c->peer.name, sizeof(c->peer.name), c->peer.ip, sizeof(c->peer.ip));
 
 	info("LWS: New connection %s", websocket_connection_name(c));
@@ -52,86 +53,75 @@ static int websocket_connection_init(struct websocket_connection *c, struct lws
 	c->state = STATE_INITIALIZED;
 	c->wsi = wsi;
 	
-	if (c->node != NULL)
+	if (c->node) {
+		struct websocket *w = c->node->_vd;
+
 		list_push(&w->connections, c);
+	}
 	else
 		list_push(&connections, c);
-	
-	ret = queue_init(&c->queue, DEFAULT_WEBSOCKET_QUEUELEN, &memtype_hugepage);
-	if (ret) {
-		warn("Failed to create queue for incoming websocket connection. Closing..");
-		return -1;
-	}
+
+	ret = queue_init(&c->queue, DEFAULT_QUEUELEN, &memtype_hugepage);
+	if (ret)
+		return ret;
 
 	return 0;
 }
 
-static void websocket_connection_destroy(struct websocket_connection *c)
+static int websocket_connection_destroy(struct websocket_connection *c)
 {
-	if (c->state == STATE_DESTROYED)
-		return;
+	int ret;
 
-	struct websocket *w = c->node->_vd;
+	if (c->state == STATE_DESTROYED)
+		return 0;
 
 	info("LWS: Connection %s closed", websocket_connection_name(c));
-
-	c->state = STATE_DESTROYED;
-	c->wsi = NULL;
 	
-	if (c->node)
+	if (c->node) {
+		struct websocket *w = c->node->_vd;
 		list_remove(&w->connections, c);
+	}
 	else
 		list_remove(&connections, c);
 
 	if (c->_name)
 		free(c->_name);
 	
-	queue_destroy(&c->queue);
+	ret = queue_destroy(&c->queue);
+	if (ret)
+		return ret;
+	
+	c->state = STATE_DESTROYED;
+	c->wsi = NULL;
+
+	return ret;
 }
 
 static void websocket_destination_destroy(struct websocket_destination *d)
 {
 	free(d->uri);
+	
+	free((char *) d->info.path);
+	free((char *) d->info.address);
 }
 
 static int websocket_connection_write(struct websocket_connection *c, struct sample *smps[], unsigned cnt)
 {
-	int blocks, enqueued;
-	char *bufs[cnt];
-	
-	struct websocket *w = c->node->_vd;
+	int ret;
 
 	switch (c->state) {
-		case STATE_DESTROYED:
-		case STATE_STOPPED:
-			return -1;
-
 		case STATE_INITIALIZED:
 			c->state = STATE_STARTED;
 			/* fall through */
 
 		case STATE_STARTED:
-			blocks = pool_get_many(&w->pool, (void **) bufs, cnt);
-			if (blocks != cnt)
-				warn("Pool underrun in websocket connection: %s", websocket_connection_name(c));
-
-			for (int i = 0; i < blocks; i++) {
-				struct webmsg *msg = (struct webmsg *) (bufs[i] + LWS_PRE);
-	
-				msg->version  = WEBMSG_VERSION;
-				msg->type     = WEBMSG_TYPE_DATA;
-				msg->length   = smps[i]->length;
-				msg->sequence = smps[i]->sequence;
-				msg->id       = c->node->id;
-				msg->ts.sec   = smps[i]->ts.origin.tv_sec;
-				msg->ts.nsec  = smps[i]->ts.origin.tv_nsec;
-	
-				memcpy(&msg->data, &smps[i]->data, smps[i]->length * 4);
+			for (int i = 0; i < cnt; i++) {
+				sample_get(smps[i]); /* increase reference count */
+				
+				ret = queue_push(&c->queue, (void **) smps[i]);
+				if (ret != 1)
+					warn("Queue overrun in websocket connection: %s", websocket_connection_name(c));
 			}
-
-			enqueued = queue_push_many(&c->queue, (void **) bufs, cnt);
-			if (enqueued != blocks)
-				warn("Queue overrun in websocket connection: %s", websocket_connection_name(c));
 			
 			lws_callback_on_writable(c->wsi);
 			break;
@@ -142,25 +132,50 @@ static int websocket_connection_write(struct websocket_connection *c, struct sam
 	return 0;
 }
 
+static void websocket_connection_close(struct websocket_connection *c, struct lws *wsi, enum lws_close_status status, const char *reason)
+{
+	lws_close_reason(wsi, status, (unsigned char *) reason, strlen(reason));
+
+	char *msg = strf("LWS: Closing connection");
+	
+	if (c)
+		msg = strcatf(&msg, " with %s", websocket_connection_name(c));
+	
+	msg = strcatf(&msg, ": status=%u, reason=%s", status, reason);
+
+	warn(msg);
+	
+	free(msg);
+}
+
 int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
 {
 	int ret;
 	struct websocket_connection *c = user;
-	struct websocket *w;
 	
+	struct webmsg *msg;
+	struct sample *smp;
+
 	switch (reason) {
 		case LWS_CALLBACK_CLIENT_ESTABLISHED:
+			ret = websocket_connection_init(c, wsi);
+			if (ret)
+				return -1;
+
+			return 0;
+
 		case LWS_CALLBACK_ESTABLISHED:
 			c->state = STATE_DESTROYED;
-			
+			c->mode = WEBSOCKET_MODE_SERVER;
+
 			/* Get path of incoming request */
 			char uri[64];
 			lws_hdr_copy(wsi, uri, sizeof(uri), WSI_TOKEN_GET_URI); /* The path component of the*/
 			if (strlen(uri) <= 0) {
-				warn("LWS: Closing connection with invalid URL: %s", uri);
+				websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Invalid URL");
 				return -1;
 			}
-			
+
 			if ((uri[0] == '/' && uri[1] == 0) || uri[0] == 0){
 				/* Catch all connection */
 				c->node = NULL;
@@ -171,13 +186,9 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
 				/* Search for node whose name matches the URI. */
 				c->node = list_lookup(&p.node.instances, node);
 				if (c->node == NULL) {
-					warn("LWS: Closing Connection for non-existent node: %s", uri + 1);
+					websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_POLICY_VIOLATION, "Unknown node");
 					return -1;
 				}
-				
-				/* Check if node is running */
-				if (c->node->state != STATE_STARTED)
-					return -1;
 			}
 			
 			ret = websocket_connection_init(c, wsi);
@@ -188,35 +199,60 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
 
 		case LWS_CALLBACK_CLOSED:
 			websocket_connection_destroy(c);
+			
+			if (c->mode == WEBSOCKET_MODE_CLIENT)
+				free(c);
+
 			return 0;
 			
 		case LWS_CALLBACK_CLIENT_WRITEABLE:
 		case LWS_CALLBACK_SERVER_WRITEABLE:
-			w = c->node->_vd;
-
-			if (c->node && c->node->state != STATE_STARTED)
-				return -1;
-
 			if (c->state == STATE_STOPPED) {
-				lws_close_reason(wsi, LWS_CLOSE_STATUS_GOINGAWAY, (unsigned char *) "Node stopped", 4);
+				websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_NORMAL, "Goodbye");
 				return -1;
 			}
 
-			char *buf;
-			int cnt;
-			while ((cnt = queue_pull(&c->queue, (void **) &buf))) {
-				struct webmsg *msg = (struct webmsg *) (buf + LWS_PRE);
-				
-				pool_put(&w->pool, (void *) buf);
-				
+			if (c->node && c->node->state != STATE_STARTED) {
+				websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_GOINGAWAY, "Node stopped");
+				return -1;
+			}
+
+			char *buf = NULL;
+			
+			while (queue_pull(&c->queue, (void **) &smp)) {
+				buf = realloc(buf, LWS_PRE + WEBMSG_LEN(smp->length));
+				if (!buf)
+					serror("realloc failed:");
+	
+				msg = (struct webmsg *) (buf + LWS_PRE);
+	
+				msg->version  = WEBMSG_VERSION;
+				msg->type     = WEBMSG_TYPE_DATA;
+				msg->length   = smp->length;
+				msg->sequence = smp->sequence;
+				msg->id       = smp->source->id;
+				msg->ts.sec   = smp->ts.origin.tv_sec;
+				msg->ts.nsec  = smp->ts.origin.tv_nsec;	
+
+				memcpy(&msg->data, &smp->data, SAMPLE_DATA_LEN(smp->length));
+
+				webmsg_hton(msg);
+
+				sample_put(smp);
+
 				ret = lws_write(wsi, (unsigned char *) msg, WEBMSG_LEN(msg->length), LWS_WRITE_BINARY);
-				if (ret < WEBMSG_LEN(msg->length))
-					error("Failed lws_write()");
+				if (ret < 0) {
+					warn("Failed lws_write() for connection %s", websocket_connection_name(c));
+					return -1;
+				}
 
 				if (lws_send_pipe_choked(wsi))
 					break;
 			}
+			
+			free(buf);
 
+			/* There are still samples in the queue */
 			if (queue_available(&c->queue) > 0)
 				lws_callback_on_writable(wsi);
 
@@ -224,34 +260,76 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
 
 		case LWS_CALLBACK_CLIENT_RECEIVE:
 		case LWS_CALLBACK_RECEIVE:
-			w = c->node->_vd;
-
-			if (c->node->state != STATE_STARTED)
+			if (!lws_frame_is_binary(wsi)) {
+				websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_UNACCEPTABLE_OPCODE, "Binary data expected");
 				return -1;
+			}
 
-			if (!lws_frame_is_binary(wsi) || len < WEBMSG_LEN(0))
-				warn("LWS: Received invalid packet for connection %s", websocket_connection_name(c));
+			if (len < WEBMSG_LEN(0)) {
+				websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Invalid packet");
+				return -1;
+			}
+
+			struct timespec ts_recv = time_now();
 			
-			struct webmsg *msg = (struct webmsg *) in;
-			
-			while ((char *) msg + WEBMSG_LEN(msg->length) <= (char *) in + len) {
-				struct webmsg *msg2 = pool_get(&w->pool);
-				if (!msg2) {
-					warn("Pool underrun for connection %s", websocket_connection_name(c));
+			msg = (struct webmsg *) in;
+			while ((char *) msg + WEBMSG_LEN(msg->length) < (char *) in + len) {
+				struct node *dest;
+
+				/* Convert message to host byte-order */
+				webmsg_ntoh(msg);
+
+				/* Find destination node of this message */
+				if (c->node)
+					dest = c->node;
+				else {
+					dest = NULL;
+
+					for (int i = 0; i < list_length(&p.node.instances); i++) {
+						struct node *n = list_at(&p.node.instances, i);
+					
+						if (n->id == msg->id) {
+							dest = n;
+							break;
+						}
+					}
+					
+					if (!dest) {
+						warn("Ignoring message due to invalid node id");
+						goto next;
+					}
+				}
+				
+				struct websocket *w = dest->_vd;
+
+				ret = sample_alloc(&w->pool, &smp, 1);
+				if (ret != 1) {
+					warn("Pool underrun for connection: %s", websocket_connection_name(c));
 					break;
 				}
 				
-				memcpy(msg2, msg, WEBMSG_LEN(msg->length));
+				smp->ts.origin = WEBMSG_TS(msg);
+				smp->ts.received = ts_recv;
+
+				smp->sequence  = msg->sequence;
+				smp->length    = msg->length;
+				if (smp->length > smp->capacity) {
+					smp->length = smp->capacity;
+					warn("Dropping values for connection: %s", websocket_connection_name(c));
+				}
 				
-				ret = queue_signalled_push_many(&w->queue, (void **) msg2, 1);
+				memcpy(&smp->data, &msg->data, SAMPLE_DATA_LEN(smp->length));
+				
+				ret = queue_signalled_push(&w->queue, (void **) smp);
 				if (ret != 1) {
 					warn("Queue overrun for connection %s", websocket_connection_name(c));
 					break;
 				}
-				
+
 				/* Next message */
-				msg = (struct webmsg *) ((char *) msg + WEBMSG_LEN(msg->length));
+next:				msg = (struct webmsg *) ((char *) msg + WEBMSG_LEN(msg->length));
 			}
+			
 			return 0;
 
 		default:
@@ -259,6 +337,37 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
 	}
 }
 
+int websocket_init(struct super_node *sn)
+{
+	list_init(&connections);
+	
+	web = &sn->web;
+
+	if (web->state != STATE_STARTED)
+		return -1;
+
+	return 0;
+}
+
+int websocket_deinit()
+{
+	for (size_t i = 0; i < list_length(&connections); i++) {
+		struct websocket_connection *c = list_at(&connections, i);
+
+		c->state = STATE_STOPPED;
+
+		lws_callback_on_writable(c->wsi);
+	}
+
+	/* Wait for all connections to be closed */
+	while (list_length(&connections) > 0)
+		sleep(0.2);
+
+	list_destroy(&connections, (dtor_cb_t) websocket_destination_destroy, true);
+
+	return 0;
+}
+
 int websocket_start(struct node *n)
 {
 	int ret;
@@ -274,7 +383,21 @@ int websocket_start(struct node *n)
 	if (ret)
 		return ret;
 
-	/** @todo Connection to destinations via WebSocket client */
+	for (int i = 0; i < list_length(&w->destinations); i++) {
+		struct websocket_destination *d = list_at(&w->destinations, i);
+		
+		struct websocket_connection *c = alloc(sizeof(struct websocket_connection));
+		
+		c->state = STATE_DESTROYED;
+		c->mode = WEBSOCKET_MODE_CLIENT;
+		c->node = n;
+		
+		d->info.context = web->context;
+		d->info.vhost = web->vhost;
+		d->info.userdata = c;
+		
+		lws_client_connect_via_info(&d->info);
+	}
 
 	return 0;
 }
@@ -291,15 +414,19 @@ int websocket_stop(struct node *n)
 
 		lws_callback_on_writable(c->wsi);
 	}
-	
-	ret = pool_destroy(&w->pool);
-	if (ret)
-		return ret;
+
+	/* Wait for all connections to be closed */
+	while (list_length(&w->connections) > 0)
+		sleep(1);
 
 	ret = queue_signalled_destroy(&w->queue);
 	if (ret)
 		return ret;
 	
+	ret = pool_destroy(&w->pool);
+	if (ret)
+		return ret;
+
 	return 0;
 }
 
@@ -315,46 +442,56 @@ int websocket_destroy(struct node *n)
 
 int websocket_read(struct node *n, struct sample *smps[], unsigned cnt)
 {
-	int got;
+	int avail;
 
 	struct websocket *w = n->_vd;
-	struct webmsg *msgs[cnt];
+	struct sample *cpys[cnt];
 
 	do {
-		got = queue_signalled_pull_many(&w->queue, (void **) msgs, cnt);
-		if (got < 0)
-			return got;
-	} while (got == 0);
-	
-	
-	for (int i = 0; i < got; i++) {
-		smps[i]->sequence  = msgs[i]->sequence;
-		smps[i]->length    = msgs[i]->length;
-		smps[i]->ts.origin = WEBMSG_TS(msgs[i]);
-		
-		memcpy(&smps[i]->data, &msgs[i]->data, WEBMSG_DATA_LEN(msgs[i]->length));
-	}
-	
-	pool_put_many(&w->pool, (void **) msgs, got);
+		avail = queue_signalled_pull_many(&w->queue, (void **) cpys, cnt);
+		if (avail < 0)
+			return avail;
+	} while (avail == 0);
 
-	return got;
+	for (int i = 0; i < avail; i++) {
+		sample_copy(smps[i], cpys[i]);
+		sample_put(cpys[i]);
+	}
+
+	return avail;
 }
 
 int websocket_write(struct node *n, struct sample *smps[], unsigned cnt)
 {
+	int avail;
+
 	struct websocket *w = n->_vd;
+	struct sample *cpys[cnt];
+	
+	/* Make copies of all samples */
+	avail = sample_alloc(&w->pool, cpys, cnt);
+	if (avail < cnt)
+		warn("Pool underrun for node %s: avail=%u", node_name(n), avail);
+
+	for (int i = 0; i < avail; i++) {
+		sample_copy(cpys[i], smps[i]);
+
+		cpys[i]->source = n;
+	}
 
 	for (size_t i = 0; i < list_length(&w->connections); i++) {
 		struct websocket_connection *c = list_at(&w->connections, i);
 	
-		websocket_connection_write(c, smps, cnt);
+		websocket_connection_write(c, cpys, cnt);
 	}
 	
 	for (size_t i = 0; i < list_length(&connections); i++) {
 		struct websocket_connection *c = list_at(&connections, i);
 	
-		websocket_connection_write(c, smps, cnt);
+		websocket_connection_write(c, cpys, cnt);
 	}
+	
+	sample_put_many(cpys, avail);
 
 	return cnt;
 }
@@ -381,6 +518,8 @@ int websocket_parse(struct node *n, config_setting_t *cfg)
 				cerror(cfg_dests, "The 'destinations' setting must be an array of URLs");
 		
 			struct websocket_destination d;
+			
+			memset(&d, 0, sizeof(d));
 		
 			d.uri = strdup(uri);
 			if (!d.uri)
@@ -391,10 +530,13 @@ int websocket_parse(struct node *n, config_setting_t *cfg)
 				cerror(cfg_dests, "Failed to parse websocket URI: '%s'", uri);
 		
 			d.info.ssl_connection = !strcmp(prot, "https");
-			d.info.address = ads;
-			d.info.path = path;
-			d.info.protocol = prot;
+			d.info.address = strdup(ads);
+			d.info.host    = d.info.address;
+			d.info.origin  = d.info.address;
 			d.info.ietf_version_or_minus_one = -1;
+			d.info.protocol = "live";
+			
+			asprintf((char **) &d.info.path, "/%s", path);
 		
 			list_push(&w->destinations, memdup(&d, sizeof(d)));
 		}
@@ -434,6 +576,8 @@ static struct plugin p = {
 	.node		= {
 		.vectorize	= 0, /* unlimited */
 		.size		= sizeof(struct websocket),
+		.init		= websocket_init,
+		.deinit		= websocket_deinit,
 		.start		= websocket_start,
 		.stop		= websocket_stop,
 		.destroy	= websocket_destroy,