diff --git a/include/villas/api/session.hpp b/include/villas/api/session.hpp index c9fc45396..05839acd0 100644 --- a/include/villas/api/session.hpp +++ b/include/villas/api/session.hpp @@ -63,7 +63,7 @@ protected: struct { JsonBuffer buffer; - struct queue queue; + Queue queue; } request, response; Api *api; diff --git a/lib/api/session.cpp b/lib/api/session.cpp index 12f97c150..95b8b7dd8 100644 --- a/lib/api/session.cpp +++ b/lib/api/session.cpp @@ -40,16 +40,11 @@ Session::Session(Api *a) : runs(0), api(a) { - queue_init(&request.queue, 128, &memory_heap); - queue_init(&response.queue, 128, &memory_heap); - logger->debug("Initiated API session: {}", getName()); } Session::~Session() { - queue_destroy(&request.queue); - queue_destroy(&response.queue); logger->debug("Destroyed API session: {}", getName()); } @@ -58,14 +53,14 @@ void Session::runPendingActions() { json_t *req, *resp; - while (queue_available(&request.queue) > 0) { - queue_pull(&request.queue, (void **) &req); + while (!request.queue.empty()) { + req = request.queue.pop(); runAction(req, &resp); json_decref(req); - queue_push(&response.queue, resp); + response.queue.push(resp); } } diff --git a/lib/api/sessions/http.cpp b/lib/api/sessions/http.cpp index ba8d6e874..c5cca2c4c 100644 --- a/lib/api/sessions/http.cpp +++ b/lib/api/sessions/http.cpp @@ -71,7 +71,6 @@ void Http::read(void *in, size_t len) int Http::complete() { - int pushed; json_t *req; req = request.buffer.decode(); @@ -79,26 +78,22 @@ int Http::complete() return 0; request.buffer.clear(); - - pushed = queue_push(&request.queue, req); - if (pushed != 1) - logger->warn("Queue overrun for API session: {}", getName()); + request.queue.push(req); return 1; } int Http::write() { - int ret, pulled; + int ret; json_t *resp; - pulled = queue_pull(&response.queue, (void **) &resp); - if (pulled) { - response.buffer.clear(); - response.buffer.encode(resp); + resp = response.queue.pop(); - json_decref(resp); - } + response.buffer.clear(); + response.buffer.encode(resp); + + json_decref(resp); std::stringstream headers; diff --git a/lib/api/sessions/socket.cpp b/lib/api/sessions/socket.cpp index e42c2ef2c..4e439f40f 100644 --- a/lib/api/sessions/socket.cpp +++ b/lib/api/sessions/socket.cpp @@ -58,9 +58,7 @@ int Socket::read() if (!j) return -1; - ret = queue_push(&request.queue, (json_t *) j); - if (ret != 1) - return -1; + request.queue.push(j); api->pending.push(this); @@ -72,7 +70,9 @@ int Socket::write() int ret; json_t *j; - while (queue_pull(&response.queue, (void **) &j)) { + while (!response.queue.empty()) { + j = response.queue.pop(); + ret = json_dumpfd(j, sd, 0); if (ret) return ret; diff --git a/lib/api/sessions/websocket.cpp b/lib/api/sessions/websocket.cpp index c49348e7b..5d312ec28 100644 --- a/lib/api/sessions/websocket.cpp +++ b/lib/api/sessions/websocket.cpp @@ -48,7 +48,6 @@ WebSocket::WebSocket(Api *a, lws *w) : int WebSocket::read(void *in, size_t len) { - int pushed; json_t *req; if (lws_is_first_fragment(wsi)) @@ -61,9 +60,7 @@ int WebSocket::read(void *in, size_t len) if (!req) return 0; - pushed = queue_push(&request.queue, req); - if (pushed != 1) - logger->warn("Queue overun in API session"); + request.queue.push(req); return 1; } @@ -73,24 +70,22 @@ int WebSocket::read(void *in, size_t len) int WebSocket::write() { - int pulled; json_t *resp; if (state == State::SHUTDOWN) return -1; - pulled = queue_pull(&response.queue, (void **) &resp); - if (pulled) { - char pad[LWS_PRE]; + resp = response.queue.pop(); - response.buffer.clear(); - response.buffer.append(pad, sizeof(pad)); - response.buffer.encode(resp); + char pad[LWS_PRE]; - json_decref(resp); + response.buffer.clear(); + response.buffer.append(pad, sizeof(pad)); + response.buffer.encode(resp); - lws_write(wsi, (unsigned char *) response.buffer.data() + LWS_PRE, response.buffer.size() - LWS_PRE, LWS_WRITE_TEXT); - } + json_decref(resp); + + lws_write(wsi, (unsigned char *) response.buffer.data() + LWS_PRE, response.buffer.size() - LWS_PRE, LWS_WRITE_TEXT); return 0; }