From a1e43c7f0061ae93d2c17d43abee4a15c1196f9d Mon Sep 17 00:00:00 2001 From: Marvin Klimke Date: Fri, 16 Nov 2018 16:07:47 +0100 Subject: [PATCH 01/16] add plugin frame for rtp node using nanomsg as a template --- include/villas/nodes/rtp.h | 80 +++++++++++++++++++ lib/nodes/rtp.c | 153 +++++++++++++++++++++++++++++++++++++ 2 files changed, 233 insertions(+) create mode 100644 include/villas/nodes/rtp.h create mode 100644 lib/nodes/rtp.c diff --git a/include/villas/nodes/rtp.h b/include/villas/nodes/rtp.h new file mode 100644 index 000000000..2baa4a164 --- /dev/null +++ b/include/villas/nodes/rtp.h @@ -0,0 +1,80 @@ +/** Node type: rtp + * + * @file + * @author Steffen Vogel + * @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +/** + * @addtogroup rtp rtp node type + * @ingroup node + * @{ + */ + +#pragma once + +#include +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +/** The maximum length of a packet which contains stuct rtp. */ +#define RTP_MAX_PACKET_LEN 1500 + +/* Forward declarations */ +struct format_type; + +struct rtp { + /* + struct { + int socket; + struct list endpoints; + } in, out; + + struct format_type *format; + struct io io; + */ +}; + +/** @see node_type::print */ +char * rtp_print(struct node *n); + +/** @see node_type::parse */ +int rtp_parse(struct node *n, json_t *cfg); + +/** @see node_type::open */ +int rtp_start(struct node *n); + +/** @see node_type::close */ +int rtp_stop(struct node *n); + +/** @see node_type::read */ +int rtp_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); + +/** @see node_type::write */ +int rtp_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); + +#ifdef __cplusplus +} +#endif + +/** @} */ diff --git a/lib/nodes/rtp.c b/lib/nodes/rtp.c new file mode 100644 index 000000000..ee5b362d4 --- /dev/null +++ b/lib/nodes/rtp.c @@ -0,0 +1,153 @@ +/** Node type: rtp + * + * @author Steffen Vogel + * @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +/* TODO: individual includes instead of wrapper include */ +#include +#include + +#include +#include +#include +#include + +int rtp_reverse(struct node *n) +{ + struct rtp *m = (struct rtp *) n->_vd; + + /* TODO */ + + return 0; +} + +int rtp_parse(struct node *n, json_t *cfg) +{ + int ret; + struct rtp *m = (struct rtp *) n->_vd; + + const char *format = "villas.binary"; + + json_error_t err; + + /* TODO ret = json_unpack_ex(...); */ + if (ret) + jerror(&err, "Failed to parse configuration of node %s", node_name(n)); + + return 0; +} + +char * rtp_print(struct node *n) +{ + struct rtp *m = (struct rtp *) n->_vd; + + char *buf = NULL; + + /* TODO */ + + return buf; +} + +int rtp_start(struct node *n) +{ + int ret; + struct rtp *m = (struct rtp *) n->_vd; + + /* TODO */ + + return 0; +} + +int rtp_stop(struct node *n) +{ + int ret; + struct rtp *m = (struct rtp *) n->_vd; + + /* TODO */ + + return 0; +} + +int rtp_type_stop() +{ + /* TODO */ + + return -1; +} + +int rtp_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) +{ + struct rtp *m = (struct rtp *) n->_vd; + int bytes; + char data[RTP_MAX_PACKET_LEN]; + + /* TODO */ + + return -1; +} + +int rtp_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) +{ + int ret; + struct rtp *m = (struct rtp *) n->_vd; + + size_t wbytes; + + char data[RTP_MAX_PACKET_LEN]; + + /* TODO */ + + return cnt; +} + +int rtp_fd(struct node *n) +{ + int ret; + struct rtp *m = (struct rtp *) n->_vd; + + int fd; + size_t len = sizeof(fd); + + /* TODO */ + + return fd; +} + +static struct plugin p = { + .name = "rtp", + .description = "real-time transport protocol (libre)", + .type = PLUGIN_TYPE_NODE, + .node = { + .vectorize = 0, + .size = sizeof(struct rtp), + .type.stop = rtp_type_stop, + .reverse = rtp_reverse, + .parse = rtp_parse, + .print = rtp_print, + .start = rtp_start, + .stop = rtp_stop, + .read = rtp_read, + .write = rtp_write, + .fd = rtp_fd + } +}; + +REGISTER_PLUGIN(&p) +LIST_INIT_STATIC(&p.node.instances) From de34ad2cda8aca74c4160a7dec411b1f3ad27e3f Mon Sep 17 00:00:00 2001 From: Marvin Klimke Date: Fri, 16 Nov 2018 16:08:55 +0100 Subject: [PATCH 02/16] add rtp node to CMakeLists adapted from libnanomsg dependencies --- CMakeLists.txt | 4 ++++ lib/nodes/CMakeLists.txt | 7 +++++++ 2 files changed, 11 insertions(+) diff --git a/CMakeLists.txt b/CMakeLists.txt index 33e716cbc..b6487f34e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -109,6 +109,10 @@ pkg_check_modules(NANOMSG IMPORTED_TARGET nanomsg) if(NOT NANOMSG_FOUND) pkg_check_modules(NANOMSG IMPORTED_TARGET libnanomsg>=1.0.0) endif() +pkg_check_modules(RE IMPORTED_TARGET re) +if(NOT RE_FOUND) + pkg_check_modules(RE IMPORTED_TARGET libre>=1.0.0) +endif() # Build options option(WITH_HOOKS "Build with support for processing hook plugins" ON) diff --git a/lib/nodes/CMakeLists.txt b/lib/nodes/CMakeLists.txt index 3a6cc92bd..4b14ea9df 100644 --- a/lib/nodes/CMakeLists.txt +++ b/lib/nodes/CMakeLists.txt @@ -125,6 +125,13 @@ if(IBVERBS_FOUND AND RDMACM_FOUND) list(APPEND LIBRARIES ${IBVERBS_LIBRARIES} ${RDMACM_LIBRARIES}) endif() +# Enable RTP node type when libre is available +if(RE_FOUND AND WITH_IO) + list(APPEND NODE_SRC rtp.c) + list(APPEND INCLUDE_DIRS ${RE_INCLUDE_DIRS}) + list(APPEND LIBRARIES PkgConfig::RE) +endif() + add_library(nodes STATIC ${NODE_SRC}) target_include_directories(nodes PUBLIC ${INCLUDE_DIRS}) target_link_libraries(nodes LINK_PRIVATE ${LIBRARIES}) From 0e7793598b5ac865e0a1f6ce8e6f13cd38e06166 Mon Sep 17 00:00:00 2001 From: Marvin Klimke Date: Wed, 21 Nov 2018 18:21:12 +0100 Subject: [PATCH 03/16] correct libre version --- CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index b6487f34e..3410ffa1d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -111,7 +111,7 @@ if(NOT NANOMSG_FOUND) endif() pkg_check_modules(RE IMPORTED_TARGET re) if(NOT RE_FOUND) - pkg_check_modules(RE IMPORTED_TARGET libre>=1.0.0) + pkg_check_modules(RE IMPORTED_TARGET libre>=0.5.9) endif() # Build options From 162fafbb032b419367ad4f0258b0c2e9ef09087b Mon Sep 17 00:00:00 2001 From: Marvin Klimke Date: Wed, 21 Nov 2018 18:21:29 +0100 Subject: [PATCH 04/16] prevent "unused" warnings --- lib/nodes/rtp.c | 45 ++++++++++++++++++++++++++------------------- 1 file changed, 26 insertions(+), 19 deletions(-) diff --git a/lib/nodes/rtp.c b/lib/nodes/rtp.c index ee5b362d4..c979235a9 100644 --- a/lib/nodes/rtp.c +++ b/lib/nodes/rtp.c @@ -20,10 +20,11 @@ * along with this program. If not, see . *********************************************************************************/ -/* TODO: individual includes instead of wrapper include */ -#include +#include #include +#include + #include #include #include @@ -31,19 +32,19 @@ int rtp_reverse(struct node *n) { - struct rtp *m = (struct rtp *) n->_vd; + /* struct rtp *m = (struct rtp *) n->_vd; */ /* TODO */ - return 0; + return -1; } int rtp_parse(struct node *n, json_t *cfg) { - int ret; - struct rtp *m = (struct rtp *) n->_vd; + int ret = 0; + /* struct rtp *m = (struct rtp *) n->_vd; */ - const char *format = "villas.binary"; + /* const char *format = "villas.binary"; */ json_error_t err; @@ -51,14 +52,15 @@ int rtp_parse(struct node *n, json_t *cfg) if (ret) jerror(&err, "Failed to parse configuration of node %s", node_name(n)); - return 0; + return -1; } char * rtp_print(struct node *n) { - struct rtp *m = (struct rtp *) n->_vd; - + /* struct rtp *m = (struct rtp *) n->_vd; */ + char *buf = NULL; + /* TODO */ @@ -67,22 +69,27 @@ char * rtp_print(struct node *n) int rtp_start(struct node *n) { + /* int ret; struct rtp *m = (struct rtp *) n->_vd; + */ /* TODO */ - return 0; + return -1; } int rtp_stop(struct node *n) { + /* int ret; struct rtp *m = (struct rtp *) n->_vd; + */ /* TODO */ + re_cancel(); - return 0; + return -1; } int rtp_type_stop() @@ -94,9 +101,11 @@ int rtp_type_stop() int rtp_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { + /* struct rtp *m = (struct rtp *) n->_vd; int bytes; char data[RTP_MAX_PACKET_LEN]; + */ /* TODO */ @@ -105,25 +114,23 @@ int rtp_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele int rtp_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { - int ret; - struct rtp *m = (struct rtp *) n->_vd; + /* struct rtp *m = (struct rtp *) n->_vd; */ - size_t wbytes; + /* size_t wbytes; */ - char data[RTP_MAX_PACKET_LEN]; + /* char data[RTP_MAX_PACKET_LEN]; */ /* TODO */ + rtp_send(NULL, NULL, 0, 0, 0, 0, NULL); return cnt; } int rtp_fd(struct node *n) { - int ret; - struct rtp *m = (struct rtp *) n->_vd; + /* struct rtp *m = (struct rtp *) n->_vd; */ int fd; - size_t len = sizeof(fd); /* TODO */ From dc81c8fd8c1a15c68f431b85ec2b3f67519dc26c Mon Sep 17 00:00:00 2001 From: Marvin Klimke Date: Thu, 22 Nov 2018 07:18:27 +0100 Subject: [PATCH 05/16] first version of rtp node parse method use individual libre includes to avoid naming conflicts. use libre's struct sa to store socket addresses. --- include/villas/nodes/rtp.h | 7 ++++++- lib/nodes/rtp.c | 41 ++++++++++++++++++++++++++++++++------ 2 files changed, 41 insertions(+), 7 deletions(-) diff --git a/include/villas/nodes/rtp.h b/include/villas/nodes/rtp.h index 2baa4a164..a73994056 100644 --- a/include/villas/nodes/rtp.h +++ b/include/villas/nodes/rtp.h @@ -29,6 +29,8 @@ #pragma once +#include + #include #include #include @@ -49,10 +51,13 @@ struct rtp { int socket; struct list endpoints; } in, out; + */ + + struct sa local; /**< Local address of the socket */ + struct sa remote; /**< Remote address of the socket */ struct format_type *format; struct io io; - */ }; /** @see node_type::print */ diff --git a/lib/nodes/rtp.c b/lib/nodes/rtp.c index c979235a9..2576e511f 100644 --- a/lib/nodes/rtp.c +++ b/lib/nodes/rtp.c @@ -23,7 +23,10 @@ #include #include -#include +#include +#include +#include +#undef ALIGN_MASK #include #include @@ -42,17 +45,43 @@ int rtp_reverse(struct node *n) int rtp_parse(struct node *n, json_t *cfg) { int ret = 0; - /* struct rtp *m = (struct rtp *) n->_vd; */ + struct rtp *sr = (struct rtp *) n->_vd; - /* const char *format = "villas.binary"; */ + const char *local, *remote; + const char *format = "villas.binary"; json_error_t err; - /* TODO ret = json_unpack_ex(...); */ + ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s: { s: s }, s: { s: s } }", + "format", &format, + "out", + "address", &remote, + "in", + "address", &local + ); if (ret) jerror(&err, "Failed to parse configuration of node %s", node_name(n)); - return -1; + /* Format */ + sr->format = format_type_lookup(format); + if(!sr->format) + error("Invalid format '%s' for node %s", format, node_name(n)); + + ret = sa_decode(&sr->remote, remote, strlen(remote)); + if (ret) { + error("Failed to resolve remote address '%s' of node %s: %s", + remote, node_name(n), strerror(ret)); + } + + ret = sa_decode(&sr->local, local, strlen(local)); + if (ret) { + error("Failed to resolve local address '%s' of node %s: %s", + local, node_name(n), strerror(ret)); + } + + info("### MKL ### rtp_parse success\n"); + + return ret; } char * rtp_print(struct node *n) @@ -130,7 +159,7 @@ int rtp_fd(struct node *n) { /* struct rtp *m = (struct rtp *) n->_vd; */ - int fd; + int fd = -1; /* TODO */ From 29a24d354e93a1a95616c96a00edbd785619d695 Mon Sep 17 00:00:00 2001 From: Marvin Klimke Date: Thu, 22 Nov 2018 17:53:07 +0100 Subject: [PATCH 06/16] implement rtp_print and rtp_reverse also begin of rtp_start add libre rtp socket and flag for rtcp in struct rtp --- include/villas/nodes/rtp.h | 9 ++--- lib/nodes/rtp.c | 71 ++++++++++++++++++++++++++------------ 2 files changed, 51 insertions(+), 29 deletions(-) diff --git a/include/villas/nodes/rtp.h b/include/villas/nodes/rtp.h index a73994056..1f6fc1591 100644 --- a/include/villas/nodes/rtp.h +++ b/include/villas/nodes/rtp.h @@ -46,18 +46,15 @@ extern "C" { struct format_type; struct rtp { - /* - struct { - int socket; - struct list endpoints; - } in, out; - */ + struct rtp_sock *rs; /**< RTP socket */ struct sa local; /**< Local address of the socket */ struct sa remote; /**< Remote address of the socket */ struct format_type *format; struct io io; + + bool enable_rtcp; }; /** @see node_type::print */ diff --git a/lib/nodes/rtp.c b/lib/nodes/rtp.c index 2576e511f..c6520b293 100644 --- a/lib/nodes/rtp.c +++ b/lib/nodes/rtp.c @@ -30,30 +30,36 @@ #include #include +#include #include #include int rtp_reverse(struct node *n) { - /* struct rtp *m = (struct rtp *) n->_vd; */ + struct rtp *r = (struct rtp *) n->_vd; + struct sa tmp; - /* TODO */ + tmp = r->local; + r->local = r->remote; + r->remote = tmp; - return -1; + return 0; } int rtp_parse(struct node *n, json_t *cfg) { int ret = 0; - struct rtp *sr = (struct rtp *) n->_vd; + struct rtp *r = (struct rtp *) n->_vd; const char *local, *remote; const char *format = "villas.binary"; + bool enable_rtcp = false; json_error_t err; - ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s: { s: s }, s: { s: s } }", + ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: b, s: { s: s }, s: { s: s } }", "format", &format, + "enable_rtcp", &enable_rtcp, "out", "address", &remote, "in", @@ -63,49 +69,70 @@ int rtp_parse(struct node *n, json_t *cfg) jerror(&err, "Failed to parse configuration of node %s", node_name(n)); /* Format */ - sr->format = format_type_lookup(format); - if(!sr->format) + r->format = format_type_lookup(format); + if(!r->format) error("Invalid format '%s' for node %s", format, node_name(n)); - ret = sa_decode(&sr->remote, remote, strlen(remote)); + /* Enable RTCP */ + r->enable_rtcp = enable_rtcp; + if(enable_rtcp) + error("RTCP is not implemented yet."); + + /* Remote address */ + ret = sa_decode(&r->remote, remote, strlen(remote)); if (ret) { error("Failed to resolve remote address '%s' of node %s: %s", remote, node_name(n), strerror(ret)); } - ret = sa_decode(&sr->local, local, strlen(local)); + /* Local address */ + ret = sa_decode(&r->local, local, strlen(local)); if (ret) { error("Failed to resolve local address '%s' of node %s: %s", local, node_name(n), strerror(ret)); } - info("### MKL ### rtp_parse success\n"); + /** @todo parse * in addresses */ return ret; } char * rtp_print(struct node *n) { - /* struct rtp *m = (struct rtp *) n->_vd; */ - - char *buf = NULL; - + struct rtp *r = (struct rtp *) n->_vd; + char *buf; - /* TODO */ + char *local = socket_print_addr((struct sockaddr *) &r->local.u); + char *remote = socket_print_addr((struct sockaddr *) &r->remote.u); + + buf = strf("format=%s, in.address=%s, out.address=%s", format_type_name(r->format), local, remote); + + free(local); + free(remote); return buf; } int rtp_start(struct node *n) { - /* int ret; - struct rtp *m = (struct rtp *) n->_vd; - */ + struct rtp *r = (struct rtp *) n->_vd; + + /* Initialize IO */ + ret = io_init(&r->io, r->format, &n->signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET); + if (ret) + return ret; + + ret = io_check(&r->io); + if (ret) + return ret; + + uint16_t port = sa_port(&r->local) & ~1; + ret = rtp_listen(&r->rs, IPPROTO_UDP, &r->local, port, port+1, r->enable_rtcp, NULL, NULL, NULL); /* TODO */ - return -1; + return ret; } int rtp_stop(struct node *n) @@ -159,11 +186,9 @@ int rtp_fd(struct node *n) { /* struct rtp *m = (struct rtp *) n->_vd; */ - int fd = -1; + error("No acces to file descriptor."); - /* TODO */ - - return fd; + return -1; } static struct plugin p = { From 5d9ad4e9e0eb9e6a8f52c05f8c80fbb9e44286f8 Mon Sep 17 00:00:00 2001 From: Marvin Klimke Date: Wed, 28 Nov 2018 06:11:13 +0100 Subject: [PATCH 07/16] add placeholder rtp receive handler --- lib/nodes/rtp.c | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/lib/nodes/rtp.c b/lib/nodes/rtp.c index c6520b293..44a03deef 100644 --- a/lib/nodes/rtp.c +++ b/lib/nodes/rtp.c @@ -113,6 +113,15 @@ char * rtp_print(struct node *n) return buf; } +static void rtp_handler(const struct sa *src, const struct rtp_header *hdr, struct mbuf *mb, void *arg) +{ + (void)hdr; + (void)arg; + (void)mb; + + /* placeholder */ +} + int rtp_start(struct node *n) { int ret; @@ -128,7 +137,7 @@ int rtp_start(struct node *n) return ret; uint16_t port = sa_port(&r->local) & ~1; - ret = rtp_listen(&r->rs, IPPROTO_UDP, &r->local, port, port+1, r->enable_rtcp, NULL, NULL, NULL); + ret = rtp_listen(&r->rs, IPPROTO_UDP, &r->local, port, port+1, r->enable_rtcp, rtp_handler, NULL, NULL); /* TODO */ From 2ea4b65b58932e48cdbed3af90462a4d9d34083a Mon Sep 17 00:00:00 2001 From: Marvin Klimke Date: Wed, 28 Nov 2018 18:12:06 +0100 Subject: [PATCH 08/16] [WIP] first approach to rtp_write function. --- include/villas/nodes/rtp.h | 2 +- lib/nodes/rtp.c | 72 ++++++++++++++++++++++++++++++-------- 2 files changed, 59 insertions(+), 15 deletions(-) diff --git a/include/villas/nodes/rtp.h b/include/villas/nodes/rtp.h index 1f6fc1591..5802a5261 100644 --- a/include/villas/nodes/rtp.h +++ b/include/villas/nodes/rtp.h @@ -40,7 +40,7 @@ extern "C" { #endif /** The maximum length of a packet which contains stuct rtp. */ -#define RTP_MAX_PACKET_LEN 1500 +#define RTP_INITIAL_BUFFER_LEN 1500 /* Forward declarations */ struct format_type; diff --git a/lib/nodes/rtp.c b/lib/nodes/rtp.c index 44a03deef..9e94a9709 100644 --- a/lib/nodes/rtp.c +++ b/lib/nodes/rtp.c @@ -22,15 +22,18 @@ #include #include +#include #include #include +#include #include +#include #undef ALIGN_MASK #include -#include #include +#include #include #include @@ -136,11 +139,13 @@ int rtp_start(struct node *n) if (ret) return ret; + /* Initialize random number generator */ + rand_init(); + + /* Initialize RTP socket */ uint16_t port = sa_port(&r->local) & ~1; ret = rtp_listen(&r->rs, IPPROTO_UDP, &r->local, port, port+1, r->enable_rtcp, rtp_handler, NULL, NULL); - /* TODO */ - return ret; } @@ -148,26 +153,26 @@ int rtp_stop(struct node *n) { /* int ret; - struct rtp *m = (struct rtp *) n->_vd; + struct rtp *r = (struct rtp *) n->_vd; */ /* TODO */ re_cancel(); - return -1; + return 0; } int rtp_type_stop() { /* TODO */ - return -1; + return 0; } int rtp_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { /* - struct rtp *m = (struct rtp *) n->_vd; + struct rtp *r = (struct rtp *) n->_vd; int bytes; char data[RTP_MAX_PACKET_LEN]; */ @@ -179,21 +184,60 @@ int rtp_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele int rtp_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { - /* struct rtp *m = (struct rtp *) n->_vd; */ + int ret; + struct rtp *r = (struct rtp *) n->_vd; - /* size_t wbytes; */ + char *buf; + char pad[12] = { 0 }; + size_t buflen; + /* ssize_t bytes; */ + size_t wbytes; - /* char data[RTP_MAX_PACKET_LEN]; */ + buflen = RTP_INITIAL_BUFFER_LEN; + buf = alloc(buflen); + if (!buf) + return -1; - /* TODO */ - rtp_send(NULL, NULL, 0, 0, 0, 0, NULL); +retry: ret = io_sprint(&r->io, buf, buflen, &wbytes, smps, cnt); + if (ret < 0) + goto out; - return cnt; + if (wbytes <= 0) + goto out; + + if (wbytes > buflen) { + buflen = wbytes; + buf = realloc(buf, buflen); + goto retry; + } + + /* Prepare mbuf */ + struct mbuf *mb = mbuf_alloc(buflen + 12); + ret = mbuf_write_str(mb, pad); + if(ret) { + error("Error writing to mbuf"); + return ret; + } + ret = mbuf_write_str(mb, buf); + if(ret) { + error("Error writing to mbuf"); + return ret; + } + mbuf_set_pos(mb, 12); + + /* Send dataset */ + ret = rtp_send(r->rs, &r->remote, false, false, 61, (uint32_t)time(NULL), mb); + if(ret) + return ret; + +out: free(buf); + + return ret; } int rtp_fd(struct node *n) { - /* struct rtp *m = (struct rtp *) n->_vd; */ + /* struct rtp *r = (struct rtp *) n->_vd; */ error("No acces to file descriptor."); From b6161e06b168ca41a35c08db6a26ea732649c20f Mon Sep 17 00:00:00 2001 From: Marvin Klimke Date: Sat, 1 Dec 2018 12:31:12 +0100 Subject: [PATCH 09/16] [WIP] further work on write function of RTP node add proper warning and error outputs fix rtp_write to write data to interface (not verified) cleanup of rtp socket outstanding --- lib/nodes/rtp.c | 65 ++++++++++++++++++++++++++++++------------------- 1 file changed, 40 insertions(+), 25 deletions(-) diff --git a/lib/nodes/rtp.c b/lib/nodes/rtp.c index 9e94a9709..2138954a5 100644 --- a/lib/nodes/rtp.c +++ b/lib/nodes/rtp.c @@ -27,6 +27,7 @@ #include #include #include +#include #include #include #undef ALIGN_MASK @@ -139,9 +140,6 @@ int rtp_start(struct node *n) if (ret) return ret; - /* Initialize random number generator */ - rand_init(); - /* Initialize RTP socket */ uint16_t port = sa_port(&r->local) & ~1; ret = rtp_listen(&r->rs, IPPROTO_UDP, &r->local, port, port+1, r->enable_rtcp, rtp_handler, NULL, NULL); @@ -151,21 +149,27 @@ int rtp_start(struct node *n) int rtp_stop(struct node *n) { - /* int ret; struct rtp *r = (struct rtp *) n->_vd; - */ - /* TODO */ - re_cancel(); + /*mem_deref(r->rs);*/ + + ret = io_destroy(&r->io); + if (ret) + return ret; return 0; } +int rtp_type_start() +{ + /* Initialize library */ + return libre_init(); +} + int rtp_type_stop() { - /* TODO */ - + libre_close(); return 0; } @@ -179,7 +183,7 @@ int rtp_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele /* TODO */ - return -1; + return 0; } int rtp_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) @@ -188,22 +192,28 @@ int rtp_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rel struct rtp *r = (struct rtp *) n->_vd; char *buf; - char pad[12] = { 0 }; + char pad[] = " "; size_t buflen; /* ssize_t bytes; */ size_t wbytes; buflen = RTP_INITIAL_BUFFER_LEN; buf = alloc(buflen); - if (!buf) + if (!buf) { + error("Error allocating buffer space"); return -1; + } -retry: ret = io_sprint(&r->io, buf, buflen, &wbytes, smps, cnt); - if (ret < 0) +retry: cnt = io_sprint(&r->io, buf, buflen, &wbytes, smps, cnt); + if (cnt < 0) { + error("Error from io_sprint, reason: %d", cnt); goto out; + } - if (wbytes <= 0) + if (wbytes <= 0) { + error("Error written bytes = %ld <= 0", wbytes); goto out; + } if (wbytes > buflen) { buflen = wbytes; @@ -215,31 +225,35 @@ retry: ret = io_sprint(&r->io, buf, buflen, &wbytes, smps, cnt); struct mbuf *mb = mbuf_alloc(buflen + 12); ret = mbuf_write_str(mb, pad); if(ret) { - error("Error writing to mbuf"); - return ret; + error("Error writing pad to mbuf"); + cnt = ret; + goto out; } - ret = mbuf_write_str(mb, buf); + ret = mbuf_write_mem(mb, (uint8_t*)buf, buflen); if(ret) { - error("Error writing to mbuf"); - return ret; + error("Error writing data to mbuf"); + cnt = ret; + goto out; } mbuf_set_pos(mb, 12); /* Send dataset */ - ret = rtp_send(r->rs, &r->remote, false, false, 61, (uint32_t)time(NULL), mb); - if(ret) - return ret; + ret = rtp_send(r->rs, &r->remote, false, false, 21, (uint32_t)time(NULL), mb); + if(ret) { + error("Error from rtp_send, reason: %d", ret); + cnt = ret; + } out: free(buf); - return ret; + return cnt; } int rtp_fd(struct node *n) { /* struct rtp *r = (struct rtp *) n->_vd; */ - error("No acces to file descriptor."); + error("No access to file descriptor."); return -1; } @@ -251,6 +265,7 @@ static struct plugin p = { .node = { .vectorize = 0, .size = sizeof(struct rtp), + .type.start = rtp_type_start, .type.stop = rtp_type_stop, .reverse = rtp_reverse, .parse = rtp_parse, From c905f242c5c4ca53ec6189b4dcc6560dce3a09c3 Mon Sep 17 00:00:00 2001 From: Marvin Klimke Date: Fri, 7 Dec 2018 06:37:48 +0100 Subject: [PATCH 10/16] add re_main in pthread thread is started in rtp_type_start and joined in rtp_type_stop --- lib/nodes/rtp.c | 52 ++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 41 insertions(+), 11 deletions(-) diff --git a/lib/nodes/rtp.c b/lib/nodes/rtp.c index 2138954a5..049ecb72e 100644 --- a/lib/nodes/rtp.c +++ b/lib/nodes/rtp.c @@ -21,6 +21,7 @@ *********************************************************************************/ #include +#include #include #include @@ -38,6 +39,8 @@ #include #include +pthread_t re_pthread; + int rtp_reverse(struct node *n) { struct rtp *r = (struct rtp *) n->_vd; @@ -121,7 +124,8 @@ static void rtp_handler(const struct sa *src, const struct rtp_header *hdr, stru { (void)hdr; (void)arg; - (void)mb; + + printf("rtp: recv %zu bytes\n", mbuf_get_left(mb)); /* placeholder */ } @@ -149,28 +153,54 @@ int rtp_start(struct node *n) int rtp_stop(struct node *n) { - int ret; struct rtp *r = (struct rtp *) n->_vd; /*mem_deref(r->rs);*/ - ret = io_destroy(&r->io); - if (ret) - return ret; + return io_destroy(&r->io); +} - return 0; +static void * th_func(void *arg) +{ + re_main(NULL); + return NULL; } int rtp_type_start() { + int ret; + /* Initialize library */ - return libre_init(); + ret = libre_init(); + if (ret) { + error("Error initializing libre"); + return ret; + } + + /* Add worker thread */ + ret = pthread_create(&re_pthread, NULL, th_func, NULL); + if (ret) { + error("Error creating rtp node type pthread"); + return ret; + } + + return ret; } int rtp_type_stop() { + int ret; + + /* Join worker thread */ + re_cancel(); + ret = pthread_join(re_pthread, NULL); + if (ret) { + error("Error joining rtp node type pthread"); + return ret; + } + libre_close(); - return 0; + return ret; } int rtp_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) @@ -224,13 +254,13 @@ retry: cnt = io_sprint(&r->io, buf, buflen, &wbytes, smps, cnt); /* Prepare mbuf */ struct mbuf *mb = mbuf_alloc(buflen + 12); ret = mbuf_write_str(mb, pad); - if(ret) { + if (ret) { error("Error writing pad to mbuf"); cnt = ret; goto out; } ret = mbuf_write_mem(mb, (uint8_t*)buf, buflen); - if(ret) { + if (ret) { error("Error writing data to mbuf"); cnt = ret; goto out; @@ -239,7 +269,7 @@ retry: cnt = io_sprint(&r->io, buf, buflen, &wbytes, smps, cnt); /* Send dataset */ ret = rtp_send(r->rs, &r->remote, false, false, 21, (uint32_t)time(NULL), mb); - if(ret) { + if (ret) { error("Error from rtp_send, reason: %d", ret); cnt = ret; } From 257568895147e3a554cdaa9314212d46cdae0453 Mon Sep 17 00:00:00 2001 From: Marvin Klimke Date: Fri, 7 Dec 2018 15:15:24 +0100 Subject: [PATCH 11/16] [WIP] add pthread syncronization to re_main thread rtp receive handler writes data to mutex protected memory and indicates the amount of data written. main thread polls on the thread and gets the data respecting the mutex. --- include/villas/nodes/rtp.h | 6 ++++ lib/nodes/rtp.c | 65 ++++++++++++++++++++++++++++---------- 2 files changed, 55 insertions(+), 16 deletions(-) diff --git a/include/villas/nodes/rtp.h b/include/villas/nodes/rtp.h index 5802a5261..fa5e82d8a 100644 --- a/include/villas/nodes/rtp.h +++ b/include/villas/nodes/rtp.h @@ -29,6 +29,8 @@ #pragma once +#include + #include #include @@ -55,6 +57,10 @@ struct rtp { struct io io; bool enable_rtcp; + + char *recv_buf; + size_t recv_size; + pthread_mutex_t recv_mutex; }; /** @see node_type::print */ diff --git a/lib/nodes/rtp.c b/lib/nodes/rtp.c index 049ecb72e..2e4530451 100644 --- a/lib/nodes/rtp.c +++ b/lib/nodes/rtp.c @@ -80,7 +80,7 @@ int rtp_parse(struct node *n, json_t *cfg) if(!r->format) error("Invalid format '%s' for node %s", format, node_name(n)); - /* Enable RTCP */ + /* Enable RTCP */ r->enable_rtcp = enable_rtcp; if(enable_rtcp) error("RTCP is not implemented yet."); @@ -122,19 +122,38 @@ char * rtp_print(struct node *n) static void rtp_handler(const struct sa *src, const struct rtp_header *hdr, struct mbuf *mb, void *arg) { - (void)hdr; - (void)arg; - - printf("rtp: recv %zu bytes\n", mbuf_get_left(mb)); + struct rtp *r = (struct rtp *) arg; + size_t n = mbuf_get_left(mb); - /* placeholder */ + /* enter critical section */ + pthread_mutex_lock(&r->recv_mutex); + + /* adapt buffer space and read data */ + r->recv_buf = realloc(r->recv_buf, n); + mbuf_read_mem(mb, (uint8_t *) r->recv_buf, n); + + /* indicate number of bytes read */ + r->recv_size = n; + + /* leave critical section */ + pthread_mutex_unlock(&r->recv_mutex); + + /* header not yet used */ + (void)hdr; } int rtp_start(struct node *n) { int ret; struct rtp *r = (struct rtp *) n->_vd; - + + /* Initialize Mutex */ + ret = pthread_mutex_init(&r->recv_mutex, NULL); + if (ret) + return ret; + r->recv_size = 0; + r->recv_buf = (char *)alloc(RTP_INITIAL_BUFFER_LEN); + /* Initialize IO */ ret = io_init(&r->io, r->format, &n->signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET); if (ret) @@ -146,7 +165,7 @@ int rtp_start(struct node *n) /* Initialize RTP socket */ uint16_t port = sa_port(&r->local) & ~1; - ret = rtp_listen(&r->rs, IPPROTO_UDP, &r->local, port, port+1, r->enable_rtcp, rtp_handler, NULL, NULL); + ret = rtp_listen(&r->rs, IPPROTO_UDP, &r->local, port, port+1, r->enable_rtcp, rtp_handler, NULL, n->_vd); return ret; } @@ -193,6 +212,7 @@ int rtp_type_stop() /* Join worker thread */ re_cancel(); + pthread_cancel(re_pthread); /* @todo avoid using pthread_cancel */ ret = pthread_join(re_pthread, NULL); if (ret) { error("Error joining rtp node type pthread"); @@ -205,15 +225,29 @@ int rtp_type_stop() int rtp_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { - /* + int ret; struct rtp *r = (struct rtp *) n->_vd; - int bytes; - char data[RTP_MAX_PACKET_LEN]; - */ + size_t bytes = r->recv_size; - /* TODO */ + /* new data available? */ + if (bytes > 0) { + /* enter critical section */ + pthread_mutex_lock(&r->recv_mutex); - return 0; + /* read from buffer */ + ret = io_sscan(&r->io, r->recv_buf, bytes, NULL, smps, cnt); + + /* indicate data was consumed */ + r->recv_size = 0; + + /* leave critical section */ + pthread_mutex_unlock(&r->recv_mutex); + + if (ret < 0) + warn("Received invalid packet from node: %s ret=%d", node_name(n), ret); + } + + return ret; } int rtp_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) @@ -224,7 +258,6 @@ int rtp_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rel char *buf; char pad[] = " "; size_t buflen; - /* ssize_t bytes; */ size_t wbytes; buflen = RTP_INITIAL_BUFFER_LEN; @@ -255,7 +288,7 @@ retry: cnt = io_sprint(&r->io, buf, buflen, &wbytes, smps, cnt); struct mbuf *mb = mbuf_alloc(buflen + 12); ret = mbuf_write_str(mb, pad); if (ret) { - error("Error writing pad to mbuf"); + error("Error writing padding to mbuf"); cnt = ret; goto out; } From 40d9bd5368eb59c6c32ff2df98d5db78f16398a3 Mon Sep 17 00:00:00 2001 From: Marvin Klimke Date: Fri, 7 Dec 2018 15:16:11 +0100 Subject: [PATCH 12/16] [WIP] add integration test script for rtp node aparently not yet working. --- tests/integration/pipe-loopback-rtp.sh | 77 ++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) create mode 100755 tests/integration/pipe-loopback-rtp.sh diff --git a/tests/integration/pipe-loopback-rtp.sh b/tests/integration/pipe-loopback-rtp.sh new file mode 100755 index 000000000..405c04c9f --- /dev/null +++ b/tests/integration/pipe-loopback-rtp.sh @@ -0,0 +1,77 @@ +#!/bin/bash +# +# Integration loopback test for villas-pipe. +# +# @author Steffen Vogel +# @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC +# @license GNU General Public License (version 3) +# +# VILLASnode +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +################################################################################## + +SCRIPT=$(realpath $0) +SCRIPTPATH=$(dirname ${SCRIPT}) +source ${SCRIPTPATH}/../../tools/integration-tests-helper.sh + +CONFIG_FILE=$(mktemp) +INPUT_FILE=$(mktemp) +OUTPUT_FILE=$(mktemp) + +NUM_SAMPLES=${NUM_SAMPLES:-100} + +# Generate test data +villas-signal mixed -v 5 -l ${NUM_SAMPLES} -n > ${INPUT_FILE} + +FORMAT="villas.binary" +VECTORIZE="1" + +cat > ${CONFIG_FILE} << EOF +{ + "nodes" : { + "node1" : { + "type" : "rtp", + + "format" : "${FORMAT}", + "vectorize" : ${VECTORIZE}, + + "in" : { + "address" : "127.0.0.1:12000", + + "signals" : [ + { "type" : "float" }, + { "type" : "float" }, + { "type" : "float" }, + { "type" : "float" }, + { "type" : "float" } + ] + }, + "out" : { + "address" : "127.0.0.1:12000" + } + } + } +} +EOF + +villas-pipe -l ${NUM_SAMPLES} ${CONFIG_FILE} node1 > ${OUTPUT_FILE} < ${INPUT_FILE} + +# Compare data +villas-test-cmp ${CMPFLAGS} ${INPUT_FILE} ${OUTPUT_FILE} +RC=$? + +rm ${OUTPUT_FILE} ${INPUT_FILE} ${CONFIG_FILE} + +exit $RC From 22b42a8d8838aedb1cf00916d439b93cd502c6dc Mon Sep 17 00:00:00 2001 From: Marvin Klimke Date: Thu, 13 Dec 2018 18:50:18 +0100 Subject: [PATCH 13/16] [WIP] Use thread save queue instead of mutexes for data exchange worker thread pushes received data in queue, main thread pulls data on request from queue. --- include/villas/nodes/rtp.h | 5 ++-- lib/nodes/rtp.c | 57 +++++++++++++++----------------------- 2 files changed, 25 insertions(+), 37 deletions(-) diff --git a/include/villas/nodes/rtp.h b/include/villas/nodes/rtp.h index fa5e82d8a..5bdf225d0 100644 --- a/include/villas/nodes/rtp.h +++ b/include/villas/nodes/rtp.h @@ -36,6 +36,7 @@ #include #include #include +#include #ifdef __cplusplus extern "C" { @@ -58,9 +59,7 @@ struct rtp { bool enable_rtcp; - char *recv_buf; - size_t recv_size; - pthread_mutex_t recv_mutex; + struct queue recv_queue; }; /** @see node_type::print */ diff --git a/lib/nodes/rtp.c b/lib/nodes/rtp.c index 2e4530451..38d76c2d2 100644 --- a/lib/nodes/rtp.c +++ b/lib/nodes/rtp.c @@ -123,20 +123,9 @@ char * rtp_print(struct node *n) static void rtp_handler(const struct sa *src, const struct rtp_header *hdr, struct mbuf *mb, void *arg) { struct rtp *r = (struct rtp *) arg; - size_t n = mbuf_get_left(mb); - /* enter critical section */ - pthread_mutex_lock(&r->recv_mutex); - - /* adapt buffer space and read data */ - r->recv_buf = realloc(r->recv_buf, n); - mbuf_read_mem(mb, (uint8_t *) r->recv_buf, n); - - /* indicate number of bytes read */ - r->recv_size = n; - - /* leave critical section */ - pthread_mutex_unlock(&r->recv_mutex); + if (queue_push(&r->recv_queue, (void *) mbuf_alloc_ref(mb)) != 1) + warn("Failed to push to queue"); /* header not yet used */ (void)hdr; @@ -147,12 +136,10 @@ int rtp_start(struct node *n) int ret; struct rtp *r = (struct rtp *) n->_vd; - /* Initialize Mutex */ - ret = pthread_mutex_init(&r->recv_mutex, NULL); + /* Initialize Queue */ + ret = queue_init(&r->recv_queue, 8, &memory_heap); if (ret) return ret; - r->recv_size = 0; - r->recv_buf = (char *)alloc(RTP_INITIAL_BUFFER_LEN); /* Initialize IO */ ret = io_init(&r->io, r->format, &n->signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET); @@ -227,26 +214,28 @@ int rtp_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele { int ret; struct rtp *r = (struct rtp *) n->_vd; - size_t bytes = r->recv_size; - - /* new data available? */ - if (bytes > 0) { - /* enter critical section */ - pthread_mutex_lock(&r->recv_mutex); - - /* read from buffer */ - ret = io_sscan(&r->io, r->recv_buf, bytes, NULL, smps, cnt); - - /* indicate data was consumed */ - r->recv_size = 0; - - /* leave critical section */ - pthread_mutex_unlock(&r->recv_mutex); + size_t bytes; + char *buf; + struct mbuf *mb; + /* Get data from queue */ + ret = queue_pull(&r->recv_queue, (void **) &mb); + if (ret <= 0) { if (ret < 0) - warn("Received invalid packet from node: %s ret=%d", node_name(n), ret); + warn("Failed to pull from queue"); + return ret; } + /* Read from mbuf */ + bytes = mbuf_get_left(mb); + buf = (char *) alloc(bytes); + mbuf_read_mem(mb, (uint8_t *) buf, bytes); + + /* Unpack data */ + ret = io_sscan(&r->io, buf, bytes, NULL, smps, cnt); + if (ret < 0) + warn("Received invalid packet from node %s: reason=%d", node_name(n), ret); + return ret; } @@ -301,7 +290,7 @@ retry: cnt = io_sprint(&r->io, buf, buflen, &wbytes, smps, cnt); mbuf_set_pos(mb, 12); /* Send dataset */ - ret = rtp_send(r->rs, &r->remote, false, false, 21, (uint32_t)time(NULL), mb); + ret = rtp_send(r->rs, &r->remote, false, false, 21, (uint32_t) time(NULL), mb); if (ret) { error("Error from rtp_send, reason: %d", ret); cnt = ret; From 7e38cb9eb03d3bd973fb19455512107ebf94f294 Mon Sep 17 00:00:00 2001 From: Marvin Klimke Date: Sun, 16 Dec 2018 11:47:33 +0100 Subject: [PATCH 14/16] add free and mem_deref to prevent memory leaks --- lib/nodes/rtp.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/lib/nodes/rtp.c b/lib/nodes/rtp.c index 38d76c2d2..5e448f74a 100644 --- a/lib/nodes/rtp.c +++ b/lib/nodes/rtp.c @@ -127,8 +127,9 @@ static void rtp_handler(const struct sa *src, const struct rtp_header *hdr, stru if (queue_push(&r->recv_queue, (void *) mbuf_alloc_ref(mb)) != 1) warn("Failed to push to queue"); - /* header not yet used */ - (void)hdr; + /* source, header not yet used */ + (void) src; + (void) hdr; } int rtp_start(struct node *n) @@ -236,6 +237,7 @@ int rtp_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele if (ret < 0) warn("Received invalid packet from node %s: reason=%d", node_name(n), ret); + free(buf); return ret; } @@ -297,6 +299,7 @@ retry: cnt = io_sprint(&r->io, buf, buflen, &wbytes, smps, cnt); } out: free(buf); + mem_deref(mb); return cnt; } From 26cdc6e58adccbef47c4af8cee71877ba651a027 Mon Sep 17 00:00:00 2001 From: Marvin Klimke Date: Wed, 19 Dec 2018 18:40:53 +0100 Subject: [PATCH 15/16] add proper queue destruction --- lib/nodes/rtp.c | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/lib/nodes/rtp.c b/lib/nodes/rtp.c index 5e448f74a..5dd0cad46 100644 --- a/lib/nodes/rtp.c +++ b/lib/nodes/rtp.c @@ -160,10 +160,19 @@ int rtp_start(struct node *n) int rtp_stop(struct node *n) { + int ret; struct rtp *r = (struct rtp *) n->_vd; /*mem_deref(r->rs);*/ + ret = queue_close(&r->recv_queue); + if (ret) + warn("Problem closing queue"); + + ret = queue_destroy(&r->recv_queue); + if (ret) + warn("Problem destroying queue"); + return io_destroy(&r->io); } From 434e1e5c1dc91bba5fb9f3b59a097249049a1d64 Mon Sep 17 00:00:00 2001 From: Marvin Klimke Date: Thu, 20 Dec 2018 08:25:13 +0100 Subject: [PATCH 16/16] [WIP] Prepare for RTCP sessions add second pair of local/remote addresses add receive handler for RTCP packets --- include/villas/nodes/rtp.h | 6 +++-- lib/nodes/rtp.c | 52 +++++++++++++++++++++++++++++--------- 2 files changed, 44 insertions(+), 14 deletions(-) diff --git a/include/villas/nodes/rtp.h b/include/villas/nodes/rtp.h index 5bdf225d0..cae564a91 100644 --- a/include/villas/nodes/rtp.h +++ b/include/villas/nodes/rtp.h @@ -51,8 +51,10 @@ struct format_type; struct rtp { struct rtp_sock *rs; /**< RTP socket */ - struct sa local; /**< Local address of the socket */ - struct sa remote; /**< Remote address of the socket */ + struct sa local_rtp; /**< Local address of the RTP socket */ + struct sa local_rtcp; /**< Local address of the RTCP socket */ + struct sa remote_rtp; /**< Remote address of the RTP socket */ + struct sa remote_rtcp; /**< Remote address of the RTCP socket */ struct format_type *format; struct io io; diff --git a/lib/nodes/rtp.c b/lib/nodes/rtp.c index 5dd0cad46..9ea7d0c63 100644 --- a/lib/nodes/rtp.c +++ b/lib/nodes/rtp.c @@ -46,9 +46,13 @@ int rtp_reverse(struct node *n) struct rtp *r = (struct rtp *) n->_vd; struct sa tmp; - tmp = r->local; - r->local = r->remote; - r->remote = tmp; + tmp = r->local_rtp; + r->local_rtp = r->remote_rtp; + r->remote_rtp = tmp; + + tmp = r->local_rtcp; + r->local_rtcp = r->remote_rtcp; + r->remote_rtcp = tmp; return 0; } @@ -61,6 +65,7 @@ int rtp_parse(struct node *n, json_t *cfg) const char *local, *remote; const char *format = "villas.binary"; bool enable_rtcp = false; + uint16_t port; json_error_t err; @@ -83,22 +88,34 @@ int rtp_parse(struct node *n, json_t *cfg) /* Enable RTCP */ r->enable_rtcp = enable_rtcp; if(enable_rtcp) - error("RTCP is not implemented yet."); + warn("RTCP is not implemented yet"); /* Remote address */ - ret = sa_decode(&r->remote, remote, strlen(remote)); + ret = sa_decode(&r->remote_rtp, remote, strlen(remote)); if (ret) { error("Failed to resolve remote address '%s' of node %s: %s", remote, node_name(n), strerror(ret)); } + /* Assign even port number to RTP socket, next odd number to RTCP socket */ + port = sa_port(&r->remote_rtp) & ~1; + sa_set_sa(&r->remote_rtcp, &r->remote_rtp.u.sa); + sa_set_port(&r->remote_rtp, port); + sa_set_port(&r->remote_rtcp, port+1); + /* Local address */ - ret = sa_decode(&r->local, local, strlen(local)); + ret = sa_decode(&r->local_rtp, local, strlen(local)); if (ret) { error("Failed to resolve local address '%s' of node %s: %s", local, node_name(n), strerror(ret)); } + /* Assign even port number to RTP socket, next odd number to RTCP socket */ + port = sa_port(&r->local_rtp) & ~1; + sa_set_sa(&r->local_rtcp, &r->local_rtp.u.sa); + sa_set_port(&r->local_rtp, port); + sa_set_port(&r->local_rtcp, port+1); + /** @todo parse * in addresses */ return ret; @@ -109,8 +126,8 @@ char * rtp_print(struct node *n) struct rtp *r = (struct rtp *) n->_vd; char *buf; - char *local = socket_print_addr((struct sockaddr *) &r->local.u); - char *remote = socket_print_addr((struct sockaddr *) &r->remote.u); + char *local = socket_print_addr((struct sockaddr *) &r->local_rtp.u); + char *remote = socket_print_addr((struct sockaddr *) &r->remote_rtp.u); buf = strf("format=%s, in.address=%s, out.address=%s", format_type_name(r->format), local, remote); @@ -132,6 +149,14 @@ static void rtp_handler(const struct sa *src, const struct rtp_header *hdr, stru (void) hdr; } +static void rtcp_handler(const struct sa *src, struct rtcp_msg *msg, void *arg) +{ + (void)src; + (void)arg; + + printf("rtcp: recv %s\n", rtcp_type_name(msg->hdr.pt)); +} + int rtp_start(struct node *n) { int ret; @@ -152,8 +177,11 @@ int rtp_start(struct node *n) return ret; /* Initialize RTP socket */ - uint16_t port = sa_port(&r->local) & ~1; - ret = rtp_listen(&r->rs, IPPROTO_UDP, &r->local, port, port+1, r->enable_rtcp, rtp_handler, NULL, n->_vd); + uint16_t port = sa_port(&r->local_rtp) & ~1; + ret = rtp_listen(&r->rs, IPPROTO_UDP, &r->local_rtp, port, port+1, r->enable_rtcp, rtp_handler, rtcp_handler, n->_vd); + + /* Start RTCP session */ + rtcp_start(r->rs, node_name(n), &r->remote_rtcp); return ret; } @@ -301,7 +329,7 @@ retry: cnt = io_sprint(&r->io, buf, buflen, &wbytes, smps, cnt); mbuf_set_pos(mb, 12); /* Send dataset */ - ret = rtp_send(r->rs, &r->remote, false, false, 21, (uint32_t) time(NULL), mb); + ret = rtp_send(r->rs, &r->remote_rtp, false, false, 21, (uint32_t) time(NULL), mb); if (ret) { error("Error from rtp_send, reason: %d", ret); cnt = ret; @@ -338,7 +366,7 @@ static struct plugin p = { .stop = rtp_stop, .read = rtp_read, .write = rtp_write, - .fd = rtp_fd + .fd = rtp_fd } };