From c905f242c5c4ca53ec6189b4dcc6560dce3a09c3 Mon Sep 17 00:00:00 2001 From: Marvin Klimke Date: Fri, 7 Dec 2018 06:37:48 +0100 Subject: [PATCH] add re_main in pthread thread is started in rtp_type_start and joined in rtp_type_stop --- lib/nodes/rtp.c | 52 ++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 41 insertions(+), 11 deletions(-) diff --git a/lib/nodes/rtp.c b/lib/nodes/rtp.c index 2138954a5..049ecb72e 100644 --- a/lib/nodes/rtp.c +++ b/lib/nodes/rtp.c @@ -21,6 +21,7 @@ *********************************************************************************/ #include +#include #include #include @@ -38,6 +39,8 @@ #include #include +pthread_t re_pthread; + int rtp_reverse(struct node *n) { struct rtp *r = (struct rtp *) n->_vd; @@ -121,7 +124,8 @@ static void rtp_handler(const struct sa *src, const struct rtp_header *hdr, stru { (void)hdr; (void)arg; - (void)mb; + + printf("rtp: recv %zu bytes\n", mbuf_get_left(mb)); /* placeholder */ } @@ -149,28 +153,54 @@ int rtp_start(struct node *n) int rtp_stop(struct node *n) { - int ret; struct rtp *r = (struct rtp *) n->_vd; /*mem_deref(r->rs);*/ - ret = io_destroy(&r->io); - if (ret) - return ret; + return io_destroy(&r->io); +} - return 0; +static void * th_func(void *arg) +{ + re_main(NULL); + return NULL; } int rtp_type_start() { + int ret; + /* Initialize library */ - return libre_init(); + ret = libre_init(); + if (ret) { + error("Error initializing libre"); + return ret; + } + + /* Add worker thread */ + ret = pthread_create(&re_pthread, NULL, th_func, NULL); + if (ret) { + error("Error creating rtp node type pthread"); + return ret; + } + + return ret; } int rtp_type_stop() { + int ret; + + /* Join worker thread */ + re_cancel(); + ret = pthread_join(re_pthread, NULL); + if (ret) { + error("Error joining rtp node type pthread"); + return ret; + } + libre_close(); - return 0; + return ret; } int rtp_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) @@ -224,13 +254,13 @@ retry: cnt = io_sprint(&r->io, buf, buflen, &wbytes, smps, cnt); /* Prepare mbuf */ struct mbuf *mb = mbuf_alloc(buflen + 12); ret = mbuf_write_str(mb, pad); - if(ret) { + if (ret) { error("Error writing pad to mbuf"); cnt = ret; goto out; } ret = mbuf_write_mem(mb, (uint8_t*)buf, buflen); - if(ret) { + if (ret) { error("Error writing data to mbuf"); cnt = ret; goto out; @@ -239,7 +269,7 @@ retry: cnt = io_sprint(&r->io, buf, buflen, &wbytes, smps, cnt); /* Send dataset */ ret = rtp_send(r->rs, &r->remote, false, false, 21, (uint32_t)time(NULL), mb); - if(ret) { + if (ret) { error("Error from rtp_send, reason: %d", ret); cnt = ret; }