Added message read/write utilities

This commit is contained in:
Snaipe 2015-12-13 18:13:16 +01:00
parent 752123f496
commit e7b0d5e0c4
4 changed files with 112 additions and 16 deletions

View file

@ -212,6 +212,8 @@ set(SOURCE_FILES
src/protocol/criterion.pb.c
src/protocol/criterion.pb.h
src/protocol/protocol.c
src/protocol/messages.c
src/protocol/messages.h
src/protocol/connect.c
src/protocol/connect.h
src/common.h

View file

@ -35,6 +35,7 @@
#include "criterion/internal/preprocess.h"
#include "protocol/protocol.h"
#include "protocol/connect.h"
#include "protocol/messages.h"
#include "compat/time.h"
#include "compat/posix.h"
#include "compat/processor.h"
@ -295,25 +296,20 @@ static void run_tests_async(struct criterion_test_set *set,
goto cleanup;
criterion_protocol_msg msg = criterion_protocol_msg_init_zero;
unsigned char *buf = NULL;
int length;
while ((length = nn_recv(socket, &buf, NN_MSG, 0)) > 0) {
pb_istream_t in = pb_istream_from_buffer(buf, length);
if (pb_decode(&in, criterion_protocol_msg_fields, &msg)) {
struct client_ctx *cctx = process_client_message(&sctx, &msg);
if (cctx->state == CS_DEATH && cctx->kind == WORKER) {
remove_client_by_pid(&sctx, get_process_id_of(cctx->worker->proc));
sfree(cctx->worker);
while (read_message(socket, &msg) == 1) {
struct client_ctx *cctx = process_client_message(&sctx, &msg);
if (cctx->state == CS_DEATH && cctx->kind == WORKER) {
remove_client_by_pid(&sctx, get_process_id_of(cctx->worker->proc));
sfree(cctx->worker);
cctx = spawn_next_client(&sctx, &ctx);
if (!is_runner())
goto cleanup;
cctx = spawn_next_client(&sctx, &ctx);
if (!is_runner())
goto cleanup;
if (cctx == NULL)
--active_workers;
}
if (cctx == NULL)
--active_workers;
}
nn_freemsg(buf);
if (!active_workers)
break;
}

68
src/protocol/messages.c Normal file
View file

@ -0,0 +1,68 @@
/*
* The MIT License (MIT)
*
* Copyright © 2015 Franklin "Snaipe" Mathieu <http://snai.pe/>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include <nanomsg/nn.h>
#include <stdlib.h>
#include "protocol/protocol.h"
int read_message(int sock, criterion_protocol_msg *message) {
int res;
unsigned char *buf;
int read = res = nn_recv(sock, &buf, NN_MSG, 0);
if (read <= 0)
goto cleanup;
pb_istream_t stream = pb_istream_from_buffer(buf, read);
if (!pb_decode(&stream, criterion_protocol_msg_fields, message)) {
res = -2;
goto cleanup;
}
res = 1;
cleanup:
nn_freemsg(buf);
return res;
}
int write_message(int sock, const criterion_protocol_msg *message) {
int res = -1;
size_t size;
unsigned char *buf = NULL;
if (!pb_get_encoded_size(&size, criterion_protocol_msg_fields, message))
goto cleanup;
buf = malloc(size);
pb_ostream_t stream = pb_ostream_from_buffer(buf, size);
if (!pb_encode(&stream, criterion_protocol_msg_fields, message))
goto cleanup;
int written = nn_send(sock, buf, size, 0);
if (written <= 0 || written != (int) size)
goto cleanup;
res = 1;
cleanup:
free(buf);
return res;
}

30
src/protocol/messages.h Normal file
View file

@ -0,0 +1,30 @@
/*
* The MIT License (MIT)
*
* Copyright © 2015 Franklin "Snaipe" Mathieu <http://snai.pe/>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#ifndef MESSAGES_H_
# define MESSAGES_H_
int write_message(int sock, const criterion_protocol_msg *message);
int read_message(int sock, criterion_protocol_msg *message);
#endif /* !MESSAGES_H_ */