1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

reworked libwebsocket buffers in order to support fragmented messages

This commit is contained in:
Steffen Vogel 2017-08-27 17:04:25 +02:00
parent 61de3b3274
commit 1946d137fa
11 changed files with 345 additions and 346 deletions

View file

@ -25,9 +25,11 @@
#include <libwebsockets.h>
#include <jansson.h>
#include <pthread.h>
#include "list.h"
#include "common.h"
#include "queue_signalled.h"
#include "api/session.h"
@ -48,10 +50,13 @@ struct api_action;
typedef int (*api_cb_t)(struct api_action *c, json_t *args, json_t **resp, struct api_session *s);
struct api {
struct list sessions; /**< List of currently active connections */
enum state state;
struct list sessions; /**< List of currently active connections */
struct queue_signalled pending; /**< A queue of api_sessions which have pending requests. */
pthread_t thread;
struct super_node *super_node;
};

View file

@ -27,7 +27,8 @@
#include <jansson.h>
#include "common.h"
#include "web/buffer.h"
#include "queue.h"
#include "buffer.h"
enum api_version {
API_VERSION_UNKOWN = 0,
@ -41,28 +42,23 @@ enum api_mode {
/** A connection via HTTP REST or WebSockets to issue API actions. */
struct api_session {
enum api_mode mode;
enum state state;
enum api_version version;
int runs;
struct {
struct web_buffer body; /**< HTTP body / WS payload */
} request;
struct {
struct web_buffer body; /**< HTTP body / WS payload */
struct web_buffer headers; /**< HTTP headers */
} response;
bool completed; /**< Did we receive the complete body yet? */
enum state state;
struct {
struct buffer buffer;
struct queue queue;
} request, response;
struct lws *wsi;
struct api *api;
};
int api_session_init(struct api_session *s, struct api *a, enum api_mode m);
int api_session_init(struct api_session *s, enum api_mode m);
int api_session_destroy(struct api_session *s);

50
include/villas/buffer.h Normal file
View file

@ -0,0 +1,50 @@
/** A simple growing buffer.
*
* @file
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#pragma once
#include <stdlib.h>
#include <jansson.h>
#include "common.h"
struct buffer {
enum state state;
char *buf;
size_t len;
size_t size;
};
int buffer_init(struct buffer *b, size_t size);
int buffer_destroy(struct buffer *b);
void buffer_clear(struct buffer *b);
int buffer_append(struct buffer *b, const char *data, size_t len);
int buffer_parse_json(struct buffer *b, json_t **j);
int buffer_append_json(struct buffer *b, json_t *j);

View file

@ -1,72 +0,0 @@
/** WebSocket buffer.
*
* @file
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#pragma once
#include <stddef.h>
#include <libwebsockets.h>
#include "common.h"
struct web_buffer {
char *buffer; /**< A pointer to the buffer. Usually resized via realloc() */
size_t size; /**< The allocated size of the buffer. */
size_t len; /**< The used length of the buffer. */
size_t prefix; /**< The used length of the buffer. */
enum lws_write_protocol protocol;
enum state state;
};
/** Initialize a libwebsockets buffer. */
int web_buffer_init(struct web_buffer *b, enum lws_write_protocol prot);
/** Destroy a libwebsockets buffer. */
int web_buffer_destroy(struct web_buffer *b);
/** Flush the buffers contents to lws_write() */
int web_buffer_write(struct web_buffer *b, struct lws *w);
/** Copy \p len bytes from the beginning of the buffer and copy them to \p out.
*
* @param out The destination buffer. If NULL, we just remove \p len bytes from the buffer.
*/
int web_buffer_read(struct web_buffer *b, char *out, size_t len);
/** Parse JSON from the beginning of the buffer.
*
* @retval -1 The buffer is empty.
* @retval -2 The buffer contains malformed JSON.
*/
int web_buffer_read_json(struct web_buffer *b, json_t **req);
/** Append \p len bytes of \p in at the end of the buffer.
*
* The buffer is automatically resized.
*/
int web_buffer_append(struct web_buffer *b, const char *in, size_t len);
/** Append the serialized represetnation of the JSON object \p res at the end of the buffer. */
int web_buffer_append_json(struct web_buffer *b, json_t *res);

View file

@ -33,7 +33,7 @@ LIB_SRCS += $(addprefix lib/kernel/, kernel.c rt.c) \
utils.c super_node.c hist.c timing.c pool.c list.c queue.c \
queue_signalled.c memory.c advio.c plugin.c node_type.c stats.c \
mapping.c io.c shmem.c config_helper.c crypt.c compat.c \
log_table.c log_helper.c io_format.c task.c \
log_table.c log_helper.c io_format.c task.c buffer.c \
)
LIB_LDFLAGS = -shared
@ -46,7 +46,9 @@ endif
LIB_PKGS += openssl libcurl
ifeq ($(WITH_WEB),1)
-include lib/web/Makefile.inc
LIB_SRCS += lib/web.c
LIB_PKGS += libwebsockets
LIB_CFLAGS += -DWITH_WEB
endif
ifeq ($(WITH_API),1)

182
lib/api.c
View file

@ -30,9 +30,13 @@
#include "assert.h"
#include "compat.h"
/* Forward declarations */
static void * worker(void *ctx);
int api_ws_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
{
int ret;
int ret, pulled, pushed;
json_t *req, *resp;
struct web *w = lws_context_user(lws_get_context(wsi));
struct api_session *s = (struct api_session *) user;
@ -46,15 +50,18 @@ int api_ws_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void *
/* Parse request URI */
char uri[64];
lws_hdr_copy(wsi, uri, sizeof(uri), WSI_TOKEN_GET_URI); /* The path component of the*/
lws_hdr_copy(wsi, uri, sizeof(uri), WSI_TOKEN_GET_URI);
ret = sscanf(uri, "/v%d", (int *) &s->version);
if (ret != 1)
return -1;
ret = api_session_init(s, w->api, API_MODE_WS);
ret = api_session_init(s, API_MODE_WS);
if (ret)
return -1;
s->wsi = wsi;
s->api = w->api;
debug(LOG_API, "New API session initiated: version=%d, mode=websocket", s->version);
break;
@ -68,24 +75,40 @@ int api_ws_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void *
break;
case LWS_CALLBACK_SERVER_WRITEABLE:
web_buffer_write(&s->response.body, wsi);
case LWS_CALLBACK_RECEIVE:
if (lws_is_first_fragment(wsi))
buffer_clear(&s->request.buffer);
buffer_append(&s->request.buffer, in, len);
if (lws_is_final_fragment(wsi)) {
ret = buffer_parse_json(&s->request.buffer, &req);
if (ret)
break;
pushed = queue_push(&s->request.queue, req);
if (pushed != 1)
warn("Queue overun in Api session");
if (s->completed && s->response.body.len == 0)
return -1;
pushed = queue_signalled_push(&w->api->pending, s);
if (pushed != 1)
warn("Queue overrun in Api");
}
break;
case LWS_CALLBACK_SERVER_WRITEABLE:
pulled = queue_pull(&s->response.queue, (void **) &resp);
if (pulled < 1)
break;
char pad[LWS_PRE];
buffer_clear(&s->response.buffer);
buffer_append(&s->response.buffer, pad, sizeof(pad));
buffer_append_json(&s->response.buffer, resp);
case LWS_CALLBACK_RECEIVE:
web_buffer_append(&s->request.body, in, len);
json_t *req, *resp;
while (web_buffer_read_json(&s->request.body, &req) >= 0) {
api_session_run_command(s, req, &resp);
web_buffer_append_json(&s->response.body, resp);
lws_callback_on_writable(wsi);
}
lws_write(wsi, (unsigned char *) s->response.buffer.buf + LWS_PRE, s->response.buffer.len - LWS_PRE, LWS_WRITE_TEXT);
break;
default:
@ -97,7 +120,8 @@ int api_ws_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void *
int api_http_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
{
int ret;
int ret, pulled, pushed;
json_t *resp, *req;
struct web *w = lws_context_user(lws_get_context(wsi));
struct api_session *s = (struct api_session *) user;
@ -114,29 +138,15 @@ int api_http_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void
if (ret != 1)
return -1;
ret = api_session_init(s, w->api, API_MODE_HTTP);
ret = api_session_init(s, API_MODE_HTTP);
if (ret)
return -1;
s->wsi = wsi;
s->api = w->api;
debug(LOG_API, "New API session initiated: version=%d, mode=http", s->version);
/* Prepare HTTP response header */
const char headers[] = "HTTP/1.1 200 OK\r\n"
"Content-type: application/json\r\n"
"User-agent: " USER_AGENT "\r\n"
"Access-Control-Allow-Origin: *\r\n"
"Access-Control-Allow-Methods: GET, POST, OPTIONS\r\n"
"Access-Control-Allow-Headers: Content-Type\r\n"
"Access-Control-Max-Age: 86400\r\n"
"\r\n";
web_buffer_append(&s->response.headers, headers, sizeof(headers)-1);
lws_callback_on_writable(wsi);
/* Only HTTP POST requests wait for body */
if (lws_hdr_total_length(wsi, WSI_TOKEN_POST_URI) == 0)
s->completed = true;
break;
case LWS_CALLBACK_CLOSED_HTTP:
@ -149,27 +159,48 @@ int api_http_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void
break;
case LWS_CALLBACK_HTTP_BODY:
web_buffer_append(&s->request.body, in, len);
buffer_append(&s->request.buffer, in, len);
json_t *req, *resp;
while (web_buffer_read_json(&s->request.body, &req) == 1) {
api_session_run_command(s, req, &resp);
web_buffer_append_json(&s->response.body, resp);
lws_callback_on_writable(wsi);
}
break;
case LWS_CALLBACK_HTTP_BODY_COMPLETION:
s->completed = true;
ret = buffer_parse_json(&s->request.buffer, &req);
if (ret)
break;
buffer_clear(&s->request.buffer);
pushed = queue_push(&s->request.queue, req);
if (pushed != 1)
warn("Queue overrun for Api session");
pushed = queue_signalled_push(&w->api->pending, s);
if (pushed != 1)
warn("Queue overrun for Api");
break;
case LWS_CALLBACK_HTTP_WRITEABLE:
web_buffer_write(&s->response.headers, wsi);
web_buffer_write(&s->response.body, wsi);
if (s->completed && s->response.body.len == 0)
return -1;
pulled = queue_pull(&s->response.queue, (void **) &resp);
if (pulled) {
const char headers[] = "HTTP/1.1 200 OK\r\n"
"Content-type: application/json\r\n"
"User-agent: " USER_AGENT "\r\n"
"Access-Control-Allow-Origin: *\r\n"
"Access-Control-Allow-Methods: GET, POST, OPTIONS\r\n"
"Access-Control-Allow-Headers: Content-Type\r\n"
"Access-Control-Max-Age: 86400\r\n"
"\r\n";
buffer_clear(&s->response.buffer);
buffer_append_json(&s->response.buffer, resp);
lws_write(wsi, (unsigned char *) headers, strlen(headers), LWS_WRITE_HTTP_HEADERS);
lws_write(wsi, (unsigned char *) s->response.buffer.buf, s->response.buffer.len, LWS_WRITE_HTTP);
return -1; /* Close connection */
}
break;
default:
@ -202,7 +233,13 @@ int api_destroy(struct api *a)
int api_start(struct api *a)
{
int ret;
info("Starting API sub-system");
ret = pthread_create(&a->thread, NULL, worker, a);
if (ret)
error("Failed to start Api worker thread");
a->state = STATE_STARTED;
@ -211,11 +248,52 @@ int api_start(struct api *a)
int api_stop(struct api *a)
{
info("Stopping API sub-system");
int ret;
list_destroy(&a->sessions, (dtor_cb_t) api_session_destroy, false);
info("Stopping API sub-system");
if (a->state != STATE_STARTED)
return 0;
ret = list_destroy(&a->sessions, (dtor_cb_t) api_session_destroy, false);
if (ret)
return ret;
ret = pthread_cancel(a->thread);
if (ret)
serror("Failed to cancel Api worker thread");
ret = pthread_join(a->thread, NULL);
if (ret)
serror("Failed to join Api worker thread");
a->state = STATE_STOPPED;
return 0;
}
static void * worker(void *ctx)
{
int pulled;
struct api *a = ctx;
struct api_session *s;
json_t *req, *resp;
for (;;) {
pulled = queue_signalled_pull(&a->pending, (void **) &s);
if (pulled != 1)
continue;
queue_pull(&s->request.queue, (void **) &req);
api_session_run_command(s, req, &resp);
queue_push(&s->response.queue, resp);
lws_callback_on_writable(s->wsi);
}
return NULL;
}

View file

@ -24,33 +24,55 @@
#include "web.h"
#include "plugin.h"
#include "memory.h"
int api_session_init(struct api_session *s, struct api *a, enum api_mode m)
int api_session_init(struct api_session *s, enum api_mode m)
{
s->mode = m;
s->api = a;
int ret;
s->completed = false;
ret = buffer_init(&s->request.buffer, 0);
if (ret)
return ret;
ret = buffer_init(&s->response.buffer, 0);
if (ret)
return ret;
ret = queue_init(&s->request.queue, 128, &memtype_heap);
if (ret)
return ret;
web_buffer_init(&s->request.body, s->mode == API_MODE_HTTP ? LWS_WRITE_HTTP : LWS_WRITE_TEXT);
web_buffer_init(&s->response.body, s->mode == API_MODE_HTTP ? LWS_WRITE_HTTP : LWS_WRITE_TEXT);
if (s->mode == API_MODE_HTTP)
web_buffer_init(&s->response.headers, LWS_WRITE_HTTP_HEADERS);
ret = queue_init(&s->response.queue, 128, &memtype_heap);
if (ret)
return ret;
return 0;
}
int api_session_destroy(struct api_session *s)
{
int ret;
if (s->state == STATE_DESTROYED)
return 0;
web_buffer_destroy(&s->request.body);
web_buffer_destroy(&s->response.body);
ret = buffer_destroy(&s->request.buffer);
if (ret)
return ret;
if (s->mode == API_MODE_HTTP)
web_buffer_destroy(&s->response.headers);
ret = buffer_destroy(&s->response.buffer);
if (ret)
return ret;
ret = queue_destroy(&s->request.queue);
if (ret)
return ret;
ret = queue_destroy(&s->response.queue);
if (ret)
return ret;
s->state = STATE_DESTROYED;

99
lib/buffer.c Normal file
View file

@ -0,0 +1,99 @@
/** A simple growing buffer.
*
* @file
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#include <string.h>
#include "buffer.h"
#include "common.h"
int buffer_init(struct buffer *b, size_t size)
{
b->len = 0;
b->size = size;
b->buf = malloc(size);
if (!b->buf)
return -1;
b->state = STATE_INITIALIZED;
return 0;
}
int buffer_destroy(struct buffer *b)
{
if (b->buf)
free(b->buf);
b->state = STATE_DESTROYED;
return 0;
}
void buffer_clear(struct buffer *b)
{
b->len = 0;
}
int buffer_append(struct buffer *b, const char *data, size_t len)
{
if (b->len + len > b->size) {
b->size = b->len + len;
b->buf = realloc(b->buf, b->size);
if (!b->buf)
return -1;
}
memcpy(b->buf + b->len, data, len);
b->len += len;
return 0;
}
int buffer_parse_json(struct buffer *b, json_t **j)
{
*j = json_loadb(b->buf, b->len, 0, NULL);
if (!*j)
return -1;
return 0;
}
int buffer_append_json(struct buffer *b, json_t *j)
{
size_t len;
retry: len = json_dumpb(j, b->buf + b->len, b->size - b->len, 0);
if (b->size < b->len + len) {
b->buf = realloc(b->buf, b->len + len);
if (!b->buf)
return -1;
b->size = b->len + len;
goto retry;
}
b->len += len;
return 0;
}

View file

@ -246,7 +246,7 @@ int web_start(struct web *w)
ret = pthread_create(&w->thread, NULL, worker, w);
if (ret)
error("Failed to start Web worker");
error("Failed to start Web worker thread");
w->state = STATE_STARTED;
@ -255,6 +255,8 @@ int web_start(struct web *w)
int web_stop(struct web *w)
{
int ret;
if (w->state != STATE_STARTED)
return 0;
@ -265,11 +267,16 @@ int web_stop(struct web *w)
/** @todo Wait for all connections to be closed */
pthread_cancel(w->thread);
pthread_join(w->thread, NULL);
ret = pthread_cancel(w->thread);
if (ret)
serror("Failed to cancel Web worker thread");
w->state = STATE_STOPPED;
ret = pthread_join(w->thread, NULL);
if (ret)
serror("Failed to join Web worker thread");
}
w->state = STATE_STOPPED;
return 0;
}

View file

@ -1,27 +0,0 @@
# Makefile.
#
# @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
# @copyright 2017, Institute for Automation of Complex Power Systems, EONERC
# @license GNU General Public License (version 3)
#
# VILLASnode
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
###################################################################################
LIB_SRCS += lib/web.c
LIB_SRCS += $(wildcard lib/web/*.c)
LIB_PKGS += libwebsockets
LIB_CFLAGS += -DWITH_WEB

View file

@ -1,161 +0,0 @@
/** API buffer for sending and receiving data from libwebsockets.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#include <jansson.h>
#include <string.h>
#include "compat.h"
#include "assert.h"
#include "utils.h"
#include "web/buffer.h"
int web_buffer_init(struct web_buffer *b, enum lws_write_protocol prot)
{
assert(b->state == STATE_DESTROYED);
b->protocol = prot;
b->size = 0;
b->len = 0;
b->buffer = NULL;
b->prefix = b->protocol == LWS_WRITE_TEXT || b->protocol == LWS_WRITE_BINARY ? LWS_PRE : 0;
b->state = STATE_INITIALIZED;
return 0;
}
int web_buffer_destroy(struct web_buffer *b)
{
if (b->state == STATE_DESTROYED)
return 0;
if (b->buffer)
free(b->buffer);
b->state = STATE_DESTROYED;
return 0;
}
int web_buffer_write(struct web_buffer *b, struct lws *w)
{
int ret, len, sent = 0;
unsigned char *chunk;
assert(b->state == STATE_INITIALIZED);
if (b->len <= 0)
return 0;
do {
chunk = (unsigned char *) b->buffer + b->prefix + sent;
len = strlen(b->buffer + b->prefix);
ret = lws_write(w, chunk, len, b->protocol);
if (ret < 0)
break;
sent += ret + 1;
} while (sent < b->len);
web_buffer_read(b, NULL, sent); /* drop sent bytes from the buffer*/
return sent;
}
int web_buffer_read(struct web_buffer *b, char *out, size_t len)
{
assert(b->state == STATE_INITIALIZED);
if (len > b->len)
len = b->len;
if (out)
memcpy(out, b->buffer + b->prefix, len);
memmove(b->buffer + b->prefix, b->buffer + b->prefix + len, b->len - len);
b->len -= len;
return 0;
}
int web_buffer_read_json(struct web_buffer *b, json_t **req)
{
json_error_t e;
assert(b->state == STATE_INITIALIZED);
if (b->len <= 0)
return -1;
*req = json_loadb(b->buffer + b->prefix, b->len, JSON_DISABLE_EOF_CHECK, &e);
if (!*req)
return -2;
web_buffer_read(b, NULL, e.position);
return 1;
}
int web_buffer_append(struct web_buffer *b, const char *in, size_t len)
{
assert(b->state == STATE_INITIALIZED);
/* We append a \0 to split messages */
len++;
if (b->size < b->len + len) {
b->buffer = realloc(b->buffer, b->prefix + b->len + len);
if (!b->buffer)
return -1;
b->size = b->len + len;
}
memcpy(b->buffer + b->prefix + b->len, in, len);
b->buffer[b->prefix + b->len + len - 1] = 0;
b->len += len;
return 0;
}
int web_buffer_append_json(struct web_buffer *b, json_t *res)
{
size_t len;
assert(b->state == STATE_INITIALIZED);
retry: len = json_dumpb(res, b->buffer + b->prefix + b->len, b->size - b->len, 0) + 1;
if (b->size < b->len + len) {
b->buffer = realloc(b->buffer, b->prefix + b->len + len);
if (!b->buffer)
return -1;
b->size = b->len + len;
goto retry;
}
b->buffer[b->prefix + b->len + len - 1] = 0;
b->len += len;
return 0;
}