diff --git a/include/villas/nodes/rtp.h b/include/villas/nodes/rtp.h index 5802a5261..fa5e82d8a 100644 --- a/include/villas/nodes/rtp.h +++ b/include/villas/nodes/rtp.h @@ -29,6 +29,8 @@ #pragma once +#include + #include #include @@ -55,6 +57,10 @@ struct rtp { struct io io; bool enable_rtcp; + + char *recv_buf; + size_t recv_size; + pthread_mutex_t recv_mutex; }; /** @see node_type::print */ diff --git a/lib/nodes/rtp.c b/lib/nodes/rtp.c index 049ecb72e..2e4530451 100644 --- a/lib/nodes/rtp.c +++ b/lib/nodes/rtp.c @@ -80,7 +80,7 @@ int rtp_parse(struct node *n, json_t *cfg) if(!r->format) error("Invalid format '%s' for node %s", format, node_name(n)); - /* Enable RTCP */ + /* Enable RTCP */ r->enable_rtcp = enable_rtcp; if(enable_rtcp) error("RTCP is not implemented yet."); @@ -122,19 +122,38 @@ char * rtp_print(struct node *n) static void rtp_handler(const struct sa *src, const struct rtp_header *hdr, struct mbuf *mb, void *arg) { - (void)hdr; - (void)arg; - - printf("rtp: recv %zu bytes\n", mbuf_get_left(mb)); + struct rtp *r = (struct rtp *) arg; + size_t n = mbuf_get_left(mb); - /* placeholder */ + /* enter critical section */ + pthread_mutex_lock(&r->recv_mutex); + + /* adapt buffer space and read data */ + r->recv_buf = realloc(r->recv_buf, n); + mbuf_read_mem(mb, (uint8_t *) r->recv_buf, n); + + /* indicate number of bytes read */ + r->recv_size = n; + + /* leave critical section */ + pthread_mutex_unlock(&r->recv_mutex); + + /* header not yet used */ + (void)hdr; } int rtp_start(struct node *n) { int ret; struct rtp *r = (struct rtp *) n->_vd; - + + /* Initialize Mutex */ + ret = pthread_mutex_init(&r->recv_mutex, NULL); + if (ret) + return ret; + r->recv_size = 0; + r->recv_buf = (char *)alloc(RTP_INITIAL_BUFFER_LEN); + /* Initialize IO */ ret = io_init(&r->io, r->format, &n->signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET); if (ret) @@ -146,7 +165,7 @@ int rtp_start(struct node *n) /* Initialize RTP socket */ uint16_t port = sa_port(&r->local) & ~1; - ret = rtp_listen(&r->rs, IPPROTO_UDP, &r->local, port, port+1, r->enable_rtcp, rtp_handler, NULL, NULL); + ret = rtp_listen(&r->rs, IPPROTO_UDP, &r->local, port, port+1, r->enable_rtcp, rtp_handler, NULL, n->_vd); return ret; } @@ -193,6 +212,7 @@ int rtp_type_stop() /* Join worker thread */ re_cancel(); + pthread_cancel(re_pthread); /* @todo avoid using pthread_cancel */ ret = pthread_join(re_pthread, NULL); if (ret) { error("Error joining rtp node type pthread"); @@ -205,15 +225,29 @@ int rtp_type_stop() int rtp_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { - /* + int ret; struct rtp *r = (struct rtp *) n->_vd; - int bytes; - char data[RTP_MAX_PACKET_LEN]; - */ + size_t bytes = r->recv_size; - /* TODO */ + /* new data available? */ + if (bytes > 0) { + /* enter critical section */ + pthread_mutex_lock(&r->recv_mutex); - return 0; + /* read from buffer */ + ret = io_sscan(&r->io, r->recv_buf, bytes, NULL, smps, cnt); + + /* indicate data was consumed */ + r->recv_size = 0; + + /* leave critical section */ + pthread_mutex_unlock(&r->recv_mutex); + + if (ret < 0) + warn("Received invalid packet from node: %s ret=%d", node_name(n), ret); + } + + return ret; } int rtp_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) @@ -224,7 +258,6 @@ int rtp_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rel char *buf; char pad[] = " "; size_t buflen; - /* ssize_t bytes; */ size_t wbytes; buflen = RTP_INITIAL_BUFFER_LEN; @@ -255,7 +288,7 @@ retry: cnt = io_sprint(&r->io, buf, buflen, &wbytes, smps, cnt); struct mbuf *mb = mbuf_alloc(buflen + 12); ret = mbuf_write_str(mb, pad); if (ret) { - error("Error writing pad to mbuf"); + error("Error writing padding to mbuf"); cnt = ret; goto out; }