1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-16 00:00:02 +01:00

websocket: use std::list for connections

This commit is contained in:
Steffen Vogel 2022-01-11 07:38:19 -05:00
parent 7d6a1031eb
commit c6fe0201da

View file

@ -41,7 +41,8 @@ using namespace villas::utils;
#define DEFAULT_WEBSOCKET_BUFFER_SIZE (1 << 12)
/* Private static storage */
static struct List connections; /**< List of active libwebsocket connections which receive samples from all nodes (catch all) */
static std::list<struct websocket_connection *> connections; /**< List of active libwebsocket connections which receive samples from all nodes (catch all) */
static std::mutex connections_lock;
static villas::node::Web *web;
static villas::Logger logger = logging.get("websocket");
@ -114,7 +115,7 @@ int websocket_connection_write(struct websocket_connection *c, struct Sample * c
{
int pushed;
if (c->state != websocket_connection::State::INITIALIZED)
if (c->state != websocket_connection::State::ESTABLISHED)
return -1;
pushed = queue_push_many(&c->queue, (void **) smps, cnt);
@ -128,6 +129,8 @@ int websocket_connection_write(struct websocket_connection *c, struct Sample * c
/* Client connections which are currently connecting don't have an associate c->wsi yet */
if (c->wsi)
web->callbackOnWritable(c->wsi);
else
c->node->logger->warn("No WSI for conn?");
return 0;
}
@ -148,9 +151,6 @@ int villas::node::websocket_protocol_cb(struct lws *wsi, enum lws_callback_reaso
switch (reason) {
case LWS_CALLBACK_CLIENT_ESTABLISHED:
case LWS_CALLBACK_ESTABLISHED:
c->wsi = wsi;
c->state = websocket_connection::State::ESTABLISHED;
if (reason == LWS_CALLBACK_CLIENT_ESTABLISHED)
c->mode = websocket_connection::Mode::CLIENT;
else {
@ -217,10 +217,15 @@ int villas::node::websocket_protocol_cb(struct lws *wsi, enum lws_callback_reaso
return -1;
}
list_push(&connections, c);
c->wsi = wsi;
c->state = websocket_connection::State::ESTABLISHED;
c->node->logger->info("Established WebSocket connection: {}", *c);
{
std::lock_guard guard(connections_lock);
connections.push_back(c);
}
break;
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
@ -237,11 +242,12 @@ int villas::node::websocket_protocol_cb(struct lws *wsi, enum lws_callback_reaso
/** @todo Attempt reconnect here */
}
if (connections.state == State::INITIALIZED)
list_remove_all(&connections, c);
{
std::lock_guard guard(connections_lock);
connections.remove(c);
}
if (c->state == websocket_connection::State::INITIALIZED)
websocket_connection_destroy(c);
websocket_connection_destroy(c);
if (c->mode == websocket_connection::Mode::CLIENT)
delete c;
@ -338,12 +344,6 @@ int villas::node::websocket_protocol_cb(struct lws *wsi, enum lws_callback_reaso
int villas::node::websocket_type_start(villas::node::SuperNode *sn)
{
int ret;
ret = list_init(&connections);
if (ret)
return ret;
web = sn->getWeb();
if (!web->isEnabled())
return -1;
@ -432,45 +432,36 @@ int villas::node::websocket_start(NodeCompat *n)
int villas::node::websocket_stop(NodeCompat *n)
{
int ret, open_connections = 0;
int ret;
auto *w = n->getData<struct websocket>();
for (size_t i = 0; i < list_length(&connections); i++) {
struct websocket_connection *c = (struct websocket_connection *) list_at(&connections, i);
unsigned open_connections;
do {
{
std::lock_guard guard(connections_lock);
if (c->node != n)
continue;
open_connections = 0;
for (auto *c : connections) {
if (c->node == n) {
open_connections++;
c->state = websocket_connection::State::SHUTDOWN;
if (c->state != websocket_connection::State::SHUTDOWN) {
c->state = websocket_connection::State::SHUTDOWN;
lws_callback_on_writable(c->wsi);
}
lws_callback_on_writable(c->wsi);
}
}
}
}
/* Count open connections belonging to this node */
for (size_t i = 0; i < list_length(&connections); i++) {
struct websocket_connection *c = (struct websocket_connection *) list_at(&connections, i);
if (c->node == n)
open_connections++;
}
if (open_connections > 0) {
n->logger->info("Waiting for shutdown of {} connections...", open_connections);
sleep(1);
}
usleep(0.5e6);
} while (open_connections > 0);
ret = queue_signalled_close(&w->queue);
if (ret)
return ret;
ret = queue_signalled_destroy(&w->queue);
if (ret)
return ret;
ret = pool_destroy(&w->pool);
if (ret)
return ret;
return 0;
}
@ -479,6 +470,14 @@ int villas::node::websocket_destroy(NodeCompat *n)
auto *w = n->getData<struct websocket>();
int ret;
ret = queue_signalled_destroy(&w->queue);
if (ret)
return ret;
ret = pool_destroy(&w->pool);
if (ret)
return ret;
ret = list_destroy(&w->destinations, (dtor_cb_t) websocket_destination_destroy, true);
if (ret)
return ret;
@ -517,11 +516,12 @@ int villas::node::websocket_write(NodeCompat *n, struct Sample * const smps[], u
sample_copy_many(cpys, smps, avail);
for (size_t i = 0; i < list_length(&connections); i++) {
struct websocket_connection *c = (struct websocket_connection *) list_at(&connections, i);
if (c->node == n)
websocket_connection_write(c, cpys, cnt);
{
std::lock_guard guard(connections_lock);
for (auto *c : connections) {
if (c->node == n)
websocket_connection_write(c, cpys, cnt);
}
}
sample_decref_many(cpys, avail);