1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

websocket: add new option to wait until all client connections are established

This commit is contained in:
Steffen Vogel 2022-01-11 07:33:43 -05:00
parent 2ee4bc7589
commit c170ac096f
2 changed files with 32 additions and 3 deletions

View file

@ -48,6 +48,8 @@ class NodeCompat;
struct websocket {
struct List destinations; /**< List of websocket servers connect to in client mode (struct websocket_destination). */
bool wait; /**< Wait until all destinations are connected. */
struct Pool pool;
struct CQueueSignalled queue; /**< For samples which are received from WebSockets */
};
@ -86,7 +88,6 @@ struct websocket_connection {
villas::Buffer *send; /**< A buffer for constructing messages before calling lws_write() */
} buffers;
/** Custom formatter for spdlog */
template<typename OStream>
friend OStream &operator<<(OStream &os, const struct websocket_connection &c)

View file

@ -91,6 +91,7 @@ int websocket_connection_destroy(struct websocket_connection *c)
/* Return all samples to pool */
int avail;
struct Sample *smp;
while ((avail = queue_pull(&c->queue, (void **) &smp)))
sample_decref(smp);
@ -103,7 +104,6 @@ int websocket_connection_destroy(struct websocket_connection *c)
delete c->buffers.send;
c->wsi = nullptr;
c->state = websocket_connection::State::DESTROYED;
return 0;
@ -393,6 +393,27 @@ int villas::node::websocket_start(NodeCompat *n)
lws_client_connect_via_info(&d->info);
}
/* Wait until all destinations are connected */
if (w->wait) {
unsigned connected = 0;
do {
{
std::lock_guard guard(connections_lock);
connected = 0;
for (auto *c : connections) {
if (c->mode == websocket_connection::Mode::CLIENT &&
c->state == websocket_connection::State::ESTABLISHED &&
c->node == n)
connected++;
}
}
n->logger->info("Wait until all destination are connected");
usleep(0.5e6);
} while (connected < list_length(&w->destinations));
}
return 0;
}
@ -504,15 +525,22 @@ int villas::node::websocket_parse(NodeCompat *n, json_t *json)
json_t *json_dests = nullptr;
json_t *json_dest;
json_error_t err;
int wc = -1;
ret = list_init(&w->destinations);
if (ret)
return ret;
ret = json_unpack_ex(json, &err, 0, "{ s?: o }", "destinations", &json_dests);
ret = json_unpack_ex(json, &err, 0, "{ s?: o, s?: b }",
"destinations", &json_dests,
"wait_connected", &wc
);
if (ret)
throw ConfigError(json, err, "node-config-node-websocket");
if (wc >= 0)
w->wait = wc != 0;
if (json_dests) {
if (!json_is_array(json_dests))
throw ConfigError(json_dests, err, "node-config-node-websocket-destinations", "The 'destinations' setting must be an array of URLs");