1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

[WIP] Use thread save queue instead of mutexes for data exchange

worker thread pushes received data in queue, main thread pulls data on request from queue.
This commit is contained in:
Marvin Klimke 2018-12-13 18:50:18 +01:00
parent 40d9bd5368
commit 22b42a8d88
2 changed files with 25 additions and 37 deletions

View file

@ -36,6 +36,7 @@
#include <villas/node.h>
#include <villas/list.h>
#include <villas/io.h>
#include <villas/queue.h>
#ifdef __cplusplus
extern "C" {
@ -58,9 +59,7 @@ struct rtp {
bool enable_rtcp;
char *recv_buf;
size_t recv_size;
pthread_mutex_t recv_mutex;
struct queue recv_queue;
};
/** @see node_type::print */

View file

@ -123,20 +123,9 @@ char * rtp_print(struct node *n)
static void rtp_handler(const struct sa *src, const struct rtp_header *hdr, struct mbuf *mb, void *arg)
{
struct rtp *r = (struct rtp *) arg;
size_t n = mbuf_get_left(mb);
/* enter critical section */
pthread_mutex_lock(&r->recv_mutex);
/* adapt buffer space and read data */
r->recv_buf = realloc(r->recv_buf, n);
mbuf_read_mem(mb, (uint8_t *) r->recv_buf, n);
/* indicate number of bytes read */
r->recv_size = n;
/* leave critical section */
pthread_mutex_unlock(&r->recv_mutex);
if (queue_push(&r->recv_queue, (void *) mbuf_alloc_ref(mb)) != 1)
warn("Failed to push to queue");
/* header not yet used */
(void)hdr;
@ -147,12 +136,10 @@ int rtp_start(struct node *n)
int ret;
struct rtp *r = (struct rtp *) n->_vd;
/* Initialize Mutex */
ret = pthread_mutex_init(&r->recv_mutex, NULL);
/* Initialize Queue */
ret = queue_init(&r->recv_queue, 8, &memory_heap);
if (ret)
return ret;
r->recv_size = 0;
r->recv_buf = (char *)alloc(RTP_INITIAL_BUFFER_LEN);
/* Initialize IO */
ret = io_init(&r->io, r->format, &n->signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET);
@ -227,26 +214,28 @@ int rtp_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele
{
int ret;
struct rtp *r = (struct rtp *) n->_vd;
size_t bytes = r->recv_size;
/* new data available? */
if (bytes > 0) {
/* enter critical section */
pthread_mutex_lock(&r->recv_mutex);
/* read from buffer */
ret = io_sscan(&r->io, r->recv_buf, bytes, NULL, smps, cnt);
/* indicate data was consumed */
r->recv_size = 0;
/* leave critical section */
pthread_mutex_unlock(&r->recv_mutex);
size_t bytes;
char *buf;
struct mbuf *mb;
/* Get data from queue */
ret = queue_pull(&r->recv_queue, (void **) &mb);
if (ret <= 0) {
if (ret < 0)
warn("Received invalid packet from node: %s ret=%d", node_name(n), ret);
warn("Failed to pull from queue");
return ret;
}
/* Read from mbuf */
bytes = mbuf_get_left(mb);
buf = (char *) alloc(bytes);
mbuf_read_mem(mb, (uint8_t *) buf, bytes);
/* Unpack data */
ret = io_sscan(&r->io, buf, bytes, NULL, smps, cnt);
if (ret < 0)
warn("Received invalid packet from node %s: reason=%d", node_name(n), ret);
return ret;
}
@ -301,7 +290,7 @@ retry: cnt = io_sprint(&r->io, buf, buflen, &wbytes, smps, cnt);
mbuf_set_pos(mb, 12);
/* Send dataset */
ret = rtp_send(r->rs, &r->remote, false, false, 21, (uint32_t)time(NULL), mb);
ret = rtp_send(r->rs, &r->remote, false, false, 21, (uint32_t) time(NULL), mb);
if (ret) {
error("Error from rtp_send, reason: %d", ret);
cnt = ret;