1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

rtp: use signalled queue for thread synchronization

This commit is contained in:
Steffen Vogel 2019-01-07 15:22:38 +01:00
parent e6d5a82f29
commit 99cae8903d
2 changed files with 7 additions and 7 deletions

View file

@ -36,7 +36,7 @@
#include <villas/node.h>
#include <villas/list.h>
#include <villas/io.h>
#include <villas/queue.h>
#include <villas/queue_signalled.h>
#ifdef __cplusplus
extern "C" {
@ -61,7 +61,7 @@ struct rtp {
bool enable_rtcp;
struct queue recv_queue;
struct queue_signalled recv_queue;
};
/** @see node_type::print */

View file

@ -141,7 +141,7 @@ static void rtp_handler(const struct sa *src, const struct rtp_header *hdr, stru
{
struct rtp *r = (struct rtp *) arg;
if (queue_push(&r->recv_queue, (void *) mbuf_alloc_ref(mb)) != 1)
if (queue_signalled_push(&r->recv_queue, (void *) mbuf_alloc_ref(mb)) != 1)
warning("Failed to push to queue");
/* source, header not yet used */
@ -163,7 +163,7 @@ int rtp_start(struct node *n)
struct rtp *r = (struct rtp *) n->_vd;
/* Initialize Queue */
ret = queue_init(&r->recv_queue, 8, &memory_heap);
ret = queue_signalled_init(&r->recv_queue, 8, &memory_heap, 0);
if (ret)
return ret;
@ -193,11 +193,11 @@ int rtp_stop(struct node *n)
/*mem_deref(r->rs);*/
ret = queue_close(&r->recv_queue);
ret = queue_signalled_close(&r->recv_queue);
if (ret)
warning("Problem closing queue");
ret = queue_destroy(&r->recv_queue);
ret = queue_signalled_destroy(&r->recv_queue);
if (ret)
warning("Problem destroying queue");
@ -257,7 +257,7 @@ int rtp_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele
struct mbuf *mb;
/* Get data from queue */
ret = queue_pull(&r->recv_queue, (void **) &mb);
ret = queue_signalled_pull(&r->recv_queue, (void **) &mb);
if (ret <= 0) {
if (ret < 0)
warning("Failed to pull from queue");