diff --git a/include/villas/nodes/rtp.h b/include/villas/nodes/rtp.h index fa5e82d8a..5bdf225d0 100644 --- a/include/villas/nodes/rtp.h +++ b/include/villas/nodes/rtp.h @@ -36,6 +36,7 @@ #include #include #include +#include #ifdef __cplusplus extern "C" { @@ -58,9 +59,7 @@ struct rtp { bool enable_rtcp; - char *recv_buf; - size_t recv_size; - pthread_mutex_t recv_mutex; + struct queue recv_queue; }; /** @see node_type::print */ diff --git a/lib/nodes/rtp.c b/lib/nodes/rtp.c index 2e4530451..38d76c2d2 100644 --- a/lib/nodes/rtp.c +++ b/lib/nodes/rtp.c @@ -123,20 +123,9 @@ char * rtp_print(struct node *n) static void rtp_handler(const struct sa *src, const struct rtp_header *hdr, struct mbuf *mb, void *arg) { struct rtp *r = (struct rtp *) arg; - size_t n = mbuf_get_left(mb); - /* enter critical section */ - pthread_mutex_lock(&r->recv_mutex); - - /* adapt buffer space and read data */ - r->recv_buf = realloc(r->recv_buf, n); - mbuf_read_mem(mb, (uint8_t *) r->recv_buf, n); - - /* indicate number of bytes read */ - r->recv_size = n; - - /* leave critical section */ - pthread_mutex_unlock(&r->recv_mutex); + if (queue_push(&r->recv_queue, (void *) mbuf_alloc_ref(mb)) != 1) + warn("Failed to push to queue"); /* header not yet used */ (void)hdr; @@ -147,12 +136,10 @@ int rtp_start(struct node *n) int ret; struct rtp *r = (struct rtp *) n->_vd; - /* Initialize Mutex */ - ret = pthread_mutex_init(&r->recv_mutex, NULL); + /* Initialize Queue */ + ret = queue_init(&r->recv_queue, 8, &memory_heap); if (ret) return ret; - r->recv_size = 0; - r->recv_buf = (char *)alloc(RTP_INITIAL_BUFFER_LEN); /* Initialize IO */ ret = io_init(&r->io, r->format, &n->signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET); @@ -227,26 +214,28 @@ int rtp_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele { int ret; struct rtp *r = (struct rtp *) n->_vd; - size_t bytes = r->recv_size; - - /* new data available? */ - if (bytes > 0) { - /* enter critical section */ - pthread_mutex_lock(&r->recv_mutex); - - /* read from buffer */ - ret = io_sscan(&r->io, r->recv_buf, bytes, NULL, smps, cnt); - - /* indicate data was consumed */ - r->recv_size = 0; - - /* leave critical section */ - pthread_mutex_unlock(&r->recv_mutex); + size_t bytes; + char *buf; + struct mbuf *mb; + /* Get data from queue */ + ret = queue_pull(&r->recv_queue, (void **) &mb); + if (ret <= 0) { if (ret < 0) - warn("Received invalid packet from node: %s ret=%d", node_name(n), ret); + warn("Failed to pull from queue"); + return ret; } + /* Read from mbuf */ + bytes = mbuf_get_left(mb); + buf = (char *) alloc(bytes); + mbuf_read_mem(mb, (uint8_t *) buf, bytes); + + /* Unpack data */ + ret = io_sscan(&r->io, buf, bytes, NULL, smps, cnt); + if (ret < 0) + warn("Received invalid packet from node %s: reason=%d", node_name(n), ret); + return ret; } @@ -301,7 +290,7 @@ retry: cnt = io_sprint(&r->io, buf, buflen, &wbytes, smps, cnt); mbuf_set_pos(mb, 12); /* Send dataset */ - ret = rtp_send(r->rs, &r->remote, false, false, 21, (uint32_t)time(NULL), mb); + ret = rtp_send(r->rs, &r->remote, false, false, 21, (uint32_t) time(NULL), mb); if (ret) { error("Error from rtp_send, reason: %d", ret); cnt = ret;