diff --git a/include/villas/nodes/websocket.h b/include/villas/nodes/websocket.h index 325019406..1d512eae9 100644 --- a/include/villas/nodes/websocket.h +++ b/include/villas/nodes/websocket.h @@ -34,6 +34,8 @@ struct websocket { struct pool pool; struct queue queue; /**< For samples which are received from WebSockets a */ + pthread_mutex_t mutex; /**< Mutex for signalling the availability of new samples in struct websocket::queue. */ + pthread_cond_t cond; /**< Condition variable for signalling the availability of new samples in struct websocket::queue. */ }; /* Internal datastructures */ diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c index 9ffbffe85..4907cbf3d 100644 --- a/lib/nodes/websocket.c +++ b/lib/nodes/websocket.c @@ -251,6 +251,10 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi break; } + pthread_mutex_lock(&w->mutex); + pthread_cond_broadcast(&w->cond); + pthread_mutex_unlock(&w->mutex); + /* Next message */ msg = (struct webmsg *) ((char *) msg + WEBMSG_LEN(msg->length)); } @@ -277,6 +281,14 @@ int websocket_start(struct node *n) ret = queue_init(&w->queue, DEFAULT_QUEUELEN, &memtype_hugepage); if (ret) return ret; + + ret = pthread_mutex_init(&w->mutex, NULL); + if (ret) + return ret; + + ret = pthread_cond_init(&w->cond, NULL); + if (ret) + return ret; /** @todo Connection to destinations via WebSocket client */ @@ -285,6 +297,7 @@ int websocket_start(struct node *n) int websocket_stop(struct node *n) { + int ret; struct websocket *w = n->_vd; for (size_t i = 0; i < list_length(&w->connections); i++) { @@ -295,8 +308,21 @@ int websocket_stop(struct node *n) lws_callback_on_writable(c->wsi); } - pool_destroy(&w->pool); - queue_destroy(&w->queue); + ret = pool_destroy(&w->pool); + if (ret) + return ret; + + ret = queue_destroy(&w->queue); + if (ret) + return ret; + + ret = pthread_mutex_destroy(&w->mutex); + if (ret) + return ret; + + ret = pthread_cond_destroy(&w->cond); + if (ret) + return ret; return 0; } @@ -316,10 +342,19 @@ int websocket_read(struct node *n, struct sample *smps[], unsigned cnt) int got; struct websocket *w = n->_vd; - struct webmsg *msgs[cnt]; - got = queue_pull_many(&w->queue, (void **) msgs, cnt); + do { + pthread_mutex_lock(&w->mutex); + pthread_cond_wait(&w->cond, &w->mutex); + pthread_mutex_unlock(&w->mutex); + + got = queue_pull_many(&w->queue, (void **) msgs, cnt); + if (got < 0) + return got; + } while (got == 0); + + for (int i = 0; i < got; i++) { smps[i]->sequence = msgs[i]->sequence; smps[i]->length = msgs[i]->length;