From a9d1584180d93149ddfd2e3f6349ccb2430966be Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Tue, 11 Jan 2022 09:13:51 -0500 Subject: [PATCH] websocket: improve handling of pending connections (opening/closing) --- include/villas/nodes/websocket.hpp | 3 ++- lib/nodes/websocket.cpp | 38 +++++++++++++++++------------- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/include/villas/nodes/websocket.hpp b/include/villas/nodes/websocket.hpp index f92e4f2db..1cfbdb3e3 100644 --- a/include/villas/nodes/websocket.hpp +++ b/include/villas/nodes/websocket.hpp @@ -67,7 +67,8 @@ struct websocket_connection { CONNECTING, RECONNECTING, ESTABLISHED, - SHUTDOWN, + CLOSING, + CLOSED, ERROR } state; /**< The current status of this connection. */ diff --git a/lib/nodes/websocket.cpp b/lib/nodes/websocket.cpp index 0fd066417..c65ab67ce 100644 --- a/lib/nodes/websocket.cpp +++ b/lib/nodes/websocket.cpp @@ -141,6 +141,8 @@ void websocket_connection_close(struct websocket_connection *c, struct lws *wsi, lws_close_reason(wsi, status, (unsigned char *) reason, strlen(reason)); c->node->logger->debug("Closing WebSocket connection with {}: status={}, reason={}", *c, status, reason); + + c->state = websocket_connection::State::CLOSED; } int villas::node::websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) @@ -170,9 +172,7 @@ int villas::node::websocket_protocol_cb(struct lws *wsi, enum lws_callback_reaso lws_hdr_copy(wsi, uri, sizeof(uri), WSI_TOKEN_GET_URI); /* The path component of the*/ if (strlen(uri) <= 0) { websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Invalid URL"); - logger->warn("Failed to get request URI"); - return -1; } @@ -236,9 +236,10 @@ int villas::node::websocket_protocol_cb(struct lws *wsi, enum lws_callback_reaso return -1; case LWS_CALLBACK_CLOSED: + c->state = websocket_connection::State::CLOSED; c->node->logger->debug("Closed WebSocket connection: {}", *c); - if (c->state != websocket_connection::State::SHUTDOWN) { + if (c->state != websocket_connection::State::CLOSING) { /** @todo Attempt reconnect here */ } @@ -247,7 +248,9 @@ int villas::node::websocket_protocol_cb(struct lws *wsi, enum lws_callback_reaso connections.remove(c); } - websocket_connection_destroy(c); + ret = websocket_connection_destroy(c); + if (ret) + return ret; if (c->mode == websocket_connection::Mode::CLIENT) delete c; @@ -275,7 +278,7 @@ int villas::node::websocket_protocol_cb(struct lws *wsi, enum lws_callback_reaso if (queue_available(&c->queue) > 0) lws_callback_on_writable(wsi); - else if (c->state == websocket_connection::State::SHUTDOWN) { + else if (c->state == websocket_connection::State::CLOSING) { websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_GOINGAWAY, "Node stopped"); return -1; } @@ -327,7 +330,7 @@ int villas::node::websocket_protocol_cb(struct lws *wsi, enum lws_callback_reaso c->buffers.recv->clear(); - if (c->state == websocket_connection::State::SHUTDOWN) { + if (c->state == websocket_connection::State::CLOSING) { websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_GOINGAWAY, "Node stopped"); return -1; } @@ -408,7 +411,7 @@ int villas::node::websocket_start(NodeCompat *n) /* Wait until all destinations are connected */ if (w->wait) { - unsigned connected = 0; + unsigned connected = 0, total = list_length(&w->destinations); do { { std::lock_guard guard(connections_lock); @@ -422,9 +425,11 @@ int villas::node::websocket_start(NodeCompat *n) } } - n->logger->info("Wait until all destination are connected"); - usleep(0.5e6); - } while (connected < list_length(&w->destinations)); + if (connected < total) { + n->logger->info("Wait until all destinations are connected: pending={}", total - connected); + sleep(1); + } + } while (connected < total); } return 0; @@ -443,19 +448,20 @@ int villas::node::websocket_stop(NodeCompat *n) open_connections = 0; for (auto *c : connections) { if (c->node == n) { - open_connections++; - - if (c->state != websocket_connection::State::SHUTDOWN) { - c->state = websocket_connection::State::SHUTDOWN; + if (c->state != websocket_connection::State::CLOSED) { + open_connections++; + c->state = websocket_connection::State::CLOSING; lws_callback_on_writable(c->wsi); } } } } - n->logger->info("Waiting for shutdown of {} connections...", open_connections); - usleep(0.5e6); + if (open_connections > 0) { + n->logger->info("Waiting for open connections to be closed: pending={}", open_connections); + sleep(1); + } } while (open_connections > 0); ret = queue_signalled_close(&w->queue);