From c4f5e6baedb68bc04933a6e14b5d8805e7c8cb2c Mon Sep 17 00:00:00 2001 From: Snaipe Date: Tue, 12 Jan 2016 00:17:03 +0100 Subject: [PATCH] Implemented simple server mode --- CMakeLists.txt | 1 + include/criterion/options.h | 7 ++++ src/core/abort.c | 1 + src/core/client.c | 66 ++++++++++++++++++++++++++++++++---- src/core/client.h | 11 +++++- src/core/runner.c | 65 ++++++++++++++++++++++++++++++----- src/core/test.c | 1 + src/core/theories.c | 5 +++ src/core/worker.c | 18 ---------- src/entry/params.c | 4 ++- src/io/event.c | 2 ++ src/protocol/criterion.pb.c | 7 ++-- src/protocol/criterion.pb.h | 7 ++-- src/protocol/criterion.proto | 1 + src/protocol/messages.c | 11 ++++-- src/protocol/messages.h | 1 + src/protocol/protocol.c | 2 ++ src/protocol/protocol.h | 20 ++++++++--- 18 files changed, 181 insertions(+), 49 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index e226e4f..0f4a5d3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -96,6 +96,7 @@ set(GettextTranslate_ALL 1) set(GettextTranslate_GMO_BINARY 1) add_definitions(-DCRITERION_BUILDING_DLL=1) +add_definitions(-DPB_ENABLE_MALLOC=1) set(CMAKE_C_FLAGS_DEFAULT "${CMAKE_C_FLAGS}") set(CMAKE_CXX_FLAGS_DEFAULT "${CMAKE_CXX_FLAGS}") diff --git a/include/criterion/options.h b/include/criterion/options.h index fbe9014..7bdba9d 100644 --- a/include/criterion/options.h +++ b/include/criterion/options.h @@ -105,6 +105,13 @@ struct criterion_options { * default: true */ bool measure_time; + + /** + * Whether criterion should wait for incoming connections in server mode + * + * default: false + */ + bool wait_for_clients; }; CR_BEGIN_C_API diff --git a/src/core/abort.c b/src/core/abort.c index 00a31a3..2832087 100644 --- a/src/core/abort.c +++ b/src/core/abort.c @@ -50,6 +50,7 @@ void criterion_test_die(const char *msg, ...) { .name = (char *) criterion_current_test->name, .message = formatted_msg, ); + criterion_message_set_id(abort_msg); cr_send_to_runner(&abort_msg); free(formatted_msg); diff --git a/src/core/client.c b/src/core/client.c index afd7de8..654b4fb 100644 --- a/src/core/client.c +++ b/src/core/client.c @@ -37,6 +37,7 @@ static void nothing(void) {}; KHASH_MAP_INIT_INT(ht_client, struct client_ctx) +KHASH_MAP_INIT_STR(ht_extern, struct client_ctx) static enum client_state phase_to_state[] = { [criterion_protocol_phase_kind_SETUP] = CS_SETUP, @@ -77,16 +78,30 @@ static void get_message_id(char *out, size_t n, const criterion_protocol_msg *ms switch (msg->which_id) { case criterion_protocol_msg_pid_tag: snprintf(out, n, "[PID %" PRId64 "]", msg->id.pid); return; + case criterion_protocol_msg_uid_tag: + snprintf(out, n, "[external \"%s\"]", msg->id.uid); return; default: break; } } -void init_server_context(struct server_ctx *sctx) { +void init_server_context(struct server_ctx *sctx, struct criterion_global_stats *gstats) { sctx->subprocesses = kh_init(ht_client); + sctx->clients = kh_init(ht_extern); + + sctx->gstats = gstats; + sctx->extern_suite = (struct criterion_suite) { + .name = "external", + .data = &sctx->extern_suite_data, + }; + sctx->extern_suite_data = (struct criterion_test_extra_data) { + .disabled = 0, + }; + sctx->extern_sstats = suite_stats_init(&sctx->extern_suite); } void destroy_client_context(struct client_ctx *ctx) { - sfree(ctx->worker); + if (ctx->kind == WORKER) + sfree(ctx->worker); } void destroy_server_context(struct server_ctx *sctx) { @@ -99,6 +114,8 @@ void destroy_server_context(struct server_ctx *sctx) { destroy_client_context(&v); }); kh_destroy(ht_client, sctx->subprocesses); + + kh_destroy(ht_extern, sctx->clients); } struct client_ctx *add_client_from_worker(struct server_ctx *sctx, struct client_ctx *ctx, struct worker *w) { @@ -119,6 +136,28 @@ void remove_client_by_pid(struct server_ctx *sctx, int pid) { } } +struct client_ctx *add_external_client(struct server_ctx *sctx, char *id) { + int absent; + khint_t k = kh_put(ht_extern, sctx->clients, id, &absent); + kh_value(sctx->clients, k) = (struct client_ctx) { + .kind = EXTERN, + .extern_test = { + .name = strdup(id), + .category = "external", + }, + .gstats = sctx->gstats, + .sstats = sctx->extern_sstats, + }; + + struct client_ctx *ctx = &kh_value(sctx->clients, k); + ctx->test = &ctx->extern_test; + ctx->suite = &sctx->extern_suite; + ctx->extern_test.data = &ctx->extern_test_data; + ctx->tstats = test_stats_init(&ctx->extern_test); + + return ctx; +} + static void process_client_message_impl(struct server_ctx *sctx, struct client_ctx *ctx, const criterion_protocol_msg *msg) { message_handler *handler = message_handlers[msg->data.which_value]; bool ack; @@ -141,23 +180,38 @@ struct client_ctx *process_client_message(struct server_ctx *ctx, const criterio return NULL; } + struct client_ctx *client = NULL; switch (msg->which_id) { case criterion_protocol_msg_pid_tag: { khiter_t k = kh_get(ht_client, ctx->subprocesses, msg->id.pid); if (k != kh_end(ctx->subprocesses)) { - process_client_message_impl(ctx, &kh_value(ctx->subprocesses, k), msg); - return &kh_value(ctx->subprocesses, k); + client = &kh_value(ctx->subprocesses, k); } else { - handler_error(ctx, "%s", "", "Received message identified by a PID '%ld'" + handler_error(ctx, "%s", "", "Received message identified by a PID '%ld' " "that is not a child process.", msg->id.pid); } } break; + case criterion_protocol_msg_uid_tag: { + khiter_t k = kh_get(ht_extern, ctx->clients, msg->id.uid); + bool client_found = k != kh_end(ctx->clients); + if (!client_found && msg->data.which_value == criterion_protocol_submessage_birth_tag) { + client = add_external_client(ctx, msg->id.uid); + } else if (client_found) { + client = &kh_value(ctx->clients, k); + } else { + handler_error(ctx, "%s", "", "Received message identified by the ID '%s'" + "that did not send a birth message previously.", msg->id.uid); + } + } break; default: { handler_error(ctx, "%s", "", "Received message with malformed id tag '%d'.\n", criterion_protocol_msg_pid_tag); } break; } - return NULL; + + if (client) + process_client_message_impl(ctx, client, msg); + return client; } #define push_event(...) \ diff --git a/src/core/client.h b/src/core/client.h index e2290ed..679eb3c 100644 --- a/src/core/client.h +++ b/src/core/client.h @@ -50,6 +50,8 @@ enum client_kind { struct client_ctx { enum client_kind kind; struct worker *worker; + struct criterion_test_extra_data extern_test_data; + struct criterion_test extern_test; enum client_state state; bool alive; @@ -61,15 +63,22 @@ struct client_ctx { }; typedef struct kh_ht_client_s khash_t(ht_client); +typedef struct kh_ht_extern_s khash_t(ht_extern); struct server_ctx { int socket; + struct criterion_suite extern_suite; + struct criterion_test_extra_data extern_suite_data; + struct criterion_global_stats *gstats; + struct criterion_suite_stats *extern_sstats; + khash_t(ht_client) *subprocesses; + khash_t(ht_extern) *clients; }; struct client_ctx *process_client_message(struct server_ctx *ctx, const criterion_protocol_msg *msg); -void init_server_context(struct server_ctx *sctx); +void init_server_context(struct server_ctx *sctx, struct criterion_global_stats *gstats); void destroy_server_context(struct server_ctx *sctx); struct client_ctx *add_client_from_worker(struct server_ctx *sctx, struct client_ctx *ctx, struct worker *w); void remove_client_by_pid(struct server_ctx *sctx, int pid); diff --git a/src/core/runner.c b/src/core/runner.c index 9490b71..eb0a1f1 100644 --- a/src/core/runner.c +++ b/src/core/runner.c @@ -33,6 +33,7 @@ #include "criterion/internal/ordered-set.h" #include "criterion/logging.h" #include "criterion/internal/preprocess.h" +#include "criterion/redirect.h" #include "protocol/protocol.h" #include "protocol/connect.h" #include "protocol/messages.h" @@ -52,6 +53,12 @@ #include "common.h" #include "client.h" +#if HAVE_THREADS_H +# include +#else +# include +#endif + #ifdef HAVE_PCRE #include "string/extmatch.h" #endif @@ -169,6 +176,19 @@ const struct criterion_suite *criterion_current_suite; void run_test_child(struct criterion_test *test, struct criterion_suite *suite) { + cr_redirect_stdin(); + g_client_socket = connect_client(); + if (g_client_socket < 0) { + criterion_perror("Could not initialize the message client: %s.\n", + strerror(errno)); + abort(); + } + + // Notify the runner that the test was born + criterion_protocol_msg msg = criterion_message(birth); + criterion_message_set_id(msg); + cr_send_to_runner(&msg); + #ifndef ENABLE_VALGRIND_ERRORS VALGRIND_ENABLE_ERROR_REPORTING; #endif @@ -183,6 +203,17 @@ void run_test_child(struct criterion_test *test, if (test->test) test->test(); + +#ifndef ENABLE_VALGRIND_ERRORS + VALGRIND_DISABLE_ERROR_REPORTING; +#endif + + close_socket(g_client_socket); + + fflush(NULL); // flush all opened streams + if (criterion_options.no_early_exit) + return; + _Exit(0); } #define push_event(...) \ @@ -280,7 +311,7 @@ static void run_tests_async(struct criterion_test_set *set, int has_msg = 0; struct server_ctx sctx; - init_server_context(&sctx); + init_server_context(&sctx, stats); sctx.socket = socket; @@ -297,12 +328,17 @@ static void run_tests_async(struct criterion_test_set *set, ++active_workers; } - if (!active_workers) + if (!active_workers && !criterion_options.wait_for_clients) goto cleanup; criterion_protocol_msg msg = criterion_protocol_msg_init_zero; while ((has_msg = read_message(socket, &msg)) == 1) { struct client_ctx *cctx = process_client_message(&sctx, &msg); + + // drop invalid messages + if (!cctx) + continue; + if (!cctx->alive && cctx->kind == WORKER) { remove_client_by_pid(&sctx, get_process_id_of(cctx->worker->proc)); @@ -314,15 +350,15 @@ static void run_tests_async(struct criterion_test_set *set, --active_workers; } - if (!active_workers) + if (!active_workers && !criterion_options.wait_for_clients) break; - pb_release(criterion_protocol_msg_fields, &msg); + free_message(&msg); } cleanup: if (has_msg) - pb_release(criterion_protocol_msg_fields, &msg); + free_message(&msg); destroy_server_context(&sctx); ccrAbort(ctx); } @@ -398,8 +434,8 @@ int criterion_run_all_tests(struct criterion_test_set *set) { void run_single_test_by_name(const char *testname) { struct criterion_test_set *set = criterion_init(); - // FIXME: initialize null sink for pipe system. - abort(); + struct criterion_test *test = NULL; + struct criterion_suite *suite = NULL; FOREACH_SET(struct criterion_suite_set *s, set->suites) { size_t tests = s->tests ? s->tests->size : 0; @@ -409,10 +445,21 @@ void run_single_test_by_name(const char *testname) { FOREACH_SET(struct criterion_test *t, s->tests) { char name[1024]; snprintf(name, sizeof (name), "%s::%s", s->suite.name, t->name); - if (!strncmp(name, testname, 1024)) - run_test_child(t, &s->suite); + if (!strncmp(name, testname, 1024)) { + test = t; + suite = &s->suite; + break; + } } } + if (test) { + is_extern_worker = true; + criterion_current_test = test; + criterion_current_suite = suite; + + run_test_child(test, suite); + } + sfree(set); } diff --git a/src/core/test.c b/src/core/test.c index 2a88a95..0b9a71d 100644 --- a/src/core/test.c +++ b/src/core/test.c @@ -39,6 +39,7 @@ static void send_event(int phase) { .phase = phase, .name = (char *) criterion_current_test->name, ); + criterion_message_set_id(msg); cr_send_to_runner(&msg); } diff --git a/src/core/theories.c b/src/core/theories.c index b8da613..f969cc3 100644 --- a/src/core/theories.c +++ b/src/core/theories.c @@ -210,12 +210,14 @@ void cr_theory_main(struct criterion_datapoints *dps, size_t datapoints, void (* .phase = criterion_protocol_phase_kind_SETUP, .name = name, ); + criterion_message_set_id(setup_msg); cr_send_to_runner(&setup_msg); criterion_protocol_msg main_msg = criterion_message(phase, .phase = criterion_protocol_phase_kind_MAIN, .name = name, ); + criterion_message_set_id(main_msg); cr_send_to_runner(&main_msg); int theory_aborted = 0; @@ -253,6 +255,7 @@ void cr_theory_main(struct criterion_datapoints *dps, size_t datapoints, void (* .name = name, .message = result.msg ); + criterion_message_set_id(msg); cr_send_to_runner(&msg); } } @@ -262,12 +265,14 @@ void cr_theory_main(struct criterion_datapoints *dps, size_t datapoints, void (* .phase = criterion_protocol_phase_kind_TEARDOWN, .name = name, ); + criterion_message_set_id(teardown_msg); cr_send_to_runner(&teardown_msg); criterion_protocol_msg end_msg = criterion_message(phase, .phase = criterion_protocol_phase_kind_END, .name = name, ); + criterion_message_set_id(end_msg); cr_send_to_runner(&end_msg); } free(name); diff --git a/src/core/worker.c b/src/core/worker.c index 8bc7ed6..c237922 100644 --- a/src/core/worker.c +++ b/src/core/worker.c @@ -60,25 +60,7 @@ static void close_process(void *ptr, CR_UNUSED void *meta) { } void run_worker(struct worker_context *ctx) { - cr_redirect_stdin(); - g_client_socket = connect_client(); - if (g_client_socket < 0) { - criterion_perror("Could not initialize the message client: %s.\n", - strerror(errno)); - abort(); - } - - // Notify the runner that the test was born - criterion_protocol_msg msg = criterion_message(birth); - cr_send_to_runner(&msg); - ctx->func(ctx->test, ctx->suite); - close_socket(g_client_socket); - - fflush(NULL); // flush all opened streams - if (criterion_options.no_early_exit) - return; - _Exit(0); } struct worker *spawn_test_worker(struct execution_context *ctx, diff --git a/src/entry/params.c b/src/entry/params.c index a6dc8a0..f2eae9d 100644 --- a/src/entry/params.c +++ b/src/entry/params.c @@ -156,6 +156,7 @@ int criterion_handle_args(int argc, char *argv[], bool handle_unknown_arg) { {"always-succeed", no_argument, 0, 'y'}, {"no-early-exit", no_argument, 0, 'z'}, {"output", required_argument, 0, 'O'}, + {"wait", no_argument, 0, 'w'}, {0, 0, 0, 0 } }; @@ -237,7 +238,7 @@ int criterion_handle_args(int argc, char *argv[], bool handle_unknown_arg) { free(out); } - for (int c; (c = getopt_long(argc, argv, "hvlfj:SqO:", opts, NULL)) != -1;) { + for (int c; (c = getopt_long(argc, argv, "hvlfj:SqO:w", opts, NULL)) != -1;) { switch (c) { case 'b': criterion_options.logging_threshold = (enum criterion_logging_level) atou(DEF(optarg, "1")); break; case 'y': criterion_options.always_succeed = true; break; @@ -281,6 +282,7 @@ int criterion_handle_args(int argc, char *argv[], bool handle_unknown_arg) { quiet = !strcmp(path, "-"); criterion_add_output(arg, path); } break; + case 'w': criterion_options.wait_for_clients = true; break; case '?': default : do_print_usage = handle_unknown_arg; break; } diff --git a/src/io/event.c b/src/io/event.c index 2ea9ce9..1681831 100644 --- a/src/io/event.c +++ b/src/io/event.c @@ -22,6 +22,7 @@ * THE SOFTWARE. */ +#include "criterion/criterion.h" #include "protocol/protocol.h" #include "protocol/messages.h" #include "event.h" @@ -38,5 +39,6 @@ void criterion_send_assert(struct criterion_assert_stats *stats) { .has_line = true, .line = stats->line, ); + criterion_message_set_id(msg); cr_send_to_runner(&msg); } diff --git a/src/protocol/criterion.pb.c b/src/protocol/criterion.pb.c index aeb2176..f41a707 100644 --- a/src/protocol/criterion.pb.c +++ b/src/protocol/criterion.pb.c @@ -1,5 +1,5 @@ /* Automatically generated nanopb constant definitions */ -/* Generated by nanopb-0.3.5-dev at Mon Jan 11 00:41:07 2016. */ +/* Generated by nanopb-0.3.5-dev at Mon Jan 11 19:14:39 2016. */ #include "criterion.pb.h" @@ -60,10 +60,11 @@ const pb_field_t criterion_protocol_submessage_fields[6] = { PB_LAST_FIELD }; -const pb_field_t criterion_protocol_msg_fields[4] = { +const pb_field_t criterion_protocol_msg_fields[5] = { PB_FIELD( 1, INT32 , REQUIRED, STATIC , FIRST, criterion_protocol_msg, version, version, &criterion_protocol_msg_version_default), PB_ONEOF_FIELD(id, 2, INT64 , ONEOF, STATIC , OTHER, criterion_protocol_msg, pid, version, 0), - PB_FIELD( 16, MESSAGE , REQUIRED, STATIC , OTHER, criterion_protocol_msg, data, id.pid, &criterion_protocol_submessage_fields), + PB_ONEOF_FIELD(id, 3, STRING , ONEOF, POINTER , OTHER, criterion_protocol_msg, uid, version, 0), + PB_FIELD( 16, MESSAGE , REQUIRED, STATIC , OTHER, criterion_protocol_msg, data, id.uid, &criterion_protocol_submessage_fields), PB_LAST_FIELD }; diff --git a/src/protocol/criterion.pb.h b/src/protocol/criterion.pb.h index 301296d..c9b29f1 100644 --- a/src/protocol/criterion.pb.h +++ b/src/protocol/criterion.pb.h @@ -1,5 +1,5 @@ /* Automatically generated nanopb header */ -/* Generated by nanopb-0.3.5-dev at Mon Jan 11 00:41:07 2016. */ +/* Generated by nanopb-0.3.5-dev at Mon Jan 11 19:14:39 2016. */ #ifndef PB_CRITERION_PB_H_INCLUDED #define PB_CRITERION_PB_H_INCLUDED @@ -91,6 +91,7 @@ typedef struct _criterion_protocol_msg { pb_size_t which_id; union { int64_t pid; + char *uid; } id; criterion_protocol_submessage data; } criterion_protocol_msg; @@ -140,6 +141,7 @@ extern const int32_t criterion_protocol_msg_version_default; #define criterion_protocol_submessage_message_tag 4 #define criterion_protocol_submessage_assert_tag 5 #define criterion_protocol_msg_pid_tag 2 +#define criterion_protocol_msg_uid_tag 3 #define criterion_protocol_msg_version_tag 1 #define criterion_protocol_msg_data_tag 16 @@ -151,12 +153,11 @@ extern const pb_field_t criterion_protocol_assert_fields[5]; extern const pb_field_t criterion_protocol_log_fields[4]; extern const pb_field_t criterion_protocol_ack_fields[3]; extern const pb_field_t criterion_protocol_submessage_fields[6]; -extern const pb_field_t criterion_protocol_msg_fields[4]; +extern const pb_field_t criterion_protocol_msg_fields[5]; /* Maximum encoded size of messages (where known) */ #define criterion_protocol_birth_size 11 #define criterion_protocol_death_size 24 -#define criterion_protocol_msg_size (29 + criterion_protocol_submessage_size) /* Message IDs (where set with "msgid" option) */ #ifdef PB_MSGID diff --git a/src/protocol/criterion.proto b/src/protocol/criterion.proto index 2ca8a9d..7a04d12 100644 --- a/src/protocol/criterion.proto +++ b/src/protocol/criterion.proto @@ -75,6 +75,7 @@ message msg { oneof id { int64 pid = 2; + string uid = 3; } required submessage data = 16; diff --git a/src/protocol/messages.c b/src/protocol/messages.c index c836291..3c78eaf 100644 --- a/src/protocol/messages.c +++ b/src/protocol/messages.c @@ -83,7 +83,7 @@ void cr_send_to_runner(const criterion_protocol_msg *message) { if (write_message(g_client_socket, message) != 1) { criterion_perror("Could not write the \"%s\" message down the event pipe: %s.\n", message_names[message->data.which_value], - strerror(errno)); + nn_strerror(errno)); abort(); } @@ -91,7 +91,7 @@ void cr_send_to_runner(const criterion_protocol_msg *message) { int read = nn_recv(g_client_socket, &buf, NN_MSG, 0); if (read <= 0) { - criterion_perror("Could not read ack: %s.\n", strerror(errno)); + criterion_perror("Could not read ack: %s.\n", nn_strerror(errno)); abort(); } @@ -106,6 +106,7 @@ void cr_send_to_runner(const criterion_protocol_msg *message) { criterion_perror("Runner returned an error: %s.\n", ack.message ? ack.message : "Unknown error"); abort(); } + pb_release(criterion_protocol_ack_fields, &ack); if (buf) nn_freemsg(buf); @@ -140,9 +141,13 @@ void send_ack(int sock, bool ok, const char *msg, ...) { int written = nn_send(sock, buf, size, 0); if (written <= 0 || written != (int) size) { - criterion_perror("Could not send ack: %s.\n", strerror(errno)); + criterion_perror("Could not send ack: %s.\n", nn_strerror(errno)); abort(); } free(buf); } + +void free_message(criterion_protocol_msg *msg) { + pb_release(criterion_protocol_msg_fields, msg); +} diff --git a/src/protocol/messages.h b/src/protocol/messages.h index f69ff39..39cf601 100644 --- a/src/protocol/messages.h +++ b/src/protocol/messages.h @@ -30,5 +30,6 @@ int write_message(int sock, const criterion_protocol_msg *message); int read_message(int sock, criterion_protocol_msg *message); void cr_send_to_runner(const criterion_protocol_msg *message); void send_ack(int sock, bool ok, const char *msg, ...); +void free_message(criterion_protocol_msg *msg); #endif /* !MESSAGES_H_ */ diff --git a/src/protocol/protocol.c b/src/protocol/protocol.c index 420cce8..3206888 100644 --- a/src/protocol/protocol.c +++ b/src/protocol/protocol.c @@ -62,3 +62,5 @@ pb_istream_t pb_istream_from_fd(int fd) { pb_istream_t stream = {&read_fd_callback, (void*)(intptr_t) fd, SIZE_MAX, NULL}; return stream; } + +volatile bool is_extern_worker = false; diff --git a/src/protocol/protocol.h b/src/protocol/protocol.h index 833932d..4e4db88 100644 --- a/src/protocol/protocol.h +++ b/src/protocol/protocol.h @@ -40,13 +40,23 @@ bool pb_read_string(pb_istream_t *stream, const pb_field_t *field, void **arg); pb_ostream_t pb_ostream_from_fd(int fd); pb_istream_t pb_istream_from_fd(int fd); +extern volatile bool is_extern_worker; + +# define criterion_message_set_id(Msg) \ + do { \ + if (is_extern_worker) { \ + (Msg).id.uid = (char *) criterion_current_test->name; \ + } else { \ + (Msg).id.pid = get_process_id(); \ + } \ + } while (0) + # define criterion_message(Kind, ...) \ (criterion_protocol_msg) { \ - .version = 1, \ - .which_id = criterion_protocol_msg_pid_tag, \ - .id = { \ - .pid = get_process_id(), \ - }, \ + .version = PROTOCOL_V1, \ + .which_id = is_extern_worker \ + ? criterion_protocol_msg_uid_tag \ + : criterion_protocol_msg_pid_tag, \ .data = { \ .which_value = criterion_protocol_submessage_ ## Kind ## _tag, \ .value = { \