diff --git a/include/villas/nodes/websocket.hpp b/include/villas/nodes/websocket.hpp index da048795a..40bcb2385 100644 --- a/include/villas/nodes/websocket.hpp +++ b/include/villas/nodes/websocket.hpp @@ -48,6 +48,8 @@ class NodeCompat; struct websocket { struct List destinations; /**< List of websocket servers connect to in client mode (struct websocket_destination). */ + bool wait; /**< Wait until all destinations are connected. */ + struct Pool pool; struct CQueueSignalled queue; /**< For samples which are received from WebSockets */ }; @@ -86,7 +88,6 @@ struct websocket_connection { villas::Buffer *send; /**< A buffer for constructing messages before calling lws_write() */ } buffers; - /** Custom formatter for spdlog */ template friend OStream &operator<<(OStream &os, const struct websocket_connection &c) diff --git a/lib/nodes/websocket.cpp b/lib/nodes/websocket.cpp index a18fa174d..b31dd4312 100644 --- a/lib/nodes/websocket.cpp +++ b/lib/nodes/websocket.cpp @@ -91,6 +91,7 @@ int websocket_connection_destroy(struct websocket_connection *c) /* Return all samples to pool */ int avail; struct Sample *smp; + while ((avail = queue_pull(&c->queue, (void **) &smp))) sample_decref(smp); @@ -103,7 +104,6 @@ int websocket_connection_destroy(struct websocket_connection *c) delete c->buffers.send; c->wsi = nullptr; - c->state = websocket_connection::State::DESTROYED; return 0; @@ -393,6 +393,27 @@ int villas::node::websocket_start(NodeCompat *n) lws_client_connect_via_info(&d->info); } + /* Wait until all destinations are connected */ + if (w->wait) { + unsigned connected = 0; + do { + { + std::lock_guard guard(connections_lock); + + connected = 0; + for (auto *c : connections) { + if (c->mode == websocket_connection::Mode::CLIENT && + c->state == websocket_connection::State::ESTABLISHED && + c->node == n) + connected++; + } + } + + n->logger->info("Wait until all destination are connected"); + usleep(0.5e6); + } while (connected < list_length(&w->destinations)); + } + return 0; } @@ -504,15 +525,22 @@ int villas::node::websocket_parse(NodeCompat *n, json_t *json) json_t *json_dests = nullptr; json_t *json_dest; json_error_t err; + int wc = -1; ret = list_init(&w->destinations); if (ret) return ret; - ret = json_unpack_ex(json, &err, 0, "{ s?: o }", "destinations", &json_dests); + ret = json_unpack_ex(json, &err, 0, "{ s?: o, s?: b }", + "destinations", &json_dests, + "wait_connected", &wc + ); if (ret) throw ConfigError(json, err, "node-config-node-websocket"); + if (wc >= 0) + w->wait = wc != 0; + if (json_dests) { if (!json_is_array(json_dests)) throw ConfigError(json_dests, err, "node-config-node-websocket-destinations", "The 'destinations' setting must be an array of URLs");