From fab53b0302fda003333e5affe5609eee4168e2f6 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Tue, 8 Nov 2016 00:24:57 -0500 Subject: [PATCH] further steps to implement websocket client functionality --- include/villas/nodes/websocket.h | 29 +--- lib/nodes/websocket.c | 269 +++++++++++++++++++------------ 2 files changed, 173 insertions(+), 125 deletions(-) diff --git a/include/villas/nodes/websocket.h b/include/villas/nodes/websocket.h index 9d4679d51..3fb1bef79 100644 --- a/include/villas/nodes/websocket.h +++ b/include/villas/nodes/websocket.h @@ -19,6 +19,8 @@ #ifndef _WEBSOCKET_H_ #define _WEBSOCKET_H_ +#include + #include "node.h" #include "pool.h" #include "queue.h" @@ -28,38 +30,15 @@ struct lws; /** Internal data per websocket node */ struct websocket { - struct list connections; /**< List of active libwebsocket connections in server mode (struct websocket_connection) */ - - struct list destinations; /**< List of struct lws_client_connect_info to connect to in client mode. */ + struct list connections; /**< List of active libwebsocket connections in server mode (struct websocket_connection). */ + struct list destinations; /**< List of websocket servers connect to in client mode (struct websocket_destination). */ struct pool pool; - struct queue queue; /**< For samples which are received from WebSockets a */ int id; /**< The index of this node */ }; -struct websocket_connection { - enum { - WEBSOCKET_ESTABLISHED, - WEBSOCKET_ACTIVE, - WEBSOCKET_SHUTDOWN, - WEBSOCKET_CLOSED - } state; - - struct node *node; - struct path *path; - - struct queue queue; /**< For samples which are sent to the WebSocket */ - - struct lws *wsi; - - struct { - char name[64]; - char ip[64]; - } peer; -}; - /** @see node_vtable::init */ int websocket_init(int argc, char * argv[], config_setting_t *cfg); diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c index 901d1c8ce..1809b4761 100644 --- a/lib/nodes/websocket.c +++ b/lib/nodes/websocket.c @@ -12,7 +12,6 @@ #include #include -#include #include #include "nodes/websocket.h" @@ -23,6 +22,35 @@ #include "cfg.h" #include "config.h" +/* Internal datastructures */ +struct connection { + enum { + WEBSOCKET_ESTABLISHED, + WEBSOCKET_ACTIVE, + WEBSOCKET_SHUTDOWN, + WEBSOCKET_CLOSED + } state; + + struct node *node; + struct path *path; + + struct queue queue; /**< For samples which are sent to the WebSocket */ + + struct lws *wsi; + + struct { + char name[64]; + char ip[64]; + } peer; + + char *_name; +}; + +struct destination { + char *uri; + struct lws_client_connect_info info; +}; + /* Private static storage */ static config_setting_t *cfg_root; /**< Root config */ static pthread_t thread; /**< All nodes are served by a single websocket server. This server is running in a dedicated thread. */ @@ -45,10 +73,93 @@ static int protocol_cb_live(struct lws *, enum lws_callback_reasons, void *, voi static struct lws_protocols protocols[] = { { "http-only", protocol_cb_http, 0, 0 }, - { "live", protocol_cb_live, sizeof(struct websocket_connection), 0 }, + { "live", protocol_cb_live, sizeof(struct connection), 0 }, { NULL } }; +__attribute__((unused)) static int connection_init(struct connection *c) +{ + /** @todo */ + return -1; +} + +__attribute__((unused)) static void connection_destroy(struct connection *c) +{ + if (c->_name) + free(c->_name); +} + +static char * connection_name(struct connection *c) +{ + if (!c->_name) { + if (c->node) + asprintf(&c->_name, "%s (%s) for node %s", c->peer.name, c->peer.ip, node_name(c->node)); + else + asprintf(&c->_name, "%s (%s) for all nodes", c->peer.name, c->peer.ip); + } + + return c->_name; +} + +static void destination_destroy(struct destination *d) +{ + free(d->uri); +} + +static int connection_write(struct connection *c, struct sample *smps[], unsigned cnt) +{ + int blocks, enqueued; + char *bufs[cnt]; + + struct websocket *w = c->node->_vd; + + switch (c->state) { + case WEBSOCKET_SHUTDOWN: + return -1; + case WEBSOCKET_CLOSED: + if (c->node) { + struct websocket *w = (struct websocket *) c->node->_vd; + list_remove(&w->connections, c); + } + else + list_remove(&connections, c); + break; + + case WEBSOCKET_ESTABLISHED: + c->state = WEBSOCKET_ACTIVE; + /* fall through */ + + case WEBSOCKET_ACTIVE: + blocks = pool_get_many(&w->pool, (void **) bufs, cnt); + if (blocks != cnt) + warn("Pool underrun in websocket connection: %s", connection_name(c)); + + for (int i = 0; i < blocks; i++) { + struct webmsg *msg = (struct webmsg *) (bufs[i] + LWS_PRE); + + msg->version = WEBMSG_VERSION; + msg->type = WEBMSG_TYPE_DATA; + msg->endian = WEBMSG_ENDIAN_HOST; + msg->length = smps[i]->length; + msg->sequence = smps[i]->sequence; + msg->id = w->id; + msg->ts.sec = smps[i]->ts.origin.tv_sec; + msg->ts.nsec = smps[i]->ts.origin.tv_nsec; + + memcpy(&msg->data, &smps[i]->data, smps[i]->length * 4); + } + + enqueued = queue_push_many(&c->queue, (void **) bufs, cnt); + if (enqueued != blocks) + warn("Queue overrun in websocket connection: %s", connection_name(c)); + + lws_callback_on_writable(c->wsi); + break; + } + + return 0; +} + static void logger(int level, const char *msg) { int len = strlen(msg); if (strchr(msg, '\n')) @@ -102,7 +213,7 @@ static char * get_mimetype(const char *resource_path) return "text/plain"; } -int protocol_cb_http(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) +static int protocol_cb_http(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { switch (reason) { case LWS_CALLBACK_HTTP: @@ -220,10 +331,10 @@ try_to_reuse: return 0; } -int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) +static int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { int ret; - struct websocket_connection *c = user; + struct connection *c = user; struct websocket *w; switch (reason) { @@ -268,10 +379,7 @@ int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, void *us /* Lookup peer address for debug output */ lws_get_peer_addresses(wsi, lws_get_socket_fd(wsi), c->peer.name, sizeof(c->peer.name), c->peer.ip, sizeof(c->peer.ip)); - if (c->node != NULL) - info("LWS: New connection for node %s from %s (%s)", node_name(c->node), c->peer.name, c->peer.ip); - else - info("LWS: New connection from %s (%s)", c->peer.name, c->peer.ip); + info("LWS: New connection %s", connection_name(c)); if (c->node != NULL) { struct websocket *w = (struct websocket *) c->node->_vd; @@ -285,10 +393,7 @@ int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, void *us } case LWS_CALLBACK_CLOSED: - if (c->node != NULL) - info("LWS: Connection closed for node %s from %s (%s)", node_name(c->node), c->peer.name, c->peer.ip); - else - info("LWS: Connection closed from %s (%s)", c->peer.name, c->peer.ip); + info("LWS: Connection %s closed", connection_name(c)); c->state = WEBSOCKET_CLOSED; c->wsi = NULL; @@ -304,7 +409,7 @@ int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, void *us if (c->node && c->node->state != NODE_RUNNING) return -1; - if (w->state == WEBSOCKET_SHUTDOWN) { + if (c->state == WEBSOCKET_SHUTDOWN) { lws_close_reason(wsi, LWS_CLOSE_STATUS_GOINGAWAY, (unsigned char *) "Node stopped", 4); return -1; } @@ -338,14 +443,14 @@ int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, void *us return -1; if (!lws_frame_is_binary(wsi) || len < WEBMSG_LEN(0)) - warn("LWS: Received invalid packet for node: %s", node_name(c->node)); + warn("LWS: Received invalid packet for connection %s", connection_name(c)); struct webmsg *msg = (struct webmsg *) in; while ((char *) msg + WEBMSG_LEN(msg->length) <= (char *) in + len) { struct webmsg *msg2 = pool_get(&w->pool); if (!msg2) { - warn("Pool underrun for node: %s", node_name(c->node)); + warn("Pool underrun for connection %s", connection_name(c)); break; } @@ -353,7 +458,7 @@ int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, void *us ret = queue_push(&w->queue, msg2); if (ret != 1) { - warn("Queue overrun for node: %s", node_name(c->node)); + warn("Queue overrun for connection %s", connection_name(c)); break; } @@ -451,7 +556,7 @@ int websocket_close(struct node *n) { struct websocket *w = n->_vd; - list_foreach(struct websocket_connection *c, &w->connections) { + list_foreach(struct connection *c, &w->connections) { c->state = WEBSOCKET_SHUTDOWN; lws_callback_on_writable(c->wsi); } @@ -466,7 +571,9 @@ int websocket_close(struct node *n) int websocket_destroy(struct node *n) { -// struct websocket *w = n->_vd; + struct websocket *w = n->_vd; + + list_destroy(&w->destinations, (dtor_cb_t) destination_destroy, true); return 0; } @@ -497,85 +604,16 @@ int websocket_read(struct node *n, struct sample *smps[], unsigned cnt) return got; } -static int websocket_connection_init(struct websocket_connection *c) -{ - /** @todo */ - return -1; -} - -static void websocket_connection_destroy(struct websocket_connection *c) -{ - /** @todo */ -} - -static char * websocket_connection_name(struct websocket_connection *c) -{ - /** @todo */ - return "(todo)"; -} - -static int websocket_connection_write(struct websocket_connection *c, struct sample *smps[], unsigned cnt) -{ - int blocks, enqueued; - char *bufs[cnt]; - - switch (c->state) { - case WEBSOCKET_SHUTDOWN: - return -1; - case WEBSOCKET_CLOSED: - if (c->node) { - struct websocket *w = (struct websocket *) c->node->_vd; - list_remove(&w->connections, c); - } - else - list_remove(&connections, c); - break; - - case WEBSOCKET_ESTABLISHED: - c->state = WEBSOCKET_ACTIVE; - /* fall through */ - - case WEBSOCKET_ACTIVE: - blocks = pool_get_many(&w->pool, (void **) bufs, cnt); - if (blocks != cnt) - warn("Pool underrun in websocket connection: %s", websocket_connection_name(c)); - - for (int i = 0; i < blocks; i++) { - struct webmsg *msg = (struct webmsg *) (bufs[i] + LWS_PRE); - - msg->version = WEBMSG_VERSION; - msg->type = WEBMSG_TYPE_DATA; - msg->endian = WEBMSG_ENDIAN_HOST; - msg->length = smps[i]->length; - msg->sequence = smps[i]->sequence; - msg->id = w->id; - msg->ts.sec = smps[i]->ts.origin.tv_sec; - msg->ts.nsec = smps[i]->ts.origin.tv_nsec; - - memcpy(&msg->data, &smps[i]->data, smps[i]->length * 4); - } - - enqueued = queue_push_many(&c->queue, (void **) bufs, cnt); - if (enqueued != blocks) - warn("Queue overrun in websocket connection: %s", websocket_connection_name(c)); - - lws_callback_on_writable(c->wsi); - break; - } - - return 0; -} - int websocket_write(struct node *n, struct sample *smps[], unsigned cnt) { struct websocket *w = n->_vd; - list_foreach(struct websocket_connection *c, &w->connections) { - websocket_write_connection(c, smps, cnt); + list_foreach(struct connection *c, &w->connections) { + connection_write(c, smps, cnt); } - list_foreach(struct websocket_connection *c, &connections) { - websocket_write_connection(c, smps, cnt); + list_foreach(struct connection *c, &connections) { + connection_write(c, smps, cnt); } return cnt; @@ -583,30 +621,61 @@ int websocket_write(struct node *n, struct sample *smps[], unsigned cnt) int websocket_parse(struct node *n, config_setting_t *cfg) { + struct websocket *w = n->_vd; config_setting_t *cfg_dests; + int ret; cfg_dests = config_setting_get_member(cfg, "destinations"); if (!config_setting_is_array(cfg_dests)) - cerror(dests, "The 'destinations' setting must be an array of URLs"); + cerror(cfg_dests, "The 'destinations' setting must be an array of URLs"); for (int i = 0; i < config_setting_length(cfg_dests); i++) { - config_setting_t *cfg_dest; - const char *url; - struct lws_client_connect_info *i; + struct destination *d; + const char *uri, *prot, *ads, *path; - url = config_setting_get_string_elem(cfg_dests, i); - if (!url) - cerror(dests, "The 'destinations' setting must be an array of URLs"); + uri = config_setting_get_string_elem(cfg_dests, i); + if (!uri) + cerror(cfg_dests, "The 'destinations' setting must be an array of URLs"); - i = alloc(sizeof()) + d = alloc(sizeof(struct destination)); + d->uri = strdup(uri); + if (!d->uri) + serror("Failed to allocate memory"); + + ret = lws_parse_uri(d->uri, &prot, &ads, &d->info.port, &path); + if (ret) + cerror(cfg_dests, "Failed to parse websocket URI: '%s'", uri); + + d->info.ssl_connection = !strcmp(prot, "https"); + d->info.address = ads; + d->info.path = path; + d->info.protocol = prot; + d->info.ietf_version_or_minus_one = -1; + + list_push(&w->destinations, d); } + + return 0; } char * websocket_print(struct node *n) { + struct websocket *w = n->_vd; + + char *buf = NULL; + list_foreach(struct lws_client_connect_info *in, &w->destinations) { + buf = strcatf(&buf, "%s://%s:%d/%s", + in->ssl_connection ? "https" : "http", + in->address, + in->port, + in->path + ); + } + + return buf; } static struct node_type vt = {