1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

websocket: improve handling of pending connections (opening/closing)

This commit is contained in:
Steffen Vogel 2022-01-11 09:13:51 -05:00
parent 06029a6fa1
commit a9d1584180
2 changed files with 24 additions and 17 deletions

View file

@ -67,7 +67,8 @@ struct websocket_connection {
CONNECTING,
RECONNECTING,
ESTABLISHED,
SHUTDOWN,
CLOSING,
CLOSED,
ERROR
} state; /**< The current status of this connection. */

View file

@ -141,6 +141,8 @@ void websocket_connection_close(struct websocket_connection *c, struct lws *wsi,
lws_close_reason(wsi, status, (unsigned char *) reason, strlen(reason));
c->node->logger->debug("Closing WebSocket connection with {}: status={}, reason={}", *c, status, reason);
c->state = websocket_connection::State::CLOSED;
}
int villas::node::websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
@ -170,9 +172,7 @@ int villas::node::websocket_protocol_cb(struct lws *wsi, enum lws_callback_reaso
lws_hdr_copy(wsi, uri, sizeof(uri), WSI_TOKEN_GET_URI); /* The path component of the*/
if (strlen(uri) <= 0) {
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Invalid URL");
logger->warn("Failed to get request URI");
return -1;
}
@ -236,9 +236,10 @@ int villas::node::websocket_protocol_cb(struct lws *wsi, enum lws_callback_reaso
return -1;
case LWS_CALLBACK_CLOSED:
c->state = websocket_connection::State::CLOSED;
c->node->logger->debug("Closed WebSocket connection: {}", *c);
if (c->state != websocket_connection::State::SHUTDOWN) {
if (c->state != websocket_connection::State::CLOSING) {
/** @todo Attempt reconnect here */
}
@ -247,7 +248,9 @@ int villas::node::websocket_protocol_cb(struct lws *wsi, enum lws_callback_reaso
connections.remove(c);
}
websocket_connection_destroy(c);
ret = websocket_connection_destroy(c);
if (ret)
return ret;
if (c->mode == websocket_connection::Mode::CLIENT)
delete c;
@ -275,7 +278,7 @@ int villas::node::websocket_protocol_cb(struct lws *wsi, enum lws_callback_reaso
if (queue_available(&c->queue) > 0)
lws_callback_on_writable(wsi);
else if (c->state == websocket_connection::State::SHUTDOWN) {
else if (c->state == websocket_connection::State::CLOSING) {
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_GOINGAWAY, "Node stopped");
return -1;
}
@ -327,7 +330,7 @@ int villas::node::websocket_protocol_cb(struct lws *wsi, enum lws_callback_reaso
c->buffers.recv->clear();
if (c->state == websocket_connection::State::SHUTDOWN) {
if (c->state == websocket_connection::State::CLOSING) {
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_GOINGAWAY, "Node stopped");
return -1;
}
@ -408,7 +411,7 @@ int villas::node::websocket_start(NodeCompat *n)
/* Wait until all destinations are connected */
if (w->wait) {
unsigned connected = 0;
unsigned connected = 0, total = list_length(&w->destinations);
do {
{
std::lock_guard guard(connections_lock);
@ -422,9 +425,11 @@ int villas::node::websocket_start(NodeCompat *n)
}
}
n->logger->info("Wait until all destination are connected");
usleep(0.5e6);
} while (connected < list_length(&w->destinations));
if (connected < total) {
n->logger->info("Wait until all destinations are connected: pending={}", total - connected);
sleep(1);
}
} while (connected < total);
}
return 0;
@ -443,19 +448,20 @@ int villas::node::websocket_stop(NodeCompat *n)
open_connections = 0;
for (auto *c : connections) {
if (c->node == n) {
open_connections++;
if (c->state != websocket_connection::State::SHUTDOWN) {
c->state = websocket_connection::State::SHUTDOWN;
if (c->state != websocket_connection::State::CLOSED) {
open_connections++;
c->state = websocket_connection::State::CLOSING;
lws_callback_on_writable(c->wsi);
}
}
}
}
n->logger->info("Waiting for shutdown of {} connections...", open_connections);
usleep(0.5e6);
if (open_connections > 0) {
n->logger->info("Waiting for open connections to be closed: pending={}", open_connections);
sleep(1);
}
} while (open_connections > 0);
ret = queue_signalled_close(&w->queue);