mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
fixes several segfaults in websocket node-type due to improper connection establishment / tear-down (see #71, #72 & #73)
This commit is contained in:
parent
f8deb04186
commit
b89b30ffe1
1 changed files with 79 additions and 81 deletions
|
@ -23,19 +23,17 @@
|
|||
/* Internal datastructures */
|
||||
struct connection {
|
||||
enum {
|
||||
WEBSOCKET_ESTABLISHED,
|
||||
WEBSOCKET_ACTIVE,
|
||||
WEBSOCKET_SHUTDOWN,
|
||||
WEBSOCKET_CLOSED
|
||||
WEBSOCKET_CONNECTION_CLOSED,
|
||||
WEBSOCKET_CONNECTION_ESTABLISHED,
|
||||
WEBSOCKET_CONNECTION_ACTIVE,
|
||||
WEBSOCKET_CONNECTION_SHUTDOWN
|
||||
} state;
|
||||
|
||||
struct node *node;
|
||||
struct path *path;
|
||||
|
||||
struct queue queue; /**< For samples which are sent to the WebSocket */
|
||||
|
||||
struct lws *wsi;
|
||||
|
||||
struct queue queue; /**< For samples which are sent to the WebSocket */
|
||||
|
||||
struct {
|
||||
char name[64];
|
||||
char ip[64];
|
||||
|
@ -60,7 +58,7 @@ static const char *ssl_cert; /**< Path to the SSL certitifcate for HTTPS / WSS
|
|||
static const char *ssl_private_key; /**< Path to the SSL private key for HTTPS / WSS */
|
||||
static const char *htdocs; /**< Path to the directory which should be served by build in HTTP server */
|
||||
|
||||
static int id = 0;
|
||||
static int id = 0; /**< Counter for Websocket node id field. See struct webmsg. */
|
||||
|
||||
struct list connections; /**< List of active libwebsocket connections which receive samples from all nodes (catch all) */
|
||||
|
||||
|
@ -75,18 +73,6 @@ static struct lws_protocols protocols[] = {
|
|||
{ NULL }
|
||||
};
|
||||
|
||||
__attribute__((unused)) static int connection_init(struct connection *c)
|
||||
{
|
||||
/** @todo */
|
||||
return -1;
|
||||
}
|
||||
|
||||
__attribute__((unused)) static void connection_destroy(struct connection *c)
|
||||
{
|
||||
if (c->_name)
|
||||
free(c->_name);
|
||||
}
|
||||
|
||||
static char * connection_name(struct connection *c)
|
||||
{
|
||||
if (!c->_name) {
|
||||
|
@ -99,6 +85,53 @@ static char * connection_name(struct connection *c)
|
|||
return c->_name;
|
||||
}
|
||||
|
||||
static int connection_init(struct connection *c, struct lws *wsi)
|
||||
{
|
||||
int ret;
|
||||
|
||||
struct websocket *w = (struct websocket *) c->node->_vd;
|
||||
|
||||
lws_get_peer_addresses(wsi, lws_get_socket_fd(wsi), c->peer.name, sizeof(c->peer.name), c->peer.ip, sizeof(c->peer.ip));
|
||||
|
||||
info("LWS: New connection %s", connection_name(c));
|
||||
|
||||
c->state = WEBSOCKET_CONNECTION_ESTABLISHED;
|
||||
c->wsi = wsi;
|
||||
|
||||
if (c->node != NULL)
|
||||
list_push(&w->connections, c);
|
||||
else
|
||||
list_push(&connections, c);
|
||||
|
||||
ret = queue_init(&c->queue, DEFAULT_QUEUELEN, &memtype_hugepage);
|
||||
if (ret) {
|
||||
warn("Failed to create queue for incoming websocket connection. Closing..");
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void connection_destroy(struct connection *c)
|
||||
{
|
||||
struct websocket *w = (struct websocket *) c->node->_vd;
|
||||
|
||||
info("LWS: Connection %s closed", connection_name(c));
|
||||
|
||||
c->state = WEBSOCKET_CONNECTION_CLOSED;
|
||||
c->wsi = NULL;
|
||||
|
||||
if (c->node)
|
||||
list_remove(&w->connections, c);
|
||||
else
|
||||
list_remove(&connections, c);
|
||||
|
||||
if (c->_name)
|
||||
free(c->_name);
|
||||
|
||||
queue_destroy(&c->queue);
|
||||
}
|
||||
|
||||
static void destination_destroy(struct destination *d)
|
||||
{
|
||||
free(d->uri);
|
||||
|
@ -112,22 +145,15 @@ static int connection_write(struct connection *c, struct sample *smps[], unsigne
|
|||
struct websocket *w = c->node->_vd;
|
||||
|
||||
switch (c->state) {
|
||||
case WEBSOCKET_SHUTDOWN:
|
||||
case WEBSOCKET_CONNECTION_CLOSED:
|
||||
case WEBSOCKET_CONNECTION_SHUTDOWN:
|
||||
return -1;
|
||||
case WEBSOCKET_CLOSED:
|
||||
if (c->node) {
|
||||
struct websocket *w = (struct websocket *) c->node->_vd;
|
||||
list_remove(&w->connections, c);
|
||||
}
|
||||
else
|
||||
list_remove(&connections, c);
|
||||
break;
|
||||
|
||||
case WEBSOCKET_ESTABLISHED:
|
||||
c->state = WEBSOCKET_ACTIVE;
|
||||
case WEBSOCKET_CONNECTION_ESTABLISHED:
|
||||
c->state = WEBSOCKET_CONNECTION_ACTIVE;
|
||||
/* fall through */
|
||||
|
||||
case WEBSOCKET_ACTIVE:
|
||||
case WEBSOCKET_CONNECTION_ACTIVE:
|
||||
blocks = pool_get_many(&w->pool, (void **) bufs, cnt);
|
||||
if (blocks != cnt)
|
||||
warn("Pool underrun in websocket connection: %s", connection_name(c));
|
||||
|
@ -365,38 +391,15 @@ static int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, v
|
|||
return -1;
|
||||
}
|
||||
|
||||
c->state = WEBSOCKET_ESTABLISHED;
|
||||
c->wsi = wsi;
|
||||
|
||||
ret = queue_init(&c->queue, DEFAULT_QUEUELEN, &memtype_hugepage);
|
||||
if (ret) {
|
||||
warn("Failed to create queue for incoming websocket connection. Closing..");
|
||||
ret = connection_init(c, wsi);
|
||||
if (ret)
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* Lookup peer address for debug output */
|
||||
lws_get_peer_addresses(wsi, lws_get_socket_fd(wsi), c->peer.name, sizeof(c->peer.name), c->peer.ip, sizeof(c->peer.ip));
|
||||
|
||||
info("LWS: New connection %s", connection_name(c));
|
||||
|
||||
if (c->node != NULL) {
|
||||
struct websocket *w = (struct websocket *) c->node->_vd;
|
||||
list_push(&w->connections, c);
|
||||
}
|
||||
else {
|
||||
list_push(&connections, c);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
case LWS_CALLBACK_CLOSED:
|
||||
info("LWS: Connection %s closed", connection_name(c));
|
||||
|
||||
c->state = WEBSOCKET_CLOSED;
|
||||
c->wsi = NULL;
|
||||
|
||||
queue_destroy(&c->queue);
|
||||
connection_destroy(c);
|
||||
|
||||
return 0;
|
||||
|
||||
|
@ -407,7 +410,7 @@ static int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, v
|
|||
if (c->node && c->node->state != NODE_RUNNING)
|
||||
return -1;
|
||||
|
||||
if (c->state == WEBSOCKET_SHUTDOWN) {
|
||||
if (c->state == WEBSOCKET_CONNECTION_SHUTDOWN) {
|
||||
lws_close_reason(wsi, LWS_CLOSE_STATUS_GOINGAWAY, (unsigned char *) "Node stopped", 4);
|
||||
return -1;
|
||||
}
|
||||
|
@ -534,9 +537,6 @@ int websocket_open(struct node *n)
|
|||
|
||||
w->id = id++;
|
||||
|
||||
list_init(&w->connections);
|
||||
list_init(&w->destinations);
|
||||
|
||||
size_t blocklen = LWS_PRE + WEBMSG_LEN(DEFAULT_VALUES);
|
||||
|
||||
ret = pool_init(&w->pool, 64 * DEFAULT_QUEUELEN, blocklen, &memtype_hugepage);
|
||||
|
@ -547,6 +547,8 @@ int websocket_open(struct node *n)
|
|||
if (ret)
|
||||
return ret;
|
||||
|
||||
/** @todo Connection to destinations via WebSocket client */
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -555,22 +557,21 @@ int websocket_close(struct node *n)
|
|||
struct websocket *w = n->_vd;
|
||||
|
||||
list_foreach(struct connection *c, &w->connections) {
|
||||
c->state = WEBSOCKET_SHUTDOWN;
|
||||
c->state = WEBSOCKET_CONNECTION_SHUTDOWN;
|
||||
lws_callback_on_writable(c->wsi);
|
||||
}
|
||||
|
||||
pool_destroy(&w->pool);
|
||||
queue_destroy(&w->queue);
|
||||
|
||||
list_destroy(&w->connections, NULL, false);
|
||||
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int websocket_destroy(struct node *n)
|
||||
{
|
||||
struct websocket *w = n->_vd;
|
||||
|
||||
|
||||
list_destroy(&w->connections, NULL, false);
|
||||
list_destroy(&w->destinations, (dtor_cb_t) destination_destroy, true);
|
||||
|
||||
return 0;
|
||||
|
@ -578,17 +579,13 @@ int websocket_destroy(struct node *n)
|
|||
|
||||
int websocket_read(struct node *n, struct sample *smps[], unsigned cnt)
|
||||
{
|
||||
int got;
|
||||
|
||||
struct websocket *w = n->_vd;
|
||||
|
||||
struct webmsg *msgs[cnt];
|
||||
|
||||
int got;
|
||||
|
||||
do {
|
||||
got = queue_pull_many(&w->queue, (void **) msgs, cnt);
|
||||
pthread_yield();
|
||||
} while (got == 0);
|
||||
|
||||
|
||||
got = queue_pull_many(&w->queue, (void **) msgs, cnt);
|
||||
for (int i = 0; i < got; i++) {
|
||||
smps[i]->sequence = msgs[i]->sequence;
|
||||
smps[i]->length = msgs[i]->length;
|
||||
|
@ -606,13 +603,11 @@ int websocket_write(struct node *n, struct sample *smps[], unsigned cnt)
|
|||
{
|
||||
struct websocket *w = n->_vd;
|
||||
|
||||
list_foreach(struct connection *c, &w->connections) {
|
||||
list_foreach(struct connection *c, &w->connections)
|
||||
connection_write(c, smps, cnt);
|
||||
}
|
||||
|
||||
list_foreach(struct connection *c, &connections) {
|
||||
list_foreach(struct connection *c, &connections)
|
||||
connection_write(c, smps, cnt);
|
||||
}
|
||||
|
||||
return cnt;
|
||||
}
|
||||
|
@ -622,7 +617,10 @@ int websocket_parse(struct node *n, config_setting_t *cfg)
|
|||
struct websocket *w = n->_vd;
|
||||
config_setting_t *cfg_dests;
|
||||
int ret;
|
||||
|
||||
|
||||
list_init(&w->connections);
|
||||
list_init(&w->destinations);
|
||||
|
||||
cfg_dests = config_setting_get_member(cfg, "destinations");
|
||||
if (cfg_dests) {
|
||||
if (!config_setting_is_array(cfg_dests))
|
||||
|
|
Loading…
Add table
Reference in a new issue