1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-30 00:00:11 +01:00
This commit is contained in:
Steffen Vogel 2021-04-21 18:01:59 -04:00
parent 3b84db26c6
commit 1e35ccab0e
6 changed files with 169 additions and 111 deletions

View file

@ -38,7 +38,14 @@
#include <villas/exceptions.hpp>
namespace villas {
namespace node {
/* Forward declarations */
class SuperNode;
namespace tools {
class Relay;
}
namespace api {
const int version = 2;
@ -90,9 +97,6 @@ public:
} /* namespace api */
/* Forward declarations */
class SuperNode;
class Api {
protected:
@ -103,8 +107,6 @@ protected:
std::thread thread;
std::atomic<bool> running; /**< Atomic flag for signalizing thread termination. */
SuperNode *super_node;
void run();
void worker();
@ -113,20 +115,64 @@ public:
*
* Save references to list of paths / nodes for command execution.
*/
Api(SuperNode *sn);
Api();
~Api();
void start();
void stop();
SuperNode * getSuperNode()
{
return super_node;
}
std::list<api::Session *> sessions; /**< List of currently active connections */
villas::QueueSignalled<api::Session *> pending; /**< A queue of api_sessions which have pending requests. */
};
namespace node {
class Api : public Api {
protected:
SuperNode *super_node;
public:
/** Initalize the API.
*
* Save references to list of paths / nodes for command execution.
*/
Api(SuperNode *sn) :
Api(),
super_node(sn)
{ }
SuperNode * getSuperNode()
{
return super_node;
}
}
} /* namespace node */
namespace relay {
class Api : public Api {
protected:
tools::Relay *relay;
public:
/** Initalize the API.
*
* Save references to list of paths / nodes for command execution.
*/
Api(tools::Relay *r) :
Api(),
relay(r)
{ }
tools::Relay * getRelay()
{
return r;
}
}
} /* namespace relay */
} /* namespace villas */

View file

@ -36,7 +36,6 @@
#define RE_UUID "[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}"
namespace villas {
namespace node {
namespace api {
/* Forward declarations */
@ -120,6 +119,7 @@ public:
toString();
};
template<typename R>
class RequestFactory : public plugin::Plugin {
public:
@ -128,10 +128,10 @@ public:
virtual bool
match(const std::string &uri, std::smatch &m) const = 0;
virtual Request *
virtual R *
make(Session *s) = 0;
static Request *
static R *
create(Session *s, const std::string &uri, Session::Method meth, unsigned long ct);
};
@ -175,5 +175,4 @@ public:
};
} /* namespace api */
} /* namespace node */
} /* namespace villas */

View file

@ -30,10 +30,12 @@
#include <villas/api.hpp>
namespace villas {
namespace node {
/* Forward declarations */
class SuperNode;
namespace node {
class SuperNode;
}
class Api;
class Web;
@ -115,9 +117,7 @@ public:
static std::string
methodToString(Method meth);
};
} /* namespace api */
} /* namespace node */
} /* namespace villas */

View file

@ -40,10 +40,9 @@ InvalidMethod::InvalidMethod(Request *req) :
)
{ }
Api::Api(SuperNode *sn) :
Api::Api() :
logger(logging.get("api")),
state(State::INITIALIZED),
super_node(sn)
{ }
Api::~Api()

View file

@ -50,7 +50,8 @@ namespace tools {
RelaySession::RelaySession(Relay *r, Identifier sid) :
identifier(sid),
connects(0)
connects(0),
metadata(json_null())
{
auto loggerName = fmt::format("relay:{}", sid);
logger = villas::logging.get(loggerName);
@ -71,7 +72,7 @@ RelaySession::~RelaySession()
sessions.erase(identifier);
}
RelaySession * RelaySession::get(Relay *r, lws *wsi)
RelaySession * RelaySession::getOrCreate(Relay *r, lws *wsi)
{
char uri[64];
@ -85,19 +86,41 @@ RelaySession * RelaySession::get(Relay *r, lws *wsi)
if (strlen(uri) <= 1)
throw InvalidUrlException();
Identifier sid = uri + 1;
std::string name_or_uuid = uri + 1;
auto it = sessions.find(sid);
if (it == sessions.end()) {
auto *rs = new RelaySession(r, sid);
if (!rs)
auto s = lookup(name_or_uuid);
if (!s) {
s = new RelaySession(r, name_or_uuid);
if (!s)
throw MemoryAllocationError();
return rs;
}
else {
auto logger = logging.get("villas-relay");
logger->info("Found existing session: {}", sid);
logger->info("Found existing session: {}", s->getIdentifier());
}
return s;
}
RelaySession * RelaySession::lookup(std::string &name_or_uuid)
{
int ret;
uuid_t uuid;
ret = uuid_parse(name_or_uuid.c_str(), uuid);
if (ret == 0) { // UUID
auto cmp = [uuid] (const std::pair<Identifier, RelaySession *> &p) { return uuid_compare(p.second->uuid, uuid) == 0; };
auto it = std::find_if(sessions.begin(), sessions.end(), cmp);
if (it == sessions.end())
return nullptr;
return it->second;
}
else { // Identifier
auto it = sessions.find(name_or_uuid);
if (it == sessions.end())
return nullptr;
return it->second;
}
@ -116,12 +139,13 @@ json_t * RelaySession::toJson() const
uuid_string_t uuid_str;
uuid_unparse_lower(uuid, uuid_str);
return json_pack("{ s: s, s: s, s: o, s: I, s: i }",
return json_pack("{ s: s, s: s, s: o, s: I, s: i, s: o }",
"identifier", identifier.c_str(),
"uuid", uuid_str,
"connections", json_connections,
"created", created,
"connects", connects
"connects", connects,
"metadata", metadata
);
}
@ -137,7 +161,7 @@ RelayConnection::RelayConnection(Relay *r, lws *w, bool lo) :
frames_sent(0),
loopback(lo)
{
session = RelaySession::get(r, wsi);
session = RelaySession::getOrCreate(r, wsi);
session->connections[wsi] = this;
session->connects++;
@ -217,6 +241,12 @@ void RelayConnection::read(void *in, size_t len)
}
}
HTTPRequest::HTTPRequest(struct lws *w) :
wsi(w)
{
}
Relay::Relay(int argc, char *argv[]) :
Tool(argc, argv, "relay"),
stop(false),
@ -224,7 +254,9 @@ Relay::Relay(int argc, char *argv[]) :
vhost(nullptr),
loopback(false),
port(8088),
protocol("live")
protocol("live"),
api(),
web(&api)
{
int ret;
@ -255,7 +287,7 @@ Relay::Relay(int argc, char *argv[]) :
{
.name = "http-api",
.callback = httpProtocolCallback,
.per_session_data_size = 0,
.per_session_data_size = sizeof(HTTPRequest),
.rx_buffer_size = 1024
},
{
@ -299,76 +331,6 @@ void Relay::loggerCallback(int level, const char *msg)
}
}
int Relay::httpProtocolCallback(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
{
int ret;
size_t json_len;
json_t *json_sessions, *json_body;
lws_context *ctx = lws_get_context(wsi);
void *user_ctx = lws_context_user(ctx);
Relay *r = reinterpret_cast<Relay *>(user_ctx);
unsigned char buf[LWS_PRE + 2048], *start = &buf[LWS_PRE], *end = &buf[sizeof(buf) - LWS_PRE - 1], *p = start;
switch (reason) {
case LWS_CALLBACK_HTTP:
if (lws_add_http_common_headers(wsi, HTTP_STATUS_OK,
"application/json",
LWS_ILLEGAL_HTTP_CONTENT_LEN, /* no content len */
&p, end))
return 1;
if (lws_finalize_write_http_header(wsi, start, &p, end))
return 1;
/* Write the body separately */
lws_callback_on_writable(wsi);
return 0;
case LWS_CALLBACK_HTTP_WRITEABLE:
json_sessions = json_array();
for (auto it : RelaySession::sessions) {
auto &session = it.second;
json_array_append(json_sessions, session->toJson());
}
uuid_string_t uuid_str;
uuid_unparse(r->uuid, uuid_str);
json_body = json_pack("{ s: o, s: s, s: s, s: s, s: { s: b, s: i, s: s } }",
"sessions", json_sessions,
"version", PROJECT_VERSION_STR,
"hostname", r->hostname.c_str(),
"uuid", uuid_str,
"options",
"loopback", r->loopback,
"port", r->port,
"protocol", r->protocol.c_str()
);
json_len = json_dumpb(json_body, (char *) buf + LWS_PRE, sizeof(buf) - LWS_PRE, JSON_INDENT(4));
ret = lws_write(wsi, buf + LWS_PRE, json_len, LWS_WRITE_HTTP_FINAL);
if (ret < 0)
return ret;
r->logger->info("Handled API request");
//if (lws_http_transaction_completed(wsi))
return -1;
default:
break;
}
return lws_callback_http_dummy(wsi, reason, user, in, len);
}
int Relay::protocolCallback(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
{
lws_context *ctx = lws_get_context(wsi);
@ -409,6 +371,34 @@ int Relay::protocolCallback(lws *wsi, enum lws_callback_reasons reason, void *us
return 0;
}
json_t * Relay::toJson()
{
json_t *json_sessions, *json;
json_sessions = json_array();
for (auto it : RelaySession::sessions) {
auto &session = it.second;
json_array_append(json_sessions, session->toJson());
}
uuid_string_t uuid_str;
uuid_unparse(uuid, uuid_str);
json = json_pack("{ s: o, s: s, s: s, s: s, s: { s: b, s: i, s: s } }",
"sessions", json_sessions,
"version", PROJECT_VERSION_STR,
"hostname", hostname.c_str(),
"uuid", uuid_str,
"options",
"loopback", loopback,
"port", port,
"protocol", protocol.c_str()
);
return json;
}
void Relay::usage()
{
std::cout << "Usage: villas-relay [OPTIONS]" << std::endl
@ -507,8 +497,14 @@ int Relay::main() {
exit(EXIT_FAILURE);
}
api.start();
web.start();
while (!stop)
lws_service(context, 100);
sleep(1);
api.stop();
web.stop();
return 0;
}
@ -531,7 +527,7 @@ const std::vector<lws_extension> Relay::extensions = {
const lws_http_mount Relay::mount = {
.mount_next = nullptr, /* linked-list "next" */
.mountpoint = "/api/v1", /* mountpoint URL */
.mountpoint = "/api/v2", /* mountpoint URL */
.origin = nullptr, /* protocol */
.def = nullptr,
.protocol = "http-api",

View file

@ -27,7 +27,7 @@
#include <uuid/uuid.h>
#include <libwebsockets.h>
#include <villas/api.hpp>
#include <villas/log.hpp>
namespace villas {
@ -75,16 +75,25 @@ protected:
int connects;
json_t *metadata;
static std::map<std::string, RelaySession *> sessions;
public:
static RelaySession * get(Relay *r, lws *wsi);
static RelaySession * getOrCreate(Relay *r, lws *wsi);
static RelaySession * lookup(std::string &name_or_uuid);
RelaySession(Relay *r, Identifier sid);
~RelaySession();
json_t * toJson() const;
Identifier getIdentifier() const
{
return identifier;
}
};
class RelayConnection {
@ -120,6 +129,10 @@ public:
void read(void *in, size_t len);
};
class RelayRequestFactory : public {
};
class Relay : public Tool {
public:
@ -143,6 +156,9 @@ protected:
uuid_t uuid;
Web web;
Api<RelayRequestFactory> api;
/** List of libwebsockets protocols. */
std::vector<lws_protocols> protocols;
@ -163,6 +179,8 @@ protected:
int main();
json_t * toJson();
void handler(int signal, siginfo_t *sinfo, void *ctx)
{
stop = true;