mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-16 00:00:02 +01:00
websocket: fix connection state handling
This commit is contained in:
parent
afd7561966
commit
355831d8a9
2 changed files with 135 additions and 152 deletions
|
@ -58,12 +58,13 @@ struct websocket {
|
|||
/* Internal datastructures */
|
||||
struct websocket_connection {
|
||||
enum websocket_connection_state {
|
||||
STATE_DISCONNECTED,
|
||||
STATE_CONNECTING,
|
||||
STATE_RECONNECTING,
|
||||
STATE_ESTABLISHED,
|
||||
STATE_SHUTDOWN,
|
||||
STATE_ERROR
|
||||
WEBSOCKET_CONNECTION_STATE_DESTROYED,
|
||||
WEBSOCKET_CONNECTION_STATE_INITIALIZED,
|
||||
WEBSOCKET_CONNECTION_STATE_CONNECTING,
|
||||
WEBSOCKET_CONNECTION_STATE_RECONNECTING,
|
||||
WEBSOCKET_CONNECTION_STATE_ESTABLISHED,
|
||||
WEBSOCKET_CONNECTION_STATE_SHUTDOWN,
|
||||
WEBSOCKET_CONNECTION_STATE_ERROR
|
||||
} state; /**< The current status of this connection. */
|
||||
|
||||
enum {
|
||||
|
@ -76,6 +77,7 @@ struct websocket_connection {
|
|||
struct io io;
|
||||
struct queue queue; /**< For samples which are sent to the WebSocket */
|
||||
|
||||
struct format_type *format;
|
||||
struct websocket_destination *destination;
|
||||
|
||||
struct {
|
||||
|
|
|
@ -55,8 +55,8 @@ static char * websocket_connection_name(struct websocket_connection *c)
|
|||
|
||||
strcatf(&c->_name, "remote.ip=%s, remote.name=%s", ip, name);
|
||||
}
|
||||
else if (c->destination)
|
||||
strcatf(&c->_name, "%s:%d", c->destination->info.address, c->destination->info.port);
|
||||
else if (c->mode == WEBSOCKET_MODE_CLIENT && c->destination != NULL)
|
||||
strcatf(&c->_name, "dest=%s:%d", c->destination->info.address, c->destination->info.port);
|
||||
|
||||
if (c->node)
|
||||
strcatf(&c->_name, ", node=%s", node_name(c->node));
|
||||
|
@ -75,10 +75,79 @@ static void websocket_destination_destroy(struct websocket_destination *d)
|
|||
free((char *) d->info.address);
|
||||
}
|
||||
|
||||
static int websocket_connection_init(struct websocket_connection *c)
|
||||
{
|
||||
int ret;
|
||||
|
||||
c->_name = NULL;
|
||||
|
||||
ret = queue_init(&c->queue, DEFAULT_QUEUELEN, &memtype_hugepage);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
ret = io_init(&c->io, c->format, c->node, SAMPLE_HAS_ALL);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
ret = buffer_init(&c->buffers.recv, DEFAULT_WEBSOCKET_BUFFER_SIZE);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
ret = buffer_init(&c->buffers.send, DEFAULT_WEBSOCKET_BUFFER_SIZE);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
c->state = WEBSOCKET_CONNECTION_STATE_INITIALIZED;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int websocket_connection_destroy(struct websocket_connection *c)
|
||||
{
|
||||
int ret;
|
||||
|
||||
assert(c->state != WEBSOCKET_CONNECTION_STATE_DESTROYED);
|
||||
|
||||
if (c->_name)
|
||||
free(c->_name);
|
||||
|
||||
/* Return all samples to pool */
|
||||
int avail;
|
||||
struct sample *smp;
|
||||
while ((avail = queue_pull(&c->queue, (void **) &smp)))
|
||||
sample_put(smp);
|
||||
|
||||
ret = queue_destroy(&c->queue);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
ret = io_destroy(&c->io);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
ret = buffer_destroy(&c->buffers.recv);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
ret = buffer_destroy(&c->buffers.send);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
c->wsi = NULL;
|
||||
c->_name = NULL;
|
||||
|
||||
c->state = WEBSOCKET_CONNECTION_STATE_DESTROYED;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int websocket_connection_write(struct websocket_connection *c, struct sample *smps[], unsigned cnt)
|
||||
{
|
||||
int pushed;
|
||||
|
||||
if (c->state != WEBSOCKET_CONNECTION_STATE_ESTABLISHED)
|
||||
return -1;
|
||||
|
||||
pushed = queue_push_many(&c->queue, (void **) smps, cnt);
|
||||
if (pushed < cnt)
|
||||
warn("Queue overrun in WebSocket connection: %s", websocket_connection_name(c));
|
||||
|
@ -108,95 +177,69 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
|
|||
|
||||
switch (reason) {
|
||||
case LWS_CALLBACK_CLIENT_ESTABLISHED:
|
||||
c->wsi = wsi;
|
||||
c->state = STATE_ESTABLISHED;
|
||||
|
||||
ret = buffer_init(&c->buffers.recv, DEFAULT_WEBSOCKET_BUFFER_SIZE);
|
||||
if (ret)
|
||||
return -1;
|
||||
|
||||
ret = buffer_init(&c->buffers.send, DEFAULT_WEBSOCKET_BUFFER_SIZE);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
debug(LOG_WEBSOCKET | 10, "Established WebSocket connection: %s", websocket_connection_name(c));
|
||||
|
||||
/* Schedule writable callback in case we have something to send */
|
||||
if (queue_available(&c->queue) > 0)
|
||||
lws_callback_on_writable(wsi);
|
||||
|
||||
break;
|
||||
|
||||
case LWS_CALLBACK_ESTABLISHED:
|
||||
c->wsi = wsi;
|
||||
c->state = STATE_ESTABLISHED;
|
||||
c->mode = WEBSOCKET_MODE_SERVER;
|
||||
c->_name = NULL;
|
||||
c->state = WEBSOCKET_CONNECTION_STATE_ESTABLISHED;
|
||||
|
||||
/* We use the URI to associate this connection to a node
|
||||
* and choose a protocol.
|
||||
*
|
||||
* Example: ws://example.com/node_1.json
|
||||
* Will select the node with the name 'node_1'
|
||||
* and format 'json'.
|
||||
*/
|
||||
if (reason == LWS_CALLBACK_CLIENT_ESTABLISHED)
|
||||
c->mode = WEBSOCKET_MODE_CLIENT;
|
||||
else {
|
||||
c->mode = WEBSOCKET_MODE_SERVER;
|
||||
/* We use the URI to associate this connection to a node
|
||||
* and choose a protocol.
|
||||
*
|
||||
* Example: ws://example.com/node_1.json
|
||||
* Will select the node with the name 'node_1'
|
||||
* and format 'json'.
|
||||
*/
|
||||
|
||||
/* Get path of incoming request */
|
||||
char *node, *format = NULL;
|
||||
char uri[64];
|
||||
/* Get path of incoming request */
|
||||
char *node, *format;
|
||||
char uri[64];
|
||||
|
||||
lws_hdr_copy(wsi, uri, sizeof(uri), WSI_TOKEN_GET_URI); /* The path component of the*/
|
||||
if (strlen(uri) <= 0) {
|
||||
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Invalid URL");
|
||||
return -1;
|
||||
lws_hdr_copy(wsi, uri, sizeof(uri), WSI_TOKEN_GET_URI); /* The path component of the*/
|
||||
if (strlen(uri) <= 0) {
|
||||
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Invalid URL");
|
||||
return -1;
|
||||
}
|
||||
|
||||
node = strtok(uri, "/.");
|
||||
if (!node) {
|
||||
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_POLICY_VIOLATION, "Unknown node");
|
||||
return -1;
|
||||
}
|
||||
|
||||
format = strtok(NULL, "");
|
||||
if (!format)
|
||||
format = "villas.web";
|
||||
|
||||
/* Search for node whose name matches the URI. */
|
||||
c->node = list_lookup(&p.node.instances, node);
|
||||
if (!c->node) {
|
||||
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_POLICY_VIOLATION, "Unknown node");
|
||||
return -1;
|
||||
}
|
||||
|
||||
c->format = format_type_lookup(format);
|
||||
if (!c->format) {
|
||||
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_POLICY_VIOLATION, "Unknown format");
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
node = strtok(uri, "/.");
|
||||
if (!node) {
|
||||
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_POLICY_VIOLATION, "Unknown node");
|
||||
ret = websocket_connection_init(c);
|
||||
if (ret) {
|
||||
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_POLICY_VIOLATION, "Internal error");
|
||||
return -1;
|
||||
}
|
||||
|
||||
format = strtok(NULL, "");
|
||||
if (!format)
|
||||
format = "villas.web";
|
||||
|
||||
/* Search for node whose name matches the URI. */
|
||||
c->node = list_lookup(&p.node.instances, node);
|
||||
if (!c->node) {
|
||||
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_POLICY_VIOLATION, "Unknown node");
|
||||
return -1;
|
||||
}
|
||||
|
||||
struct format_type *fmt = format_type_lookup(format);
|
||||
if (!fmt) {
|
||||
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Invalid format");
|
||||
return -1;
|
||||
}
|
||||
|
||||
ret = io_init(&c->io, fmt, c->node, SAMPLE_HAS_ALL);
|
||||
if (ret)
|
||||
return -1;
|
||||
|
||||
ret = buffer_init(&c->buffers.recv, DEFAULT_WEBSOCKET_BUFFER_SIZE);
|
||||
if (ret)
|
||||
return -1;
|
||||
|
||||
ret = buffer_init(&c->buffers.send, DEFAULT_WEBSOCKET_BUFFER_SIZE);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
ret = queue_init(&c->queue, DEFAULT_QUEUELEN, &memtype_hugepage);
|
||||
if (ret)
|
||||
return -1;
|
||||
|
||||
list_push(&connections, c);
|
||||
|
||||
debug(LOG_WEBSOCKET | 10, "Established WebSocket connection: %s", websocket_connection_name(c));
|
||||
break;
|
||||
|
||||
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
|
||||
c->state = STATE_ERROR;
|
||||
c->state = WEBSOCKET_CONNECTION_STATE_ERROR;
|
||||
|
||||
warn("Failed to establish WebSocket connection: %s, reason=%s", websocket_connection_name(c), in ? (char *) in : "unkown");
|
||||
|
||||
|
@ -205,40 +248,14 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
|
|||
case LWS_CALLBACK_CLOSED:
|
||||
debug(LOG_WEBSOCKET | 10, "Closed WebSocket connection: %s", websocket_connection_name(c));
|
||||
|
||||
if (c->state != STATE_SHUTDOWN) {
|
||||
if (c->state != WEBSOCKET_CONNECTION_STATE_SHUTDOWN) {
|
||||
/** @todo Attempt reconnect here */
|
||||
}
|
||||
|
||||
if (connections.state == STATE_INITIALIZED)
|
||||
list_remove(&connections, c);
|
||||
|
||||
if (c->_name)
|
||||
free(c->_name);
|
||||
|
||||
/* Return all samples to pool */
|
||||
int avail;
|
||||
struct sample *smp;
|
||||
while ((avail = queue_pull(&c->queue, (void **) &smp)))
|
||||
sample_put(smp);
|
||||
|
||||
/* Destroy queue */
|
||||
ret = queue_destroy(&c->queue);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
ret = io_destroy(&c->io);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
ret = buffer_destroy(&c->buffers.recv);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
ret = buffer_destroy(&c->buffers.send);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
c->wsi = NULL;
|
||||
websocket_connection_destroy(c);
|
||||
|
||||
if (c->mode == WEBSOCKET_MODE_CLIENT)
|
||||
free(c);
|
||||
|
@ -247,7 +264,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
|
|||
|
||||
case LWS_CALLBACK_CLIENT_WRITEABLE:
|
||||
case LWS_CALLBACK_SERVER_WRITEABLE:
|
||||
if (c->state == STATE_SHUTDOWN) {
|
||||
if (c->state == WEBSOCKET_CONNECTION_STATE_SHUTDOWN) {
|
||||
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_GOINGAWAY, "Node stopped");
|
||||
return -1;
|
||||
}
|
||||
|
@ -323,7 +340,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
|
|||
|
||||
buffer_clear(&c->buffers.recv);
|
||||
|
||||
if (c->state == STATE_SHUTDOWN) {
|
||||
if (c->state == WEBSOCKET_CONNECTION_STATE_SHUTDOWN) {
|
||||
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_GOINGAWAY, "Node stopped");
|
||||
return -1;
|
||||
}
|
||||
|
@ -357,7 +374,7 @@ int websocket_deinit()
|
|||
for (size_t i = 0; i < list_length(&connections); i++) {
|
||||
struct websocket_connection *c = (struct websocket_connection *) list_at(&connections, i);
|
||||
|
||||
c->state = STATE_SHUTDOWN;
|
||||
c->state = WEBSOCKET_CONNECTION_STATE_SHUTDOWN;
|
||||
|
||||
lws_callback_on_writable(c->wsi);
|
||||
}
|
||||
|
@ -392,32 +409,16 @@ int websocket_start(struct node *n)
|
|||
struct websocket_destination *d = (struct websocket_destination *) list_at(&w->destinations, i);
|
||||
struct websocket_connection *c = (struct websocket_connection *) alloc(sizeof(struct websocket_connection));
|
||||
|
||||
c->state = STATE_CONNECTING;
|
||||
c->mode = WEBSOCKET_MODE_CLIENT;
|
||||
c->state = WEBSOCKET_CONNECTION_STATE_CONNECTING;
|
||||
|
||||
c->node = n;
|
||||
c->format = format_type_lookup("villas.web");
|
||||
c->destination = d;
|
||||
c->_name = NULL;
|
||||
|
||||
struct format_type *fmt;
|
||||
|
||||
fmt = format_type_lookup("villas-web"); /** @todo We could parse the format from the URI */
|
||||
if (!fmt)
|
||||
return -1;
|
||||
|
||||
ret = io_init(&c->io, fmt, n, SAMPLE_HAS_ALL);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
d->info.context = web->context;
|
||||
d->info.vhost = web->vhost;
|
||||
d->info.userdata = c;
|
||||
|
||||
ret = queue_init(&c->queue, DEFAULT_QUEUELEN, &memtype_hugepage);
|
||||
if (ret)
|
||||
return -1;
|
||||
|
||||
list_push(&connections, c);
|
||||
|
||||
lws_client_connect_via_info(&d->info);
|
||||
}
|
||||
|
||||
|
@ -429,33 +430,13 @@ int websocket_stop(struct node *n)
|
|||
int ret;
|
||||
struct websocket *w = (struct websocket *) n->_vd;
|
||||
|
||||
/* Wait for all connections to be closed */
|
||||
for (;;) {
|
||||
int connecting = 0;
|
||||
|
||||
for (int i = 0; i < list_length(&w->destinations); i++) {
|
||||
struct websocket_destination *d = (struct websocket_destination *) list_at(&w->destinations, i);
|
||||
struct websocket_connection *c = d->info.userdata;
|
||||
|
||||
if (c->state == STATE_CONNECTING)
|
||||
connecting++;
|
||||
}
|
||||
|
||||
if (connecting == 0)
|
||||
break;
|
||||
|
||||
debug(LOG_WEBSOCKET | 10, "Waiting for %d client connections to be established", connecting);
|
||||
sleep(1);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < list_length(&connections); i++) {
|
||||
struct websocket_connection *c = (struct websocket_connection *) list_at(&connections, i);
|
||||
|
||||
if (c->node != n)
|
||||
continue;
|
||||
|
||||
if (c->state != STATE_CONNECTING)
|
||||
c->state = STATE_SHUTDOWN;
|
||||
c->state = WEBSOCKET_CONNECTION_STATE_SHUTDOWN;
|
||||
|
||||
lws_callback_on_writable(c->wsi);
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue