diff --git a/src/villas-relay.cpp b/src/villas-relay.cpp index db679c4c7..b5d2e2b00 100644 --- a/src/villas-relay.cpp +++ b/src/villas-relay.cpp @@ -21,22 +21,21 @@ *********************************************************************************/ #include -#include #include #include #include #include -#include #include -#include #include +#include + +#include +#include #include -#include - -auto console = spdlog::stdout_color_mt("console"); +#include "villas-relay.hpp" /** The libwebsockets server context. */ static lws_context *context; @@ -44,166 +43,30 @@ static lws_context *context; /** The libwebsockets vhost. */ static lws_vhost *vhost; -/* Forward declarations */ -lws_callback_function protocol_cb; -class Session; -class Connection; +auto console = villas::logging.get("console"); +std::map sessions; -static std::map sessions; - -class InvalidUrlException { }; - -struct Options { - bool loopback; -} opts; - -class Frame : public std::vector { -public: - Frame() { - /* lws_write() requires LWS_PRE bytes in front of the payload */ - insert(end(), LWS_PRE, 0); - } - - uint8_t * data() { - return std::vector::data() + LWS_PRE; - } - - size_type size() { - return std::vector::size() - LWS_PRE; - } -}; - -class Session { -public: - typedef std::string Identifier; - - static Session * get(lws *wsi) - { - char uri[64]; - - /* We use the URI to associate this connection to a session - * Example: ws://example.com/node_1 - * Will select the session with the name 'node_1' - */ - - /* Get path of incoming request */ - lws_hdr_copy(wsi, uri, sizeof(uri), WSI_TOKEN_GET_URI); - if (strlen(uri) <= 0) - throw InvalidUrlException(); - - Identifier sid = uri; - - auto it = sessions.find(sid); - if (it == sessions.end()) { - return new Session(sid); - } - else { - console->info("Found existing session: {}", sid); - - return it->second; - } - } - - Session(Identifier sid) : - identifier(sid) - { - console->info("Session created: {}", identifier); - - sessions[sid] = this; - } - - ~Session() - { - console->info("Session destroyed: {}", identifier); - - sessions.erase(identifier); - } - - Identifier identifier; - - std::map connections; -}; - -class Connection { - -protected: - lws *wsi; - - std::shared_ptr currentFrame; - - std::queue> outgoingFrames; - - Session *session; - - char name[128]; - char ip[128]; - -public: - Connection(lws *w) : - wsi(w), - currentFrame(std::make_shared()), - outgoingFrames() - { - session = Session::get(wsi); - session->connections[wsi] = this; - - lws_get_peer_addresses(wsi, lws_get_socket_fd(wsi), name, sizeof(name), ip, sizeof(ip)); - - console->info("New connection established: session={}, remote={} ({})", session->identifier, name, ip); - } - - ~Connection() - { - console->info("Connection closed: session={}, remote={} ({})", session->identifier, name, ip); - - session->connections.erase(wsi); - - if (session->connections.empty()) - delete session; - } - - void write() - { - int ret; - - while (!outgoingFrames.empty()) { - std::shared_ptr fr = outgoingFrames.front(); - - ret = lws_write(wsi, fr->data(), fr->size(), LWS_WRITE_BINARY); - if (ret < 0) - return; - - outgoingFrames.pop(); - } - } - - void read(void *in, size_t len) - { - currentFrame->insert(currentFrame->end(), (uint8_t *) in, (uint8_t *) in + len); - - if (lws_is_final_fragment(wsi)) { - console->debug("Received frame, relaying to {} connections", session->connections.size() - (opts.loopback ? 0 : 1)); - - for (auto p : session->connections) { - auto c = p.second; - - /* We skip the current connection in order - * to avoid receiving our own data */ - if (opts.loopback == false && c == this) - continue; - - c->outgoingFrames.push(currentFrame); - - lws_callback_on_writable(c->wsi); - } - - currentFrame = std::make_shared(); - } - } +/* Default options */ +struct Options opts = { + .loopback = false, + .port = 8088, + .protocol = "live" }; /** List of libwebsockets protocols. */ lws_protocols protocols[] = { + { + .name = "http", + .callback = lws_callback_http_dummy, + .per_session_data_size = 0, + .rx_buffer_size = 1024 + }, + { + .name = "http-api", + .callback = http_protocol_cb, + .per_session_data_size = 0, + .rx_buffer_size = 1024 + }, { .name = "live", .callback = protocol_cb, @@ -228,6 +91,184 @@ static const lws_extension extensions[] = { { NULL /* terminator */ } }; +static const lws_http_mount mount = { + .mount_next = NULL, /* linked-list "next" */ + .mountpoint = "/api/v1", /* mountpoint URL */ + .origin = NULL, /* protocol */ + .def = NULL, + .protocol = "http-api", + .cgienv = NULL, + .extra_mimetypes = NULL, + .interpret = NULL, + .cgi_timeout = 0, + .cache_max_age = 0, + .auth_mask = 0, + .cache_reusable = 0, + .cache_revalidate = 0, + .cache_intermediaries = 0, + .origin_protocol = LWSMPRO_CALLBACK, /* dynamic */ + .mountpoint_len = 7, /* char count */ + .basic_auth_login_file =NULL, +}; + +Session::Session(Identifier sid) : + identifier(sid), + connects(0) +{ + console->info("Session created: {}", identifier); + + sessions[sid] = this; + + created = time(NULL); + + uuid_generate(uuid); +} + +Session::~Session() +{ + console->info("Session destroyed: {}", identifier); + + sessions.erase(identifier); +} + +Session * Session::get(lws *wsi) +{ + char uri[64]; + + /* We use the URI to associate this connection to a session + * Example: ws://example.com/node_1 + * Will select the session with the name 'node_1' + */ + + /* Get path of incoming request */ + lws_hdr_copy(wsi, uri, sizeof(uri), WSI_TOKEN_GET_URI); + if (strlen(uri) <= 0) + throw InvalidUrlException(); + + Identifier sid = uri; + + auto it = sessions.find(sid); + if (it == sessions.end()) { + return new Session(sid); + } + else { + console->info("Found existing session: {}", sid); + + return it->second; + } +} + +json_t * Session::toJson() const +{ + json_t *json_connections = json_array(); + + for (auto it : connections) { + auto conn = it.second; + + json_array_append(json_connections, conn->toJson()); + } + + char uuid_str[UUID_STR_LEN]; + uuid_unparse_lower(uuid, uuid_str); + + return json_pack("{ s: s, s: s, s: o, s: I, s: i }", + "identifier", identifier.c_str(), + "uuid", uuid_str, + "connections", json_connections, + "created", created, + "connects", connects + ); +} + +Connection::Connection(lws *w) : + wsi(w), + currentFrame(std::make_shared()), + outgoingFrames(), + bytes_recv(0), + bytes_sent(0), + frames_recv(0), + frames_sent(0) +{ + session = Session::get(wsi); + session->connections[wsi] = this; + session->connects++; + + lws_get_peer_addresses(wsi, lws_get_socket_fd(wsi), name, sizeof(name), ip, sizeof(ip)); + + created = time(NULL); + + console->info("New connection established: session={}, remote={} ({})", session->identifier, name, ip); +} + +Connection::~Connection() +{ + console->info("Connection closed: session={}, remote={} ({})", session->identifier, name, ip); + + session->connections.erase(wsi); + + if (session->connections.empty()) + delete session; +} + +json_t * Connection::toJson() const +{ + return json_pack("{ s: s, s: s, s: I, s: I, s: I, s: I, s: I }", + "name", name, + "ip", ip, + "created", created, + "bytes_recv", bytes_recv, + "bytes_sent", bytes_sent, + "frames_recv", frames_recv, + "frames_sent", frames_sent + ); +} + +void Connection::write() +{ + int ret; + + std::shared_ptr fr = outgoingFrames.front(); + + ret = lws_write(wsi, fr->data(), fr->size(), LWS_WRITE_BINARY); + if (ret < 0) + return; + + bytes_sent += fr->size(); + frames_sent++; + + outgoingFrames.pop(); + + if (outgoingFrames.size() > 0) + lws_callback_on_writable(wsi); +} + +void Connection::read(void *in, size_t len) +{ + currentFrame->insert(currentFrame->end(), (uint8_t *) in, (uint8_t *) in + len); + + bytes_recv += len; + + if (lws_is_final_fragment(wsi)) { + frames_recv++; + console->debug("Received frame, relaying to {} connections", session->connections.size() - (opts.loopback ? 0 : 1)); + + for (auto p : session->connections) { + auto c = p.second; + + /* We skip the current connection in order + * to avoid receiving our own data */ + if (opts.loopback == false && c == this) + continue; + + c->outgoingFrames.push(currentFrame); + + lws_callback_on_writable(c->wsi); + } + + currentFrame = std::make_shared(); + } +} + static void logger(int level, const char *msg) { auto log = spdlog::get("lws"); @@ -248,13 +289,76 @@ static void logger(int level, const char *msg) } } +int http_protocol_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) +{ + int ret; + size_t json_len; + json_t *json_sessions, *json_body; + + unsigned char buf[LWS_PRE + 2048], *start = &buf[LWS_PRE], *end = &buf[sizeof(buf) - LWS_PRE - 1], *p = start; + + switch (reason) { + case LWS_CALLBACK_HTTP: + if (lws_add_http_common_headers(wsi, HTTP_STATUS_OK, + "application/json", + LWS_ILLEGAL_HTTP_CONTENT_LEN, /* no content len */ + &p, end)) + return 1; + if (lws_finalize_write_http_header(wsi, start, &p, end)) + return 1; + + /* Write the body separately */ + lws_callback_on_writable(wsi); + + return 0; + + case LWS_CALLBACK_HTTP_WRITEABLE: + + json_sessions = json_array(); + for (auto it : sessions) { + auto &session = it.second; + + json_array_append(json_sessions, session->toJson()); + } + + char hname[128]; + gethostname(hname, 128); + + json_body = json_pack("{ s: o, s: s, s: s, s: { s: b, s: i, s: s } }", + "sessions", json_sessions, + "version", PROJECT_VERSION_STR, + "hostname", hname, + "options", + "loopback", opts.loopback, + "port", opts.port, + "protocol", opts.protocol + ); + + json_len = json_dumpb(json_body, (char *) buf + LWS_PRE, sizeof(buf) - LWS_PRE, JSON_INDENT(4)); + + ret = lws_write(wsi, buf + LWS_PRE, json_len, LWS_WRITE_HTTP_FINAL); + if (ret < 0) + return ret; + + console->info("Handled API request"); + + //if (lws_http_transaction_completed(wsi)) + return -1; + + default: + break; + } + + return lws_callback_http_dummy(wsi, reason, user, in, len); +} + int protocol_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { Connection *c = reinterpret_cast(user); switch (reason) { - case LWS_CALLBACK_ESTABLISHED: { + case LWS_CALLBACK_ESTABLISHED: try { new (c) Connection(wsi); } @@ -264,7 +368,6 @@ int protocol_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in return -1; } break; - } case LWS_CALLBACK_CLOSED: c->~Connection(); @@ -292,6 +395,7 @@ void usage() std::cout << " -d LVL set debug level" << std::endl; std::cout << " -p PORT the port number to listen on" << std::endl; std::cout << " -P PROT the websocket protocol" << std::endl; + std::cout << " -l enable loopback of own data" << std::endl; std::cout << " -V show version and exit" << std::endl; std::cout << " -h show usage and exit" << std::endl; std::cout << std::endl; @@ -308,13 +412,6 @@ int main(int argc, char *argv[]) /* Start server */ lws_context_creation_info ctx_info = { 0 }; - ctx_info.options = LWS_SERVER_OPTION_EXPLICIT_VHOSTS | LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT; - ctx_info.gid = -1; - ctx_info.uid = -1; - ctx_info.protocols = protocols; - ctx_info.extensions = extensions; - ctx_info.port = 8088; - char c, *endptr; while ((c = getopt (argc, argv, "hVp:P:ld:")) != -1) { switch (c) { @@ -323,11 +420,11 @@ int main(int argc, char *argv[]) break; case 'p': - ctx_info.port = strtoul(optarg, &endptr, 10); + opts.port = strtoul(optarg, &endptr, 10); goto check; case 'P': - protocols[0].name = optarg; + opts.protocol = strdup(optarg); break; case 'l': @@ -357,6 +454,16 @@ check: if (optarg == endptr) { exit(EXIT_FAILURE); } + protocols[2].name = opts.protocol; + + ctx_info.options = LWS_SERVER_OPTION_EXPLICIT_VHOSTS | LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT; + ctx_info.gid = -1; + ctx_info.uid = -1; + ctx_info.protocols = protocols; + ctx_info.extensions = extensions; + ctx_info.port = opts.port; + ctx_info.mounts = &mount; + context = lws_create_context(&ctx_info); if (context == NULL) { console->error("WebSocket: failed to initialize server context"); diff --git a/src/villas-relay.hpp b/src/villas-relay.hpp new file mode 100644 index 000000000..e28041ce5 --- /dev/null +++ b/src/villas-relay.hpp @@ -0,0 +1,115 @@ +/** Simple WebSocket relay facilitating client-to-client connections. + * + * @file + * @author Steffen Vogel + * @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +#include +#include + +#include +#include + +#include + +/* Forward declarations */ +lws_callback_function protocol_cb, http_protocol_cb; +class Session; +class Connection; + +class InvalidUrlException { }; + +struct Options { + bool loopback; + int port; + const char *protocol; +}; + +class Frame : public std::vector { +public: + Frame() { + /* lws_write() requires LWS_PRE bytes in front of the payload */ + insert(end(), LWS_PRE, 0); + } + + uint8_t * data() { + return std::vector::data() + LWS_PRE; + } + + size_type size() { + return std::vector::size() - LWS_PRE; + } +}; + +class Session { + +protected: + time_t created; + uuid_t uuid; + +public: + typedef std::string Identifier; + + static Session * get(lws *wsi); + + Session(Identifier sid); + + ~Session(); + + json_t * toJson() const; + + Identifier identifier; + + std::map connections; + + int connects; +}; + +class Connection { + +protected: + lws *wsi; + + std::shared_ptr currentFrame; + + std::queue> outgoingFrames; + + Session *session; + + char name[128]; + char ip[128]; + + size_t created; + size_t bytes_recv; + size_t bytes_sent; + + size_t frames_recv; + size_t frames_sent; + +public: + Connection(lws *w); + + ~Connection(); + + json_t * toJson() const; + + void write(); + void read(void *in, size_t len); +};