1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

smaller bug fixes

This commit is contained in:
Steffen Vogel 2020-06-08 04:03:07 +02:00
parent 53bb93a967
commit daeb0820ee
6 changed files with 20 additions and 34 deletions

View file

@ -33,7 +33,7 @@
#include <villas/pool.h>
#include <villas/queue_signalled.h>
#include <villas/common.hpp>
#include <villas/buffer.h>
#include <villas/buffer.hpp>
#include <villas/io.h>
#include <villas/node/config.h>
@ -77,8 +77,8 @@ struct websocket_connection {
struct websocket_destination *destination;
struct {
struct buffer recv; /**< A buffer for reconstructing fragmented messags. */
struct buffer send; /**< A buffer for contsructing messages before calling lws_write() */
villas::Buffer *recv; /**< A buffer for reconstructing fragmented messags. */
villas::Buffer *send; /**< A buffer for contsructing messages before calling lws_write() */
} buffers;
char *_name;

View file

@ -26,7 +26,7 @@
#include <villas/sample.h>
#include <villas/node.h>
#include <villas/signal.h>
#include <villas/compat.h>
#include <villas/compat.hpp>
#include <villas/timing.h>
#include <villas/io.h>
#include <villas/formats/json.h>

View file

@ -22,10 +22,10 @@
#include <unordered_map>
#include <cstdlib>
#include <unistd.h>
#include <cstdlib>
#include <cerrno>
#include <strings.h>
#include <cstring>
#include <sys/time.h>
#include <sys/resource.h>

View file

@ -30,13 +30,13 @@
#include <villas/timing.h>
#include <villas/utils.hpp>
#include <villas/buffer.h>
#include <villas/plugin.h>
#include <villas/nodes/websocket.hpp>
#include <villas/format_type.h>
#include <villas/formats/msg_format.h>
#include <villas/super_node.hpp>
using namespace villas;
using namespace villas::utils;
#define DEFAULT_WEBSOCKET_BUFFER_SIZE (1 << 12)
@ -98,13 +98,8 @@ static int websocket_connection_init(struct websocket_connection *c)
if (ret)
return ret;
ret = buffer_init(&c->buffers.recv, DEFAULT_WEBSOCKET_BUFFER_SIZE);
if (ret)
return ret;
ret = buffer_init(&c->buffers.send, DEFAULT_WEBSOCKET_BUFFER_SIZE);
if (ret)
return ret;
c->buffers.recv = new Buffer(DEFAULT_WEBSOCKET_BUFFER_SIZE);
c->buffers.send = new Buffer(DEFAULT_WEBSOCKET_BUFFER_SIZE);
c->state = websocket_connection::State::INITIALIZED;
@ -134,13 +129,8 @@ static int websocket_connection_destroy(struct websocket_connection *c)
if (ret)
return ret;
ret = buffer_destroy(&c->buffers.recv);
if (ret)
return ret;
ret = buffer_destroy(&c->buffers.send);
if (ret)
return ret;
delete c->buffers.recv;
delete c->buffers.send;
c->wsi = nullptr;
c->_name = nullptr;
@ -286,9 +276,9 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
pulled = queue_pull_many(&c->queue, (void **) smps, cnt);
if (pulled > 0) {
size_t wbytes;
io_sprint(&c->io, c->buffers.send.buf + LWS_PRE, c->buffers.send.size - LWS_PRE, &wbytes, smps, pulled);
io_sprint(&c->io, c->buffers.send->buf + LWS_PRE, c->buffers.send->size - LWS_PRE, &wbytes, smps, pulled);
ret = lws_write(wsi, (unsigned char *) c->buffers.send.buf + LWS_PRE, wbytes, c->io.flags & (int) IOFlags::HAS_BINARY_PAYLOAD ? LWS_WRITE_BINARY : LWS_WRITE_TEXT);
ret = lws_write(wsi, (unsigned char *) c->buffers.send->buf + LWS_PRE, wbytes, c->io.flags & (int) IOFlags::HAS_BINARY_PAYLOAD ? LWS_WRITE_BINARY : LWS_WRITE_TEXT);
sample_decref_many(smps, pulled);
@ -311,9 +301,9 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
case LWS_CALLBACK_CLIENT_RECEIVE:
case LWS_CALLBACK_RECEIVE:
if (lws_is_first_fragment(wsi))
buffer_clear(&c->buffers.recv);
c->buffers.recv->clear();
ret = buffer_append(&c->buffers.recv, (char *) in, len);
ret = c->buffers.recv->append((char *) in, len);
if (ret) {
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_UNACCEPTABLE_OPCODE, "Failed to process data");
return -1;
@ -336,7 +326,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
if (avail < cnt)
warning("Pool underrun for connection: %s", websocket_connection_name(c));
recvd = io_sscan(&c->io, c->buffers.recv.buf, c->buffers.recv.len, nullptr, smps, avail);
recvd = io_sscan(&c->io, c->buffers.recv->buf, c->buffers.recv->len, nullptr, smps, avail);
if (recvd < 0) {
warning("Failed to parse sample data received on connection: %s", websocket_connection_name(c));
break;
@ -358,7 +348,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
if (enqueued < avail)
sample_decref_many(&smps[enqueued], avail - enqueued);
buffer_clear(&c->buffers.recv);
c->buffers.recv->clear();
if (c->state == websocket_connection::State::SHUTDOWN) {
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_GOINGAWAY, "Node stopped");

View file

@ -119,7 +119,7 @@ int path_init(struct vpath *p)
new (&p->logger) Logger;
new (&p->received) std::bitset<MAX_SAMPLE_LENGTH>;
new (&p->mask) std::bitset<MAX_SAMPLE_LENGTH>;
new (&p->rate) Task(CLOCK_MONOTONIC);
new (&p->timeout) Task(CLOCK_MONOTONIC);
p->logger = logging.get("path");
@ -168,7 +168,7 @@ int path_init(struct vpath *p)
static int path_prepare_poll(struct vpath *p)
{
int fds[16], ret, n = 0, m;
int fds[16], n = 0, m;
if (p->reader.pfds)
delete[] p->reader.pfds;
@ -200,7 +200,7 @@ static int path_prepare_poll(struct vpath *p)
/* We use the last slot for the timeout timer. */
if (p->rate > 0) {
p->rate.setRate(&p->timeout);
p->timeout.setRate(p->rate);
p->reader.nfds++;
p->reader.pfds = (struct pollfd *) realloc(p->reader.pfds, p->reader.nfds * sizeof(struct pollfd));

View file

@ -370,8 +370,6 @@ void SuperNode::prepare()
void SuperNode::start()
{
int ret;
assert(state == State::PREPARED);
#ifdef WITH_API
@ -452,8 +450,6 @@ void SuperNode::stopInterfaces()
void SuperNode::stop()
{
int ret;
stopNodes();
stopPaths();
stopNodeTypes();