diff --git a/include/villas/nodes/rtp.h b/include/villas/nodes/rtp.h index ac5c199d6..f21e74758 100644 --- a/include/villas/nodes/rtp.h +++ b/include/villas/nodes/rtp.h @@ -60,6 +60,8 @@ struct rtp { struct format_type *format; struct io io; + struct mbuf *mb; + double rate; /**< Sample rate of source */ struct { diff --git a/lib/nodes/rtp.c b/lib/nodes/rtp.c index 4f261fc41..7a363108b 100644 --- a/lib/nodes/rtp.c +++ b/lib/nodes/rtp.c @@ -325,6 +325,15 @@ int rtp_start(struct node *n) if (ret) return ret; + /* Initialize memory buffer for sending */ + r->mb = mbuf_alloc(RTP_INITIAL_BUFFER_LEN + 12); + if (!r->mb) + return -1; + + ret = mbuf_fill(r->mb, 0, 12); + if (ret) + return -1; + ret = io_check(&r->io); if (ret) return ret; @@ -390,6 +399,8 @@ int rtp_stop(struct node *n) if (ret) warning("Problem destroying queue"); + mem_deref(r->mb); + return io_destroy(&r->io); } @@ -500,71 +511,34 @@ int rtp_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rel int ret; struct rtp *r = (struct rtp *) n->_vd; - char *buf; - char pad[] = " "; - size_t buflen; size_t wbytes; + size_t avail; - buflen = RTP_INITIAL_BUFFER_LEN; - buf = alloc(buflen); - if (!buf) { - warning("Error allocating buffer space"); - cnt = -1; - goto out1; - } + uint32_t ts = (uint32_t) time(NULL); -retry: cnt = io_sprint(&r->io, buf, buflen, &wbytes, smps, cnt); - if (cnt < 0) { - warning("Error from io_sprint, reason: %d", cnt); - goto out1; - } +retry: mbuf_set_pos(r->mb, 12); + avail = mbuf_get_space(r->mb); + cnt = io_sprint(&r->io, (char *) r->mb->buf, avail, &wbytes, smps, cnt); + if (cnt < 0) + return -1; - if (wbytes <= 0) { - warning("Error written bytes = %ld <= 0", wbytes); - cnt = -1; - goto out1; - } + if (wbytes > avail) { + ret = mbuf_resize(r->mb, wbytes + 12); + if (!ret) + return -1; - if (wbytes > buflen) { - buflen = wbytes; - buf = realloc(buf, buflen); goto retry; } - - /* Prepare mbuf */ - struct mbuf *mb = mbuf_alloc(buflen + 12); - if (!mb) { - warning("Failed to allocate memory"); - cnt = -1; - goto out2; - } - - ret = mbuf_write_str(mb, pad); - if (ret) { - warning("Error writing padding to mbuf"); - cnt = ret; - goto out2; - } - - ret = mbuf_write_mem(mb, (uint8_t*)buf, buflen); - if (ret) { - warning("Error writing data to mbuf"); - cnt = ret; - goto out2; - } - - mbuf_set_pos(mb, 12); + else + mbuf_set_end(r->mb, r->mb->pos + wbytes); /* Send dataset */ - ret = rtp_send(r->rs, &r->out.saddr_rtp, false, false, 21, (uint32_t) time(NULL), mb); + ret = rtp_send(r->rs, &r->out.saddr_rtp, false, false, 21, ts, r->mb); if (ret) { warning("Error from rtp_send, reason: %d", ret); cnt = ret; } -out2: mem_deref(mb); -out1: free(buf); - return cnt; }