diff --git a/include/villas/nodes/websocket.h b/include/villas/nodes/websocket.h index 1d512eae9..287b62cbf 100644 --- a/include/villas/nodes/websocket.h +++ b/include/villas/nodes/websocket.h @@ -20,7 +20,7 @@ #include "node.h" #include "pool.h" -#include "queue.h" +#include "queue_signalled.h" #include "common.h" /* Forward declaration */ @@ -32,10 +32,7 @@ struct websocket { struct list destinations; /**< List of websocket servers connect to in client mode (struct websocket_destination). */ struct pool pool; - struct queue queue; /**< For samples which are received from WebSockets a */ - - pthread_mutex_t mutex; /**< Mutex for signalling the availability of new samples in struct websocket::queue. */ - pthread_cond_t cond; /**< Condition variable for signalling the availability of new samples in struct websocket::queue. */ + struct queue_signalled queue; /**< For samples which are received from WebSockets a */ }; /* Internal datastructures */ @@ -84,4 +81,4 @@ int websocket_read(struct node *n, struct sample *smps[], unsigned cnt); /** @see node_vtable::write */ int websocket_write(struct node *n, struct sample *smps[], unsigned cnt); -/** @} */ \ No newline at end of file +/** @} */ diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c index 4907cbf3d..5c7ee48eb 100644 --- a/lib/nodes/websocket.c +++ b/lib/nodes/websocket.c @@ -245,16 +245,12 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi memcpy(msg2, msg, WEBMSG_LEN(msg->length)); - ret = queue_push(&w->queue, msg2); + ret = queue_signalled_push_many(&w->queue, (void **) msg2, 1); if (ret != 1) { warn("Queue overrun for connection %s", websocket_connection_name(c)); break; } - pthread_mutex_lock(&w->mutex); - pthread_cond_broadcast(&w->cond); - pthread_mutex_unlock(&w->mutex); - /* Next message */ msg = (struct webmsg *) ((char *) msg + WEBMSG_LEN(msg->length)); } @@ -278,15 +274,7 @@ int websocket_start(struct node *n) if (ret) return ret; - ret = queue_init(&w->queue, DEFAULT_QUEUELEN, &memtype_hugepage); - if (ret) - return ret; - - ret = pthread_mutex_init(&w->mutex, NULL); - if (ret) - return ret; - - ret = pthread_cond_init(&w->cond, NULL); + ret = queue_signalled_init(&w->queue, DEFAULT_QUEUELEN, &memtype_hugepage); if (ret) return ret; @@ -312,18 +300,10 @@ int websocket_stop(struct node *n) if (ret) return ret; - ret = queue_destroy(&w->queue); + ret = queue_signalled_destroy(&w->queue); if (ret) return ret; - ret = pthread_mutex_destroy(&w->mutex); - if (ret) - return ret; - - ret = pthread_cond_destroy(&w->cond); - if (ret) - return ret; - return 0; } @@ -345,11 +325,7 @@ int websocket_read(struct node *n, struct sample *smps[], unsigned cnt) struct webmsg *msgs[cnt]; do { - pthread_mutex_lock(&w->mutex); - pthread_cond_wait(&w->cond, &w->mutex); - pthread_mutex_unlock(&w->mutex); - - got = queue_pull_many(&w->queue, (void **) msgs, cnt); + got = queue_signalled_pull_many(&w->queue, (void **) msgs, cnt); if (got < 0) return got; } while (got == 0); @@ -471,4 +447,4 @@ static struct plugin p = { } }; -REGISTER_PLUGIN(&p) \ No newline at end of file +REGISTER_PLUGIN(&p)