diff --git a/include/websocket.h b/include/websocket.h index b2e7ed9bf..d60fd7ee3 100644 --- a/include/websocket.h +++ b/include/websocket.h @@ -21,20 +21,19 @@ #include "node.h" -/* Forward declarations */ -struct msg; -struct libwebsocket_context; - -struct websocket { - struct { - pthread_cond_t cond; - pthread_mutex_t mutex; - struct pool *pool; - size_t cnt; - } read, write; +/** Internal data per websocket node */ +struct websocket { + struct list connections; /**< List of active libwebsocket connections (struct websocket_connection) */ + struct list destinations; /**< List of struct lws_client_connect_info to connect to. */ - int shutdown; - struct list connections; /**< List of struct libwebsockets sockets */ + struct websocket_connection *writer; +}; + +struct websocket_connection { + struct node *node; + struct path *path; + + qptr_t received; }; /** @see node_vtable::init */ @@ -53,9 +52,9 @@ int websocket_close(struct node *n); int websocket_destroy(struct node *n); /** @see node_vtable::read */ -int websocket_read(struct node *n, struct pool *pool, int cnt); +int websocket_read(struct node *n, struct sample *smps[], unsigned cnt); /** @see node_vtable::write */ -int websocket_write(struct node *n, struct pool *pool, int cnt); +int websocket_write(struct node *n, struct sample *smps[], unsigned cnt); #endif /* _WEBSOCKET_H_ */ \ No newline at end of file diff --git a/lib/websocket-http.c b/lib/websocket-http.c new file mode 100644 index 000000000..bdb8782c5 --- /dev/null +++ b/lib/websocket-http.c @@ -0,0 +1,154 @@ +/** HTTP protocol of the websocket node type + * + * @author Steffen Vogel + * @copyright 2014-2016, Institute for Automation of Complex Power Systems, EONERC + * This file is part of S2SS. All Rights Reserved. Proprietary and confidential. + * Unauthorized copying of this file, via any medium is strictly prohibited. + *********************************************************************************/ + +#include +#include + +#ifdef WITH_JANSSON + #include +#endif + +/* Choose mime type based on the file extension */ +static char * get_mimetype(const char *resource_path) +{ + char *extension = strrchr(resource_path, '.'); + + if (extension == NULL) + return "text/plain"; + else if (!strcmp(extension, ".png")) + return "image/png"; + else if (!strcmp(extension, ".jpg")) + return "image/jpg"; + else if (!strcmp(extension, ".gif")) + return "image/gif"; + else if (!strcmp(extension, ".html")) + return "text/html"; + else if (!strcmp(extension, ".css")) + return "text/css"; + else if (!strcmp(extension, ".js")) + return "application/javascript"; + else + return "text/plain"; +} + +static int protocol_cb_http(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) +{ + switch (reason) { + case LWS_CALLBACK_HTTP: + if (!htdocs) { + lws_return_http_status(wsi, HTTP_STATUS_SERVICE_UNAVAILABLE, NULL); + goto try_to_reuse; + } + + if (len < 1) { + lws_return_http_status(wsi, HTTP_STATUS_BAD_REQUEST, NULL); + goto try_to_reuse; + } + + char *requested_uri = (char *) in; + + debug(DBG_WEBSOCKET | 3, "WebSocket: New HTTP request: %s", requested_uri); + + /* Handle default path */ + if (!strcmp(requested_uri, "/")) { + char *response = "HTTP/1.1 302 Found\r\n" + "Content-Length: 0\r\n" + "Location: /index.html\r\n" + "\r\n"; + + lws_write(wsi, (void *) response, strlen(response), LWS_WRITE_HTTP); + + goto try_to_reuse; + } +#ifdef WITH_JANSSON + /* Return list of websocket nodes */ + else if (!strcmp(requested_uri, "/nodes.json")) { + json_t *json_body = json_array(); + + list_foreach(struct node *n, &vt.instances) { + struct websocket *w = n->_vd; + + json_t *json_node = json_pack("{ s: s, s: i, s: i, s: i, s: i }", + "name", node_name_short(n), + "connections", list_length(&w->connections), + "state", n->state, + "vectorize", n->vectorize, + "affinity", n->affinity + ); + + /* Add all additional fields of node here. + * This can be used for metadata */ + json_object_update(json_node, config_to_json(n->cfg)); + + json_array_append_new(json_body, json_node); + } + + char *body = json_dumps(json_body, JSON_INDENT(4)); + + char *header = "HTTP/1.1 200 OK\r\n" + "Connection: close\r\n" + "Content-Type: application/json\r\n" + "\r\n"; + + lws_write(wsi, (void *) header, strlen(header), LWS_WRITE_HTTP); + lws_write(wsi, (void *) body, strlen(body), LWS_WRITE_HTTP); + + free(body); + json_decref(json_body); + + return -1; + } + else if (!strcmp(requested_uri, "/config.json")) { + char *body = json_dumps(config_to_json(cfg_root), JSON_INDENT(4)); + + char *header = "HTTP/1.1 200 OK\r\n" + "Connection: close\r\n" + "Content-Type: application/json\r\n" + "\r\n"; + + lws_write(wsi, (void *) header, strlen(header), LWS_WRITE_HTTP); + lws_write(wsi, (void *) body, strlen(body), LWS_WRITE_HTTP); + + free(body); + + return -1; + } +#endif + else { + char path[4069]; + snprintf(path, sizeof(path), "%s%s", htdocs, requested_uri); + + /* refuse to serve files we don't understand */ + char *mimetype = get_mimetype(path); + if (!mimetype) { + warn("HTTP: Unknown mimetype for %s", path); + lws_return_http_status(wsi, HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE, NULL); + return -1; + } + + int n = lws_serve_http_file(wsi, path, mimetype, NULL, 0); + if (n < 0) + return -1; + else if (n == 0) + break; + else + goto try_to_reuse; + } + + default: + break; + } + + return 0; + +try_to_reuse: + if (lws_http_transaction_completed(wsi)) + return -1; + + return 0; +} \ No newline at end of file diff --git a/lib/websocket-live.c b/lib/websocket-live.c new file mode 100644 index 000000000..4dff1d72c --- /dev/null +++ b/lib/websocket-live.c @@ -0,0 +1,133 @@ +/** Live protocol of the websocket node type + * + * This protocol callback function is used to handle the binary websocket protoocol + * which is used to send / receive struct msg's. + * + * @author Steffen Vogel + * @copyright 2014-2016, Institute for Automation of Complex Power Systems, EONERC + * This file is part of S2SS. All Rights Reserved. Proprietary and confidential. + * Unauthorized copying of this file, via any medium is strictly prohibited. + *********************************************************************************/ + +static int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) +{ + struct conn *c = user; + + switch (reason) { + case LWS_CALLBACK_WSI_CREATE: + c->wsi = wsi; + return 0; + + case LWS_CALLBACK_WSI_DESTROY: + c->wsi = NULL; + + connection_destroy(c); /* release c->peer.ip & c->peer.name */ + return 0; + + case LWS_CALLBACK_CLIENT_ESTABLISHED: + case LWS_CALLBACK_ESTABLISHED: + c->state = CONN_STATE_ESTABLISHED; + c->role = (reason == LWS_CALLBACK_ESTABLISHED) + ? CONN_ROLE_SERVER + : CONN_ROLE_CLIENT; + + /* Get path of incoming request */ + char uri[64] + lws_hdr_copy(wsi, uri, sizeof(uri), WSI_TOKEN_GET_URI); /* The path component of the*/ + if (strlen(uri) <= 0) { + warn("WebSocket: Closing connection with invalid URL: %s") + return -1; + } + + /* Search for node whose name matches the URI. */ + c->node = list_lookup(&vt.instances, uri + 1); + if (c->node == NULL) { + warn("WebSocket: Closing Connection for non-existent node: %s", uri + 1); + return -1; + } + + /* Check if node is running */ + if (c->node.state != NODE_RUNNING) + return -1; + + /* Alias to ease readability */ + c->ws = n->_vd; + + /* Lookup peer address for debug output */ + c->peer.name = alloc(64); + c->peer.ip = alloc(64); + lws_get_peer_addresses(wsi, lws_get_socket_fd(wsi), c->peer.name, 64, c->peer.ip, 64); + + info("WebSocket: New Connection for node %s from %s (%s)", node_name(c->node), c->peer.name, c->peer.ip); + list_push(&c->ws->connections, c); + + return 0; + + case LWS_CALLBACK_CLOSED: + c->state = CONN_STATE_CLOSED; + + info("WebSocket: Connection closed for node %s from %s (%s)", node_name(c->node), c->peer.name, c->peer.ip); + list_remove(&c->ws->connections, c); + + return 0; + + case LWS_CALLBACK_CLIENT_WRITABLE: + case LWS_CALLBACK_SERVER_WRITEABLE: + if (c->node.state != NODE_RUNNING) + return -1; + + int sent, sz, remain; + struct queue *q = &c->tx_queue; + + pthread_mutex_lock(&q->lock); + + sz = q->tail - q->head; + if (len == 0) + goto out; /* nothing to sent at the moment */ + + sent = lws_write(wsi, q->head, sz, LWS_WRITE_BINARY); + if (sent < 0) + goto out; + + /* Move unsent part to head of queue */ + remain = sz - sent; + if (remain > 0) + memmove(q->head, q->head + sent, remain); + + /* Update queue tail */ + q->tail = q->head + remain; + +out: pthread_mutex_unlock(&q->lock); + + return (sent < 0) ? -1 : 0; + + case LWS_CALLBACK_CLIENT_RECEIVE: + case LWS_CALLBACK_RECEIVE: + if (c->node.state != NODE_RUNNING) + return -1; + + if (!lws_frame_is_binary(wsi)) { + warn("WebSocket: Received non-binary frame for node %s", node_name(c->node)); + return -1; + } + + int ret = 0; + struct queue *q = &w->rx_queue; + + pthread_mutex_lock(&q->lock); + + + + memcpy(q->tail, in, len); + + q->tail += len; + + pthread_cond_broadcast(&q->cond); +out2: pthread_mutex_unlock(&q->lock); + + return ret; + + default: + return 0; + } +} \ No newline at end of file diff --git a/lib/websocket.c b/lib/websocket.c index 58f72dbeb..ece8725ce 100644 --- a/lib/websocket.c +++ b/lib/websocket.c @@ -1,6 +1,4 @@ /** Node type: Websockets (libwebsockets) - * - * This file implements the weboscket subtype for nodes. * * @author Steffen Vogel * @copyright 2014-2016, Institute for Automation of Complex Power Systems, EONERC @@ -89,7 +87,7 @@ static int protocol_cb_http(struct lws *wsi, enum lws_callback_reasons reason, v char *requested_uri = (char *) in; - debug(3, "WebSocket: New HTTP request: %s", requested_uri); + debug(DBG_WEBSOCKET | 3, "WebSocket: New HTTP request: %s", requested_uri); /* Handle default path */ if (!strcmp(requested_uri, "/")) { @@ -345,17 +343,17 @@ static void logger(int level, const char *msg) { case LLL_ERR: error("WebSocket: %.*s", len, msg); break; case LLL_WARN: warn("WebSocket: %.*s", len, msg); break; case LLL_INFO: info("WebSocket: %.*s", len, msg); break; - default: debug(1, "WebSocket: %.*s", len, msg); break; + default: debug(DBG_WEBSOCKET | 1, "WebSocket: %.*s", len, msg); break; } } static void * server_thread(void *ctx) { - debug(3, "WebSocket: Started server thread"); + debug(DBG_WEBSOCKET | 3, "WebSocket: Started server thread"); while (lws_service(context, 10) >= 0); - debug(3, "WebSocket: shutdown voluntarily"); + debug(DBG_WEBSOCKET | 3, "WebSocket: shutdown voluntarily"); return NULL; } @@ -419,14 +417,8 @@ int websocket_open(struct node *n) { struct websocket *w = n->_vd; - list_init(&w->connections, NULL); - - pthread_mutex_init(&w->read.mutex, NULL); - pthread_mutex_init(&w->write.mutex, NULL); - pthread_cond_init(&w->read.cond, NULL); - - /* pthread_cond_wait() expects the mutex to be already locked */ - pthread_mutex_lock(&w->read.mutex); + list_init(&w->connections); + list_init(&w->destinations); return 0; } @@ -439,6 +431,9 @@ int websocket_close(struct node *n) list_foreach(struct lws *wsi, &w->connections) lws_callback_on_writable(wsi); + + /** @todo Is is safe? */ + list_destroy(&w->connections); return 0; } @@ -447,40 +442,27 @@ int websocket_destroy(struct node *n) { struct websocket *w = n->_vd; - pthread_mutex_destroy(&w->read.mutex); - pthread_mutex_destroy(&w->write.mutex); - pthread_cond_destroy(&w->read.cond); - return 0; } -int websocket_read(struct node *n, struct pool *pool, int cnt) +int websocket_read(struct node *n, struct pool *pool, unsigned cnt) { struct websocket *w = n->_vd; - - w->read.pool = pool; - w->read.cnt = cnt; - - pthread_cond_wait(&w->read.cond, &w->read.mutex); - - return 1; + + /* Check for new websocket connections and more readers to queue */ + list_foreach(struct) + + return cnt; } -int websocket_write(struct node *n, struct pool *pool, int cnt) +int websocket_write(struct node *n, struct pool *pool, unsigned cnt) { struct websocket *w = n->_vd; - - pthread_mutex_lock(&w->write.mutex); - - w->write.pool = pool; - w->write.cnt = cnt; /* Notify all active websocket connections to send new data */ list_foreach(struct lws *wsi, &w->connections) lws_callback_on_writable(wsi); - - pthread_mutex_unlock(&w->write.mutex); - + return 1; }