1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

splitting web socket code in multiple files

This commit is contained in:
Steffen Vogel 2016-06-08 22:39:17 +02:00
parent 3e0c743d80
commit 104455e0d5
4 changed files with 318 additions and 50 deletions

View file

@ -21,20 +21,19 @@
#include "node.h"
/* Forward declarations */
struct msg;
struct libwebsocket_context;
struct websocket {
struct {
pthread_cond_t cond;
pthread_mutex_t mutex;
struct pool *pool;
size_t cnt;
} read, write;
/** Internal data per websocket node */
struct websocket {
struct list connections; /**< List of active libwebsocket connections (struct websocket_connection) */
struct list destinations; /**< List of struct lws_client_connect_info to connect to. */
int shutdown;
struct list connections; /**< List of struct libwebsockets sockets */
struct websocket_connection *writer;
};
struct websocket_connection {
struct node *node;
struct path *path;
qptr_t received;
};
/** @see node_vtable::init */
@ -53,9 +52,9 @@ int websocket_close(struct node *n);
int websocket_destroy(struct node *n);
/** @see node_vtable::read */
int websocket_read(struct node *n, struct pool *pool, int cnt);
int websocket_read(struct node *n, struct sample *smps[], unsigned cnt);
/** @see node_vtable::write */
int websocket_write(struct node *n, struct pool *pool, int cnt);
int websocket_write(struct node *n, struct sample *smps[], unsigned cnt);
#endif /* _WEBSOCKET_H_ */

154
lib/websocket-http.c Normal file
View file

@ -0,0 +1,154 @@
/** HTTP protocol of the websocket node type
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2014-2016, Institute for Automation of Complex Power Systems, EONERC
* This file is part of S2SS. All Rights Reserved. Proprietary and confidential.
* Unauthorized copying of this file, via any medium is strictly prohibited.
*********************************************************************************/
#include <libwebsockets.h>
#include <libconfig.h>
#ifdef WITH_JANSSON
#include <jansson.h>
#endif
/* Choose mime type based on the file extension */
static char * get_mimetype(const char *resource_path)
{
char *extension = strrchr(resource_path, '.');
if (extension == NULL)
return "text/plain";
else if (!strcmp(extension, ".png"))
return "image/png";
else if (!strcmp(extension, ".jpg"))
return "image/jpg";
else if (!strcmp(extension, ".gif"))
return "image/gif";
else if (!strcmp(extension, ".html"))
return "text/html";
else if (!strcmp(extension, ".css"))
return "text/css";
else if (!strcmp(extension, ".js"))
return "application/javascript";
else
return "text/plain";
}
static int protocol_cb_http(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
{
switch (reason) {
case LWS_CALLBACK_HTTP:
if (!htdocs) {
lws_return_http_status(wsi, HTTP_STATUS_SERVICE_UNAVAILABLE, NULL);
goto try_to_reuse;
}
if (len < 1) {
lws_return_http_status(wsi, HTTP_STATUS_BAD_REQUEST, NULL);
goto try_to_reuse;
}
char *requested_uri = (char *) in;
debug(DBG_WEBSOCKET | 3, "WebSocket: New HTTP request: %s", requested_uri);
/* Handle default path */
if (!strcmp(requested_uri, "/")) {
char *response = "HTTP/1.1 302 Found\r\n"
"Content-Length: 0\r\n"
"Location: /index.html\r\n"
"\r\n";
lws_write(wsi, (void *) response, strlen(response), LWS_WRITE_HTTP);
goto try_to_reuse;
}
#ifdef WITH_JANSSON
/* Return list of websocket nodes */
else if (!strcmp(requested_uri, "/nodes.json")) {
json_t *json_body = json_array();
list_foreach(struct node *n, &vt.instances) {
struct websocket *w = n->_vd;
json_t *json_node = json_pack("{ s: s, s: i, s: i, s: i, s: i }",
"name", node_name_short(n),
"connections", list_length(&w->connections),
"state", n->state,
"vectorize", n->vectorize,
"affinity", n->affinity
);
/* Add all additional fields of node here.
* This can be used for metadata */
json_object_update(json_node, config_to_json(n->cfg));
json_array_append_new(json_body, json_node);
}
char *body = json_dumps(json_body, JSON_INDENT(4));
char *header = "HTTP/1.1 200 OK\r\n"
"Connection: close\r\n"
"Content-Type: application/json\r\n"
"\r\n";
lws_write(wsi, (void *) header, strlen(header), LWS_WRITE_HTTP);
lws_write(wsi, (void *) body, strlen(body), LWS_WRITE_HTTP);
free(body);
json_decref(json_body);
return -1;
}
else if (!strcmp(requested_uri, "/config.json")) {
char *body = json_dumps(config_to_json(cfg_root), JSON_INDENT(4));
char *header = "HTTP/1.1 200 OK\r\n"
"Connection: close\r\n"
"Content-Type: application/json\r\n"
"\r\n";
lws_write(wsi, (void *) header, strlen(header), LWS_WRITE_HTTP);
lws_write(wsi, (void *) body, strlen(body), LWS_WRITE_HTTP);
free(body);
return -1;
}
#endif
else {
char path[4069];
snprintf(path, sizeof(path), "%s%s", htdocs, requested_uri);
/* refuse to serve files we don't understand */
char *mimetype = get_mimetype(path);
if (!mimetype) {
warn("HTTP: Unknown mimetype for %s", path);
lws_return_http_status(wsi, HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE, NULL);
return -1;
}
int n = lws_serve_http_file(wsi, path, mimetype, NULL, 0);
if (n < 0)
return -1;
else if (n == 0)
break;
else
goto try_to_reuse;
}
default:
break;
}
return 0;
try_to_reuse:
if (lws_http_transaction_completed(wsi))
return -1;
return 0;
}

133
lib/websocket-live.c Normal file
View file

@ -0,0 +1,133 @@
/** Live protocol of the websocket node type
*
* This protocol callback function is used to handle the binary websocket protoocol
* which is used to send / receive struct msg's.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2014-2016, Institute for Automation of Complex Power Systems, EONERC
* This file is part of S2SS. All Rights Reserved. Proprietary and confidential.
* Unauthorized copying of this file, via any medium is strictly prohibited.
*********************************************************************************/
static int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
{
struct conn *c = user;
switch (reason) {
case LWS_CALLBACK_WSI_CREATE:
c->wsi = wsi;
return 0;
case LWS_CALLBACK_WSI_DESTROY:
c->wsi = NULL;
connection_destroy(c); /* release c->peer.ip & c->peer.name */
return 0;
case LWS_CALLBACK_CLIENT_ESTABLISHED:
case LWS_CALLBACK_ESTABLISHED:
c->state = CONN_STATE_ESTABLISHED;
c->role = (reason == LWS_CALLBACK_ESTABLISHED)
? CONN_ROLE_SERVER
: CONN_ROLE_CLIENT;
/* Get path of incoming request */
char uri[64]
lws_hdr_copy(wsi, uri, sizeof(uri), WSI_TOKEN_GET_URI); /* The path component of the*/
if (strlen(uri) <= 0) {
warn("WebSocket: Closing connection with invalid URL: %s")
return -1;
}
/* Search for node whose name matches the URI. */
c->node = list_lookup(&vt.instances, uri + 1);
if (c->node == NULL) {
warn("WebSocket: Closing Connection for non-existent node: %s", uri + 1);
return -1;
}
/* Check if node is running */
if (c->node.state != NODE_RUNNING)
return -1;
/* Alias to ease readability */
c->ws = n->_vd;
/* Lookup peer address for debug output */
c->peer.name = alloc(64);
c->peer.ip = alloc(64);
lws_get_peer_addresses(wsi, lws_get_socket_fd(wsi), c->peer.name, 64, c->peer.ip, 64);
info("WebSocket: New Connection for node %s from %s (%s)", node_name(c->node), c->peer.name, c->peer.ip);
list_push(&c->ws->connections, c);
return 0;
case LWS_CALLBACK_CLOSED:
c->state = CONN_STATE_CLOSED;
info("WebSocket: Connection closed for node %s from %s (%s)", node_name(c->node), c->peer.name, c->peer.ip);
list_remove(&c->ws->connections, c);
return 0;
case LWS_CALLBACK_CLIENT_WRITABLE:
case LWS_CALLBACK_SERVER_WRITEABLE:
if (c->node.state != NODE_RUNNING)
return -1;
int sent, sz, remain;
struct queue *q = &c->tx_queue;
pthread_mutex_lock(&q->lock);
sz = q->tail - q->head;
if (len == 0)
goto out; /* nothing to sent at the moment */
sent = lws_write(wsi, q->head, sz, LWS_WRITE_BINARY);
if (sent < 0)
goto out;
/* Move unsent part to head of queue */
remain = sz - sent;
if (remain > 0)
memmove(q->head, q->head + sent, remain);
/* Update queue tail */
q->tail = q->head + remain;
out: pthread_mutex_unlock(&q->lock);
return (sent < 0) ? -1 : 0;
case LWS_CALLBACK_CLIENT_RECEIVE:
case LWS_CALLBACK_RECEIVE:
if (c->node.state != NODE_RUNNING)
return -1;
if (!lws_frame_is_binary(wsi)) {
warn("WebSocket: Received non-binary frame for node %s", node_name(c->node));
return -1;
}
int ret = 0;
struct queue *q = &w->rx_queue;
pthread_mutex_lock(&q->lock);
memcpy(q->tail, in, len);
q->tail += len;
pthread_cond_broadcast(&q->cond);
out2: pthread_mutex_unlock(&q->lock);
return ret;
default:
return 0;
}
}

View file

@ -1,6 +1,4 @@
/** Node type: Websockets (libwebsockets)
*
* This file implements the weboscket subtype for nodes.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2014-2016, Institute for Automation of Complex Power Systems, EONERC
@ -89,7 +87,7 @@ static int protocol_cb_http(struct lws *wsi, enum lws_callback_reasons reason, v
char *requested_uri = (char *) in;
debug(3, "WebSocket: New HTTP request: %s", requested_uri);
debug(DBG_WEBSOCKET | 3, "WebSocket: New HTTP request: %s", requested_uri);
/* Handle default path */
if (!strcmp(requested_uri, "/")) {
@ -345,17 +343,17 @@ static void logger(int level, const char *msg) {
case LLL_ERR: error("WebSocket: %.*s", len, msg); break;
case LLL_WARN: warn("WebSocket: %.*s", len, msg); break;
case LLL_INFO: info("WebSocket: %.*s", len, msg); break;
default: debug(1, "WebSocket: %.*s", len, msg); break;
default: debug(DBG_WEBSOCKET | 1, "WebSocket: %.*s", len, msg); break;
}
}
static void * server_thread(void *ctx)
{
debug(3, "WebSocket: Started server thread");
debug(DBG_WEBSOCKET | 3, "WebSocket: Started server thread");
while (lws_service(context, 10) >= 0);
debug(3, "WebSocket: shutdown voluntarily");
debug(DBG_WEBSOCKET | 3, "WebSocket: shutdown voluntarily");
return NULL;
}
@ -419,14 +417,8 @@ int websocket_open(struct node *n)
{
struct websocket *w = n->_vd;
list_init(&w->connections, NULL);
pthread_mutex_init(&w->read.mutex, NULL);
pthread_mutex_init(&w->write.mutex, NULL);
pthread_cond_init(&w->read.cond, NULL);
/* pthread_cond_wait() expects the mutex to be already locked */
pthread_mutex_lock(&w->read.mutex);
list_init(&w->connections);
list_init(&w->destinations);
return 0;
}
@ -439,6 +431,9 @@ int websocket_close(struct node *n)
list_foreach(struct lws *wsi, &w->connections)
lws_callback_on_writable(wsi);
/** @todo Is is safe? */
list_destroy(&w->connections);
return 0;
}
@ -447,40 +442,27 @@ int websocket_destroy(struct node *n)
{
struct websocket *w = n->_vd;
pthread_mutex_destroy(&w->read.mutex);
pthread_mutex_destroy(&w->write.mutex);
pthread_cond_destroy(&w->read.cond);
return 0;
}
int websocket_read(struct node *n, struct pool *pool, int cnt)
int websocket_read(struct node *n, struct pool *pool, unsigned cnt)
{
struct websocket *w = n->_vd;
w->read.pool = pool;
w->read.cnt = cnt;
pthread_cond_wait(&w->read.cond, &w->read.mutex);
return 1;
/* Check for new websocket connections and more readers to queue */
list_foreach(struct)
return cnt;
}
int websocket_write(struct node *n, struct pool *pool, int cnt)
int websocket_write(struct node *n, struct pool *pool, unsigned cnt)
{
struct websocket *w = n->_vd;
pthread_mutex_lock(&w->write.mutex);
w->write.pool = pool;
w->write.cnt = cnt;
/* Notify all active websocket connections to send new data */
list_foreach(struct lws *wsi, &w->connections)
lws_callback_on_writable(wsi);
pthread_mutex_unlock(&w->write.mutex);
return 1;
}