diff --git a/src/pipe.c b/src/pipe.c index a42125c06..714b93824 100644 --- a/src/pipe.c +++ b/src/pipe.c @@ -49,10 +49,9 @@ struct dir { struct pool pool; pthread_t thread; bool enabled; + int limit; } sendd, recvv; -bool reverse = false; - struct node *node; pthread_t ptid; /**< Parent thread id */ @@ -92,16 +91,16 @@ static void usage() printf(" -d LVL set debug log level to LVL\n"); printf(" -x swap read / write endpoints\n"); printf(" -s only read data from stdin and send it to node\n"); - printf(" -r only read data from node and write it to stdout\n\n"); + printf(" -r only read data from node and write it to stdout\n"); + printf(" -L NUM terminate after NUM samples sent\n"); + printf(" -l NUM terminate after NUM samples received\n\n"); print_copyright(); - - exit(EXIT_FAILURE); } static void * send_loop(void *ctx) { - int ret; + int ret, cnt = 0; struct sample *smps[node->vectorize]; /* Initialize memory */ @@ -118,11 +117,14 @@ static void * send_loop(void *ctx) for (len = 0; len < node->vectorize; len++) { struct sample *s = smps[len]; int reason; + + if (sendd.limit > 0 && cnt >= sendd.limit) + break; retry: reason = sample_io_villas_fscan(stdin, s, NULL); if (reason < 0) { if (feof(stdin)) - goto exit; + goto leave; else { warn("Skipped invalid message message: reason=%d", reason); goto retry; @@ -130,20 +132,31 @@ retry: reason = sample_io_villas_fscan(stdin, s, NULL); } } - node_write(node, smps, len); + cnt += node_write(node, smps, len); + + if (sendd.limit > 0 && cnt >= sendd.limit) + goto leave2; + pthread_testcancel(); } - /* We reached EOF on stdin here. Lets kill the process */ -exit: info("Reached end-of-file. Terminating..."); +leave2: info("Reached send limit. Terminating..."); pthread_kill(ptid, SIGINT); return NULL; + + /* We reached EOF on stdin here. Lets kill the process */ +leave: if (recvv.limit < 0) { + info("Reached end-of-file. Terminating..."); + pthread_kill(ptid, SIGINT); + } + + return NULL; } static void * recv_loop(void *ctx) { - int ret; + int ret, cnt = 0; struct sample *smps[node->vectorize]; /* Initialize memory */ @@ -162,6 +175,7 @@ static void * recv_loop(void *ctx) for (;;) { int recv = node_read(node, smps, node->vectorize); struct timespec now = time_now(); + for (int i = 0; i < recv; i++) { struct sample *s = smps[i]; @@ -171,24 +185,36 @@ static void * recv_loop(void *ctx) sample_io_villas_fprint(stdout, s, SAMPLE_IO_ALL); fflush(stdout); } + + cnt += recv; + if (recvv.limit > 0 && cnt >= recvv.limit) + goto leave; + pthread_testcancel(); } +leave: info("Reached receive limit. Terminating..."); + pthread_kill(ptid, SIGINT); + return NULL; + return NULL; } int main(int argc, char *argv[]) { int ret, level = V; - char c; + + bool reverse = false; + + sendd = recvv = (struct dir) { + .enabled = true, + .limit = -1 + }; ptid = pthread_self(); - - /* Default values */ - sendd.enabled = true; - recvv.enabled = true; - - while ((c = getopt(argc, argv, "hxrsd:")) != -1) { + + char c, *endptr; + while ((c = getopt(argc, argv, "hxrsd:l:L:")) != -1) { switch (c) { case 'x': reverse = true; @@ -202,6 +228,12 @@ int main(int argc, char *argv[]) case 'd': level = strtoul(optarg, &endptr, 10); goto check; + case 'l': + recvv.limit = strtoul(optarg, &endptr, 10); + goto check; + case 'L': + sendd.limit = strtoul(optarg, &endptr, 10); + goto check; case 'h': case '?': usage();