1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

use pthread_cond_signal for efficient signalling in websocket node-type

This commit is contained in:
Steffen Vogel 2017-04-02 13:02:49 +02:00
parent 8bcb9401fe
commit a33dadf80a
2 changed files with 41 additions and 4 deletions

View file

@ -34,6 +34,8 @@ struct websocket {
struct pool pool;
struct queue queue; /**< For samples which are received from WebSockets a */
pthread_mutex_t mutex; /**< Mutex for signalling the availability of new samples in struct websocket::queue. */
pthread_cond_t cond; /**< Condition variable for signalling the availability of new samples in struct websocket::queue. */
};
/* Internal datastructures */

View file

@ -251,6 +251,10 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
break;
}
pthread_mutex_lock(&w->mutex);
pthread_cond_broadcast(&w->cond);
pthread_mutex_unlock(&w->mutex);
/* Next message */
msg = (struct webmsg *) ((char *) msg + WEBMSG_LEN(msg->length));
}
@ -277,6 +281,14 @@ int websocket_start(struct node *n)
ret = queue_init(&w->queue, DEFAULT_QUEUELEN, &memtype_hugepage);
if (ret)
return ret;
ret = pthread_mutex_init(&w->mutex, NULL);
if (ret)
return ret;
ret = pthread_cond_init(&w->cond, NULL);
if (ret)
return ret;
/** @todo Connection to destinations via WebSocket client */
@ -285,6 +297,7 @@ int websocket_start(struct node *n)
int websocket_stop(struct node *n)
{
int ret;
struct websocket *w = n->_vd;
for (size_t i = 0; i < list_length(&w->connections); i++) {
@ -295,8 +308,21 @@ int websocket_stop(struct node *n)
lws_callback_on_writable(c->wsi);
}
pool_destroy(&w->pool);
queue_destroy(&w->queue);
ret = pool_destroy(&w->pool);
if (ret)
return ret;
ret = queue_destroy(&w->queue);
if (ret)
return ret;
ret = pthread_mutex_destroy(&w->mutex);
if (ret)
return ret;
ret = pthread_cond_destroy(&w->cond);
if (ret)
return ret;
return 0;
}
@ -316,10 +342,19 @@ int websocket_read(struct node *n, struct sample *smps[], unsigned cnt)
int got;
struct websocket *w = n->_vd;
struct webmsg *msgs[cnt];
got = queue_pull_many(&w->queue, (void **) msgs, cnt);
do {
pthread_mutex_lock(&w->mutex);
pthread_cond_wait(&w->cond, &w->mutex);
pthread_mutex_unlock(&w->mutex);
got = queue_pull_many(&w->queue, (void **) msgs, cnt);
if (got < 0)
return got;
} while (got == 0);
for (int i = 0; i < got; i++) {
smps[i]->sequence = msgs[i]->sequence;
smps[i]->length = msgs[i]->length;