Revert "Switched from nanomsg to zeromq temporarily until the fork patch works"

This reverts commit 4af745bf88.
This commit is contained in:
Snaipe 2016-01-28 23:37:41 +01:00
parent 98fbab0afd
commit 04042ab71a
14 changed files with 77 additions and 93 deletions

9
.gitmodules vendored
View file

@ -13,6 +13,9 @@
[submodule "dependencies/nanopb"] [submodule "dependencies/nanopb"]
path = dependencies/nanopb path = dependencies/nanopb
url = https://github.com/nanopb/nanopb.git url = https://github.com/nanopb/nanopb.git
[submodule "dependencies/libzmq"] [submodule "dependencies/nanomsg"]
path = dependencies/libzmq path = dependencies/nanomsg
url = https://github.com/zeromq/zeromq4-1 url = https://github.com/nanomsg/nanomsg.git
[submodule "dependencies/nanomsg-patched"]
path = dependencies/nanomsg-patched
url = https://github.com/Snaipe/nanomsg.git

View file

@ -27,17 +27,18 @@ include (PackageUtils)
cr_add_subproject (csptr PATH dependencies/libcsptr CMAKE) cr_add_subproject (csptr PATH dependencies/libcsptr CMAKE)
cr_add_subproject (dyncall_s PATH dependencies/dyncall CMAKE IF THEORIES) cr_add_subproject (dyncall_s PATH dependencies/dyncall CMAKE IF THEORIES)
cr_add_subproject (zmq cr_add_subproject (nanomsg
PATH dependencies/libzmq PATH dependencies/nanomsg-patched
LIBNAME libzmq-static OPTS "-DNN_TESTS=OFF"
OPTS GENERATOR "Visual Studio 14 2015"
-DZMQ_MAKE_VALGRIND_HAPPY=1
-DZMQ_BUILD_FRAMEWORK=OFF
-DZMQ_BUILD_TESTS=OFF
-DWITH_TWEETNACL=OFF
-DWITH_PERF_TOOL=OFF
CMAKE CMAKE
PARALLELIZED IF WIN32 AND NOT CYGWIN
)
cr_add_subproject (nanomsg
PATH dependencies/nanomsg-patched
AUTOTOOLS
PARALLELIZE
IF NOT WIN32 OR CYGWIN
) )
cr_add_subproject (wingetopt PATH dependencies/wingetopt CMAKE IF MSVC) cr_add_subproject (wingetopt PATH dependencies/wingetopt CMAKE IF MSVC)
@ -77,7 +78,7 @@ cr_add_library(criterion SHARED
) )
cr_link_subproject(criterion csptr STATIC) cr_link_subproject(criterion csptr STATIC)
cr_link_subproject(criterion zmq SHARED) cr_link_subproject(criterion nanomsg STATIC)
cr_link_subproject(criterion dyncall_s STATIC) cr_link_subproject(criterion dyncall_s STATIC)
cr_link_subproject(criterion wingetopt STATIC) cr_link_subproject(criterion wingetopt STATIC)

1
dependencies/libzmq vendored

@ -1 +0,0 @@
Subproject commit 5501f19a0f4c596c5247c743c04759ef075927d3

1
dependencies/nanomsg vendored Submodule

@ -0,0 +1 @@
Subproject commit fd66ff55a5bad44ea0c3cca8bea345b6f02663bf

1
dependencies/nanomsg-patched vendored Submodule

@ -0,0 +1 @@
Subproject commit 8809bc55d8944d48a576c644499f5399ebe68cda

View file

@ -66,7 +66,7 @@ struct kh_ht_client_s;
struct kh_ht_extern_s; struct kh_ht_extern_s;
struct server_ctx { struct server_ctx {
cr_socket socket; int socket;
struct criterion_suite extern_suite; struct criterion_suite extern_suite;
struct criterion_test_extra_data extern_suite_data; struct criterion_test_extra_data extern_suite_data;
struct criterion_global_stats *gstats; struct criterion_global_stats *gstats;

View file

@ -27,6 +27,7 @@
#include <errno.h> #include <errno.h>
#include <csptr/smalloc.h> #include <csptr/smalloc.h>
#include <valgrind/valgrind.h> #include <valgrind/valgrind.h>
#include <nanomsg/nn.h>
#include "criterion/internal/test.h" #include "criterion/internal/test.h"
#include "criterion/options.h" #include "criterion/options.h"
#include "criterion/internal/ordered-set.h" #include "criterion/internal/ordered-set.h"
@ -170,13 +171,9 @@ const struct criterion_suite *criterion_current_suite;
void run_test_child(struct criterion_test *test, void run_test_child(struct criterion_test *test,
struct criterion_suite *suite) { struct criterion_suite *suite) {
// Reinitialize transport context after fork
cr_transport_term ();
cr_transport_init ();
cr_redirect_stdin(); cr_redirect_stdin();
g_client_socket = connect_client(); g_client_socket = connect_client();
if (g_client_socket == NULL) { if (g_client_socket < 0) {
criterion_perror("Could not initialize the message client: %s.\n", criterion_perror("Could not initialize the message client: %s.\n",
strerror(errno)); strerror(errno));
abort(); abort();
@ -302,7 +299,7 @@ static struct client_ctx *spawn_next_client(struct server_ctx *sctx, ccrContext
static void run_tests_async(struct criterion_test_set *set, static void run_tests_async(struct criterion_test_set *set,
struct criterion_global_stats *stats, struct criterion_global_stats *stats,
cr_socket socket) { int socket) {
ccrContext ctx = 0; ccrContext ctx = 0;
@ -384,17 +381,15 @@ static int criterion_run_all_tests_impl(struct criterion_test_set *set) {
fflush(NULL); // flush everything before forking fflush(NULL); // flush everything before forking
cr_transport_init (); int sock = bind_server();
if (sock < 0) {
cr_socket sock = bind_server();
if (sock == NULL) {
criterion_perror("Could not initialize the message server: %s.\n", criterion_perror("Could not initialize the message server: %s.\n",
strerror(errno)); strerror(errno));
abort(); abort();
} }
g_client_socket = connect_client(); g_client_socket = connect_client();
if (g_client_socket == NULL) { if (g_client_socket < 0) {
criterion_perror("Could not initialize the message client: %s.\n", criterion_perror("Could not initialize the message client: %s.\n",
strerror(errno)); strerror(errno));
abort(); abort();
@ -420,7 +415,6 @@ cleanup:
close_socket (g_client_socket); close_socket (g_client_socket);
close_socket (sock); close_socket (sock);
} }
cr_transport_term ();
sfree(stats); sfree(stats);
return result; return result;
} }

View file

@ -25,6 +25,7 @@
#include <stdbool.h> #include <stdbool.h>
#include <errno.h> #include <errno.h>
#include <csptr/smalloc.h> #include <csptr/smalloc.h>
#include <nanomsg/nn.h>
#include "criterion/types.h" #include "criterion/types.h"
#include "criterion/options.h" #include "criterion/options.h"

View file

@ -28,7 +28,7 @@
#include "event.h" #include "event.h"
#include "assert.h" #include "assert.h"
cr_socket g_client_socket = NULL; int g_client_socket = -1;
void criterion_send_assert(struct criterion_assert_stats *stats) { void criterion_send_assert(struct criterion_assert_stats *stats) {
assert(stats->message); assert(stats->message);

View file

@ -26,11 +26,10 @@
# include "criterion/event.h" # include "criterion/event.h"
# include "core/worker.h" # include "core/worker.h"
# include "protocol/connect.h"
# include <stdio.h> # include <stdio.h>
# include <pb.h> # include <pb.h>
extern cr_socket g_client_socket; extern int g_client_socket;
struct event { struct event {
unsigned long long pid; unsigned long long pid;

View file

@ -22,53 +22,46 @@
* THE SOFTWARE. * THE SOFTWARE.
*/ */
#include <errno.h> #include <errno.h>
#include <zmq.h> #include <nanomsg/nn.h>
#include "connect.h" #include <nanomsg/reqrep.h>
#define URL "ipc://criterion.sock" #define URL "ipc://criterion.sock"
#define errno_ignore(Stmt) do { int err = errno; Stmt; errno = err; } while (0) #define errno_ignore(Stmt) do { int err = errno; Stmt; errno = err; } while (0)
static void *zmq_ctx; int bind_server(void) {
int fstrat = NN_FORK_RESET;
nn_setopt(NN_FORK_STRATEGY, &fstrat, sizeof (fstrat));
void cr_transport_init (void) { int sock = nn_socket(AF_SP, NN_REP);
zmq_ctx = zmq_ctx_new (); if (sock < 0)
} return -1;
void cr_transport_term (void) { if (nn_bind(sock, URL) < 0)
zmq_ctx_destroy (zmq_ctx);
}
cr_socket bind_server(void) {
cr_socket sock = zmq_socket(zmq_ctx, ZMQ_REP);
if (sock == NULL)
goto error;
if (zmq_bind(sock, URL) < 0)
goto error; goto error;
return sock; return sock;
error: {} error: {}
if (sock) errno_ignore(nn_close(sock));
errno_ignore(zmq_close(sock)); return -1;
return NULL;
} }
cr_socket connect_client(void) { int connect_client(void) {
cr_socket sock = zmq_socket(zmq_ctx, ZMQ_REQ); int sock = nn_socket(AF_SP, NN_REQ);
if (sock == NULL) if (sock < 0)
goto error; return -1;
if (zmq_connect (sock, URL) < 0) if (nn_connect (sock, URL) < 0)
goto error; goto error;
return sock; return sock;
error: {} error: {}
if (sock) errno_ignore(nn_close(sock));
errno_ignore(zmq_close(sock)); return -1;
return NULL;
} }
void close_socket(cr_socket sock) { void close_socket(int sock) {
zmq_close(sock); nn_close(sock);
} }

View file

@ -24,13 +24,8 @@
#ifndef CONNECT_H_ #ifndef CONNECT_H_
# define CONNECT_H_ # define CONNECT_H_
typedef void *cr_socket; int connect_client(void);
int bind_server(void);
void cr_transport_init (void); void close_socket(int sock);
void cr_transport_term (void);
cr_socket connect_client(void);
cr_socket bind_server(void);
void close_socket(cr_socket sock);
#endif /* !CONNECT_H_ */ #endif /* !CONNECT_H_ */

View file

@ -21,24 +21,22 @@
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE. * THE SOFTWARE.
*/ */
#include <nanomsg/nn.h>
#include <stdlib.h> #include <stdlib.h>
#include <zmq.h>
#include "protocol/protocol.h" #include "protocol/protocol.h"
#include "criterion/logging.h" #include "criterion/logging.h"
#include "io/event.h" #include "io/event.h"
#include "io/asprintf.h" #include "io/asprintf.h"
int read_message(cr_socket sock, criterion_protocol_msg *message) { int read_message(int sock, criterion_protocol_msg *message) {
int res; int res;
zmq_msg_t msg; unsigned char *buf = NULL;
zmq_msg_init (&msg); int read = res = nn_recv(sock, &buf, NN_MSG, 0);
int read = res = zmq_msg_recv(&msg, sock, 0);
if (read <= 0) if (read <= 0)
goto cleanup; goto cleanup;
pb_istream_t stream = pb_istream_from_buffer(zmq_msg_data(&msg), read); pb_istream_t stream = pb_istream_from_buffer(buf, read);
if (!pb_decode(&stream, criterion_protocol_msg_fields, message)) { if (!pb_decode(&stream, criterion_protocol_msg_fields, message)) {
res = -2; res = -2;
goto cleanup; goto cleanup;
@ -46,11 +44,12 @@ int read_message(cr_socket sock, criterion_protocol_msg *message) {
res = 1; res = 1;
cleanup: cleanup:
zmq_msg_close(&msg); if (buf)
nn_freemsg(buf);
return res; return res;
} }
int write_message(cr_socket sock, const criterion_protocol_msg *message) { int write_message(int sock, const criterion_protocol_msg *message) {
int res = -1; int res = -1;
size_t size; size_t size;
unsigned char *buf = NULL; unsigned char *buf = NULL;
@ -62,7 +61,7 @@ int write_message(cr_socket sock, const criterion_protocol_msg *message) {
if (!pb_encode(&stream, criterion_protocol_msg_fields, message)) if (!pb_encode(&stream, criterion_protocol_msg_fields, message))
goto cleanup; goto cleanup;
int written = zmq_send(sock, buf, size, 0); int written = nn_send(sock, buf, size, 0);
if (written <= 0 || written != (int) size) if (written <= 0 || written != (int) size)
goto cleanup; goto cleanup;
@ -84,22 +83,20 @@ void cr_send_to_runner(const criterion_protocol_msg *message) {
if (write_message(g_client_socket, message) != 1) { if (write_message(g_client_socket, message) != 1) {
criterion_perror("Could not write the \"%s\" message down the event pipe: %s.\n", criterion_perror("Could not write the \"%s\" message down the event pipe: %s.\n",
message_names[message->data.which_value], message_names[message->data.which_value],
zmq_strerror(errno)); nn_strerror(errno));
abort(); abort();
} }
zmq_msg_t msg; unsigned char *buf = NULL;
zmq_msg_init (&msg); int read = nn_recv(g_client_socket, &buf, NN_MSG, 0);
int read = zmq_msg_recv(&msg, g_client_socket, 0);
if (read <= 0) { if (read <= 0) {
criterion_perror("Could not read ack: %s.\n", zmq_strerror(errno)); criterion_perror("Could not read ack: %s.\n", nn_strerror(errno));
abort(); abort();
} }
criterion_protocol_ack ack; criterion_protocol_ack ack;
pb_istream_t stream = pb_istream_from_buffer(zmq_msg_data(&msg), read); pb_istream_t stream = pb_istream_from_buffer(buf, read);
if (!pb_decode(&stream, criterion_protocol_ack_fields, &ack)) { if (!pb_decode(&stream, criterion_protocol_ack_fields, &ack)) {
criterion_perror("Could not decode ack: %s.\n", PB_GET_ERROR(&stream)); criterion_perror("Could not decode ack: %s.\n", PB_GET_ERROR(&stream));
abort(); abort();
@ -111,10 +108,11 @@ void cr_send_to_runner(const criterion_protocol_msg *message) {
} }
pb_release(criterion_protocol_ack_fields, &ack); pb_release(criterion_protocol_ack_fields, &ack);
zmq_msg_close(&msg); if (buf)
nn_freemsg(buf);
} }
void send_ack(cr_socket sock, bool ok, const char *msg, ...) { void send_ack(int sock, bool ok, const char *msg, ...) {
criterion_protocol_ack ack; criterion_protocol_ack ack;
ack.status_code = ok ? criterion_protocol_ack_status_OK : criterion_protocol_ack_status_ERROR; ack.status_code = ok ? criterion_protocol_ack_status_OK : criterion_protocol_ack_status_ERROR;
ack.message = NULL; ack.message = NULL;
@ -141,9 +139,9 @@ void send_ack(cr_socket sock, bool ok, const char *msg, ...) {
abort(); abort();
} }
int written = zmq_send(sock, buf, size, 0); int written = nn_send(sock, buf, size, 0);
if (written <= 0 || written != (int) size) { if (written <= 0 || written != (int) size) {
criterion_perror("Could not send ack: %s.\n", zmq_strerror(errno)); criterion_perror("Could not send ack: %s.\n", nn_strerror(errno));
abort(); abort();
} }

View file

@ -25,12 +25,11 @@
# define MESSAGES_H_ # define MESSAGES_H_
# include "criterion.pb.h" # include "criterion.pb.h"
# include "connect.h"
int write_message(cr_socket sock, const criterion_protocol_msg *message); int write_message(int sock, const criterion_protocol_msg *message);
int read_message(cr_socket sock, criterion_protocol_msg *message); int read_message(int sock, criterion_protocol_msg *message);
void cr_send_to_runner(const criterion_protocol_msg *message); void cr_send_to_runner(const criterion_protocol_msg *message);
void send_ack(cr_socket sock, bool ok, const char *msg, ...); void send_ack(int sock, bool ok, const char *msg, ...);
void free_message(criterion_protocol_msg *msg); void free_message(criterion_protocol_msg *msg);
#endif /* !MESSAGES_H_ */ #endif /* !MESSAGES_H_ */