1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

add re_main in pthread

thread is started in rtp_type_start and joined in rtp_type_stop
This commit is contained in:
Marvin Klimke 2018-12-07 06:37:48 +01:00
parent b6161e06b1
commit c905f242c5

View file

@ -21,6 +21,7 @@
*********************************************************************************/
#include <inttypes.h>
#include <pthread.h>
#include <string.h>
#include <time.h>
@ -38,6 +39,8 @@
#include <villas/utils.h>
#include <villas/format_type.h>
pthread_t re_pthread;
int rtp_reverse(struct node *n)
{
struct rtp *r = (struct rtp *) n->_vd;
@ -121,7 +124,8 @@ static void rtp_handler(const struct sa *src, const struct rtp_header *hdr, stru
{
(void)hdr;
(void)arg;
(void)mb;
printf("rtp: recv %zu bytes\n", mbuf_get_left(mb));
/* placeholder */
}
@ -149,28 +153,54 @@ int rtp_start(struct node *n)
int rtp_stop(struct node *n)
{
int ret;
struct rtp *r = (struct rtp *) n->_vd;
/*mem_deref(r->rs);*/
ret = io_destroy(&r->io);
if (ret)
return ret;
return io_destroy(&r->io);
}
return 0;
static void * th_func(void *arg)
{
re_main(NULL);
return NULL;
}
int rtp_type_start()
{
int ret;
/* Initialize library */
return libre_init();
ret = libre_init();
if (ret) {
error("Error initializing libre");
return ret;
}
/* Add worker thread */
ret = pthread_create(&re_pthread, NULL, th_func, NULL);
if (ret) {
error("Error creating rtp node type pthread");
return ret;
}
return ret;
}
int rtp_type_stop()
{
int ret;
/* Join worker thread */
re_cancel();
ret = pthread_join(re_pthread, NULL);
if (ret) {
error("Error joining rtp node type pthread");
return ret;
}
libre_close();
return 0;
return ret;
}
int rtp_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
@ -224,13 +254,13 @@ retry: cnt = io_sprint(&r->io, buf, buflen, &wbytes, smps, cnt);
/* Prepare mbuf */
struct mbuf *mb = mbuf_alloc(buflen + 12);
ret = mbuf_write_str(mb, pad);
if(ret) {
if (ret) {
error("Error writing pad to mbuf");
cnt = ret;
goto out;
}
ret = mbuf_write_mem(mb, (uint8_t*)buf, buflen);
if(ret) {
if (ret) {
error("Error writing data to mbuf");
cnt = ret;
goto out;
@ -239,7 +269,7 @@ retry: cnt = io_sprint(&r->io, buf, buflen, &wbytes, smps, cnt);
/* Send dataset */
ret = rtp_send(r->rs, &r->remote, false, false, 21, (uint32_t)time(NULL), mb);
if(ret) {
if (ret) {
error("Error from rtp_send, reason: %d", ret);
cnt = ret;
}