diff --git a/include/villas/nodes/websocket.h b/include/villas/nodes/websocket.h index 20467926e..fecaadc92 100644 --- a/include/villas/nodes/websocket.h +++ b/include/villas/nodes/websocket.h @@ -51,6 +51,11 @@ struct websocket_connection { char ip[64]; } peer; + enum { + WEBSOCKET_MODE_CLIENT, + WEBSOCKET_MODE_SERVER, + } mode; + enum state state; char *_name; diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c index da4f34bf8..3a375c3bf 100644 --- a/lib/nodes/websocket.c +++ b/lib/nodes/websocket.c @@ -23,6 +23,7 @@ /* Private static storage */ static struct list connections = { .state = STATE_DESTROYED }; /**< List of active libwebsocket connections which receive samples from all nodes (catch all) */ +static struct web *web; /* Forward declarations */ static struct plugin p; @@ -30,10 +31,12 @@ static struct plugin p; static char * websocket_connection_name(struct websocket_connection *c) { if (!c->_name) { + strcatf(&c->_name, "%s (%s)", c->peer.name, c->peer.ip); + if (c->node) - asprintf(&c->_name, "%s (%s) for node %s", c->peer.name, c->peer.ip, node_name(c->node)); - else - asprintf(&c->_name, "%s (%s) for all nodes", c->peer.name, c->peer.ip); + strcatf(&c->_name, " for node %s", node_name(c->node)); + + strcatf(&c->_name, " in %s mode", c->mode == WEBSOCKET_MODE_CLIENT ? "client" : "server"); } return c->_name; @@ -42,9 +45,7 @@ static char * websocket_connection_name(struct websocket_connection *c) static int websocket_connection_init(struct websocket_connection *c, struct lws *wsi) { int ret; - - struct websocket *w = c->node->_vd; - + lws_get_peer_addresses(wsi, lws_get_socket_fd(wsi), c->peer.name, sizeof(c->peer.name), c->peer.ip, sizeof(c->peer.ip)); info("LWS: New connection %s", websocket_connection_name(c)); @@ -52,86 +53,75 @@ static int websocket_connection_init(struct websocket_connection *c, struct lws c->state = STATE_INITIALIZED; c->wsi = wsi; - if (c->node != NULL) + if (c->node) { + struct websocket *w = c->node->_vd; + list_push(&w->connections, c); + } else list_push(&connections, c); - - ret = queue_init(&c->queue, DEFAULT_WEBSOCKET_QUEUELEN, &memtype_hugepage); - if (ret) { - warn("Failed to create queue for incoming websocket connection. Closing.."); - return -1; - } + + ret = queue_init(&c->queue, DEFAULT_QUEUELEN, &memtype_hugepage); + if (ret) + return ret; return 0; } -static void websocket_connection_destroy(struct websocket_connection *c) +static int websocket_connection_destroy(struct websocket_connection *c) { - if (c->state == STATE_DESTROYED) - return; + int ret; - struct websocket *w = c->node->_vd; + if (c->state == STATE_DESTROYED) + return 0; info("LWS: Connection %s closed", websocket_connection_name(c)); - - c->state = STATE_DESTROYED; - c->wsi = NULL; - if (c->node) + if (c->node) { + struct websocket *w = c->node->_vd; list_remove(&w->connections, c); + } else list_remove(&connections, c); if (c->_name) free(c->_name); - queue_destroy(&c->queue); + ret = queue_destroy(&c->queue); + if (ret) + return ret; + + c->state = STATE_DESTROYED; + c->wsi = NULL; + + return ret; } static void websocket_destination_destroy(struct websocket_destination *d) { free(d->uri); + + free((char *) d->info.path); + free((char *) d->info.address); } static int websocket_connection_write(struct websocket_connection *c, struct sample *smps[], unsigned cnt) { - int blocks, enqueued; - char *bufs[cnt]; - - struct websocket *w = c->node->_vd; + int ret; switch (c->state) { - case STATE_DESTROYED: - case STATE_STOPPED: - return -1; - case STATE_INITIALIZED: c->state = STATE_STARTED; /* fall through */ case STATE_STARTED: - blocks = pool_get_many(&w->pool, (void **) bufs, cnt); - if (blocks != cnt) - warn("Pool underrun in websocket connection: %s", websocket_connection_name(c)); - - for (int i = 0; i < blocks; i++) { - struct webmsg *msg = (struct webmsg *) (bufs[i] + LWS_PRE); - - msg->version = WEBMSG_VERSION; - msg->type = WEBMSG_TYPE_DATA; - msg->length = smps[i]->length; - msg->sequence = smps[i]->sequence; - msg->id = c->node->id; - msg->ts.sec = smps[i]->ts.origin.tv_sec; - msg->ts.nsec = smps[i]->ts.origin.tv_nsec; - - memcpy(&msg->data, &smps[i]->data, smps[i]->length * 4); + for (int i = 0; i < cnt; i++) { + sample_get(smps[i]); /* increase reference count */ + + ret = queue_push(&c->queue, (void **) smps[i]); + if (ret != 1) + warn("Queue overrun in websocket connection: %s", websocket_connection_name(c)); } - - enqueued = queue_push_many(&c->queue, (void **) bufs, cnt); - if (enqueued != blocks) - warn("Queue overrun in websocket connection: %s", websocket_connection_name(c)); lws_callback_on_writable(c->wsi); break; @@ -142,25 +132,50 @@ static int websocket_connection_write(struct websocket_connection *c, struct sam return 0; } +static void websocket_connection_close(struct websocket_connection *c, struct lws *wsi, enum lws_close_status status, const char *reason) +{ + lws_close_reason(wsi, status, (unsigned char *) reason, strlen(reason)); + + char *msg = strf("LWS: Closing connection"); + + if (c) + msg = strcatf(&msg, " with %s", websocket_connection_name(c)); + + msg = strcatf(&msg, ": status=%u, reason=%s", status, reason); + + warn(msg); + + free(msg); +} + int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { int ret; struct websocket_connection *c = user; - struct websocket *w; + struct webmsg *msg; + struct sample *smp; + switch (reason) { case LWS_CALLBACK_CLIENT_ESTABLISHED: + ret = websocket_connection_init(c, wsi); + if (ret) + return -1; + + return 0; + case LWS_CALLBACK_ESTABLISHED: c->state = STATE_DESTROYED; - + c->mode = WEBSOCKET_MODE_SERVER; + /* Get path of incoming request */ char uri[64]; lws_hdr_copy(wsi, uri, sizeof(uri), WSI_TOKEN_GET_URI); /* The path component of the*/ if (strlen(uri) <= 0) { - warn("LWS: Closing connection with invalid URL: %s", uri); + websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Invalid URL"); return -1; } - + if ((uri[0] == '/' && uri[1] == 0) || uri[0] == 0){ /* Catch all connection */ c->node = NULL; @@ -171,13 +186,9 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi /* Search for node whose name matches the URI. */ c->node = list_lookup(&p.node.instances, node); if (c->node == NULL) { - warn("LWS: Closing Connection for non-existent node: %s", uri + 1); + websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_POLICY_VIOLATION, "Unknown node"); return -1; } - - /* Check if node is running */ - if (c->node->state != STATE_STARTED) - return -1; } ret = websocket_connection_init(c, wsi); @@ -188,35 +199,60 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi case LWS_CALLBACK_CLOSED: websocket_connection_destroy(c); + + if (c->mode == WEBSOCKET_MODE_CLIENT) + free(c); + return 0; case LWS_CALLBACK_CLIENT_WRITEABLE: case LWS_CALLBACK_SERVER_WRITEABLE: - w = c->node->_vd; - - if (c->node && c->node->state != STATE_STARTED) - return -1; - if (c->state == STATE_STOPPED) { - lws_close_reason(wsi, LWS_CLOSE_STATUS_GOINGAWAY, (unsigned char *) "Node stopped", 4); + websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_NORMAL, "Goodbye"); return -1; } - char *buf; - int cnt; - while ((cnt = queue_pull(&c->queue, (void **) &buf))) { - struct webmsg *msg = (struct webmsg *) (buf + LWS_PRE); - - pool_put(&w->pool, (void *) buf); - + if (c->node && c->node->state != STATE_STARTED) { + websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_GOINGAWAY, "Node stopped"); + return -1; + } + + char *buf = NULL; + + while (queue_pull(&c->queue, (void **) &smp)) { + buf = realloc(buf, LWS_PRE + WEBMSG_LEN(smp->length)); + if (!buf) + serror("realloc failed:"); + + msg = (struct webmsg *) (buf + LWS_PRE); + + msg->version = WEBMSG_VERSION; + msg->type = WEBMSG_TYPE_DATA; + msg->length = smp->length; + msg->sequence = smp->sequence; + msg->id = smp->source->id; + msg->ts.sec = smp->ts.origin.tv_sec; + msg->ts.nsec = smp->ts.origin.tv_nsec; + + memcpy(&msg->data, &smp->data, SAMPLE_DATA_LEN(smp->length)); + + webmsg_hton(msg); + + sample_put(smp); + ret = lws_write(wsi, (unsigned char *) msg, WEBMSG_LEN(msg->length), LWS_WRITE_BINARY); - if (ret < WEBMSG_LEN(msg->length)) - error("Failed lws_write()"); + if (ret < 0) { + warn("Failed lws_write() for connection %s", websocket_connection_name(c)); + return -1; + } if (lws_send_pipe_choked(wsi)) break; } + + free(buf); + /* There are still samples in the queue */ if (queue_available(&c->queue) > 0) lws_callback_on_writable(wsi); @@ -224,34 +260,76 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi case LWS_CALLBACK_CLIENT_RECEIVE: case LWS_CALLBACK_RECEIVE: - w = c->node->_vd; - - if (c->node->state != STATE_STARTED) + if (!lws_frame_is_binary(wsi)) { + websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_UNACCEPTABLE_OPCODE, "Binary data expected"); return -1; + } - if (!lws_frame_is_binary(wsi) || len < WEBMSG_LEN(0)) - warn("LWS: Received invalid packet for connection %s", websocket_connection_name(c)); + if (len < WEBMSG_LEN(0)) { + websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Invalid packet"); + return -1; + } + + struct timespec ts_recv = time_now(); - struct webmsg *msg = (struct webmsg *) in; - - while ((char *) msg + WEBMSG_LEN(msg->length) <= (char *) in + len) { - struct webmsg *msg2 = pool_get(&w->pool); - if (!msg2) { - warn("Pool underrun for connection %s", websocket_connection_name(c)); + msg = (struct webmsg *) in; + while ((char *) msg + WEBMSG_LEN(msg->length) < (char *) in + len) { + struct node *dest; + + /* Convert message to host byte-order */ + webmsg_ntoh(msg); + + /* Find destination node of this message */ + if (c->node) + dest = c->node; + else { + dest = NULL; + + for (int i = 0; i < list_length(&p.node.instances); i++) { + struct node *n = list_at(&p.node.instances, i); + + if (n->id == msg->id) { + dest = n; + break; + } + } + + if (!dest) { + warn("Ignoring message due to invalid node id"); + goto next; + } + } + + struct websocket *w = dest->_vd; + + ret = sample_alloc(&w->pool, &smp, 1); + if (ret != 1) { + warn("Pool underrun for connection: %s", websocket_connection_name(c)); break; } - memcpy(msg2, msg, WEBMSG_LEN(msg->length)); + smp->ts.origin = WEBMSG_TS(msg); + smp->ts.received = ts_recv; + + smp->sequence = msg->sequence; + smp->length = msg->length; + if (smp->length > smp->capacity) { + smp->length = smp->capacity; + warn("Dropping values for connection: %s", websocket_connection_name(c)); + } - ret = queue_signalled_push_many(&w->queue, (void **) msg2, 1); + memcpy(&smp->data, &msg->data, SAMPLE_DATA_LEN(smp->length)); + + ret = queue_signalled_push(&w->queue, (void **) smp); if (ret != 1) { warn("Queue overrun for connection %s", websocket_connection_name(c)); break; } - + /* Next message */ - msg = (struct webmsg *) ((char *) msg + WEBMSG_LEN(msg->length)); +next: msg = (struct webmsg *) ((char *) msg + WEBMSG_LEN(msg->length)); } + return 0; default: @@ -259,6 +337,37 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi } } +int websocket_init(struct super_node *sn) +{ + list_init(&connections); + + web = &sn->web; + + if (web->state != STATE_STARTED) + return -1; + + return 0; +} + +int websocket_deinit() +{ + for (size_t i = 0; i < list_length(&connections); i++) { + struct websocket_connection *c = list_at(&connections, i); + + c->state = STATE_STOPPED; + + lws_callback_on_writable(c->wsi); + } + + /* Wait for all connections to be closed */ + while (list_length(&connections) > 0) + sleep(0.2); + + list_destroy(&connections, (dtor_cb_t) websocket_destination_destroy, true); + + return 0; +} + int websocket_start(struct node *n) { int ret; @@ -274,7 +383,21 @@ int websocket_start(struct node *n) if (ret) return ret; - /** @todo Connection to destinations via WebSocket client */ + for (int i = 0; i < list_length(&w->destinations); i++) { + struct websocket_destination *d = list_at(&w->destinations, i); + + struct websocket_connection *c = alloc(sizeof(struct websocket_connection)); + + c->state = STATE_DESTROYED; + c->mode = WEBSOCKET_MODE_CLIENT; + c->node = n; + + d->info.context = web->context; + d->info.vhost = web->vhost; + d->info.userdata = c; + + lws_client_connect_via_info(&d->info); + } return 0; } @@ -291,15 +414,19 @@ int websocket_stop(struct node *n) lws_callback_on_writable(c->wsi); } - - ret = pool_destroy(&w->pool); - if (ret) - return ret; + + /* Wait for all connections to be closed */ + while (list_length(&w->connections) > 0) + sleep(1); ret = queue_signalled_destroy(&w->queue); if (ret) return ret; + ret = pool_destroy(&w->pool); + if (ret) + return ret; + return 0; } @@ -315,46 +442,56 @@ int websocket_destroy(struct node *n) int websocket_read(struct node *n, struct sample *smps[], unsigned cnt) { - int got; + int avail; struct websocket *w = n->_vd; - struct webmsg *msgs[cnt]; + struct sample *cpys[cnt]; do { - got = queue_signalled_pull_many(&w->queue, (void **) msgs, cnt); - if (got < 0) - return got; - } while (got == 0); - - - for (int i = 0; i < got; i++) { - smps[i]->sequence = msgs[i]->sequence; - smps[i]->length = msgs[i]->length; - smps[i]->ts.origin = WEBMSG_TS(msgs[i]); - - memcpy(&smps[i]->data, &msgs[i]->data, WEBMSG_DATA_LEN(msgs[i]->length)); - } - - pool_put_many(&w->pool, (void **) msgs, got); + avail = queue_signalled_pull_many(&w->queue, (void **) cpys, cnt); + if (avail < 0) + return avail; + } while (avail == 0); - return got; + for (int i = 0; i < avail; i++) { + sample_copy(smps[i], cpys[i]); + sample_put(cpys[i]); + } + + return avail; } int websocket_write(struct node *n, struct sample *smps[], unsigned cnt) { + int avail; + struct websocket *w = n->_vd; + struct sample *cpys[cnt]; + + /* Make copies of all samples */ + avail = sample_alloc(&w->pool, cpys, cnt); + if (avail < cnt) + warn("Pool underrun for node %s: avail=%u", node_name(n), avail); + + for (int i = 0; i < avail; i++) { + sample_copy(cpys[i], smps[i]); + + cpys[i]->source = n; + } for (size_t i = 0; i < list_length(&w->connections); i++) { struct websocket_connection *c = list_at(&w->connections, i); - websocket_connection_write(c, smps, cnt); + websocket_connection_write(c, cpys, cnt); } for (size_t i = 0; i < list_length(&connections); i++) { struct websocket_connection *c = list_at(&connections, i); - websocket_connection_write(c, smps, cnt); + websocket_connection_write(c, cpys, cnt); } + + sample_put_many(cpys, avail); return cnt; } @@ -381,6 +518,8 @@ int websocket_parse(struct node *n, config_setting_t *cfg) cerror(cfg_dests, "The 'destinations' setting must be an array of URLs"); struct websocket_destination d; + + memset(&d, 0, sizeof(d)); d.uri = strdup(uri); if (!d.uri) @@ -391,10 +530,13 @@ int websocket_parse(struct node *n, config_setting_t *cfg) cerror(cfg_dests, "Failed to parse websocket URI: '%s'", uri); d.info.ssl_connection = !strcmp(prot, "https"); - d.info.address = ads; - d.info.path = path; - d.info.protocol = prot; + d.info.address = strdup(ads); + d.info.host = d.info.address; + d.info.origin = d.info.address; d.info.ietf_version_or_minus_one = -1; + d.info.protocol = "live"; + + asprintf((char **) &d.info.path, "/%s", path); list_push(&w->destinations, memdup(&d, sizeof(d))); } @@ -434,6 +576,8 @@ static struct plugin p = { .node = { .vectorize = 0, /* unlimited */ .size = sizeof(struct websocket), + .init = websocket_init, + .deinit = websocket_deinit, .start = websocket_start, .stop = websocket_stop, .destroy = websocket_destroy,