diff --git a/include/villas/nodes/websocket.h b/include/villas/nodes/websocket.h index c8d6d95f7..650ef04a6 100644 --- a/include/villas/nodes/websocket.h +++ b/include/villas/nodes/websocket.h @@ -48,7 +48,6 @@ struct lws; /** Internal data per websocket node */ struct websocket { - struct list connections; /**< List of active libwebsocket connections in server mode (struct websocket_connection). */ struct list destinations; /**< List of websocket servers connect to in client mode (struct websocket_destination). */ struct pool pool; @@ -57,7 +56,14 @@ struct websocket { /* Internal datastructures */ struct websocket_connection { - enum state state; /**< The current status of this connection. */ + enum websocket_connection_state { + STATE_DISCONNECTED, + STATE_CONNECTING, + STATE_RECONNECTING, + STATE_ESTABLISHED, + STATE_SHUTDOWN, + STATE_ERROR + } state; /**< The current status of this connection. */ enum { WEBSOCKET_MODE_CLIENT, @@ -67,7 +73,7 @@ struct websocket_connection { struct lws *wsi; struct node *node; struct io_format *format; /**< The IO format used for this connection. */ - struct queue_signalled queue; /**< For samples which are sent to the WebSocket */ + struct queue queue; /**< For samples which are sent to the WebSocket */ union { /**< Only used in case websocket_connection::mode == WEBSOCKET_MODE_CLIENT */ @@ -80,8 +86,10 @@ struct websocket_connection { } peer; }; - char *buf; /**< A buffer which is used to construct the messages. */ - size_t buflen; /**< Length of websocket_connection::buf. */ + struct { + struct buffer recv; /**< A buffer for reconstructing fragmented messags. */ + struct buffer send; /**< A buffer for contsructing messages before calling lws_write() */ + } buffers; char *_name; }; diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c index d627da4f6..9c4ebbf72 100644 --- a/lib/nodes/websocket.c +++ b/lib/nodes/websocket.c @@ -29,6 +29,7 @@ #include "super_node.h" #include "timing.h" #include "utils.h" +#include "buffer.h" #include "plugin.h" #include "io_format.h" #include "nodes/websocket.h" @@ -43,7 +44,13 @@ static struct plugin p; static char * websocket_connection_name(struct websocket_connection *c) { if (!c->_name) { - strcatf(&c->_name, "%s (%s)", c->peer.name, c->peer.ip); + if (c->wsi) { + lws_get_peer_addresses(c->wsi, lws_get_socket_fd(c->wsi), c->peer.name, sizeof(c->peer.name), c->peer.ip, sizeof(c->peer.ip)); + + strcatf(&c->_name, "%s", c->peer.name); + } + else if (c->destination) + strcatf(&c->_name, "%s:%d", c->destination->info.address, c->destination->info.port); if (c->node) strcatf(&c->_name, " for node %s", node_name(c->node)); @@ -54,64 +61,22 @@ static char * websocket_connection_name(struct websocket_connection *c) return c->_name; } -static int websocket_connection_init(struct websocket_connection *c, struct lws *wsi) -{ - int ret; - - lws_get_peer_addresses(wsi, lws_get_socket_fd(wsi), c->peer.name, sizeof(c->peer.name), c->peer.ip, sizeof(c->peer.ip)); - - info("LWS: New connection %s", websocket_connection_name(c)); - - c->state = STATE_INITIALIZED; - c->wsi = wsi; - - /** @todo: We must find a better way to determine the buffer size */ - c->buflen = 1 << 12; - c->buf = alloc(c->buflen); - - - if (c->node) { - struct websocket *w = c->node->_vd; - - list_push(&w->connections, c); - } - else - list_push(&connections, c); - - ret = queue_signalled_init(&c->queue, DEFAULT_QUEUELEN, &memtype_hugepage); - if (ret) - return ret; - - return 0; -} - static int websocket_connection_destroy(struct websocket_connection *c) { int ret; - if (c->state == STATE_DESTROYED) - return 0; - - info("LWS: Connection %s closed", websocket_connection_name(c)); - - if (c->node) { - struct websocket *w = c->node->_vd; - list_remove(&w->connections, c); - } - else - list_remove(&connections, c); + list_remove(&connections, c); if (c->_name) free(c->_name); - ret = queue_signalled_destroy(&c->queue); + ret = queue_destroy(&c->queue); if (ret) return ret; - if (c->buf) - free(c->buf); - - c->state = STATE_DESTROYED; + buffer_destroy(&c->buffers.recv); + buffer_destroy(&c->buffers.send); + c->wsi = NULL; return ret; @@ -127,27 +92,19 @@ static void websocket_destination_destroy(struct websocket_destination *d) static int websocket_connection_write(struct websocket_connection *c, struct sample *smps[], unsigned cnt) { - int ret; - - switch (c->state) { - case STATE_INITIALIZED: - c->state = STATE_STARTED; - /* fall through */ - - case STATE_STARTED: - for (int i = 0; i < cnt; i++) { - sample_get(smps[i]); /* increase reference count */ - - ret = queue_signalled_push(&c->queue, (void **) smps[i]); - if (ret != 1) - warn("Queue overrun in websocket connection: %s", websocket_connection_name(c)); - } - - lws_callback_on_writable(c->wsi); - break; - - default: { } - } + int pushed; + + pushed = queue_push_many(&c->queue, (void **) smps, cnt); + if (pushed < cnt) + warn("Queue overrun in websocket connection: %s", websocket_connection_name(c)); + + sample_get_many(smps, cnt); + + debug(LOG_WEBSOCKET | 10, "Enqueued %u samples to %s", pushed, websocket_connection_name(c)); + + /* Client connections which are currently conecting don't have an associate c->wsi yet */ + if (c->wsi) + lws_callback_on_writable(c->wsi); return 0; } @@ -156,40 +113,33 @@ static void websocket_connection_close(struct websocket_connection *c, struct lw { lws_close_reason(wsi, status, (unsigned char *) reason, strlen(reason)); - char *msg = strf("LWS: Closing connection"); - - if (c) - msg = strcatf(&msg, " with %s", websocket_connection_name(c)); - - msg = strcatf(&msg, ": status=%u, reason=%s", status, reason); - - warn("%s", msg); - - free(msg); + debug(LOG_WEBSOCKET | 10, "Closing WebSocket connection with %s: status=%u, reason=%s", websocket_connection_name(c), status, reason); } int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { - int ret; + int ret, recvd, pulled, cnt = 128; struct websocket_connection *c = user; - + switch (reason) { case LWS_CALLBACK_CLIENT_ESTABLISHED: - ret = websocket_connection_init(c, wsi); - if (ret) - return -1; + c->wsi = wsi; + c->state = STATE_ESTABLISHED; - c->format = io_format_lookup("villas"); + buffer_init(&c->buffers.recv, 1 << 12); + buffer_init(&c->buffers.send, 1 << 12); - break; + debug(LOG_WEBSOCKET | 10, "Established WebSocket connection: %s", websocket_connection_name(c)); - case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: - warn("Failed to establish connection: %s", websocket_connection_name(c)); - + /* Schedule writable callback in case we have something to send */ + if (queue_available(&c->queue) > 0) + lws_callback_on_writable(wsi); + break; case LWS_CALLBACK_ESTABLISHED: - c->state = STATE_DESTROYED; + c->wsi = wsi; + c->state = STATE_ESTABLISHED; c->mode = WEBSOCKET_MODE_SERVER; /* We use the URI to associate this connection to a node @@ -229,134 +179,129 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi } if (!format) - format = "villas"; + format = "webmsg"; c->format = io_format_lookup(format); if (!c->format) { websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Invalid format"); return -1; } - - ret = websocket_connection_init(c, wsi); + + buffer_init(&c->buffers.recv, 1 << 12); + buffer_init(&c->buffers.send, 1 << 12); + + ret = queue_init(&c->queue, DEFAULT_QUEUELEN, &memtype_hugepage); if (ret) return -1; + list_push(&connections, c); + + debug(LOG_WEBSOCKET | 10, "Established WebSocket connection: %s", websocket_connection_name(c)); + break; + case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: + c->state = STATE_ERROR; + + warn("Failed to establish WebSocket connection: %s (%s)", websocket_connection_name(c), in ? (char *) in : "Unkown reason"); + + return -1; + case LWS_CALLBACK_CLOSED: + debug(LOG_WEBSOCKET | 10, "Closed WebSocket connection: %s", websocket_connection_name(c)); + + if (c->state != STATE_SHUTDOWN) { + /** @todo Attempt reconnect here */ + } + websocket_connection_destroy(c); if (c->mode == WEBSOCKET_MODE_CLIENT) free(c); - return 0; + break; case LWS_CALLBACK_CLIENT_WRITEABLE: case LWS_CALLBACK_SERVER_WRITEABLE: { - if (c->node && c->node->state != STATE_STARTED) { + size_t wbytes; + + struct sample **smps = alloca(cnt * sizeof(struct sample *)); + + pulled = queue_pull_many(&c->queue, (void **) smps, cnt); + if (pulled > 0) { + io_format_sprint(c->format, c->buffers.send.buf, c->buffers.send.size, &wbytes, smps, pulled, IO_FORMAT_ALL); + + ret = lws_write(wsi, (unsigned char *) c->buffers.send.buf, wbytes, c->format->flags & IO_FORMAT_BINARY ? LWS_WRITE_BINARY : LWS_WRITE_TEXT); + + sample_put_many(smps, pulled); + + debug(LOG_WEBSOCKET | 10, "Send %d samples on %s: bytes=%d", pulled, websocket_connection_name(c), ret); + } + + if (c->state == STATE_SHUTDOWN) { websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_GOINGAWAY, "Node stopped"); return -1; } - - size_t wbytes; - int cnt = 256; //c->node ? c->node->vectorize : 1; - int pulled; - - struct sample **smps = alloca(cnt * sizeof(struct sample *)); - - pulled = queue_signalled_pull_many(&c->queue, (void **) smps, cnt); - - io_format_sprint(c->format, c->buf + LWS_PRE, c->buflen - LWS_PRE, &wbytes, smps, pulled, IO_FORMAT_ALL); - - sample_put_many(smps, pulled); - - ret = lws_write(wsi, (unsigned char *) c->buf + LWS_PRE, wbytes, c->format->flags & IO_FORMAT_BINARY ? LWS_WRITE_BINARY : LWS_WRITE_TEXT); - if (ret < 0) { - warn("Failed lws_write() for connection %s", websocket_connection_name(c)); - return -1; + else { + if (queue_available(&c->queue) > 0) + lws_callback_on_writable(wsi); } - if (c->state == STATE_STOPPED) { - info("Closing connection %s", websocket_connection_name(c)); - websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_NORMAL, "Goodbye"); - return -1; - } - - if (queue_signalled_available(&c->queue) > 0) - lws_callback_on_writable(wsi); - - return 0; + break; } case LWS_CALLBACK_CLIENT_RECEIVE: - case LWS_CALLBACK_RECEIVE: { - if (c->format->flags & IO_FORMAT_BINARY && !lws_frame_is_binary(wsi)) { - websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_UNACCEPTABLE_OPCODE, "Binary data expected"); - return -1; - } - - if (len <= 0) { - websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Invalid packet"); - return -1; - } - + case LWS_CALLBACK_RECEIVE: if (!c->node) { websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Catch-all connection can not receive."); return -1; } - - struct timespec ts_recv = time_now(); - int recvd; - int cnt = 256; //c->node->vectorize; - struct websocket *w = c->node->_vd; - struct sample **smps = alloca(cnt * sizeof(struct sample *)); - - ret = sample_alloc(&w->pool, smps, cnt); - if (ret != 1) { - warn("Pool underrun for connection: %s", websocket_connection_name(c)); - break; + + if (lws_is_first_fragment(wsi)) + buffer_clear(&c->buffers.recv); + + ret = buffer_append(&c->buffers.recv, in, len); + if (ret) { + websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_UNACCEPTABLE_OPCODE, "Failed to process data"); + return -1; } + + /* We dont try to parse the frame yet, as we have to wait for the remaining fragments */ + if (lws_is_final_fragment(wsi)) { + struct timespec ts_recv = time_now(); - recvd = io_format_sscan(c->format, in, len, NULL, smps, cnt, NULL); - if (recvd < 0) { - warn("Failed to parse sample data received on connection: %s", websocket_connection_name(c)); - break; - } + struct websocket *w = c->node->_vd; + struct sample **smps = alloca(cnt * sizeof(struct sample *)); + + ret = sample_alloc(&w->pool, smps, cnt); + if (ret != cnt) { + warn("Pool underrun for connection: %s", websocket_connection_name(c)); + break; + } - struct node *dest; + recvd = io_format_sscan(c->format, c->buffers.recv.buf, c->buffers.recv.len, NULL, smps, cnt, NULL); + if (recvd < 0) { + warn("Failed to parse sample data received on connection: %s", websocket_connection_name(c)); + break; + } + + debug(LOG_WEBSOCKET | 10, "Received %d samples on %s", recvd, websocket_connection_name(c)); - for (int i = 0; i < recvd; i++) { /* Set receive timestamp */ - smps[i]->ts.received = ts_recv; + for (int i = 0; i < recvd; i++) + smps[i]->ts.received = ts_recv; - /* Find destination node of this message */ - if (c->node) - dest = c->node; - else { - dest = NULL; - - for (int i = 0; i < list_length(&p.node.instances); i++) { - struct node *n = list_at(&p.node.instances, i); - - if (n->id == smps[i]->id) { - dest = n; - break; - } - } - - if (!dest) - warn("Ignoring message due to invalid node id"); + ret = queue_signalled_push_many(&w->queue, (void **) smps, recvd); + if (ret != recvd) + warn("Queue overrun for connection %s", websocket_connection_name(c)); + + if (c->state == STATE_SHUTDOWN) { + websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_GOINGAWAY, "Node stopped"); + return -1; } } - ret = queue_signalled_push_many(&w->queue, (void **) smps, recvd); - if (ret != 1) { - warn("Queue overrun for connection %s", websocket_connection_name(c)); - break; - } - - return 0; - } + break; default: break; @@ -382,19 +327,17 @@ int websocket_deinit() for (size_t i = 0; i < list_length(&connections); i++) { struct websocket_connection *c = list_at(&connections, i); - c->state = STATE_STOPPED; + c->state = STATE_SHUTDOWN; lws_callback_on_writable(c->wsi); } /* Wait for all connections to be closed */ while (list_length(&connections) > 0) { - info("LWS: Waiting for connection shutdown"); - sched_yield(); - usleep(0.1 * 1e6); + info("Waiting for WebSocket connection shutdown"); + sleep(1); } - list_destroy(&connections, (dtor_cb_t) websocket_destination_destroy, true); return 0; @@ -415,17 +358,24 @@ int websocket_start(struct node *n) for (int i = 0; i < list_length(&w->destinations); i++) { struct websocket_destination *d = list_at(&w->destinations, i); - struct websocket_connection *c = alloc(sizeof(struct websocket_connection)); - c->state = STATE_DESTROYED; + c->state = STATE_CONNECTING; c->mode = WEBSOCKET_MODE_CLIENT; c->node = n; c->destination = d; + c->format = io_format_lookup("webmsg"); /** @todo We could parse the format from the URI */ + d->info.context = web->context; d->info.vhost = web->vhost; d->info.userdata = c; + + ret = queue_init(&c->queue, DEFAULT_QUEUELEN, &memtype_hugepage); + if (ret) + return -1; + + list_push(&connections, c); lws_client_connect_via_info(&d->info); } @@ -437,22 +387,38 @@ int websocket_stop(struct node *n) { int ret; struct websocket *w = n->_vd; + + /* Wait for all connections to be closed */ + for (;;) { + int connecting = 0; + + for (int i = 0; i < list_length(&w->destinations); i++) { + struct websocket_destination *d = list_at(&w->destinations, i); + struct websocket_connection *c = d->info.userdata; + + if (c->state == STATE_CONNECTING) + connecting++; + } + + if (connecting == 0) + break; + + debug(LOG_WEBSOCKET | 10, "Waiting for %d client connections to be established", connecting); + sleep(1); + } + + for (size_t i = 0; i < list_length(&connections); i++) { + struct websocket_connection *c = list_at(&connections, i); + + if (c->node != n) + continue; - for (size_t i = 0; i < list_length(&w->connections); i++) { - struct websocket_connection *c = list_at(&w->connections, i); - - c->state = STATE_STOPPED; + if (c->state != STATE_CONNECTING) + c->state = STATE_SHUTDOWN; lws_callback_on_writable(c->wsi); } - /* Wait for all connections to be closed */ - while (list_length(&w->connections) > 0) { - info("LWS: Waiting for connection shutdown"); - sched_yield(); - usleep(0.1 * 1e6); - } - ret = queue_signalled_destroy(&w->queue); if (ret) return ret; @@ -468,7 +434,6 @@ int websocket_destroy(struct node *n) { struct websocket *w = n->_vd; - list_destroy(&w->connections, NULL, false); list_destroy(&w->destinations, (dtor_cb_t) websocket_destination_destroy, true); return 0; @@ -481,11 +446,9 @@ int websocket_read(struct node *n, struct sample *smps[], unsigned cnt) struct websocket *w = n->_vd; struct sample *cpys[cnt]; - do { - avail = queue_signalled_pull_many(&w->queue, (void **) cpys, cnt); - if (avail < 0) - return avail; - } while (avail == 0); + avail = queue_signalled_pull_many(&w->queue, (void **) cpys, cnt); + if (avail < 0) + return avail; for (int i = 0; i < avail; i++) { sample_copy(smps[i], cpys[i]); @@ -511,18 +474,14 @@ int websocket_write(struct node *n, struct sample *smps[], unsigned cnt) sample_copy(cpys[i], smps[i]); cpys[i]->source = n; - } - - for (size_t i = 0; i < list_length(&w->connections); i++) { - struct websocket_connection *c = list_at(&w->connections, i); - - websocket_connection_write(c, cpys, cnt); + cpys[i]->id = n->id; } for (size_t i = 0; i < list_length(&connections); i++) { struct websocket_connection *c = list_at(&connections, i); - - websocket_connection_write(c, cpys, cnt); + + if (c->node == n || c->node == NULL) + websocket_connection_write(c, cpys, cnt); } sample_put_many(cpys, avail); @@ -540,7 +499,6 @@ int websocket_parse(struct node *n, json_t *cfg) json_t *cfg_dest; json_error_t err; - list_init(&w->connections); list_init(&w->destinations); ret = json_unpack_ex(cfg, &err, 0, "{ s?: o }", "destinations", &cfg_dests); @@ -568,13 +526,12 @@ int websocket_parse(struct node *n, json_t *cfg) d->info.ssl_connection = !strcmp(prot, "https"); d->info.address = strdup(ads); + d->info.path = strdup(path); d->info.host = d->info.address; d->info.origin = d->info.address; d->info.ietf_version_or_minus_one = -1; d->info.protocol = "live"; - ret = asprintf((char **) &d->info.path, "/%s", path); - list_push(&w->destinations, d); } } @@ -593,7 +550,7 @@ char * websocket_print(struct node *n) for (size_t i = 0; i < list_length(&w->destinations); i++) { struct websocket_destination *d = list_at(&w->destinations, i); - buf = strcatf(&buf, "%s://%s:%d%s ", + buf = strcatf(&buf, "%s://%s:%d/%s ", d->info.ssl_connection ? "wss" : "ws", d->info.address, d->info.port,