diff --git a/CMakeLists.txt b/CMakeLists.txt
index fe474a3..e226e4f 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -212,6 +212,8 @@ set(SOURCE_FILES
src/protocol/criterion.pb.c
src/protocol/criterion.pb.h
src/protocol/protocol.c
+ src/protocol/messages.c
+ src/protocol/messages.h
src/protocol/connect.c
src/protocol/connect.h
src/common.h
diff --git a/src/core/runner.c b/src/core/runner.c
index a08ac0f..66fefe0 100644
--- a/src/core/runner.c
+++ b/src/core/runner.c
@@ -35,6 +35,7 @@
#include "criterion/internal/preprocess.h"
#include "protocol/protocol.h"
#include "protocol/connect.h"
+#include "protocol/messages.h"
#include "compat/time.h"
#include "compat/posix.h"
#include "compat/processor.h"
@@ -295,25 +296,20 @@ static void run_tests_async(struct criterion_test_set *set,
goto cleanup;
criterion_protocol_msg msg = criterion_protocol_msg_init_zero;
- unsigned char *buf = NULL;
- int length;
- while ((length = nn_recv(socket, &buf, NN_MSG, 0)) > 0) {
- pb_istream_t in = pb_istream_from_buffer(buf, length);
- if (pb_decode(&in, criterion_protocol_msg_fields, &msg)) {
- struct client_ctx *cctx = process_client_message(&sctx, &msg);
- if (cctx->state == CS_DEATH && cctx->kind == WORKER) {
- remove_client_by_pid(&sctx, get_process_id_of(cctx->worker->proc));
- sfree(cctx->worker);
+ while (read_message(socket, &msg) == 1) {
+ struct client_ctx *cctx = process_client_message(&sctx, &msg);
+ if (cctx->state == CS_DEATH && cctx->kind == WORKER) {
+ remove_client_by_pid(&sctx, get_process_id_of(cctx->worker->proc));
+ sfree(cctx->worker);
- cctx = spawn_next_client(&sctx, &ctx);
- if (!is_runner())
- goto cleanup;
+ cctx = spawn_next_client(&sctx, &ctx);
+ if (!is_runner())
+ goto cleanup;
- if (cctx == NULL)
- --active_workers;
- }
+ if (cctx == NULL)
+ --active_workers;
}
- nn_freemsg(buf);
+
if (!active_workers)
break;
}
diff --git a/src/protocol/messages.c b/src/protocol/messages.c
new file mode 100644
index 0000000..2f0e7ba
--- /dev/null
+++ b/src/protocol/messages.c
@@ -0,0 +1,68 @@
+/*
+ * The MIT License (MIT)
+ *
+ * Copyright © 2015 Franklin "Snaipe" Mathieu
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+#include
+#include
+#include "protocol/protocol.h"
+
+int read_message(int sock, criterion_protocol_msg *message) {
+ int res;
+ unsigned char *buf;
+ int read = res = nn_recv(sock, &buf, NN_MSG, 0);
+
+ if (read <= 0)
+ goto cleanup;
+
+ pb_istream_t stream = pb_istream_from_buffer(buf, read);
+ if (!pb_decode(&stream, criterion_protocol_msg_fields, message)) {
+ res = -2;
+ goto cleanup;
+ }
+
+ res = 1;
+cleanup:
+ nn_freemsg(buf);
+ return res;
+}
+
+int write_message(int sock, const criterion_protocol_msg *message) {
+ int res = -1;
+ size_t size;
+ unsigned char *buf = NULL;
+ if (!pb_get_encoded_size(&size, criterion_protocol_msg_fields, message))
+ goto cleanup;
+
+ buf = malloc(size);
+ pb_ostream_t stream = pb_ostream_from_buffer(buf, size);
+ if (!pb_encode(&stream, criterion_protocol_msg_fields, message))
+ goto cleanup;
+
+ int written = nn_send(sock, buf, size, 0);
+ if (written <= 0 || written != (int) size)
+ goto cleanup;
+
+ res = 1;
+cleanup:
+ free(buf);
+ return res;
+}
diff --git a/src/protocol/messages.h b/src/protocol/messages.h
new file mode 100644
index 0000000..e44b4c1
--- /dev/null
+++ b/src/protocol/messages.h
@@ -0,0 +1,30 @@
+/*
+ * The MIT License (MIT)
+ *
+ * Copyright © 2015 Franklin "Snaipe" Mathieu
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+#ifndef MESSAGES_H_
+# define MESSAGES_H_
+
+int write_message(int sock, const criterion_protocol_msg *message);
+int read_message(int sock, criterion_protocol_msg *message);
+
+#endif /* !MESSAGES_H_ */