1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

[WIP] further work on write function of RTP node

add proper warning and error outputs
fix rtp_write to write data to interface (not verified)
cleanup of rtp socket outstanding
This commit is contained in:
Marvin Klimke 2018-12-01 12:31:12 +01:00
parent 2ea4b65b58
commit b6161e06b1

View file

@ -27,6 +27,7 @@
#include <re/re_types.h>
#include <re/re_main.h>
#include <re/re_mbuf.h>
#include <re/re_mem.h>
#include <re/re_rtp.h>
#include <re/re_sys.h>
#undef ALIGN_MASK
@ -139,9 +140,6 @@ int rtp_start(struct node *n)
if (ret)
return ret;
/* Initialize random number generator */
rand_init();
/* Initialize RTP socket */
uint16_t port = sa_port(&r->local) & ~1;
ret = rtp_listen(&r->rs, IPPROTO_UDP, &r->local, port, port+1, r->enable_rtcp, rtp_handler, NULL, NULL);
@ -151,21 +149,27 @@ int rtp_start(struct node *n)
int rtp_stop(struct node *n)
{
/*
int ret;
struct rtp *r = (struct rtp *) n->_vd;
*/
/* TODO */
re_cancel();
/*mem_deref(r->rs);*/
ret = io_destroy(&r->io);
if (ret)
return ret;
return 0;
}
int rtp_type_start()
{
/* Initialize library */
return libre_init();
}
int rtp_type_stop()
{
/* TODO */
libre_close();
return 0;
}
@ -179,7 +183,7 @@ int rtp_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele
/* TODO */
return -1;
return 0;
}
int rtp_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
@ -188,22 +192,28 @@ int rtp_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rel
struct rtp *r = (struct rtp *) n->_vd;
char *buf;
char pad[12] = { 0 };
char pad[] = " ";
size_t buflen;
/* ssize_t bytes; */
size_t wbytes;
buflen = RTP_INITIAL_BUFFER_LEN;
buf = alloc(buflen);
if (!buf)
if (!buf) {
error("Error allocating buffer space");
return -1;
}
retry: ret = io_sprint(&r->io, buf, buflen, &wbytes, smps, cnt);
if (ret < 0)
retry: cnt = io_sprint(&r->io, buf, buflen, &wbytes, smps, cnt);
if (cnt < 0) {
error("Error from io_sprint, reason: %d", cnt);
goto out;
}
if (wbytes <= 0)
if (wbytes <= 0) {
error("Error written bytes = %ld <= 0", wbytes);
goto out;
}
if (wbytes > buflen) {
buflen = wbytes;
@ -215,31 +225,35 @@ retry: ret = io_sprint(&r->io, buf, buflen, &wbytes, smps, cnt);
struct mbuf *mb = mbuf_alloc(buflen + 12);
ret = mbuf_write_str(mb, pad);
if(ret) {
error("Error writing to mbuf");
return ret;
error("Error writing pad to mbuf");
cnt = ret;
goto out;
}
ret = mbuf_write_str(mb, buf);
ret = mbuf_write_mem(mb, (uint8_t*)buf, buflen);
if(ret) {
error("Error writing to mbuf");
return ret;
error("Error writing data to mbuf");
cnt = ret;
goto out;
}
mbuf_set_pos(mb, 12);
/* Send dataset */
ret = rtp_send(r->rs, &r->remote, false, false, 61, (uint32_t)time(NULL), mb);
if(ret)
return ret;
ret = rtp_send(r->rs, &r->remote, false, false, 21, (uint32_t)time(NULL), mb);
if(ret) {
error("Error from rtp_send, reason: %d", ret);
cnt = ret;
}
out: free(buf);
return ret;
return cnt;
}
int rtp_fd(struct node *n)
{
/* struct rtp *r = (struct rtp *) n->_vd; */
error("No acces to file descriptor.");
error("No access to file descriptor.");
return -1;
}
@ -251,6 +265,7 @@ static struct plugin p = {
.node = {
.vectorize = 0,
.size = sizeof(struct rtp),
.type.start = rtp_type_start,
.type.stop = rtp_type_stop,
.reverse = rtp_reverse,
.parse = rtp_parse,