diff --git a/include/villas/nodes/websocket.h b/include/villas/nodes/websocket.h index 5203db542..5316756ec 100644 --- a/include/villas/nodes/websocket.h +++ b/include/villas/nodes/websocket.h @@ -74,6 +74,8 @@ struct websocket_connection { enum state state; + char *buf; /**< A buffer which is used to construct the messages. */ + char *_name; }; diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c index 6e7547ae7..144c637e8 100644 --- a/lib/nodes/websocket.c +++ b/lib/nodes/websocket.c @@ -68,6 +68,7 @@ static int websocket_connection_init(struct websocket_connection *c, struct lws c->state = STATE_INITIALIZED; c->wsi = wsi; + c->buf = NULL; if (c->node) { struct websocket *w = c->node->_vd; @@ -107,6 +108,9 @@ static int websocket_connection_destroy(struct websocket_connection *c) if (ret) return ret; + if (c->buf) + free(c->buf); + c->state = STATE_DESTROYED; c->wsi = NULL; @@ -233,14 +237,17 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi return -1; } - char *buf = NULL; + size_t msglen, buflen = LWS_PRE; while (queue_pull(&c->queue, (void **) &smp)) { - buf = realloc(buf, LWS_PRE + WEBMSG_LEN(smp->length)); - if (!buf) + msglen = WEBMSG_LEN(smp->length); + + c->buf = realloc(c->buf, buflen + msglen); + if (!c->buf) serror("realloc failed:"); - msg = (struct webmsg *) (buf + LWS_PRE); + msg = (struct webmsg *) (c->buf + buflen); + buflen += msglen; msg->version = WEBMSG_VERSION; msg->type = WEBMSG_TYPE_DATA; @@ -250,27 +257,18 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi msg->ts.sec = smp->ts.origin.tv_sec; msg->ts.nsec = smp->ts.origin.tv_nsec; - memcpy(&msg->data, &smp->data, SAMPLE_DATA_LEN(smp->length)); + memcpy(&msg->data, &smp->data, WEBMSG_DATA_LEN(smp->length)); webmsg_hton(msg); sample_put(smp); - - ret = lws_write(wsi, (unsigned char *) msg, WEBMSG_LEN(msg->length), LWS_WRITE_BINARY); - if (ret < 0) { - warn("Failed lws_write() for connection %s", websocket_connection_name(c)); - return -1; - } - - if (lws_send_pipe_choked(wsi)) - break; } - free(buf); - - /* There are still samples in the queue */ - if (queue_available(&c->queue) > 0) - lws_callback_on_writable(wsi); + ret = lws_write(wsi, (unsigned char *) c->buf + LWS_PRE, buflen - LWS_PRE, LWS_WRITE_BINARY); + if (ret < 0) { + warn("Failed lws_write() for connection %s", websocket_connection_name(c)); + return -1; + } return 0; @@ -603,4 +601,4 @@ static struct plugin p = { }; REGISTER_PLUGIN(&p) -LIST_INIT_STATIC(&p.node.instances) \ No newline at end of file +LIST_INIT_STATIC(&p.node.instances)