From 257568895147e3a554cdaa9314212d46cdae0453 Mon Sep 17 00:00:00 2001 From: Marvin Klimke Date: Fri, 7 Dec 2018 15:15:24 +0100 Subject: [PATCH] [WIP] add pthread syncronization to re_main thread rtp receive handler writes data to mutex protected memory and indicates the amount of data written. main thread polls on the thread and gets the data respecting the mutex. --- include/villas/nodes/rtp.h | 6 ++++ lib/nodes/rtp.c | 65 ++++++++++++++++++++++++++++---------- 2 files changed, 55 insertions(+), 16 deletions(-) diff --git a/include/villas/nodes/rtp.h b/include/villas/nodes/rtp.h index 5802a5261..fa5e82d8a 100644 --- a/include/villas/nodes/rtp.h +++ b/include/villas/nodes/rtp.h @@ -29,6 +29,8 @@ #pragma once +#include + #include #include @@ -55,6 +57,10 @@ struct rtp { struct io io; bool enable_rtcp; + + char *recv_buf; + size_t recv_size; + pthread_mutex_t recv_mutex; }; /** @see node_type::print */ diff --git a/lib/nodes/rtp.c b/lib/nodes/rtp.c index 049ecb72e..2e4530451 100644 --- a/lib/nodes/rtp.c +++ b/lib/nodes/rtp.c @@ -80,7 +80,7 @@ int rtp_parse(struct node *n, json_t *cfg) if(!r->format) error("Invalid format '%s' for node %s", format, node_name(n)); - /* Enable RTCP */ + /* Enable RTCP */ r->enable_rtcp = enable_rtcp; if(enable_rtcp) error("RTCP is not implemented yet."); @@ -122,19 +122,38 @@ char * rtp_print(struct node *n) static void rtp_handler(const struct sa *src, const struct rtp_header *hdr, struct mbuf *mb, void *arg) { - (void)hdr; - (void)arg; - - printf("rtp: recv %zu bytes\n", mbuf_get_left(mb)); + struct rtp *r = (struct rtp *) arg; + size_t n = mbuf_get_left(mb); - /* placeholder */ + /* enter critical section */ + pthread_mutex_lock(&r->recv_mutex); + + /* adapt buffer space and read data */ + r->recv_buf = realloc(r->recv_buf, n); + mbuf_read_mem(mb, (uint8_t *) r->recv_buf, n); + + /* indicate number of bytes read */ + r->recv_size = n; + + /* leave critical section */ + pthread_mutex_unlock(&r->recv_mutex); + + /* header not yet used */ + (void)hdr; } int rtp_start(struct node *n) { int ret; struct rtp *r = (struct rtp *) n->_vd; - + + /* Initialize Mutex */ + ret = pthread_mutex_init(&r->recv_mutex, NULL); + if (ret) + return ret; + r->recv_size = 0; + r->recv_buf = (char *)alloc(RTP_INITIAL_BUFFER_LEN); + /* Initialize IO */ ret = io_init(&r->io, r->format, &n->signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET); if (ret) @@ -146,7 +165,7 @@ int rtp_start(struct node *n) /* Initialize RTP socket */ uint16_t port = sa_port(&r->local) & ~1; - ret = rtp_listen(&r->rs, IPPROTO_UDP, &r->local, port, port+1, r->enable_rtcp, rtp_handler, NULL, NULL); + ret = rtp_listen(&r->rs, IPPROTO_UDP, &r->local, port, port+1, r->enable_rtcp, rtp_handler, NULL, n->_vd); return ret; } @@ -193,6 +212,7 @@ int rtp_type_stop() /* Join worker thread */ re_cancel(); + pthread_cancel(re_pthread); /* @todo avoid using pthread_cancel */ ret = pthread_join(re_pthread, NULL); if (ret) { error("Error joining rtp node type pthread"); @@ -205,15 +225,29 @@ int rtp_type_stop() int rtp_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { - /* + int ret; struct rtp *r = (struct rtp *) n->_vd; - int bytes; - char data[RTP_MAX_PACKET_LEN]; - */ + size_t bytes = r->recv_size; - /* TODO */ + /* new data available? */ + if (bytes > 0) { + /* enter critical section */ + pthread_mutex_lock(&r->recv_mutex); - return 0; + /* read from buffer */ + ret = io_sscan(&r->io, r->recv_buf, bytes, NULL, smps, cnt); + + /* indicate data was consumed */ + r->recv_size = 0; + + /* leave critical section */ + pthread_mutex_unlock(&r->recv_mutex); + + if (ret < 0) + warn("Received invalid packet from node: %s ret=%d", node_name(n), ret); + } + + return ret; } int rtp_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) @@ -224,7 +258,6 @@ int rtp_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rel char *buf; char pad[] = " "; size_t buflen; - /* ssize_t bytes; */ size_t wbytes; buflen = RTP_INITIAL_BUFFER_LEN; @@ -255,7 +288,7 @@ retry: cnt = io_sprint(&r->io, buf, buflen, &wbytes, smps, cnt); struct mbuf *mb = mbuf_alloc(buflen + 12); ret = mbuf_write_str(mb, pad); if (ret) { - error("Error writing pad to mbuf"); + error("Error writing padding to mbuf"); cnt = ret; goto out; }