diff --git a/include/villas/nodes/rtp.h b/include/villas/nodes/rtp.h index cae564a91..ef3c8f2a6 100644 --- a/include/villas/nodes/rtp.h +++ b/include/villas/nodes/rtp.h @@ -36,7 +36,7 @@ #include #include #include -#include +#include #ifdef __cplusplus extern "C" { @@ -61,7 +61,7 @@ struct rtp { bool enable_rtcp; - struct queue recv_queue; + struct queue_signalled recv_queue; }; /** @see node_type::print */ diff --git a/lib/nodes/rtp.c b/lib/nodes/rtp.c index a6a52daeb..7b78a1580 100644 --- a/lib/nodes/rtp.c +++ b/lib/nodes/rtp.c @@ -141,7 +141,7 @@ static void rtp_handler(const struct sa *src, const struct rtp_header *hdr, stru { struct rtp *r = (struct rtp *) arg; - if (queue_push(&r->recv_queue, (void *) mbuf_alloc_ref(mb)) != 1) + if (queue_signalled_push(&r->recv_queue, (void *) mbuf_alloc_ref(mb)) != 1) warning("Failed to push to queue"); /* source, header not yet used */ @@ -163,7 +163,7 @@ int rtp_start(struct node *n) struct rtp *r = (struct rtp *) n->_vd; /* Initialize Queue */ - ret = queue_init(&r->recv_queue, 8, &memory_heap); + ret = queue_signalled_init(&r->recv_queue, 8, &memory_heap, 0); if (ret) return ret; @@ -193,11 +193,11 @@ int rtp_stop(struct node *n) /*mem_deref(r->rs);*/ - ret = queue_close(&r->recv_queue); + ret = queue_signalled_close(&r->recv_queue); if (ret) warning("Problem closing queue"); - ret = queue_destroy(&r->recv_queue); + ret = queue_signalled_destroy(&r->recv_queue); if (ret) warning("Problem destroying queue"); @@ -257,7 +257,7 @@ int rtp_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele struct mbuf *mb; /* Get data from queue */ - ret = queue_pull(&r->recv_queue, (void **) &mb); + ret = queue_signalled_pull(&r->recv_queue, (void **) &mb); if (ret <= 0) { if (ret < 0) warning("Failed to pull from queue");