[Issue #58] Switched event pipe IO to use native read/write syscalls

This commit is contained in:
Snaipe 2015-09-27 01:20:27 +02:00
parent f1dfff5756
commit 3fb26eb505
8 changed files with 169 additions and 23 deletions

View file

@ -35,4 +35,12 @@ struct pipe_handle {
#endif
};
struct pipe_file_handle {
#ifdef VANILLA_WIN32
HANDLE fh;
#else
int fd;
#endif
};
#endif /* !PIPE_INTERNAL_H_ */

View file

@ -182,6 +182,135 @@ void pipe_std_redirect(s_pipe_handle *pipe, enum criterion_std_fd fd) {
#endif
}
void close_pipe_file_handle(void *ptr, UNUSED void *meta) {
s_pipe_file_handle *h = ptr;
#ifdef VANILLA_WIN32
CloseHandle(h->fh);
#else
close(h->fd);
#endif
}
#ifdef VANILLA_WIN32
static HANDLE win_dup(HANDLE h) {
HANDLE dup;
DuplicateHandle(GetCurrentProcess(),
h,
GetCurrentProcess(),
&dup,
0,
TRUE,
DUPLICATE_SAME_ACCESS);
return dup;
}
#endif
s_pipe_file_handle *pipe_out_handle(s_pipe_handle *p, enum pipe_opt opts) {
#ifdef VANILLA_WIN32
if (opts & PIPE_CLOSE)
CloseHandle(p->fhs[0]);
HANDLE fh = p->fhs[1];
if (opts & PIPE_DUP)
fh = win_dup(fh);
s_pipe_file_handle *h = smalloc(
.size = sizeof (s_pipe_file_handle),
.dtor = close_pipe_file_handle);
h->fh = fh;
return h;
#else
if (opts & PIPE_CLOSE)
close(p->fds[0]);
int fd = p->fds[1];
if (opts & PIPE_DUP)
fd = dup(fd);
s_pipe_file_handle *h = smalloc(
.size = sizeof (s_pipe_file_handle),
.dtor = close_pipe_file_handle);
h->fd = fd;
return h;
#endif
}
s_pipe_file_handle *pipe_in_handle(s_pipe_handle *p, enum pipe_opt opts) {
#ifdef VANILLA_WIN32
if (opts & PIPE_CLOSE)
CloseHandle(p->fhs[1]);
HANDLE fh = p->fhs[0];
if (opts & PIPE_DUP)
fh = win_dup(fh);
s_pipe_file_handle *h = smalloc(
.size = sizeof (s_pipe_file_handle),
.dtor = close_pipe_file_handle);
h->fh = fh;
return h;
#else
if (opts & PIPE_CLOSE)
close(p->fds[1]);
int fd = p->fds[0];
if (opts & PIPE_DUP)
fd = dup(fd);
s_pipe_file_handle *h = smalloc(
.size = sizeof (s_pipe_file_handle),
.dtor = close_pipe_file_handle);
h->fd = fd;
return h;
#endif
}
int pipe_write(const void *buf, size_t size, s_pipe_file_handle *pipe) {
#ifdef VANILLA_WIN32
DWORD written = 0;
size_t off = 0;
while (size > 0) {
if (!WriteFile(pipe->fh, (const char *) buf + off, size, &written, NULL))
return -1;
size -= written;
off += written;
}
if (off > 0)
return 1;
return 0;
#else
ssize_t res = write(pipe->fd, buf, size);
if (res < 0)
return -1;
if (res > 0)
return 1;
return 0;
#endif
}
int pipe_read(void *buf, size_t size, s_pipe_file_handle *pipe) {
#ifdef VANILLA_WIN32
DWORD read = 0;
size_t off = 0;
while (size > 0) {
if (!ReadFile(pipe->fh, (char *) buf + off, size, &read, NULL))
return -1;
size -= read;
off += read;
}
if (off > 0)
return 1;
return 0;
#else
ssize_t res = read(pipe->fd, buf, size);
if (res < 0)
return -1;
if (res > 0)
return 1;
return 0;
#endif
}
static s_pipe_handle stdout_redir_;
static s_pipe_handle stderr_redir_;
static s_pipe_handle stdin_redir_;

View file

@ -31,6 +31,9 @@
struct pipe_handle;
typedef struct pipe_handle s_pipe_handle;
struct pipe_file_handle;
typedef struct pipe_file_handle s_pipe_file_handle;
enum pipe_end {
PIPE_READ = 0,
PIPE_WRITE = 1,
@ -51,9 +54,15 @@ s_pipe_handle *stdpipe();
FILE *pipe_in(s_pipe_handle *p, enum pipe_opt opts);
FILE *pipe_out(s_pipe_handle *p, enum pipe_opt opts);
s_pipe_file_handle *pipe_out_handle(s_pipe_handle *p, enum pipe_opt opts);
s_pipe_file_handle *pipe_in_handle(s_pipe_handle *p, enum pipe_opt opts);
int stdpipe_options(s_pipe_handle *pipe, int id, int noblock);
void pipe_std_redirect(s_pipe_handle *pipe, enum criterion_std_fd fd);
int pipe_write(const void *buf, size_t size, s_pipe_file_handle *pipe);
int pipe_read(void *buf, size_t size, s_pipe_file_handle *pipe);
INLINE FILE* get_std_file(enum criterion_std_fd fd_kind) {
switch (fd_kind) {
case CR_STDIN: return stdin;

View file

@ -335,7 +335,7 @@ static void run_tests_async(struct criterion_test_set *set,
size_t active_workers = 0;
FILE *event_pipe = pipe_in(g_worker_pipe, PIPE_DUP);
s_pipe_file_handle *event_pipe = pipe_in_handle(g_worker_pipe, PIPE_DUP);
struct event *ev = NULL;
// initialization of coroutine
@ -374,7 +374,7 @@ static void run_tests_async(struct criterion_test_set *set,
ev = NULL;
cleanup:
fclose(event_pipe);
sfree(event_pipe);
sfree(ev);
for (size_t i = 0; i < nb_workers; ++i)
sfree(workers.workers[i]);

View file

@ -48,14 +48,14 @@ bool is_runner(void) {
static void close_process(void *ptr, UNUSED void *meta) {
struct worker *proc = ptr;
fclose(proc->in);
sfree(proc->in);
sfree(proc->ctx.suite_stats);
sfree(proc->ctx.test_stats);
sfree(proc->ctx.stats);
sfree(proc->proc);
}
struct event *worker_read_event(struct worker_set *workers, FILE *pipe) {
struct event *worker_read_event(struct worker_set *workers, s_pipe_file_handle *pipe) {
struct event *ev = read_event(pipe);
if (ev) {
ev->worker_index = -1;
@ -76,10 +76,10 @@ struct event *worker_read_event(struct worker_set *workers, FILE *pipe) {
void run_worker(struct worker_context *ctx) {
cr_redirect_stdin();
g_event_pipe = pipe_out(ctx->pipe, PIPE_CLOSE);
g_event_pipe = pipe_out_handle(ctx->pipe, PIPE_CLOSE);
ctx->func(ctx->test, ctx->suite);
fclose(g_event_pipe);
sfree(g_event_pipe);
fflush(NULL); // flush all opened streams
if (criterion_options.no_early_exit)
@ -118,7 +118,7 @@ struct worker *spawn_test_worker(struct execution_context *ctx,
*ptr = (struct worker) {
.proc = proc,
.in = pipe_in(pipe, PIPE_DUP),
.in = pipe_in_handle(pipe, PIPE_DUP),
.ctx = *ctx,
};
return ptr;

View file

@ -49,7 +49,7 @@ struct execution_context {
struct worker {
int active;
s_proc_handle *proc;
FILE *in;
s_pipe_file_handle *in;
struct execution_context ctx;
};
@ -85,6 +85,6 @@ struct process_status get_status(int status);
struct worker *spawn_test_worker(struct execution_context *ctx,
f_worker_func func,
s_pipe_handle *pipe);
struct event *worker_read_event(struct worker_set *workers, FILE *pipe);
struct event *worker_read_event(struct worker_set *workers, s_pipe_file_handle *pipe);
#endif /* !PROCESS_H_ */

View file

@ -31,7 +31,7 @@
#include "core/worker.h"
#include "event.h"
FILE *g_event_pipe = NULL;
s_pipe_file_handle *g_event_pipe = NULL;
void destroy_event(void *ptr, UNUSED void *meta) {
struct event *ev = ptr;
@ -56,12 +56,12 @@ void destroy_assert_event(void *ptr, UNUSED void *meta) {
abort(); \
} while (0)
struct event *read_event(FILE *f) {
struct event *read_event(s_pipe_file_handle *f) {
unsigned kind;
ASSERT(fread(&kind, sizeof (unsigned), 1, f) == 0);
ASSERT(pipe_read(&kind, sizeof (unsigned), f) == 0);
unsigned long long pid;
ASSERT(fread(&pid, sizeof (unsigned long long), 1, f) == 0);
ASSERT(pipe_read(&pid, sizeof (unsigned long long), f) == 0);
switch (kind) {
case ASSERT: {
@ -70,13 +70,13 @@ struct event *read_event(FILE *f) {
char *msg = NULL;
buf = malloc(assert_size);
ASSERT(fread(buf, assert_size, 1, f) == 0);
ASSERT(pipe_read(buf, assert_size, f) == 0);
size_t len = 0;
ASSERT(fread(&len, sizeof (size_t), 1, f) == 0);
ASSERT(pipe_read(&len, sizeof (size_t), f) == 0);
msg = malloc(len);
ASSERT(fread(msg, len, 1, f) == 0);
ASSERT(pipe_read(msg, len, f) == 0);
buf->message = msg;
@ -89,10 +89,10 @@ struct event *read_event(FILE *f) {
}
case THEORY_FAIL: {
size_t len = 0;
ASSERT(fread(&len, sizeof (size_t), 1, f) == 0);
ASSERT(pipe_read(&len, sizeof (size_t), f) == 0);
char *buf = malloc(len);
ASSERT(fread(buf, len, 1, f) == 0);
ASSERT(pipe_read(buf, len, f) == 0);
struct event *ev = smalloc(
.size = sizeof (struct event),
@ -103,7 +103,7 @@ struct event *read_event(FILE *f) {
}
case POST_TEST: {
double *elapsed_time = malloc(sizeof (double));
ASSERT(fread(elapsed_time, sizeof (double), 1, f) == 0);
ASSERT(pipe_read(elapsed_time, sizeof (double), f) == 0);
struct event *ev = smalloc(
.size = sizeof (struct event),
@ -114,7 +114,7 @@ struct event *read_event(FILE *f) {
}
case WORKER_TERMINATED: {
struct worker_status *status = malloc(sizeof (struct worker_status));
ASSERT(fread(status, sizeof (struct worker_status), 1, f) == 0);
ASSERT(pipe_read(status, sizeof (struct worker_status), f) == 0);
struct event *ev = smalloc(
.size = sizeof (struct event),
@ -138,7 +138,7 @@ void send_event(int kind, void *data, size_t size) {
memcpy(buf, &kind, sizeof (int));
memcpy(buf + sizeof (int), &pid, sizeof (pid));
memcpy(buf + sizeof (int) + sizeof (pid), data, size);
ASSERT(fwrite(buf, sizeof (int) + sizeof (pid) + size, 1, g_event_pipe) == 0);
ASSERT(pipe_write(buf, sizeof (int) + sizeof (pid) + size, g_event_pipe) == 0);
free(buf);
}

View file

@ -28,7 +28,7 @@
# include "core/worker.h"
# include <stdio.h>
extern FILE *g_event_pipe;
extern s_pipe_file_handle *g_event_pipe;
struct event {
unsigned long long pid;
@ -43,6 +43,6 @@ enum other_event_kinds {
WORKER_TERMINATED = 1 << 30,
};
struct event *read_event(FILE *f);
struct event *read_event(s_pipe_file_handle *f);
#endif /* !EVENT_H_ */