1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

api: use C++ queues

This commit is contained in:
Steffen Vogel 2018-12-02 03:20:51 +01:00
parent a5211e1138
commit eab8f1cad7
5 changed files with 24 additions and 39 deletions

View file

@ -63,7 +63,7 @@ protected:
struct {
JsonBuffer buffer;
struct queue queue;
Queue<json_t *> queue;
} request, response;
Api *api;

View file

@ -40,16 +40,11 @@ Session::Session(Api *a) :
runs(0),
api(a)
{
queue_init(&request.queue, 128, &memory_heap);
queue_init(&response.queue, 128, &memory_heap);
logger->debug("Initiated API session: {}", getName());
}
Session::~Session()
{
queue_destroy(&request.queue);
queue_destroy(&response.queue);
logger->debug("Destroyed API session: {}", getName());
}
@ -58,14 +53,14 @@ void Session::runPendingActions()
{
json_t *req, *resp;
while (queue_available(&request.queue) > 0) {
queue_pull(&request.queue, (void **) &req);
while (!request.queue.empty()) {
req = request.queue.pop();
runAction(req, &resp);
json_decref(req);
queue_push(&response.queue, resp);
response.queue.push(resp);
}
}

View file

@ -71,7 +71,6 @@ void Http::read(void *in, size_t len)
int Http::complete()
{
int pushed;
json_t *req;
req = request.buffer.decode();
@ -79,26 +78,22 @@ int Http::complete()
return 0;
request.buffer.clear();
pushed = queue_push(&request.queue, req);
if (pushed != 1)
logger->warn("Queue overrun for API session: {}", getName());
request.queue.push(req);
return 1;
}
int Http::write()
{
int ret, pulled;
int ret;
json_t *resp;
pulled = queue_pull(&response.queue, (void **) &resp);
if (pulled) {
response.buffer.clear();
response.buffer.encode(resp);
resp = response.queue.pop();
json_decref(resp);
}
response.buffer.clear();
response.buffer.encode(resp);
json_decref(resp);
std::stringstream headers;

View file

@ -58,9 +58,7 @@ int Socket::read()
if (!j)
return -1;
ret = queue_push(&request.queue, (json_t *) j);
if (ret != 1)
return -1;
request.queue.push(j);
api->pending.push(this);
@ -72,7 +70,9 @@ int Socket::write()
int ret;
json_t *j;
while (queue_pull(&response.queue, (void **) &j)) {
while (!response.queue.empty()) {
j = response.queue.pop();
ret = json_dumpfd(j, sd, 0);
if (ret)
return ret;

View file

@ -48,7 +48,6 @@ WebSocket::WebSocket(Api *a, lws *w) :
int WebSocket::read(void *in, size_t len)
{
int pushed;
json_t *req;
if (lws_is_first_fragment(wsi))
@ -61,9 +60,7 @@ int WebSocket::read(void *in, size_t len)
if (!req)
return 0;
pushed = queue_push(&request.queue, req);
if (pushed != 1)
logger->warn("Queue overun in API session");
request.queue.push(req);
return 1;
}
@ -73,24 +70,22 @@ int WebSocket::read(void *in, size_t len)
int WebSocket::write()
{
int pulled;
json_t *resp;
if (state == State::SHUTDOWN)
return -1;
pulled = queue_pull(&response.queue, (void **) &resp);
if (pulled) {
char pad[LWS_PRE];
resp = response.queue.pop();
response.buffer.clear();
response.buffer.append(pad, sizeof(pad));
response.buffer.encode(resp);
char pad[LWS_PRE];
json_decref(resp);
response.buffer.clear();
response.buffer.append(pad, sizeof(pad));
response.buffer.encode(resp);
lws_write(wsi, (unsigned char *) response.buffer.data() + LWS_PRE, response.buffer.size() - LWS_PRE, LWS_WRITE_TEXT);
}
json_decref(resp);
lws_write(wsi, (unsigned char *) response.buffer.data() + LWS_PRE, response.buffer.size() - LWS_PRE, LWS_WRITE_TEXT);
return 0;
}