diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c index cb6d3f10e..172e43030 100644 --- a/lib/nodes/websocket.c +++ b/lib/nodes/websocket.c @@ -257,18 +257,17 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi struct timespec ts_recv = time_now(); struct node *n = c->node; + int avail, enqueued; struct websocket *w = (struct websocket *) n->_vd; struct sample **smps = alloca(cnt * sizeof(struct sample *)); if (!smps) warn("Failed to allocate memory for connection: %s", websocket_connection_name(c)); - ret = sample_alloc_many(&w->pool, smps, cnt); - if (ret != cnt) { + avail = sample_alloc_many(&w->pool, smps, cnt); + if (avail < cnt) warn("Pool underrun for connection: %s", websocket_connection_name(c)); - break; - } - recvd = io_format_sscan(c->format, c->buffers.recv.buf, c->buffers.recv.len, NULL, smps, cnt, 0); + recvd = io_format_sscan(c->format, c->buffers.recv.buf, c->buffers.recv.len, NULL, smps, avail, 0); if (recvd < 0) { warn("Failed to parse sample data received on connection: %s", websocket_connection_name(c)); break; @@ -282,10 +281,14 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi smps[i]->flags |= SAMPLE_HAS_RECEIVED; } - ret = queue_signalled_push_many(&w->queue, (void **) smps, recvd); - if (ret != recvd) + enqueued = queue_signalled_push_many(&w->queue, (void **) smps, recvd); + if (enqueued < recvd) warn("Queue overrun for connection: %s", websocket_connection_name(c)); + /* Release unused samples back to pool */ + if (enqueued < avail) + sample_put_many(&smps[enqueued], avail - enqueued); + if (c->state == STATE_SHUTDOWN) { websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_GOINGAWAY, "Node stopped"); return -1;