diff --git a/src/pipe.c b/src/pipe.c index 32b426d6b..aa6eabdd0 100644 --- a/src/pipe.c +++ b/src/pipe.c @@ -17,69 +17,88 @@ #include #include -#include "config.h" -#include "cfg.h" -#include "utils.h" -#include "node.h" -#include "msg.h" -#include "timing.h" -#include "pool.h" +#include +#include +#include +#include +#include +#include +#include +#include "config.h" struct list nodes; /**< List of all nodes */ struct settings settings; /**< The global configuration */ -struct pool recv_pool, send_pool; -pthread_t recv_thread, send_thread; +struct dir { + struct pool pool; + pthread_t thread; + bool enabled; + bool started; +} sendd, recvv; + +bool reverse = false; struct node *node; +pthread_t ptid; /**< Parent thread id */ + static void quit(int signal, siginfo_t *sinfo, void *ctx) { - pthread_cancel(recv_thread); - pthread_cancel(send_thread); + if (recvv.started) { + pthread_cancel(recvv.thread); + pthread_join(recvv.thread, NULL); + } - pthread_join(recv_thread, NULL); - pthread_join(send_thread, NULL); - + if (sendd.started) { + pthread_cancel(sendd.thread); + pthread_join(sendd.thread, NULL); + } + + pool_destroy(&recvv.pool); + pool_destroy(&sendd.pool); + node_stop(node); node_deinit(node->_vt); - - pool_destroy(&recv_pool); - pool_destroy(&send_pool); - + list_destroy(&nodes, (dtor_cb_t) node_destroy, false); - + info(GRN("Goodbye!")); exit(EXIT_SUCCESS); } static void usage(char *name) { - printf("Usage: %s CONFIG [-r] NODE\n", name); + printf("Usage: %s CONFIG NODE [OPTIONS]\n", name); printf(" CONFIG path to a configuration file\n"); printf(" NODE the name of the node to which samples are sent and received from\n"); - printf(" -d LVL set debug log level to LVL\n"); - printf(" -x swap read / write endpoints\n"); - printf(" -s only read data from stdin and send it to node\n"); - printf(" -r only read data from node and write it to stdout\n\n"); + printf(" OPTIONS are:\n"); + printf(" -d LVL set debug log level to LVL\n"); + printf(" -x swap read / write endpoints\n"); + printf(" -s only read data from stdin and send it to node\n"); + printf(" -r only read data from node and write it to stdout\n\n"); print_copyright(); exit(EXIT_FAILURE); } -void * send_loop(void *ctx) +static void * send_loop(void *ctx) { int ret; struct sample *smps[node->vectorize]; + + if (!sendd.enabled) + return NULL; + + sendd.started = true; /* Initialize memory */ - ret = pool_init_mmap(&send_pool, SAMPLE_LEN(DEFAULT_VALUES), node->vectorize); + ret = pool_init_mmap(&sendd.pool, SAMPLE_LEN(DEFAULT_VALUES), node->vectorize); if (ret < 0) error("Failed to allocate memory for receive pool."); - ret = pool_get_many(&send_pool, (void **) smps, node->vectorize); + ret = pool_get_many(&sendd.pool, (void **) smps, node->vectorize); if (ret < 0) error("Failed to get %u samples out of send pool (%d).", node->vectorize, ret); @@ -91,7 +110,7 @@ void * send_loop(void *ctx) retry: reason = sample_fscan(stdin, s, NULL); if (reason < 0) { if (feof(stdin)) - return NULL; + goto killme; else { warn("Skipped invalid message message: reason=%d", reason); goto retry; @@ -102,25 +121,33 @@ retry: reason = sample_fscan(stdin, s, NULL); node_write(node, smps, node->vectorize); } +killme: pthread_kill(ptid, SIGINT); + return NULL; } -void * recv_loop(void *ctx) +static void * recv_loop(void *ctx) { int ret; struct sample *smps[node->vectorize]; + if (!recvv.enabled) + return NULL; + + recvv.started = true; + /* Initialize memory */ - ret = pool_init_mmap(&recv_pool, SAMPLE_LEN(DEFAULT_VALUES), node->vectorize); + ret = pool_init_mmap(&recvv.pool, SAMPLE_LEN(DEFAULT_VALUES), node->vectorize); if (ret < 0) error("Failed to allocate memory for receive pool."); - ret = pool_get_many(&recv_pool, (void **) smps, node->vectorize); + ret = pool_get_many(&recvv.pool, (void **) smps, node->vectorize); if (ret < 0) error("Failed to get %u samples out of receive pool (%d).", node->vectorize, ret); /* Print header */ fprintf(stdout, "# %-20s\t\t%s\n", "sec.nsec+offset", "data[]"); + fflush(stdout); for (;;) { int recv = node_read(node, smps, node->vectorize); @@ -137,25 +164,32 @@ void * recv_loop(void *ctx) int main(int argc, char *argv[]) { - bool send = true, recv = true, reverse = false; - + int ret; + char c; + + config_t config; + + ptid = pthread_self(); + log_init(); + /* Parse command line arguments */ if (argc < 3) usage(argv[0]); - - log_init(); - - char c; + + /* Default values */ + sendd.enabled = true; + recvv.enabled = true; + while ((c = getopt(argc-2, argv+2, "hxrsd:")) != -1) { switch (c) { case 'x': reverse = true; break; case 's': - recv = false; + recvv.enabled = false; // send only break; case 'r': - send = false; + sendd.enabled = false; // receive only break; case 'd': log_setlevel(atoi(optarg), -1); @@ -177,13 +211,16 @@ int main(int argc, char *argv[]) sigaction(SIGINT, &sa_quit, NULL); /* Initialize log, configuration.. */ - config_t config; - - /* Create lists */ list_init(&nodes); - config_init(&config); - config_parse(argv[1], &config, &settings, &nodes, NULL); + info("Parsing configuration"); + { INDENT + config_init(&config); + config_parse(argv[1], &config, &settings, &nodes, NULL); + } + + info("Initialize real-time system"); + rt_init(settings.affinity, settings.priority); /* Initialize node */ node = list_lookup(&nodes, argv[2]); @@ -193,16 +230,18 @@ int main(int argc, char *argv[]) if (reverse) node_reverse(node); - node_init(node->_vt, argc-optind, argv+optind, config_root_setting(&config)); - - node_start(node); + ret = node_init(node->_vt, argc-optind, argv+optind, config_root_setting(&config)); + if (ret) + error("Failed to intialize node: %s", node_name(node)); + + ret = node_start(node); + if (ret) + error("Failed to start node: %s", node_name(node)); /* Start threads */ - if (recv) - pthread_create(&recv_thread, NULL, recv_loop, NULL); - if (send) - pthread_create(&send_thread, NULL, send_loop, NULL); - + pthread_create(&recvv.thread, NULL, recv_loop, NULL); + pthread_create(&sendd.thread, NULL, send_loop, NULL); + for (;;) pause();