diff --git a/src/villas-pipe.cpp b/src/villas-pipe.cpp index b6da8103f..7de535f9a 100644 --- a/src/villas-pipe.cpp +++ b/src/villas-pipe.cpp @@ -54,17 +54,36 @@ using namespace villas::node; class Direction { public: - Direction(struct io *i) : + Direction(struct node *n, struct io *i, bool en = true, int lim = -1) : + node(n), io(i), - enabled(true), - limit(-1) - { } + enabled(en), + limit(lim) + { + pool.state = STATE_DESTROYED; + pool.queue.state = STATE_DESTROYED; + + /* Initialize memory */ + + + /* Initialize memory */ + unsigned pool_size = node_type(node)->pool_size ? node_type(node)->pool_size : LOG2_CEIL(node->out.vectorize); + + int ret = pool_init(&pool, pool_size, SAMPLE_LENGTH(DEFAULT_SAMPLE_LENGTH), node_memory_type(node, &memory_hugepage)); + if (ret < 0) + throw RuntimeError("Failed to allocate memory for pool."); + } Direction(const Direction &c) { io = c.io; } + ~Direction() + { + pool_destroy(&pool); + } + struct pool pool; struct node *node; struct io *io; @@ -123,14 +142,6 @@ static void * send_loop(void *ctx) struct sample *smps[dirs->send.node->out.vectorize]; - /* Initialize memory */ - unsigned pool_size = node_type(dirs->send.node)->pool_size ? node_type(dirs->send.node)->pool_size : LOG2_CEIL(dirs->send.node->out.vectorize); - - ret = pool_init(&dirs->send.pool, pool_size, SAMPLE_LENGTH(DEFAULT_SAMPLE_LENGTH), node_memory_type(dirs->send.node, &memory_hugepage)); - - if (ret < 0) - throw new RuntimeError("Failed to allocate memory for receive pool."); - while (!io_eof(dirs->send.io)) { allocated = sample_alloc_many(&dirs->send.pool, smps, dirs->send.node->out.vectorize); if (ret < 0) @@ -192,18 +203,10 @@ static void * recv_loop(void *ctx) Directions *dirs = static_cast(ctx); Logger logger = logging.get("pipe"); - int recv, ret, cnt = 0, allocated = 0; + int recv, cnt = 0, allocated = 0; unsigned release; struct sample *smps[dirs->recv.node->in.vectorize]; - /* Initialize memory */ - unsigned pool_size = node_type(dirs->recv.node)->pool_size ? node_type(dirs->recv.node)->pool_size : LOG2_CEIL(dirs->recv.node->in.vectorize); - - ret = pool_init(&dirs->recv.pool, pool_size, SAMPLE_LENGTH(list_length(&dirs->recv.node->signals)), node_memory_type(dirs->recv.node, &memory_hugepage)); - - if (ret < 0) - throw new RuntimeError("Failed to allocate memory for receive pool."); - for (;;) { allocated = sample_alloc_many(&dirs->recv.pool, smps, dirs->recv.node->in.vectorize); if (allocated < 0) @@ -243,16 +246,14 @@ int main(int argc, char *argv[]) struct node *node; static struct io io = { .state = STATE_DESTROYED }; - Directions dirs = { - .send = Direction(&io), - .recv = Direction(&io) - }; - SuperNode sn; /**< The global configuration */ Logger logger = logging.get("pipe"); json_t *cfg_cli = json_object(); + bool enable_send = true, enable_recv = true; + int limit_send = -1, limit_recv = -1; + int c; char *endptr; while ((c = getopt(argc, argv, "Vhxrsd:l:L:t:f:o:")) != -1) { @@ -270,19 +271,19 @@ int main(int argc, char *argv[]) break; case 's': - dirs.recv.enabled = false; // send only + enable_recv = false; // send only break; case 'r': - dirs.send.enabled = false; // receive only + enable_send = false; // receive only break; case 'l': - dirs.recv.limit = strtoul(optarg, &endptr, 10); + limit_recv = strtoul(optarg, &endptr, 10); goto check; case 'L': - dirs.send.limit = strtoul(optarg, &endptr, 10); + limit_send = strtoul(optarg, &endptr, 10); goto check; case 't': @@ -316,7 +317,7 @@ check: if (optarg == endptr) exit(EXIT_FAILURE); } - info("Logging level: %d", logging.getLevel()); + logger->info("Logging level: {}", logging.getLevelName()); char *uri = argv[optind]; char *nodestr = argv[optind+1]; @@ -385,6 +386,11 @@ check: if (optarg == endptr) throw RuntimeError("Failed to start node {}: reason={}", node_name(node), ret); /* Start threads */ + Directions dirs = { + .send = Direction(node, &io, enable_send, limit_send), + .recv = Direction(node, &io, enable_recv, limit_recv) + }; + if (dirs.recv.enabled) { dirs.recv.node = node; pthread_create(&dirs.recv.thread, nullptr, recv_loop, &dirs); @@ -416,19 +422,7 @@ check: if (optarg == endptr) ret = node_type_stop(node->_vt); if (ret) - throw new RuntimeError("Failed to stop node type {}: reason={}", node_type_name(node->_vt), ret); - - if (dirs.recv.enabled) { - ret = pool_destroy(&dirs.recv.pool); - if (ret) - throw new RuntimeError("Failed to destroy pool"); - } - - if (dirs.send.enabled) { - ret = pool_destroy(&dirs.send.pool); - if (ret) - throw new RuntimeError("Failed to destroy pool"); - } + throw RuntimeError("Failed to stop node type {}: reason={}", node_type_name(node->_vt), ret); ret = io_close(&io); if (ret)