diff --git a/lib/nodes/websocket.cpp b/lib/nodes/websocket.cpp index 0f8babedc..0fd066417 100644 --- a/lib/nodes/websocket.cpp +++ b/lib/nodes/websocket.cpp @@ -41,7 +41,8 @@ using namespace villas::utils; #define DEFAULT_WEBSOCKET_BUFFER_SIZE (1 << 12) /* Private static storage */ -static struct List connections; /**< List of active libwebsocket connections which receive samples from all nodes (catch all) */ +static std::list connections; /**< List of active libwebsocket connections which receive samples from all nodes (catch all) */ +static std::mutex connections_lock; static villas::node::Web *web; static villas::Logger logger = logging.get("websocket"); @@ -114,7 +115,7 @@ int websocket_connection_write(struct websocket_connection *c, struct Sample * c { int pushed; - if (c->state != websocket_connection::State::INITIALIZED) + if (c->state != websocket_connection::State::ESTABLISHED) return -1; pushed = queue_push_many(&c->queue, (void **) smps, cnt); @@ -128,6 +129,8 @@ int websocket_connection_write(struct websocket_connection *c, struct Sample * c /* Client connections which are currently connecting don't have an associate c->wsi yet */ if (c->wsi) web->callbackOnWritable(c->wsi); + else + c->node->logger->warn("No WSI for conn?"); return 0; } @@ -148,9 +151,6 @@ int villas::node::websocket_protocol_cb(struct lws *wsi, enum lws_callback_reaso switch (reason) { case LWS_CALLBACK_CLIENT_ESTABLISHED: case LWS_CALLBACK_ESTABLISHED: - c->wsi = wsi; - c->state = websocket_connection::State::ESTABLISHED; - if (reason == LWS_CALLBACK_CLIENT_ESTABLISHED) c->mode = websocket_connection::Mode::CLIENT; else { @@ -217,10 +217,15 @@ int villas::node::websocket_protocol_cb(struct lws *wsi, enum lws_callback_reaso return -1; } - list_push(&connections, c); - + c->wsi = wsi; + c->state = websocket_connection::State::ESTABLISHED; c->node->logger->info("Established WebSocket connection: {}", *c); + { + std::lock_guard guard(connections_lock); + connections.push_back(c); + } + break; case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: @@ -237,11 +242,12 @@ int villas::node::websocket_protocol_cb(struct lws *wsi, enum lws_callback_reaso /** @todo Attempt reconnect here */ } - if (connections.state == State::INITIALIZED) - list_remove_all(&connections, c); + { + std::lock_guard guard(connections_lock); + connections.remove(c); + } - if (c->state == websocket_connection::State::INITIALIZED) - websocket_connection_destroy(c); + websocket_connection_destroy(c); if (c->mode == websocket_connection::Mode::CLIENT) delete c; @@ -338,12 +344,6 @@ int villas::node::websocket_protocol_cb(struct lws *wsi, enum lws_callback_reaso int villas::node::websocket_type_start(villas::node::SuperNode *sn) { - int ret; - - ret = list_init(&connections); - if (ret) - return ret; - web = sn->getWeb(); if (!web->isEnabled()) return -1; @@ -432,45 +432,36 @@ int villas::node::websocket_start(NodeCompat *n) int villas::node::websocket_stop(NodeCompat *n) { - int ret, open_connections = 0; + int ret; auto *w = n->getData(); - for (size_t i = 0; i < list_length(&connections); i++) { - struct websocket_connection *c = (struct websocket_connection *) list_at(&connections, i); + unsigned open_connections; + do { + { + std::lock_guard guard(connections_lock); - if (c->node != n) - continue; + open_connections = 0; + for (auto *c : connections) { + if (c->node == n) { + open_connections++; - c->state = websocket_connection::State::SHUTDOWN; + if (c->state != websocket_connection::State::SHUTDOWN) { + c->state = websocket_connection::State::SHUTDOWN; - lws_callback_on_writable(c->wsi); - } + lws_callback_on_writable(c->wsi); + } + } + } + } - /* Count open connections belonging to this node */ - for (size_t i = 0; i < list_length(&connections); i++) { - struct websocket_connection *c = (struct websocket_connection *) list_at(&connections, i); - - if (c->node == n) - open_connections++; - } - - if (open_connections > 0) { n->logger->info("Waiting for shutdown of {} connections...", open_connections); - sleep(1); - } + usleep(0.5e6); + } while (open_connections > 0); ret = queue_signalled_close(&w->queue); if (ret) return ret; - ret = queue_signalled_destroy(&w->queue); - if (ret) - return ret; - - ret = pool_destroy(&w->pool); - if (ret) - return ret; - return 0; } @@ -479,6 +470,14 @@ int villas::node::websocket_destroy(NodeCompat *n) auto *w = n->getData(); int ret; + ret = queue_signalled_destroy(&w->queue); + if (ret) + return ret; + + ret = pool_destroy(&w->pool); + if (ret) + return ret; + ret = list_destroy(&w->destinations, (dtor_cb_t) websocket_destination_destroy, true); if (ret) return ret; @@ -517,11 +516,12 @@ int villas::node::websocket_write(NodeCompat *n, struct Sample * const smps[], u sample_copy_many(cpys, smps, avail); - for (size_t i = 0; i < list_length(&connections); i++) { - struct websocket_connection *c = (struct websocket_connection *) list_at(&connections, i); - - if (c->node == n) - websocket_connection_write(c, cpys, cnt); + { + std::lock_guard guard(connections_lock); + for (auto *c : connections) { + if (c->node == n) + websocket_connection_write(c, cpys, cnt); + } } sample_decref_many(cpys, avail);