1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

websocket: adapt to new signal code and separate node-type configuration into in/out sections

This commit is contained in:
Steffen Vogel 2018-08-20 18:27:45 +02:00
parent d3d9bd8bfc
commit b067d91544

View file

@ -87,7 +87,11 @@ static int websocket_connection_init(struct websocket_connection *c)
if (ret)
return ret;
ret = io_init(&c->io, c->format, c->node, SAMPLE_HAS_ALL);
ret = io_init(&c->io, c->format, &c->node->signals, SAMPLE_HAS_ALL);
if (ret)
return ret;
ret = io_check(&c->io);
if (ret)
return ret;
@ -204,12 +208,14 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
lws_hdr_copy(wsi, uri, sizeof(uri), WSI_TOKEN_GET_URI); /* The path component of the*/
if (strlen(uri) <= 0) {
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Invalid URL");
warn("Failed to get request URI");
return -1;
}
node = strtok(uri, "/.");
if (!node) {
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_POLICY_VIOLATION, "Unknown node");
warn("Failed to tokenize request URI");
return -1;
}
@ -221,12 +227,14 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
c->node = list_lookup(&p.node.instances, node);
if (!c->node) {
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_POLICY_VIOLATION, "Unknown node");
warn("Failed to find node: node=%s", node);
return -1;
}
c->format = format_type_lookup(format);
if (!c->format) {
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_POLICY_VIOLATION, "Unknown format");
warn("Failed to find format: format=%s", format);
return -1;
}
}
@ -234,6 +242,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
ret = websocket_connection_init(c);
if (ret) {
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_POLICY_VIOLATION, "Internal error");
warn("Failed to intialize websocket connection: reason=%d", ret);
return -1;
}
@ -268,12 +277,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
break;
case LWS_CALLBACK_CLIENT_WRITEABLE:
case LWS_CALLBACK_SERVER_WRITEABLE:
if (c->state == WEBSOCKET_CONNECTION_STATE_SHUTDOWN) {
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_GOINGAWAY, "Node stopped");
return -1;
}
case LWS_CALLBACK_SERVER_WRITEABLE: {
struct sample **smps = alloca(cnt * sizeof(struct sample *));
pulled = queue_pull_many(&c->queue, (void **) smps, cnt);
@ -281,7 +285,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
size_t wbytes;
io_sprint(&c->io, c->buffers.send.buf + LWS_PRE, c->buffers.send.size - LWS_PRE, &wbytes, smps, pulled);
ret = lws_write(wsi, (unsigned char *) c->buffers.send.buf + LWS_PRE, wbytes, c->io.flags & FORMAT_TYPE_BINARY ? LWS_WRITE_BINARY : LWS_WRITE_TEXT);
ret = lws_write(wsi, (unsigned char *) c->buffers.send.buf + LWS_PRE, wbytes, c->io.flags & IO_HAS_BINARY_PAYLOAD ? LWS_WRITE_BINARY : LWS_WRITE_TEXT);
sample_decref_many(smps, pulled);
@ -290,8 +294,13 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
if (queue_available(&c->queue) > 0)
lws_callback_on_writable(wsi);
else if (c->state == WEBSOCKET_CONNECTION_STATE_SHUTDOWN) {
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_GOINGAWAY, "Node stopped");
return -1;
}
break;
}
case LWS_CALLBACK_CLIENT_RECEIVE:
case LWS_CALLBACK_RECEIVE:
@ -332,7 +341,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
/* Set receive timestamp */
for (int i = 0; i < recvd; i++) {
smps[i]->ts.received = ts_recv;
smps[i]->flags |= SAMPLE_HAS_RECEIVED;
smps[i]->flags |= SAMPLE_HAS_TS_RECEIVED;
}
enqueued = queue_signalled_push_many(&w->queue, (void **) smps, recvd);
@ -372,31 +381,6 @@ int websocket_type_start(struct super_node *sn)
return 0;
}
int websocket_type_stop()
{
int ret;
for (size_t i = 0; i < list_length(&connections); i++) {
struct websocket_connection *c = (struct websocket_connection *) list_at(&connections, i);
c->state = WEBSOCKET_CONNECTION_STATE_SHUTDOWN;
lws_callback_on_writable(c->wsi);
}
/* Wait for all connections to be closed */
for (int i = 0; i < 10 && list_length(&connections) > 0; i++) {
info("Waiting for shutdown of %zu connections... %d/10", list_length(&connections), i+1);
sleep(1);
}
ret = list_destroy(&connections, (dtor_cb_t) websocket_connection_destroy, false);
if (ret)
return ret;
return 0;
}
int websocket_start(struct node *n)
{
int ret;
@ -411,13 +395,21 @@ int websocket_start(struct node *n)
return ret;
for (int i = 0; i < list_length(&w->destinations); i++) {
const char *format;
struct websocket_destination *d = (struct websocket_destination *) list_at(&w->destinations, i);
struct websocket_connection *c = (struct websocket_connection *) alloc(sizeof(struct websocket_connection));
c->state = WEBSOCKET_CONNECTION_STATE_CONNECTING;
format = strchr(d->info.path, '.');
if (!format)
format = "villas.web";
c->format = format_type_lookup(format + 1);
if (!c->format)
return -1;
c->node = n;
c->format = format_type_lookup("villas.web");
c->destination = d;
d->info.context = web->context;
@ -446,6 +438,25 @@ int websocket_stop(struct node *n)
lws_callback_on_writable(c->wsi);
}
/* Wait for all connections to be closed */
for (int j = 1; j <= 10; j++) {
int open_connections = 0;
/* Count open connections belonging to this node */
for (int i = 0; i < list_length(&connections); i++) {
struct websocket_connection *c = (struct websocket_connection *) list_at(&connections, i);
if (c->node == n)
open_connections++;
}
if (open_connections == 0)
break;
info("Waiting for shutdown of %zu connections... %d/10", list_length(&connections), j);
sleep(1);
}
ret = queue_signalled_destroy(&w->queue);
if (ret)
return ret;
@ -601,7 +612,6 @@ static struct plugin p = {
.vectorize = 0, /* unlimited */
.size = sizeof(struct websocket),
.type.start = websocket_type_start,
.type.stop = websocket_type_stop,
.start = websocket_start,
.stop = websocket_stop,
.destroy = websocket_destroy,