1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

api: rework request/response API and add new endpoint for fetching a graph repr of the currently loaded config

This commit is contained in:
Steffen Vogel 2020-10-20 22:17:55 +02:00
parent 19fc8bd8fe
commit 62bed4953d
27 changed files with 363 additions and 196 deletions

2
common

@ -1 +1 @@
Subproject commit db106f50e053dc48c739c65e352d8885415e460b
Subproject commit 5e7df923770f3ed45c8618275397b793a0e1d9b1

View file

@ -27,6 +27,7 @@
#include <jansson.h>
#include <villas/log.hpp>
#include <villas/buffer.hpp>
#include <villas/plugin.hpp>
#include <villas/super_node.hpp>
#include <villas/api/session.hpp>
@ -44,6 +45,7 @@ class RequestFactory;
class Request {
friend Session;
friend RequestFactory;
friend Response;
@ -51,10 +53,12 @@ protected:
Session *session;
Logger logger;
Buffer buffer;
public:
std::smatch matches;
Session::Method method;
unsigned long contentLength;
json_t *body;
RequestFactory *factory;
@ -75,13 +79,37 @@ public:
virtual Response * execute() = 0;
virtual void prepare()
{ }
virtual void decode();
void setBody(json_t *j)
std::string
getMatch(int idx)
{
body = j;
return matches[idx].str();
}
std::string
getQueryArg(const std::string &arg)
{
char buf[1024];
const char *val;
val = lws_get_urlarg_by_name(session->wsi, (arg + "=").c_str(), buf, sizeof(buf));
return val ? std::string(val) : std::string();
}
std::string
getHeader(enum lws_token_indexes hdr)
{
char buf[1024];
lws_hdr_copy(session->wsi, buf, sizeof(buf), hdr);
return std::string(buf);
}
virtual std::string
toString();
};
class RequestFactory : public plugin::Plugin {
@ -96,7 +124,7 @@ public:
make(Session *s) = 0;
static Request *
create(Session *s);
create(Session *s, const std::string &uri, Session::Method meth, unsigned long ct);
};
template<typename T, const char *name, const char *re, const char *desc>

View file

@ -23,9 +23,12 @@
#pragma once
#include <map>
#include <jansson.h>
#include <villas/log.hpp>
#include <villas/buffer.hpp>
#include <villas/exceptions.hpp>
#include <villas/plugin.hpp>
#include <villas/api.hpp>
@ -40,56 +43,71 @@ class Request;
class Response {
public:
friend Session;
Response(Session *s, int c = HTTP_STATUS_OK, const std::string &ct = "text/html; charset=UTF-8", const Buffer &b = Buffer());
virtual
~Response()
{ }
virtual void
encodeBody()
{ }
int
writeBody(struct lws *wsi);
int
writeHeaders(struct lws *wsi);
void
setHeader(const std::string &key, const std::string &value)
{
headers[key] = value;
}
protected:
Session *session;
Logger logger;
Buffer buffer;
public:
json_t *response;
int code;
Response(Session *s, json_t *resp = nullptr);
virtual ~Response();
int
getCode() const
{
return code;
}
/** Return JSON representation of response as used by API sockets. */
virtual json_t *
toJson()
{
return response;
}
std::string contentType;
std::map<std::string, std::string> headers;
};
class ErrorResponse : public Response {
class JsonResponse : public Response {
protected:
std::string error;
json_t *json;
json_t *response;
public:
JsonResponse(Session *s, int c, json_t *r) :
Response(s, c, "application/json"),
response(r)
{ }
virtual ~JsonResponse();
virtual void
encodeBody();
};
class ErrorResponse : public JsonResponse {
public:
ErrorResponse(Session *s, const RuntimeError &e) :
Response(s),
error(e.what()),
json(nullptr)
{
code = 500;
}
JsonResponse(s, HTTP_STATUS_INTERNAL_SERVER_ERROR, json_pack("{ s: s }", "error", e.what()))
{ }
ErrorResponse(Session *s, const Error &e) :
Response(s),
error(e.what()),
json(e.json)
JsonResponse(s, e.code, json_pack("{ s: s }", "error", e.what()))
{
code = e.code;
if (e.json)
json_object_update(response, e.json);
}
virtual json_t * toJson();
};
} /* namespace api */

View file

@ -23,12 +23,10 @@
#pragma once
#include <atomic>
#include <jansson.h>
#include <villas/queue.h>
#include <villas/json_buffer.hpp>
#include <villas/buffer.hpp>
#include <villas/api.hpp>
namespace villas {
@ -50,7 +48,7 @@ class StatusRequest;
class Session {
public:
friend StatusRequest; /**< Requires access to wsi */
friend Request; /**< Requires access to wsi */
enum State {
ESTABLISHED,
@ -77,10 +75,6 @@ protected:
enum State state;
enum Version version;
std::string uri;
Method method;
unsigned long contentLength;
lws *wsi;
Web *web;
@ -88,11 +82,8 @@ protected:
Logger logger;
JsonBuffer requestBuffer;
JsonBuffer responseBuffer;
std::atomic<Request *> request;
std::atomic<Response *> response;
std::unique_ptr<Request> request;
std::unique_ptr<Response> response;
bool headersSent;
@ -118,7 +109,6 @@ public:
protocolCallback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len);
Method getRequestMethod() const;
std::string getRequestURI() const;
static std::string
methodToString(Method meth);

View file

@ -71,14 +71,14 @@ struct websocket_connection {
struct lws *wsi;
struct vnode *node;
struct io io;
struct queue queue; /**< For samples which are sent to the WebSocket */
struct queue queue; /**< For samples which are sent to the Websocket */
struct format_type *format;
struct websocket_destination *destination;
struct {
villas::Buffer *recv; /**< A buffer for reconstructing fragmented messags. */
villas::Buffer *send; /**< A buffer for contsructing messages before calling lws_write() */
villas::Buffer *recv; /**< A buffer for reconstructing fragmented messages. */
villas::Buffer *send; /**< A buffer for constructing messages before calling lws_write() */
} buffers;
char *_name;

View file

@ -38,10 +38,16 @@ set(API_SRC
requests/node_stats_reset.cpp
requests/node_file.cpp
requests/paths.cpp
requests/path_info.cpp
requests/path_info.cpp
requests/path_action.cpp
)
if(WITH_GRAPHVIZ)
list(APPEND API_SRC requests/graph.cpp)
list(APPEND INCLUDE_DIRS ${CGRAPH_INCLUDE_DIRS} ${GVC_INCLUDE_DIRS})
list(APPEND LIBRARIES PkgConfig::CGRAPH PkgConfig::GVC)
endif()
add_library(api STATIC ${API_SRC})
target_include_directories(api PUBLIC ${INCLUDE_DIRS})
target_link_libraries(api PUBLIC ${LIBRARIES})

View file

@ -28,10 +28,22 @@
using namespace villas;
using namespace villas::node::api;
Request * RequestFactory::create(Session *s)
void Request::decode()
{
auto uri = s->getRequestURI();
auto meth = s->getRequestMethod();
body = buffer.decode();
if (!body)
throw BadRequest("Failed to decode request payload");
}
std::string
Request::toString()
{
return fmt::format("endpoint={}, method={}", factory->getName(), Session::methodToString(method));
}
Request * RequestFactory::create(Session *s, const std::string &uri, Session::Method meth, unsigned long ct)
{
info("api: Trying to find request handler for: uri=%s", uri.c_str());
for (auto *rf : plugin::Registry::lookup<RequestFactory>()) {
std::smatch mr;
@ -43,6 +55,7 @@ Request * RequestFactory::create(Session *s)
p->matches = mr;
p->factory = rf;
p->method = meth;
p->contentLength = ct;
return p;
}

View file

@ -98,7 +98,7 @@ public:
"apis", json_apis,
"formats", json_formats);
return new Response(session, json_capabilities);
return new JsonResponse(session, HTTP_STATUS_OK, json_capabilities);
}
};

View file

@ -48,7 +48,7 @@ public:
? json_incref(cfg)
: json_object();
return new Response(session, json_config);
return new JsonResponse(session, HTTP_STATUS_OK, json_config);
}
};

136
lib/api/requests/graph.cpp Normal file
View file

@ -0,0 +1,136 @@
/** The "stats" API request.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2014-2020, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#ifdef WITH_GRAPHVIZ
extern "C" {
#include <graphviz/gvc.h>
}
#endif
#include <villas/timing.h>
#include <villas/api/request.hpp>
#include <villas/api/response.hpp>
namespace villas {
namespace node {
namespace api {
class GraphRequest : public Request {
protected:
GVC_t *gvc;
std::string layout;
std::string format;
public:
GraphRequest(Session *s) :
Request(s),
gvc(gvContext()),
layout(getQueryArg("layout"))
{
if (layout.empty())
layout = "neato";
}
~GraphRequest()
{
gvFreeContext(gvc);
}
virtual Response * execute()
{
if (method != Session::Method::GET)
throw InvalidMethod(this);
if (body != nullptr)
throw BadRequest("Status endpoint does not accept any body data");
auto *sn = session->getSuperNode();
auto *graph = sn->getGraph();
char *data;
unsigned len;
std::list<std::string> supportedLayouts = { "circo", "dot", "fdp", "neato", "nop", "nop1", "nop2", "osage", "patchwork", "sfdp", "twopi" };
std::list<std::string> supportedFormats = { "ps", "eps", "txt", "svg", "svgz", "gif", "png", "jpg", "jpeg", "bmp", "dot", "fig", "json", "pdf" };
format = matches[1].str();
auto lit = std::find(supportedLayouts.begin(), supportedLayouts.end(), layout);
if (lit == supportedLayouts.end())
throw BadRequest("Unsupported layout: {}", layout);
auto fit = std::find(supportedFormats.begin(), supportedFormats.end(), format);
if (fit == supportedFormats.end())
throw BadRequest("Unsupported format: {}", format);
std::string ct = "text/plain";
if (format == "svg")
ct = "image/svg+xml";
else if (format == "eps" || format == "ps")
ct = "application/postscript";
else if (format == "txt")
ct = "text/plain";
else if (format == "gif")
ct = "image/gif";
else if (format == "png")
ct = "image/png";
else if (format == "jpg" || format == "jpeg")
ct = "image/jpeg";
else if (format == "bmp")
ct = "image/bmp";
else if (format == "dot")
ct = "text/vnd.graphviz";
else if (format == "json")
ct = "application/json";
else if (format == "pdf")
ct = "application/pdf";
gvLayout(gvc, graph, layout.c_str());
gvRenderData(gvc, graph, format.c_str(), &data, &len);
auto buf = Buffer(data, len);
auto *resp = new Response(session, HTTP_STATUS_OK, ct, buf);
if (format == "svgz")
resp->setHeader("Content-Encoding", "gzip");
#if 0
gvFreeRenderData(data);
#endif
gvFreeLayout(gvc, graph);
return resp;
}
};
/* Register API request */
static char n[] = "graph";
static char r[] = "/graph\\.([a-z]+)";
static char d[] = "get graph representation of configuration";
static RequestPlugin<GraphRequest, n, r, d> p;
} /* namespace api */
} /* namespace node */
} /* namespace villas */

View file

@ -51,7 +51,7 @@ public:
A(node);
return new Response(session);
return new Response(session, HTTP_STATUS_OK);
}
};

View file

@ -57,7 +57,7 @@ public:
if (matches[2].str() == "rewind")
io_rewind(&f->io);
return new Response(session);
return new Response(session, HTTP_STATUS_OK);
}
};

View file

@ -48,7 +48,7 @@ public:
if (body != nullptr)
throw BadRequest("Nodes endpoint does not accept any body data");
return new Response(session, node_to_json(node));
return new JsonResponse(session, HTTP_STATUS_OK, node_to_json(node));
}
};

View file

@ -52,7 +52,7 @@ public:
if (node->stats == nullptr)
throw BadRequest("The statistics collection for this node is not enabled");
return new Response(session, node->stats->toJson());
return new JsonResponse(session, HTTP_STATUS_OK, node->stats->toJson());
}
};

View file

@ -54,7 +54,7 @@ public:
node->stats->reset();
return new Response(session);
return new Response(session, HTTP_STATUS_OK);
}
};

View file

@ -58,7 +58,7 @@ public:
json_array_append_new(json_nodes, node_to_json(n));
}
return new Response(session, json_nodes);
return new JsonResponse(session, HTTP_STATUS_OK, json_nodes);
}
};

View file

@ -51,7 +51,7 @@ public:
A(path);
return new Response(session);
return new Response(session, HTTP_STATUS_OK);
}
};

View file

@ -48,7 +48,7 @@ public:
if (body != nullptr)
throw BadRequest("Endpoint does not accept any body data");
return new Response(session, path_to_json(path));
return new JsonResponse(session, HTTP_STATUS_OK, path_to_json(path));
}
};

View file

@ -59,7 +59,7 @@ public:
json_array_append_new(json_paths, path_to_json(p));
}
return new Response(session, json_paths);
return new JsonResponse(session, HTTP_STATUS_OK, json_paths);
}
};

View file

@ -80,15 +80,17 @@ public:
if (json_is_string(json_config))
configUri = json_string_value(json_config);
else if (json_is_object(json_config)) {
configUri = tmpnam(nullptr);
char configUriBuf[] = "villas-node.json.XXXXXX";
int configFd = mkstemp(configUriBuf);
FILE *configFile = fopen(configUri.c_str(), "w");
FILE *configFile = fdopen(configFd, "w+");
ret = json_dumpf(json_config, configFile, JSON_INDENT(4));
if (ret < 0)
throw Error(HTTP_STATUS_INTERNAL_SERVER_ERROR, "Failed to create temporary config file");
fclose(configFile);
configUri = configUriBuf;
}
else if (json_config != nullptr)
throw BadRequest("Parameter 'config' must be either a URL (string) or a configuration (object)");
@ -122,7 +124,7 @@ public:
/* Properly terminate current instance */
utils::killme(SIGTERM);
return new Response(session, json_response);
return new JsonResponse(session, HTTP_STATUS_OK, json_response);
}
};

View file

@ -45,7 +45,7 @@ public:
utils::killme(SIGTERM);
return new Response(session);
return new Response(session, HTTP_STATUS_OK);
}
};

View file

@ -146,7 +146,7 @@ public:
json_object_set(json_status, "lws", getLwsStatus());
#endif /* LWS_WITH_SERVER_STATUS */
return new Response(session, json_status);
return new JsonResponse(session, HTTP_STATUS_OK, json_status);
}
};

View file

@ -26,29 +26,76 @@
using namespace villas::node::api;
Response::Response(Session *s, json_t *resp) :
Response::Response(Session *s, int c, const std::string &ct, const Buffer &b) :
session(s),
logger(logging.get("api:response")),
response(resp),
code(HTTP_STATUS_OK)
buffer(b),
code(c),
contentType(ct),
headers{
{ "Server:", HTTP_USER_AGENT },
{ "Access-Control-Allow-Origin:", "*" },
{ "Access-Control-Allow-Methods:", "GET, POST, OPTIONS" },
{ "Access-Control-Allow-Headers:", "Content-Type" },
{ "Access-Control-Max-Age:", "86400" }
}
{ }
Response::~Response()
int
Response::writeBody(struct lws *wsi)
{
int ret;
ret = lws_write(wsi, (unsigned char *) buffer.data(), buffer.size(), LWS_WRITE_HTTP_FINAL);
if (ret < 0)
return -1;
return 1;
}
int
Response::writeHeaders(struct lws *wsi)
{
int ret;
uint8_t headerBuffer[2048], *p = headerBuffer, *end = &headerBuffer[sizeof(headerBuffer) - 1];
// We need to encode the buffer here for getting the real content length of the response
encodeBody();
ret = lws_add_http_common_headers(wsi, code, contentType.c_str(),
buffer.size(),
&p, end);
if (ret)
return 1;
for (auto &hdr : headers) {
ret = lws_add_http_header_by_name (wsi,
reinterpret_cast<const unsigned char *>(hdr.first.c_str()),
reinterpret_cast<const unsigned char *>(hdr.second.c_str()),
hdr.second.size(), &p, end);
if (ret)
return -1;
}
ret = lws_finalize_write_http_header(wsi, headerBuffer, &p, end);
if (ret)
return 1;
/* Do we have a body to send? */
if (buffer.size() > 0)
lws_callback_on_writable(wsi);
return 0;
}
JsonResponse::~JsonResponse()
{
if (response)
json_decref(response);
}
json_t * ErrorResponse::toJson()
void
JsonResponse::encodeBody()
{
json_t *ret;
ret = json_pack("{ s: s }",
"error", error.c_str()
);
if (json)
json_object_update(ret, json);
return ret;
buffer.encode(response, JSON_INDENT(4));
}

View file

@ -37,9 +37,7 @@ using namespace villas::node;
using namespace villas::node::api;
Session::Session(lws *w) :
uri(),
method(Method::UNKNOWN),
contentLength(0),
version(Version::VERSION_2),
wsi(w),
logger(logging.get("api:session"))
{
@ -63,34 +61,25 @@ Session::~Session()
{
api->sessions.remove(this);
if (request)
delete request;
if (response)
delete response;
logger->debug("Destroyed API session: {}", getName());
}
void Session::execute()
{
Request *r = request.exchange(nullptr);
logger->debug("Running API request: uri={}, method={}", uri, method);
logger->debug("Running API request: {}", request->toString());
try {
r->prepare();
response = r->execute();
response = std::unique_ptr<Response>(request->execute());
logger->debug("Completed API request: request={}", uri, method);
logger->debug("Completed API request: {}", request->toString());
} catch (const Error &e) {
response = new ErrorResponse(this, e);
response = std::make_unique<ErrorResponse>(this, e);
logger->warn("API request failed: uri={}, method={}, code={}: {}", uri, method, e.code, e.what());
logger->warn("API request failed: {}, code={}: {}", request->toString(), e.code, e.what());
} catch (const RuntimeError &e) {
response = new ErrorResponse(this, e);
response = std::make_unique<ErrorResponse>(this, e);
logger->warn("API request failed: uri={}, method={}: {}", uri, method, e.what());
logger->warn("API request failed: {}: {}", request->toString(), e.what());
}
logger->debug("Ran pending API requests. Triggering on_writeable callback: wsi={}", (void *) wsi);
@ -128,25 +117,26 @@ void Session::open(void *in, size_t len)
int ret;
char buf[32];
uri = reinterpret_cast<char *>(in);
auto uri = reinterpret_cast<char *>(in);
try {
unsigned int len;
auto method = getRequestMethod();
if (method == Method::UNKNOWN)
throw RuntimeError("Invalid request method");
request = RequestFactory::create(this);
ret = lws_hdr_copy(wsi, buf, sizeof(buf), WSI_TOKEN_HTTP_CONTENT_LENGTH);
if (ret < 0)
throw RuntimeError("Failed to get content length");
try {
contentLength = std::stoull(buf);
len = std::stoull(buf);
} catch (const std::invalid_argument &) {
contentLength = 0;
len = 0;
}
request = std::unique_ptr<Request>(RequestFactory::create(this, uri, method, len));
/* This is an OPTIONS request.
*
* We immediatly send headers and close the connection
@ -155,39 +145,33 @@ void Session::open(void *in, size_t len)
lws_callback_on_writable(wsi);
/* This request has no body.
* We can reply immediatly */
else if (contentLength == 0)
else if (len == 0)
api->pending.push(this);
else {
/* This request has a HTTP body. We wait for more data to arrive */
}
} catch (const Error &e) {
response = new ErrorResponse(this, e);
response = std::make_unique<ErrorResponse>(this, e);
lws_callback_on_writable(wsi);
} catch (const RuntimeError &e) {
response = new ErrorResponse(this, e);
response = std::make_unique<ErrorResponse>(this, e);
lws_callback_on_writable(wsi);
}
}
void Session::body(void *in, size_t len)
{
requestBuffer.append((const char *) in, len);
request->buffer.append((const char *) in, len);
}
void Session::bodyComplete()
{
try {
auto *j = requestBuffer.decode();
if (!j)
throw BadRequest("Failed to decode request payload");
requestBuffer.clear();
(*request).setBody(j);
request->decode();
api->pending.push(this);
} catch (const Error &e) {
response = new ErrorResponse(this, e);
response = std::make_unique<ErrorResponse>(this, e);
logger->warn("Failed to decode API request: {}", e.what());
}
@ -195,65 +179,17 @@ void Session::bodyComplete()
int Session::writeable()
{
int ret;
if (!headersSent) {
int code = HTTP_STATUS_OK;
const char *contentType = "application/json";
if (response)
response->writeHeaders(wsi);
uint8_t headers[2048], *p = headers, *end = &headers[sizeof(headers) - 1];
responseBuffer.clear();
if (response) {
Response *resp = response;
json_t *json_response = resp->toJson();
responseBuffer.encode(json_response, JSON_INDENT(4));
code = resp->getCode();
}
if (lws_add_http_common_headers(wsi, code, contentType,
responseBuffer.size(), /* no content len */
&p, end))
return 1;
std::map<const char *, const char *> constantHeaders = {
{ "Server:", USER_AGENT },
{ "Access-Control-Allow-Origin:", "*" },
{ "Access-Control-Allow-Methods:", "GET, POST, OPTIONS" },
{ "Access-Control-Allow-Headers:", "Content-Type" },
{ "Access-Control-Max-Age:", "86400" }
};
for (auto &hdr : constantHeaders) {
if (lws_add_http_header_by_name (wsi,
reinterpret_cast<const unsigned char *>(hdr.first),
reinterpret_cast<const unsigned char *>(hdr.second),
strlen(hdr.second), &p, end))
return -1;
}
if (lws_finalize_write_http_header(wsi, headers, &p, end))
return 1;
/* No wait, until we can send the body */
/* Now wait, until we can send the body */
headersSent = true;
/* Do we have a body to send */
if (responseBuffer.size() > 0)
lws_callback_on_writable(wsi);
return 0;
}
else {
ret = lws_write(wsi, (unsigned char *) responseBuffer.data(), responseBuffer.size(), LWS_WRITE_HTTP_FINAL);
if (ret < 0)
return -1;
return 1;
}
else
return response->writeBody(wsi);
}
int Session::protocolCallback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
@ -318,11 +254,6 @@ int Session::protocolCallback(struct lws *wsi, enum lws_callback_reasons reason,
return 0;
}
std::string Session::getRequestURI() const
{
return uri;
}
Session::Method Session::getRequestMethod() const
{
if (lws_hdr_total_length(wsi, WSI_TOKEN_GET_URI))

View file

@ -21,7 +21,7 @@
###################################################################################
set(NODE_SRC
loopback_internal.cpp
loopback_internal.cpp
)
if(LIBNL3_ROUTE_FOUND)

View file

@ -688,7 +688,7 @@ int ngsi_start(struct vnode *n)
curl_easy_setopt(handles[p], CURLOPT_SSL_VERIFYPEER, i->ssl_verify);
curl_easy_setopt(handles[p], CURLOPT_TIMEOUT_MS, i->timeout * 1e3);
curl_easy_setopt(handles[p], CURLOPT_HTTPHEADER, i->headers);
curl_easy_setopt(handles[p], CURLOPT_USERAGENT, USER_AGENT);
curl_easy_setopt(handles[p], CURLOPT_USERAGENT, HTTP_USER_AGENT);
}
/* Create entity and atributes */

View file

@ -278,9 +278,9 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
pulled = queue_pull_many(&c->queue, (void **) smps, cnt);
if (pulled > 0) {
size_t wbytes;
io_sprint(&c->io, c->buffers.send->buf.data() + LWS_PRE, c->buffers.send->buf.size() - LWS_PRE, &wbytes, smps, pulled);
io_sprint(&c->io, c->buffers.send->data() + LWS_PRE, c->buffers.send->size() - LWS_PRE, &wbytes, smps, pulled);
ret = lws_write(wsi, (unsigned char *) c->buffers.send->buf.data() + LWS_PRE, wbytes, c->io.flags & (int) IOFlags::HAS_BINARY_PAYLOAD ? LWS_WRITE_BINARY : LWS_WRITE_TEXT);
ret = lws_write(wsi, (unsigned char *) c->buffers.send->data() + LWS_PRE, wbytes, c->io.flags & (int) IOFlags::HAS_BINARY_PAYLOAD ? LWS_WRITE_BINARY : LWS_WRITE_TEXT);
sample_decref_many(smps, pulled);
@ -305,11 +305,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
if (lws_is_first_fragment(wsi))
c->buffers.recv->clear();
ret = c->buffers.recv->append((char *) in, len);
if (ret) {
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_UNACCEPTABLE_OPCODE, "Failed to process data");
return -1;
}
c->buffers.recv->append((char *) in, len);
/* We dont try to parse the frame yet, as we have to wait for the remaining fragments */
if (lws_is_final_fragment(wsi)) {
@ -328,7 +324,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
if (avail < cnt)
warning("Pool underrun for connection: %s", websocket_connection_name(c));
recvd = io_sscan(&c->io, c->buffers.recv->buf.data(), c->buffers.recv->buf.size(), nullptr, smps, avail);
recvd = io_sscan(&c->io, c->buffers.recv->data(), c->buffers.recv->size(), nullptr, smps, avail);
if (recvd < 0) {
warning("Failed to parse sample data received on connection: %s", websocket_connection_name(c));
break;