Implemented simple server mode

This commit is contained in:
Snaipe 2016-01-12 00:17:03 +01:00
parent 1339185973
commit c4f5e6baed
18 changed files with 181 additions and 49 deletions

View file

@ -96,6 +96,7 @@ set(GettextTranslate_ALL 1)
set(GettextTranslate_GMO_BINARY 1)
add_definitions(-DCRITERION_BUILDING_DLL=1)
add_definitions(-DPB_ENABLE_MALLOC=1)
set(CMAKE_C_FLAGS_DEFAULT "${CMAKE_C_FLAGS}")
set(CMAKE_CXX_FLAGS_DEFAULT "${CMAKE_CXX_FLAGS}")

View file

@ -105,6 +105,13 @@ struct criterion_options {
* default: true
*/
bool measure_time;
/**
* Whether criterion should wait for incoming connections in server mode
*
* default: false
*/
bool wait_for_clients;
};
CR_BEGIN_C_API

View file

@ -50,6 +50,7 @@ void criterion_test_die(const char *msg, ...) {
.name = (char *) criterion_current_test->name,
.message = formatted_msg,
);
criterion_message_set_id(abort_msg);
cr_send_to_runner(&abort_msg);
free(formatted_msg);

View file

@ -37,6 +37,7 @@
static void nothing(void) {};
KHASH_MAP_INIT_INT(ht_client, struct client_ctx)
KHASH_MAP_INIT_STR(ht_extern, struct client_ctx)
static enum client_state phase_to_state[] = {
[criterion_protocol_phase_kind_SETUP] = CS_SETUP,
@ -77,16 +78,30 @@ static void get_message_id(char *out, size_t n, const criterion_protocol_msg *ms
switch (msg->which_id) {
case criterion_protocol_msg_pid_tag:
snprintf(out, n, "[PID %" PRId64 "]", msg->id.pid); return;
case criterion_protocol_msg_uid_tag:
snprintf(out, n, "[external \"%s\"]", msg->id.uid); return;
default: break;
}
}
void init_server_context(struct server_ctx *sctx) {
void init_server_context(struct server_ctx *sctx, struct criterion_global_stats *gstats) {
sctx->subprocesses = kh_init(ht_client);
sctx->clients = kh_init(ht_extern);
sctx->gstats = gstats;
sctx->extern_suite = (struct criterion_suite) {
.name = "external",
.data = &sctx->extern_suite_data,
};
sctx->extern_suite_data = (struct criterion_test_extra_data) {
.disabled = 0,
};
sctx->extern_sstats = suite_stats_init(&sctx->extern_suite);
}
void destroy_client_context(struct client_ctx *ctx) {
sfree(ctx->worker);
if (ctx->kind == WORKER)
sfree(ctx->worker);
}
void destroy_server_context(struct server_ctx *sctx) {
@ -99,6 +114,8 @@ void destroy_server_context(struct server_ctx *sctx) {
destroy_client_context(&v);
});
kh_destroy(ht_client, sctx->subprocesses);
kh_destroy(ht_extern, sctx->clients);
}
struct client_ctx *add_client_from_worker(struct server_ctx *sctx, struct client_ctx *ctx, struct worker *w) {
@ -119,6 +136,28 @@ void remove_client_by_pid(struct server_ctx *sctx, int pid) {
}
}
struct client_ctx *add_external_client(struct server_ctx *sctx, char *id) {
int absent;
khint_t k = kh_put(ht_extern, sctx->clients, id, &absent);
kh_value(sctx->clients, k) = (struct client_ctx) {
.kind = EXTERN,
.extern_test = {
.name = strdup(id),
.category = "external",
},
.gstats = sctx->gstats,
.sstats = sctx->extern_sstats,
};
struct client_ctx *ctx = &kh_value(sctx->clients, k);
ctx->test = &ctx->extern_test;
ctx->suite = &sctx->extern_suite;
ctx->extern_test.data = &ctx->extern_test_data;
ctx->tstats = test_stats_init(&ctx->extern_test);
return ctx;
}
static void process_client_message_impl(struct server_ctx *sctx, struct client_ctx *ctx, const criterion_protocol_msg *msg) {
message_handler *handler = message_handlers[msg->data.which_value];
bool ack;
@ -141,23 +180,38 @@ struct client_ctx *process_client_message(struct server_ctx *ctx, const criterio
return NULL;
}
struct client_ctx *client = NULL;
switch (msg->which_id) {
case criterion_protocol_msg_pid_tag: {
khiter_t k = kh_get(ht_client, ctx->subprocesses, msg->id.pid);
if (k != kh_end(ctx->subprocesses)) {
process_client_message_impl(ctx, &kh_value(ctx->subprocesses, k), msg);
return &kh_value(ctx->subprocesses, k);
client = &kh_value(ctx->subprocesses, k);
} else {
handler_error(ctx, "%s", "", "Received message identified by a PID '%ld'"
handler_error(ctx, "%s", "", "Received message identified by a PID '%ld' "
"that is not a child process.", msg->id.pid);
}
} break;
case criterion_protocol_msg_uid_tag: {
khiter_t k = kh_get(ht_extern, ctx->clients, msg->id.uid);
bool client_found = k != kh_end(ctx->clients);
if (!client_found && msg->data.which_value == criterion_protocol_submessage_birth_tag) {
client = add_external_client(ctx, msg->id.uid);
} else if (client_found) {
client = &kh_value(ctx->clients, k);
} else {
handler_error(ctx, "%s", "", "Received message identified by the ID '%s'"
"that did not send a birth message previously.", msg->id.uid);
}
} break;
default: {
handler_error(ctx, "%s", "", "Received message with malformed id tag '%d'.\n",
criterion_protocol_msg_pid_tag);
} break;
}
return NULL;
if (client)
process_client_message_impl(ctx, client, msg);
return client;
}
#define push_event(...) \

View file

@ -50,6 +50,8 @@ enum client_kind {
struct client_ctx {
enum client_kind kind;
struct worker *worker;
struct criterion_test_extra_data extern_test_data;
struct criterion_test extern_test;
enum client_state state;
bool alive;
@ -61,15 +63,22 @@ struct client_ctx {
};
typedef struct kh_ht_client_s khash_t(ht_client);
typedef struct kh_ht_extern_s khash_t(ht_extern);
struct server_ctx {
int socket;
struct criterion_suite extern_suite;
struct criterion_test_extra_data extern_suite_data;
struct criterion_global_stats *gstats;
struct criterion_suite_stats *extern_sstats;
khash_t(ht_client) *subprocesses;
khash_t(ht_extern) *clients;
};
struct client_ctx *process_client_message(struct server_ctx *ctx, const criterion_protocol_msg *msg);
void init_server_context(struct server_ctx *sctx);
void init_server_context(struct server_ctx *sctx, struct criterion_global_stats *gstats);
void destroy_server_context(struct server_ctx *sctx);
struct client_ctx *add_client_from_worker(struct server_ctx *sctx, struct client_ctx *ctx, struct worker *w);
void remove_client_by_pid(struct server_ctx *sctx, int pid);

View file

@ -33,6 +33,7 @@
#include "criterion/internal/ordered-set.h"
#include "criterion/logging.h"
#include "criterion/internal/preprocess.h"
#include "criterion/redirect.h"
#include "protocol/protocol.h"
#include "protocol/connect.h"
#include "protocol/messages.h"
@ -52,6 +53,12 @@
#include "common.h"
#include "client.h"
#if HAVE_THREADS_H
# include <threads.h>
#else
# include <tinycthread.h>
#endif
#ifdef HAVE_PCRE
#include "string/extmatch.h"
#endif
@ -169,6 +176,19 @@ const struct criterion_suite *criterion_current_suite;
void run_test_child(struct criterion_test *test,
struct criterion_suite *suite) {
cr_redirect_stdin();
g_client_socket = connect_client();
if (g_client_socket < 0) {
criterion_perror("Could not initialize the message client: %s.\n",
strerror(errno));
abort();
}
// Notify the runner that the test was born
criterion_protocol_msg msg = criterion_message(birth);
criterion_message_set_id(msg);
cr_send_to_runner(&msg);
#ifndef ENABLE_VALGRIND_ERRORS
VALGRIND_ENABLE_ERROR_REPORTING;
#endif
@ -183,6 +203,17 @@ void run_test_child(struct criterion_test *test,
if (test->test)
test->test();
#ifndef ENABLE_VALGRIND_ERRORS
VALGRIND_DISABLE_ERROR_REPORTING;
#endif
close_socket(g_client_socket);
fflush(NULL); // flush all opened streams
if (criterion_options.no_early_exit)
return;
_Exit(0);
}
#define push_event(...) \
@ -280,7 +311,7 @@ static void run_tests_async(struct criterion_test_set *set,
int has_msg = 0;
struct server_ctx sctx;
init_server_context(&sctx);
init_server_context(&sctx, stats);
sctx.socket = socket;
@ -297,12 +328,17 @@ static void run_tests_async(struct criterion_test_set *set,
++active_workers;
}
if (!active_workers)
if (!active_workers && !criterion_options.wait_for_clients)
goto cleanup;
criterion_protocol_msg msg = criterion_protocol_msg_init_zero;
while ((has_msg = read_message(socket, &msg)) == 1) {
struct client_ctx *cctx = process_client_message(&sctx, &msg);
// drop invalid messages
if (!cctx)
continue;
if (!cctx->alive && cctx->kind == WORKER) {
remove_client_by_pid(&sctx, get_process_id_of(cctx->worker->proc));
@ -314,15 +350,15 @@ static void run_tests_async(struct criterion_test_set *set,
--active_workers;
}
if (!active_workers)
if (!active_workers && !criterion_options.wait_for_clients)
break;
pb_release(criterion_protocol_msg_fields, &msg);
free_message(&msg);
}
cleanup:
if (has_msg)
pb_release(criterion_protocol_msg_fields, &msg);
free_message(&msg);
destroy_server_context(&sctx);
ccrAbort(ctx);
}
@ -398,8 +434,8 @@ int criterion_run_all_tests(struct criterion_test_set *set) {
void run_single_test_by_name(const char *testname) {
struct criterion_test_set *set = criterion_init();
// FIXME: initialize null sink for pipe system.
abort();
struct criterion_test *test = NULL;
struct criterion_suite *suite = NULL;
FOREACH_SET(struct criterion_suite_set *s, set->suites) {
size_t tests = s->tests ? s->tests->size : 0;
@ -409,10 +445,21 @@ void run_single_test_by_name(const char *testname) {
FOREACH_SET(struct criterion_test *t, s->tests) {
char name[1024];
snprintf(name, sizeof (name), "%s::%s", s->suite.name, t->name);
if (!strncmp(name, testname, 1024))
run_test_child(t, &s->suite);
if (!strncmp(name, testname, 1024)) {
test = t;
suite = &s->suite;
break;
}
}
}
if (test) {
is_extern_worker = true;
criterion_current_test = test;
criterion_current_suite = suite;
run_test_child(test, suite);
}
sfree(set);
}

View file

@ -39,6 +39,7 @@ static void send_event(int phase) {
.phase = phase,
.name = (char *) criterion_current_test->name,
);
criterion_message_set_id(msg);
cr_send_to_runner(&msg);
}

View file

@ -210,12 +210,14 @@ void cr_theory_main(struct criterion_datapoints *dps, size_t datapoints, void (*
.phase = criterion_protocol_phase_kind_SETUP,
.name = name,
);
criterion_message_set_id(setup_msg);
cr_send_to_runner(&setup_msg);
criterion_protocol_msg main_msg = criterion_message(phase,
.phase = criterion_protocol_phase_kind_MAIN,
.name = name,
);
criterion_message_set_id(main_msg);
cr_send_to_runner(&main_msg);
int theory_aborted = 0;
@ -253,6 +255,7 @@ void cr_theory_main(struct criterion_datapoints *dps, size_t datapoints, void (*
.name = name,
.message = result.msg
);
criterion_message_set_id(msg);
cr_send_to_runner(&msg);
}
}
@ -262,12 +265,14 @@ void cr_theory_main(struct criterion_datapoints *dps, size_t datapoints, void (*
.phase = criterion_protocol_phase_kind_TEARDOWN,
.name = name,
);
criterion_message_set_id(teardown_msg);
cr_send_to_runner(&teardown_msg);
criterion_protocol_msg end_msg = criterion_message(phase,
.phase = criterion_protocol_phase_kind_END,
.name = name,
);
criterion_message_set_id(end_msg);
cr_send_to_runner(&end_msg);
}
free(name);

View file

@ -60,25 +60,7 @@ static void close_process(void *ptr, CR_UNUSED void *meta) {
}
void run_worker(struct worker_context *ctx) {
cr_redirect_stdin();
g_client_socket = connect_client();
if (g_client_socket < 0) {
criterion_perror("Could not initialize the message client: %s.\n",
strerror(errno));
abort();
}
// Notify the runner that the test was born
criterion_protocol_msg msg = criterion_message(birth);
cr_send_to_runner(&msg);
ctx->func(ctx->test, ctx->suite);
close_socket(g_client_socket);
fflush(NULL); // flush all opened streams
if (criterion_options.no_early_exit)
return;
_Exit(0);
}
struct worker *spawn_test_worker(struct execution_context *ctx,

View file

@ -156,6 +156,7 @@ int criterion_handle_args(int argc, char *argv[], bool handle_unknown_arg) {
{"always-succeed", no_argument, 0, 'y'},
{"no-early-exit", no_argument, 0, 'z'},
{"output", required_argument, 0, 'O'},
{"wait", no_argument, 0, 'w'},
{0, 0, 0, 0 }
};
@ -237,7 +238,7 @@ int criterion_handle_args(int argc, char *argv[], bool handle_unknown_arg) {
free(out);
}
for (int c; (c = getopt_long(argc, argv, "hvlfj:SqO:", opts, NULL)) != -1;) {
for (int c; (c = getopt_long(argc, argv, "hvlfj:SqO:w", opts, NULL)) != -1;) {
switch (c) {
case 'b': criterion_options.logging_threshold = (enum criterion_logging_level) atou(DEF(optarg, "1")); break;
case 'y': criterion_options.always_succeed = true; break;
@ -281,6 +282,7 @@ int criterion_handle_args(int argc, char *argv[], bool handle_unknown_arg) {
quiet = !strcmp(path, "-");
criterion_add_output(arg, path);
} break;
case 'w': criterion_options.wait_for_clients = true; break;
case '?':
default : do_print_usage = handle_unknown_arg; break;
}

View file

@ -22,6 +22,7 @@
* THE SOFTWARE.
*/
#include "criterion/criterion.h"
#include "protocol/protocol.h"
#include "protocol/messages.h"
#include "event.h"
@ -38,5 +39,6 @@ void criterion_send_assert(struct criterion_assert_stats *stats) {
.has_line = true,
.line = stats->line,
);
criterion_message_set_id(msg);
cr_send_to_runner(&msg);
}

View file

@ -1,5 +1,5 @@
/* Automatically generated nanopb constant definitions */
/* Generated by nanopb-0.3.5-dev at Mon Jan 11 00:41:07 2016. */
/* Generated by nanopb-0.3.5-dev at Mon Jan 11 19:14:39 2016. */
#include "criterion.pb.h"
@ -60,10 +60,11 @@ const pb_field_t criterion_protocol_submessage_fields[6] = {
PB_LAST_FIELD
};
const pb_field_t criterion_protocol_msg_fields[4] = {
const pb_field_t criterion_protocol_msg_fields[5] = {
PB_FIELD( 1, INT32 , REQUIRED, STATIC , FIRST, criterion_protocol_msg, version, version, &criterion_protocol_msg_version_default),
PB_ONEOF_FIELD(id, 2, INT64 , ONEOF, STATIC , OTHER, criterion_protocol_msg, pid, version, 0),
PB_FIELD( 16, MESSAGE , REQUIRED, STATIC , OTHER, criterion_protocol_msg, data, id.pid, &criterion_protocol_submessage_fields),
PB_ONEOF_FIELD(id, 3, STRING , ONEOF, POINTER , OTHER, criterion_protocol_msg, uid, version, 0),
PB_FIELD( 16, MESSAGE , REQUIRED, STATIC , OTHER, criterion_protocol_msg, data, id.uid, &criterion_protocol_submessage_fields),
PB_LAST_FIELD
};

View file

@ -1,5 +1,5 @@
/* Automatically generated nanopb header */
/* Generated by nanopb-0.3.5-dev at Mon Jan 11 00:41:07 2016. */
/* Generated by nanopb-0.3.5-dev at Mon Jan 11 19:14:39 2016. */
#ifndef PB_CRITERION_PB_H_INCLUDED
#define PB_CRITERION_PB_H_INCLUDED
@ -91,6 +91,7 @@ typedef struct _criterion_protocol_msg {
pb_size_t which_id;
union {
int64_t pid;
char *uid;
} id;
criterion_protocol_submessage data;
} criterion_protocol_msg;
@ -140,6 +141,7 @@ extern const int32_t criterion_protocol_msg_version_default;
#define criterion_protocol_submessage_message_tag 4
#define criterion_protocol_submessage_assert_tag 5
#define criterion_protocol_msg_pid_tag 2
#define criterion_protocol_msg_uid_tag 3
#define criterion_protocol_msg_version_tag 1
#define criterion_protocol_msg_data_tag 16
@ -151,12 +153,11 @@ extern const pb_field_t criterion_protocol_assert_fields[5];
extern const pb_field_t criterion_protocol_log_fields[4];
extern const pb_field_t criterion_protocol_ack_fields[3];
extern const pb_field_t criterion_protocol_submessage_fields[6];
extern const pb_field_t criterion_protocol_msg_fields[4];
extern const pb_field_t criterion_protocol_msg_fields[5];
/* Maximum encoded size of messages (where known) */
#define criterion_protocol_birth_size 11
#define criterion_protocol_death_size 24
#define criterion_protocol_msg_size (29 + criterion_protocol_submessage_size)
/* Message IDs (where set with "msgid" option) */
#ifdef PB_MSGID

View file

@ -75,6 +75,7 @@ message msg {
oneof id {
int64 pid = 2;
string uid = 3;
}
required submessage data = 16;

View file

@ -83,7 +83,7 @@ void cr_send_to_runner(const criterion_protocol_msg *message) {
if (write_message(g_client_socket, message) != 1) {
criterion_perror("Could not write the \"%s\" message down the event pipe: %s.\n",
message_names[message->data.which_value],
strerror(errno));
nn_strerror(errno));
abort();
}
@ -91,7 +91,7 @@ void cr_send_to_runner(const criterion_protocol_msg *message) {
int read = nn_recv(g_client_socket, &buf, NN_MSG, 0);
if (read <= 0) {
criterion_perror("Could not read ack: %s.\n", strerror(errno));
criterion_perror("Could not read ack: %s.\n", nn_strerror(errno));
abort();
}
@ -106,6 +106,7 @@ void cr_send_to_runner(const criterion_protocol_msg *message) {
criterion_perror("Runner returned an error: %s.\n", ack.message ? ack.message : "Unknown error");
abort();
}
pb_release(criterion_protocol_ack_fields, &ack);
if (buf)
nn_freemsg(buf);
@ -140,9 +141,13 @@ void send_ack(int sock, bool ok, const char *msg, ...) {
int written = nn_send(sock, buf, size, 0);
if (written <= 0 || written != (int) size) {
criterion_perror("Could not send ack: %s.\n", strerror(errno));
criterion_perror("Could not send ack: %s.\n", nn_strerror(errno));
abort();
}
free(buf);
}
void free_message(criterion_protocol_msg *msg) {
pb_release(criterion_protocol_msg_fields, msg);
}

View file

@ -30,5 +30,6 @@ int write_message(int sock, const criterion_protocol_msg *message);
int read_message(int sock, criterion_protocol_msg *message);
void cr_send_to_runner(const criterion_protocol_msg *message);
void send_ack(int sock, bool ok, const char *msg, ...);
void free_message(criterion_protocol_msg *msg);
#endif /* !MESSAGES_H_ */

View file

@ -62,3 +62,5 @@ pb_istream_t pb_istream_from_fd(int fd) {
pb_istream_t stream = {&read_fd_callback, (void*)(intptr_t) fd, SIZE_MAX, NULL};
return stream;
}
volatile bool is_extern_worker = false;

View file

@ -40,13 +40,23 @@ bool pb_read_string(pb_istream_t *stream, const pb_field_t *field, void **arg);
pb_ostream_t pb_ostream_from_fd(int fd);
pb_istream_t pb_istream_from_fd(int fd);
extern volatile bool is_extern_worker;
# define criterion_message_set_id(Msg) \
do { \
if (is_extern_worker) { \
(Msg).id.uid = (char *) criterion_current_test->name; \
} else { \
(Msg).id.pid = get_process_id(); \
} \
} while (0)
# define criterion_message(Kind, ...) \
(criterion_protocol_msg) { \
.version = 1, \
.which_id = criterion_protocol_msg_pid_tag, \
.id = { \
.pid = get_process_id(), \
}, \
.version = PROTOCOL_V1, \
.which_id = is_extern_worker \
? criterion_protocol_msg_uid_tag \
: criterion_protocol_msg_pid_tag, \
.data = { \
.which_value = criterion_protocol_submessage_ ## Kind ## _tag, \
.value = { \