From 62bed4953d5e9fc03a860d57077221add5b896fe Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Tue, 20 Oct 2020 22:17:55 +0200 Subject: [PATCH] api: rework request/response API and add new endpoint for fetching a graph repr of the currently loaded config --- common | 2 +- include/villas/api/request.hpp | 38 ++++++- include/villas/api/response.hpp | 86 +++++++++------- include/villas/api/session.hpp | 18 +--- include/villas/nodes/websocket.hpp | 6 +- lib/api/CMakeLists.txt | 8 +- lib/api/request.cpp | 19 +++- lib/api/requests/capabiltities.cpp | 2 +- lib/api/requests/config.cpp | 2 +- lib/api/requests/graph.cpp | 136 ++++++++++++++++++++++++++ lib/api/requests/node_action.cpp | 2 +- lib/api/requests/node_file.cpp | 2 +- lib/api/requests/node_info.cpp | 2 +- lib/api/requests/node_stats.cpp | 2 +- lib/api/requests/node_stats_reset.cpp | 2 +- lib/api/requests/nodes.cpp | 2 +- lib/api/requests/path_action.cpp | 2 +- lib/api/requests/path_info.cpp | 2 +- lib/api/requests/paths.cpp | 2 +- lib/api/requests/restart.cpp | 8 +- lib/api/requests/shutdown.cpp | 2 +- lib/api/requests/status.cpp | 2 +- lib/api/response.cpp | 77 ++++++++++++--- lib/api/session.cpp | 119 +++++----------------- lib/nodes/CMakeLists.txt | 2 +- lib/nodes/ngsi.cpp | 2 +- lib/nodes/websocket.cpp | 12 +-- 27 files changed, 363 insertions(+), 196 deletions(-) create mode 100644 lib/api/requests/graph.cpp diff --git a/common b/common index db106f50e..5e7df9237 160000 --- a/common +++ b/common @@ -1 +1 @@ -Subproject commit db106f50e053dc48c739c65e352d8885415e460b +Subproject commit 5e7df923770f3ed45c8618275397b793a0e1d9b1 diff --git a/include/villas/api/request.hpp b/include/villas/api/request.hpp index 4fbb40c48..06ddfc879 100644 --- a/include/villas/api/request.hpp +++ b/include/villas/api/request.hpp @@ -27,6 +27,7 @@ #include #include +#include #include #include #include @@ -44,6 +45,7 @@ class RequestFactory; class Request { + friend Session; friend RequestFactory; friend Response; @@ -51,10 +53,12 @@ protected: Session *session; Logger logger; + Buffer buffer; public: std::smatch matches; Session::Method method; + unsigned long contentLength; json_t *body; RequestFactory *factory; @@ -75,13 +79,37 @@ public: virtual Response * execute() = 0; - virtual void prepare() - { } + virtual void decode(); - void setBody(json_t *j) + std::string + getMatch(int idx) { - body = j; + return matches[idx].str(); } + + std::string + getQueryArg(const std::string &arg) + { + char buf[1024]; + const char *val; + + val = lws_get_urlarg_by_name(session->wsi, (arg + "=").c_str(), buf, sizeof(buf)); + + return val ? std::string(val) : std::string(); + } + + std::string + getHeader(enum lws_token_indexes hdr) + { + char buf[1024]; + + lws_hdr_copy(session->wsi, buf, sizeof(buf), hdr); + + return std::string(buf); + } + + virtual std::string + toString(); }; class RequestFactory : public plugin::Plugin { @@ -96,7 +124,7 @@ public: make(Session *s) = 0; static Request * - create(Session *s); + create(Session *s, const std::string &uri, Session::Method meth, unsigned long ct); }; template diff --git a/include/villas/api/response.hpp b/include/villas/api/response.hpp index dd52dab0c..97bcbcc0e 100644 --- a/include/villas/api/response.hpp +++ b/include/villas/api/response.hpp @@ -23,9 +23,12 @@ #pragma once +#include + #include #include +#include #include #include #include @@ -40,56 +43,71 @@ class Request; class Response { +public: + friend Session; + + Response(Session *s, int c = HTTP_STATUS_OK, const std::string &ct = "text/html; charset=UTF-8", const Buffer &b = Buffer()); + + virtual + ~Response() + { } + + virtual void + encodeBody() + { } + + int + writeBody(struct lws *wsi); + + int + writeHeaders(struct lws *wsi); + + void + setHeader(const std::string &key, const std::string &value) + { + headers[key] = value; + } + protected: Session *session; Logger logger; + Buffer buffer; -public: - json_t *response; int code; - - Response(Session *s, json_t *resp = nullptr); - - virtual ~Response(); - - int - getCode() const - { - return code; - } - - /** Return JSON representation of response as used by API sockets. */ - virtual json_t * - toJson() - { - return response; - } + std::string contentType; + std::map headers; }; -class ErrorResponse : public Response { +class JsonResponse : public Response { protected: - std::string error; - json_t *json; + json_t *response; + +public: + JsonResponse(Session *s, int c, json_t *r) : + Response(s, c, "application/json"), + response(r) + { } + + virtual ~JsonResponse(); + + virtual void + encodeBody(); +}; + +class ErrorResponse : public JsonResponse { public: ErrorResponse(Session *s, const RuntimeError &e) : - Response(s), - error(e.what()), - json(nullptr) - { - code = 500; - } + JsonResponse(s, HTTP_STATUS_INTERNAL_SERVER_ERROR, json_pack("{ s: s }", "error", e.what())) + { } ErrorResponse(Session *s, const Error &e) : - Response(s), - error(e.what()), - json(e.json) + JsonResponse(s, e.code, json_pack("{ s: s }", "error", e.what())) { - code = e.code; + if (e.json) + json_object_update(response, e.json); } - - virtual json_t * toJson(); }; } /* namespace api */ diff --git a/include/villas/api/session.hpp b/include/villas/api/session.hpp index 794d977f5..28cd7b461 100644 --- a/include/villas/api/session.hpp +++ b/include/villas/api/session.hpp @@ -23,12 +23,10 @@ #pragma once -#include - #include #include -#include +#include #include namespace villas { @@ -50,7 +48,7 @@ class StatusRequest; class Session { public: - friend StatusRequest; /**< Requires access to wsi */ + friend Request; /**< Requires access to wsi */ enum State { ESTABLISHED, @@ -77,10 +75,6 @@ protected: enum State state; enum Version version; - std::string uri; - Method method; - unsigned long contentLength; - lws *wsi; Web *web; @@ -88,11 +82,8 @@ protected: Logger logger; - JsonBuffer requestBuffer; - JsonBuffer responseBuffer; - - std::atomic request; - std::atomic response; + std::unique_ptr request; + std::unique_ptr response; bool headersSent; @@ -118,7 +109,6 @@ public: protocolCallback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len); Method getRequestMethod() const; - std::string getRequestURI() const; static std::string methodToString(Method meth); diff --git a/include/villas/nodes/websocket.hpp b/include/villas/nodes/websocket.hpp index 2e988c685..8507f0236 100644 --- a/include/villas/nodes/websocket.hpp +++ b/include/villas/nodes/websocket.hpp @@ -71,14 +71,14 @@ struct websocket_connection { struct lws *wsi; struct vnode *node; struct io io; - struct queue queue; /**< For samples which are sent to the WebSocket */ + struct queue queue; /**< For samples which are sent to the Websocket */ struct format_type *format; struct websocket_destination *destination; struct { - villas::Buffer *recv; /**< A buffer for reconstructing fragmented messags. */ - villas::Buffer *send; /**< A buffer for contsructing messages before calling lws_write() */ + villas::Buffer *recv; /**< A buffer for reconstructing fragmented messages. */ + villas::Buffer *send; /**< A buffer for constructing messages before calling lws_write() */ } buffers; char *_name; diff --git a/lib/api/CMakeLists.txt b/lib/api/CMakeLists.txt index 0a160fe6a..cc42771d2 100644 --- a/lib/api/CMakeLists.txt +++ b/lib/api/CMakeLists.txt @@ -38,10 +38,16 @@ set(API_SRC requests/node_stats_reset.cpp requests/node_file.cpp requests/paths.cpp - requests/path_info.cpp + requests/path_info.cpp requests/path_action.cpp ) +if(WITH_GRAPHVIZ) + list(APPEND API_SRC requests/graph.cpp) + list(APPEND INCLUDE_DIRS ${CGRAPH_INCLUDE_DIRS} ${GVC_INCLUDE_DIRS}) + list(APPEND LIBRARIES PkgConfig::CGRAPH PkgConfig::GVC) +endif() + add_library(api STATIC ${API_SRC}) target_include_directories(api PUBLIC ${INCLUDE_DIRS}) target_link_libraries(api PUBLIC ${LIBRARIES}) diff --git a/lib/api/request.cpp b/lib/api/request.cpp index 711b36ec6..b9b848fb9 100644 --- a/lib/api/request.cpp +++ b/lib/api/request.cpp @@ -28,10 +28,22 @@ using namespace villas; using namespace villas::node::api; -Request * RequestFactory::create(Session *s) +void Request::decode() { - auto uri = s->getRequestURI(); - auto meth = s->getRequestMethod(); + body = buffer.decode(); + if (!body) + throw BadRequest("Failed to decode request payload"); +} + +std::string +Request::toString() +{ + return fmt::format("endpoint={}, method={}", factory->getName(), Session::methodToString(method)); +} + +Request * RequestFactory::create(Session *s, const std::string &uri, Session::Method meth, unsigned long ct) +{ + info("api: Trying to find request handler for: uri=%s", uri.c_str()); for (auto *rf : plugin::Registry::lookup()) { std::smatch mr; @@ -43,6 +55,7 @@ Request * RequestFactory::create(Session *s) p->matches = mr; p->factory = rf; p->method = meth; + p->contentLength = ct; return p; } diff --git a/lib/api/requests/capabiltities.cpp b/lib/api/requests/capabiltities.cpp index 82750bd30..c5501f443 100644 --- a/lib/api/requests/capabiltities.cpp +++ b/lib/api/requests/capabiltities.cpp @@ -98,7 +98,7 @@ public: "apis", json_apis, "formats", json_formats); - return new Response(session, json_capabilities); + return new JsonResponse(session, HTTP_STATUS_OK, json_capabilities); } }; diff --git a/lib/api/requests/config.cpp b/lib/api/requests/config.cpp index 8df87186d..2f1ddde36 100644 --- a/lib/api/requests/config.cpp +++ b/lib/api/requests/config.cpp @@ -48,7 +48,7 @@ public: ? json_incref(cfg) : json_object(); - return new Response(session, json_config); + return new JsonResponse(session, HTTP_STATUS_OK, json_config); } }; diff --git a/lib/api/requests/graph.cpp b/lib/api/requests/graph.cpp new file mode 100644 index 000000000..ea25457af --- /dev/null +++ b/lib/api/requests/graph.cpp @@ -0,0 +1,136 @@ +/** The "stats" API request. + * + * @author Steffen Vogel + * @copyright 2014-2020, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +#ifdef WITH_GRAPHVIZ +extern "C" { + #include +} +#endif + +#include +#include +#include + +namespace villas { +namespace node { +namespace api { + +class GraphRequest : public Request { + +protected: + GVC_t *gvc; + + std::string layout; + std::string format; + +public: + GraphRequest(Session *s) : + Request(s), + gvc(gvContext()), + layout(getQueryArg("layout")) + { + if (layout.empty()) + layout = "neato"; + } + + ~GraphRequest() + { + gvFreeContext(gvc); + } + + virtual Response * execute() + { + if (method != Session::Method::GET) + throw InvalidMethod(this); + + if (body != nullptr) + throw BadRequest("Status endpoint does not accept any body data"); + + auto *sn = session->getSuperNode(); + auto *graph = sn->getGraph(); + + char *data; + unsigned len; + + std::list supportedLayouts = { "circo", "dot", "fdp", "neato", "nop", "nop1", "nop2", "osage", "patchwork", "sfdp", "twopi" }; + std::list supportedFormats = { "ps", "eps", "txt", "svg", "svgz", "gif", "png", "jpg", "jpeg", "bmp", "dot", "fig", "json", "pdf" }; + + format = matches[1].str(); + + auto lit = std::find(supportedLayouts.begin(), supportedLayouts.end(), layout); + if (lit == supportedLayouts.end()) + throw BadRequest("Unsupported layout: {}", layout); + + auto fit = std::find(supportedFormats.begin(), supportedFormats.end(), format); + if (fit == supportedFormats.end()) + throw BadRequest("Unsupported format: {}", format); + + std::string ct = "text/plain"; + if (format == "svg") + ct = "image/svg+xml"; + else if (format == "eps" || format == "ps") + ct = "application/postscript"; + else if (format == "txt") + ct = "text/plain"; + else if (format == "gif") + ct = "image/gif"; + else if (format == "png") + ct = "image/png"; + else if (format == "jpg" || format == "jpeg") + ct = "image/jpeg"; + else if (format == "bmp") + ct = "image/bmp"; + else if (format == "dot") + ct = "text/vnd.graphviz"; + else if (format == "json") + ct = "application/json"; + else if (format == "pdf") + ct = "application/pdf"; + + gvLayout(gvc, graph, layout.c_str()); + gvRenderData(gvc, graph, format.c_str(), &data, &len); + + auto buf = Buffer(data, len); + auto *resp = new Response(session, HTTP_STATUS_OK, ct, buf); + + if (format == "svgz") + resp->setHeader("Content-Encoding", "gzip"); + +#if 0 + gvFreeRenderData(data); +#endif + gvFreeLayout(gvc, graph); + + return resp; + } +}; + +/* Register API request */ +static char n[] = "graph"; +static char r[] = "/graph\\.([a-z]+)"; +static char d[] = "get graph representation of configuration"; +static RequestPlugin p; + +} /* namespace api */ +} /* namespace node */ +} /* namespace villas */ + diff --git a/lib/api/requests/node_action.cpp b/lib/api/requests/node_action.cpp index 8d4d16c05..04a187d71 100644 --- a/lib/api/requests/node_action.cpp +++ b/lib/api/requests/node_action.cpp @@ -51,7 +51,7 @@ public: A(node); - return new Response(session); + return new Response(session, HTTP_STATUS_OK); } }; diff --git a/lib/api/requests/node_file.cpp b/lib/api/requests/node_file.cpp index d2b3e583b..502812c85 100644 --- a/lib/api/requests/node_file.cpp +++ b/lib/api/requests/node_file.cpp @@ -57,7 +57,7 @@ public: if (matches[2].str() == "rewind") io_rewind(&f->io); - return new Response(session); + return new Response(session, HTTP_STATUS_OK); } }; diff --git a/lib/api/requests/node_info.cpp b/lib/api/requests/node_info.cpp index f75fae589..9c4bae571 100644 --- a/lib/api/requests/node_info.cpp +++ b/lib/api/requests/node_info.cpp @@ -48,7 +48,7 @@ public: if (body != nullptr) throw BadRequest("Nodes endpoint does not accept any body data"); - return new Response(session, node_to_json(node)); + return new JsonResponse(session, HTTP_STATUS_OK, node_to_json(node)); } }; diff --git a/lib/api/requests/node_stats.cpp b/lib/api/requests/node_stats.cpp index eac4a4e88..4bbdf1baa 100644 --- a/lib/api/requests/node_stats.cpp +++ b/lib/api/requests/node_stats.cpp @@ -52,7 +52,7 @@ public: if (node->stats == nullptr) throw BadRequest("The statistics collection for this node is not enabled"); - return new Response(session, node->stats->toJson()); + return new JsonResponse(session, HTTP_STATUS_OK, node->stats->toJson()); } }; diff --git a/lib/api/requests/node_stats_reset.cpp b/lib/api/requests/node_stats_reset.cpp index d3e886739..2f7bc56ef 100644 --- a/lib/api/requests/node_stats_reset.cpp +++ b/lib/api/requests/node_stats_reset.cpp @@ -54,7 +54,7 @@ public: node->stats->reset(); - return new Response(session); + return new Response(session, HTTP_STATUS_OK); } }; diff --git a/lib/api/requests/nodes.cpp b/lib/api/requests/nodes.cpp index 30975a79c..efaa1a0f2 100644 --- a/lib/api/requests/nodes.cpp +++ b/lib/api/requests/nodes.cpp @@ -58,7 +58,7 @@ public: json_array_append_new(json_nodes, node_to_json(n)); } - return new Response(session, json_nodes); + return new JsonResponse(session, HTTP_STATUS_OK, json_nodes); } }; diff --git a/lib/api/requests/path_action.cpp b/lib/api/requests/path_action.cpp index c93c8ad79..48db2a795 100644 --- a/lib/api/requests/path_action.cpp +++ b/lib/api/requests/path_action.cpp @@ -51,7 +51,7 @@ public: A(path); - return new Response(session); + return new Response(session, HTTP_STATUS_OK); } }; diff --git a/lib/api/requests/path_info.cpp b/lib/api/requests/path_info.cpp index d01b06edd..1f9217a76 100644 --- a/lib/api/requests/path_info.cpp +++ b/lib/api/requests/path_info.cpp @@ -48,7 +48,7 @@ public: if (body != nullptr) throw BadRequest("Endpoint does not accept any body data"); - return new Response(session, path_to_json(path)); + return new JsonResponse(session, HTTP_STATUS_OK, path_to_json(path)); } }; diff --git a/lib/api/requests/paths.cpp b/lib/api/requests/paths.cpp index 67b5841d8..b9e8c3320 100644 --- a/lib/api/requests/paths.cpp +++ b/lib/api/requests/paths.cpp @@ -59,7 +59,7 @@ public: json_array_append_new(json_paths, path_to_json(p)); } - return new Response(session, json_paths); + return new JsonResponse(session, HTTP_STATUS_OK, json_paths); } }; diff --git a/lib/api/requests/restart.cpp b/lib/api/requests/restart.cpp index e0a533d88..c418985ae 100644 --- a/lib/api/requests/restart.cpp +++ b/lib/api/requests/restart.cpp @@ -80,15 +80,17 @@ public: if (json_is_string(json_config)) configUri = json_string_value(json_config); else if (json_is_object(json_config)) { - configUri = tmpnam(nullptr); + char configUriBuf[] = "villas-node.json.XXXXXX"; + int configFd = mkstemp(configUriBuf); - FILE *configFile = fopen(configUri.c_str(), "w"); + FILE *configFile = fdopen(configFd, "w+"); ret = json_dumpf(json_config, configFile, JSON_INDENT(4)); if (ret < 0) throw Error(HTTP_STATUS_INTERNAL_SERVER_ERROR, "Failed to create temporary config file"); fclose(configFile); + configUri = configUriBuf; } else if (json_config != nullptr) throw BadRequest("Parameter 'config' must be either a URL (string) or a configuration (object)"); @@ -122,7 +124,7 @@ public: /* Properly terminate current instance */ utils::killme(SIGTERM); - return new Response(session, json_response); + return new JsonResponse(session, HTTP_STATUS_OK, json_response); } }; diff --git a/lib/api/requests/shutdown.cpp b/lib/api/requests/shutdown.cpp index 11ad7da64..13ac22f16 100644 --- a/lib/api/requests/shutdown.cpp +++ b/lib/api/requests/shutdown.cpp @@ -45,7 +45,7 @@ public: utils::killme(SIGTERM); - return new Response(session); + return new Response(session, HTTP_STATUS_OK); } }; diff --git a/lib/api/requests/status.cpp b/lib/api/requests/status.cpp index 6b71c09ad..1e2db2768 100644 --- a/lib/api/requests/status.cpp +++ b/lib/api/requests/status.cpp @@ -146,7 +146,7 @@ public: json_object_set(json_status, "lws", getLwsStatus()); #endif /* LWS_WITH_SERVER_STATUS */ - return new Response(session, json_status); + return new JsonResponse(session, HTTP_STATUS_OK, json_status); } }; diff --git a/lib/api/response.cpp b/lib/api/response.cpp index ab5f37b1c..c464ab59d 100644 --- a/lib/api/response.cpp +++ b/lib/api/response.cpp @@ -26,29 +26,76 @@ using namespace villas::node::api; -Response::Response(Session *s, json_t *resp) : +Response::Response(Session *s, int c, const std::string &ct, const Buffer &b) : session(s), logger(logging.get("api:response")), - response(resp), - code(HTTP_STATUS_OK) + buffer(b), + code(c), + contentType(ct), + headers{ + { "Server:", HTTP_USER_AGENT }, + { "Access-Control-Allow-Origin:", "*" }, + { "Access-Control-Allow-Methods:", "GET, POST, OPTIONS" }, + { "Access-Control-Allow-Headers:", "Content-Type" }, + { "Access-Control-Max-Age:", "86400" } + } { } -Response::~Response() +int +Response::writeBody(struct lws *wsi) +{ + int ret; + + ret = lws_write(wsi, (unsigned char *) buffer.data(), buffer.size(), LWS_WRITE_HTTP_FINAL); + if (ret < 0) + return -1; + + return 1; +} + +int +Response::writeHeaders(struct lws *wsi) +{ + int ret; + uint8_t headerBuffer[2048], *p = headerBuffer, *end = &headerBuffer[sizeof(headerBuffer) - 1]; + + // We need to encode the buffer here for getting the real content length of the response + encodeBody(); + + ret = lws_add_http_common_headers(wsi, code, contentType.c_str(), + buffer.size(), + &p, end); + if (ret) + return 1; + + for (auto &hdr : headers) { + ret = lws_add_http_header_by_name (wsi, + reinterpret_cast(hdr.first.c_str()), + reinterpret_cast(hdr.second.c_str()), + hdr.second.size(), &p, end); + if (ret) + return -1; + } + + ret = lws_finalize_write_http_header(wsi, headerBuffer, &p, end); + if (ret) + return 1; + + /* Do we have a body to send? */ + if (buffer.size() > 0) + lws_callback_on_writable(wsi); + + return 0; +} + +JsonResponse::~JsonResponse() { if (response) json_decref(response); } -json_t * ErrorResponse::toJson() +void +JsonResponse::encodeBody() { - json_t *ret; - - ret = json_pack("{ s: s }", - "error", error.c_str() - ); - - if (json) - json_object_update(ret, json); - - return ret; + buffer.encode(response, JSON_INDENT(4)); } diff --git a/lib/api/session.cpp b/lib/api/session.cpp index cb0437e71..aadf03c41 100644 --- a/lib/api/session.cpp +++ b/lib/api/session.cpp @@ -37,9 +37,7 @@ using namespace villas::node; using namespace villas::node::api; Session::Session(lws *w) : - uri(), - method(Method::UNKNOWN), - contentLength(0), + version(Version::VERSION_2), wsi(w), logger(logging.get("api:session")) { @@ -63,34 +61,25 @@ Session::~Session() { api->sessions.remove(this); - if (request) - delete request; - - if (response) - delete response; - logger->debug("Destroyed API session: {}", getName()); } void Session::execute() { - Request *r = request.exchange(nullptr); - - logger->debug("Running API request: uri={}, method={}", uri, method); + logger->debug("Running API request: {}", request->toString()); try { - r->prepare(); - response = r->execute(); + response = std::unique_ptr(request->execute()); - logger->debug("Completed API request: request={}", uri, method); + logger->debug("Completed API request: {}", request->toString()); } catch (const Error &e) { - response = new ErrorResponse(this, e); + response = std::make_unique(this, e); - logger->warn("API request failed: uri={}, method={}, code={}: {}", uri, method, e.code, e.what()); + logger->warn("API request failed: {}, code={}: {}", request->toString(), e.code, e.what()); } catch (const RuntimeError &e) { - response = new ErrorResponse(this, e); + response = std::make_unique(this, e); - logger->warn("API request failed: uri={}, method={}: {}", uri, method, e.what()); + logger->warn("API request failed: {}: {}", request->toString(), e.what()); } logger->debug("Ran pending API requests. Triggering on_writeable callback: wsi={}", (void *) wsi); @@ -128,25 +117,26 @@ void Session::open(void *in, size_t len) int ret; char buf[32]; - uri = reinterpret_cast(in); + auto uri = reinterpret_cast(in); try { + unsigned int len; auto method = getRequestMethod(); if (method == Method::UNKNOWN) throw RuntimeError("Invalid request method"); - request = RequestFactory::create(this); - ret = lws_hdr_copy(wsi, buf, sizeof(buf), WSI_TOKEN_HTTP_CONTENT_LENGTH); if (ret < 0) throw RuntimeError("Failed to get content length"); try { - contentLength = std::stoull(buf); + len = std::stoull(buf); } catch (const std::invalid_argument &) { - contentLength = 0; + len = 0; } + request = std::unique_ptr(RequestFactory::create(this, uri, method, len)); + /* This is an OPTIONS request. * * We immediatly send headers and close the connection @@ -155,39 +145,33 @@ void Session::open(void *in, size_t len) lws_callback_on_writable(wsi); /* This request has no body. * We can reply immediatly */ - else if (contentLength == 0) + else if (len == 0) api->pending.push(this); else { /* This request has a HTTP body. We wait for more data to arrive */ } } catch (const Error &e) { - response = new ErrorResponse(this, e); + response = std::make_unique(this, e); lws_callback_on_writable(wsi); } catch (const RuntimeError &e) { - response = new ErrorResponse(this, e); + response = std::make_unique(this, e); lws_callback_on_writable(wsi); } } void Session::body(void *in, size_t len) { - requestBuffer.append((const char *) in, len); + request->buffer.append((const char *) in, len); } void Session::bodyComplete() { try { - auto *j = requestBuffer.decode(); - if (!j) - throw BadRequest("Failed to decode request payload"); - - requestBuffer.clear(); - - (*request).setBody(j); + request->decode(); api->pending.push(this); } catch (const Error &e) { - response = new ErrorResponse(this, e); + response = std::make_unique(this, e); logger->warn("Failed to decode API request: {}", e.what()); } @@ -195,65 +179,17 @@ void Session::bodyComplete() int Session::writeable() { - int ret; - if (!headersSent) { - int code = HTTP_STATUS_OK; - const char *contentType = "application/json"; + if (response) + response->writeHeaders(wsi); - uint8_t headers[2048], *p = headers, *end = &headers[sizeof(headers) - 1]; - - responseBuffer.clear(); - if (response) { - Response *resp = response; - - json_t *json_response = resp->toJson(); - - responseBuffer.encode(json_response, JSON_INDENT(4)); - - code = resp->getCode(); - } - - if (lws_add_http_common_headers(wsi, code, contentType, - responseBuffer.size(), /* no content len */ - &p, end)) - return 1; - - std::map constantHeaders = { - { "Server:", USER_AGENT }, - { "Access-Control-Allow-Origin:", "*" }, - { "Access-Control-Allow-Methods:", "GET, POST, OPTIONS" }, - { "Access-Control-Allow-Headers:", "Content-Type" }, - { "Access-Control-Max-Age:", "86400" } - }; - - for (auto &hdr : constantHeaders) { - if (lws_add_http_header_by_name (wsi, - reinterpret_cast(hdr.first), - reinterpret_cast(hdr.second), - strlen(hdr.second), &p, end)) - return -1; - } - - if (lws_finalize_write_http_header(wsi, headers, &p, end)) - return 1; - - /* No wait, until we can send the body */ + /* Now wait, until we can send the body */ headersSent = true; - /* Do we have a body to send */ - if (responseBuffer.size() > 0) - lws_callback_on_writable(wsi); - return 0; } - else { - ret = lws_write(wsi, (unsigned char *) responseBuffer.data(), responseBuffer.size(), LWS_WRITE_HTTP_FINAL); - if (ret < 0) - return -1; - - return 1; - } + else + return response->writeBody(wsi); } int Session::protocolCallback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) @@ -318,11 +254,6 @@ int Session::protocolCallback(struct lws *wsi, enum lws_callback_reasons reason, return 0; } -std::string Session::getRequestURI() const -{ - return uri; -} - Session::Method Session::getRequestMethod() const { if (lws_hdr_total_length(wsi, WSI_TOKEN_GET_URI)) diff --git a/lib/nodes/CMakeLists.txt b/lib/nodes/CMakeLists.txt index 89f3e2f66..2b6b159c9 100644 --- a/lib/nodes/CMakeLists.txt +++ b/lib/nodes/CMakeLists.txt @@ -21,7 +21,7 @@ ################################################################################### set(NODE_SRC - loopback_internal.cpp + loopback_internal.cpp ) if(LIBNL3_ROUTE_FOUND) diff --git a/lib/nodes/ngsi.cpp b/lib/nodes/ngsi.cpp index f1379a8d8..546612af1 100644 --- a/lib/nodes/ngsi.cpp +++ b/lib/nodes/ngsi.cpp @@ -688,7 +688,7 @@ int ngsi_start(struct vnode *n) curl_easy_setopt(handles[p], CURLOPT_SSL_VERIFYPEER, i->ssl_verify); curl_easy_setopt(handles[p], CURLOPT_TIMEOUT_MS, i->timeout * 1e3); curl_easy_setopt(handles[p], CURLOPT_HTTPHEADER, i->headers); - curl_easy_setopt(handles[p], CURLOPT_USERAGENT, USER_AGENT); + curl_easy_setopt(handles[p], CURLOPT_USERAGENT, HTTP_USER_AGENT); } /* Create entity and atributes */ diff --git a/lib/nodes/websocket.cpp b/lib/nodes/websocket.cpp index 845c89b46..2af573740 100644 --- a/lib/nodes/websocket.cpp +++ b/lib/nodes/websocket.cpp @@ -278,9 +278,9 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi pulled = queue_pull_many(&c->queue, (void **) smps, cnt); if (pulled > 0) { size_t wbytes; - io_sprint(&c->io, c->buffers.send->buf.data() + LWS_PRE, c->buffers.send->buf.size() - LWS_PRE, &wbytes, smps, pulled); + io_sprint(&c->io, c->buffers.send->data() + LWS_PRE, c->buffers.send->size() - LWS_PRE, &wbytes, smps, pulled); - ret = lws_write(wsi, (unsigned char *) c->buffers.send->buf.data() + LWS_PRE, wbytes, c->io.flags & (int) IOFlags::HAS_BINARY_PAYLOAD ? LWS_WRITE_BINARY : LWS_WRITE_TEXT); + ret = lws_write(wsi, (unsigned char *) c->buffers.send->data() + LWS_PRE, wbytes, c->io.flags & (int) IOFlags::HAS_BINARY_PAYLOAD ? LWS_WRITE_BINARY : LWS_WRITE_TEXT); sample_decref_many(smps, pulled); @@ -305,11 +305,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi if (lws_is_first_fragment(wsi)) c->buffers.recv->clear(); - ret = c->buffers.recv->append((char *) in, len); - if (ret) { - websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_UNACCEPTABLE_OPCODE, "Failed to process data"); - return -1; - } + c->buffers.recv->append((char *) in, len); /* We dont try to parse the frame yet, as we have to wait for the remaining fragments */ if (lws_is_final_fragment(wsi)) { @@ -328,7 +324,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi if (avail < cnt) warning("Pool underrun for connection: %s", websocket_connection_name(c)); - recvd = io_sscan(&c->io, c->buffers.recv->buf.data(), c->buffers.recv->buf.size(), nullptr, smps, avail); + recvd = io_sscan(&c->io, c->buffers.recv->data(), c->buffers.recv->size(), nullptr, smps, avail); if (recvd < 0) { warning("Failed to parse sample data received on connection: %s", websocket_connection_name(c)); break;