From 3c11acb8b5cb596670ddb1ce4b837c3ab3b50f14 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Sat, 20 Oct 2018 14:20:06 +0200 Subject: [PATCH] cpp: ported web and api --- include/villas/api.h | 87 ---- include/villas/api.hpp | 87 ++++ include/villas/api/action.hpp | 75 ++++ include/villas/api/server.hpp | 74 ++++ include/villas/api/{session.h => session.hpp} | 87 ++-- include/villas/api/sessions/http.hpp | 59 +++ include/villas/api/sessions/socket.hpp | 55 +++ include/villas/api/sessions/websocket.hpp | 56 +++ include/villas/api/sessions/wsi.hpp | 63 +++ include/villas/plugin.h | 4 +- .../villas/{super_node.h => super_node.hpp} | 73 +++- include/villas/{web.h => web.hpp} | 82 ++-- lib/CMakeLists.txt | 10 +- lib/api.c | 380 ------------------ lib/api.cpp | 104 +++++ lib/api/CMakeLists.txt | 27 +- lib/api/{actions/shutdown.c => action.cpp} | 22 +- lib/api/actions/capabiltities.c | 78 ---- lib/api/actions/capabiltities.cpp | 87 ++++ lib/api/actions/config.cpp | 55 +++ lib/api/actions/node.c | 146 ------- lib/api/actions/nodes.c | 73 ---- lib/api/actions/nodes.cpp | 84 ++++ lib/api/actions/{paths.c => paths.cpp} | 66 +-- lib/api/actions/restart.c | 99 ----- lib/api/actions/restart.cpp | 117 ++++++ lib/api/actions/{config.c => shutdown.cpp} | 40 +- lib/api/actions/{status.c => status.cpp} | 47 ++- lib/api/server.cpp | 163 ++++++++ lib/api/session.c | 172 -------- lib/api/session.cpp | 140 +++++++ lib/api/sessions/http.cpp | 198 +++++++++ lib/api/sessions/socket.cpp | 85 ++++ lib/api/sessions/websocket.cpp | 165 ++++++++ lib/api/sessions/wsi.cpp | 72 ++++ lib/nodes/websocket.c | 2 +- lib/super_node.cpp | 349 ++++++++-------- lib/web.c | 359 ----------------- lib/web.cpp | 286 +++++++++++++ 39 files changed, 2455 insertions(+), 1773 deletions(-) delete mode 100644 include/villas/api.h create mode 100644 include/villas/api.hpp create mode 100644 include/villas/api/action.hpp create mode 100644 include/villas/api/server.hpp rename include/villas/api/{session.h => session.hpp} (58%) create mode 100644 include/villas/api/sessions/http.hpp create mode 100644 include/villas/api/sessions/socket.hpp create mode 100644 include/villas/api/sessions/websocket.hpp create mode 100644 include/villas/api/sessions/wsi.hpp rename include/villas/{super_node.h => super_node.hpp} (71%) rename include/villas/{web.h => web.hpp} (51%) delete mode 100644 lib/api.c create mode 100644 lib/api.cpp rename lib/api/{actions/shutdown.c => action.cpp} (71%) delete mode 100644 lib/api/actions/capabiltities.c create mode 100644 lib/api/actions/capabiltities.cpp create mode 100644 lib/api/actions/config.cpp delete mode 100644 lib/api/actions/node.c delete mode 100644 lib/api/actions/nodes.c create mode 100644 lib/api/actions/nodes.cpp rename lib/api/actions/{paths.c => paths.cpp} (53%) delete mode 100644 lib/api/actions/restart.c create mode 100644 lib/api/actions/restart.cpp rename lib/api/actions/{config.c => shutdown.cpp} (68%) rename lib/api/actions/{status.c => status.cpp} (63%) create mode 100644 lib/api/server.cpp delete mode 100644 lib/api/session.c create mode 100644 lib/api/session.cpp create mode 100644 lib/api/sessions/http.cpp create mode 100644 lib/api/sessions/socket.cpp create mode 100644 lib/api/sessions/websocket.cpp create mode 100644 lib/api/sessions/wsi.cpp delete mode 100644 lib/web.c create mode 100644 lib/web.cpp diff --git a/include/villas/api.h b/include/villas/api.h deleted file mode 100644 index bceac7195..000000000 --- a/include/villas/api.h +++ /dev/null @@ -1,87 +0,0 @@ -/** REST-API-releated functions. - * - * @file - * @author Steffen Vogel - * @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC - * @license GNU General Public License (version 3) - * - * VILLASnode - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - *********************************************************************************/ - -#pragma once - -#include -#include -#include - -#include -#include -#include -#include - -#ifdef __cplusplus -extern "C" { -#endif - -/* Forward declarations */ -struct api; -struct api_action; - -/** Callback type of command function - * - * @param[inout] c Command handle - * @param[in] args JSON command arguments. - * @param[out] resp JSON command response. - * @param[in] i Execution context. - */ -typedef int (*api_cb_t)(struct api_action *c, json_t *args, json_t **resp, struct api_session *s); - -struct api { - enum state state; - - struct list sessions; /**< List of currently active connections */ - struct queue_signalled pending; /**< A queue of api_sessions which have pending requests. */ - - pthread_t thread; - - struct super_node *super_node; -}; - -/** API action descriptor */ -struct api_action { - api_cb_t cb; -}; - -/** Initalize the API. - * - * Save references to list of paths / nodes for command execution. - */ -int api_init(struct api *a);//, struct super_node *sn); // @todo: port to C++ - -int api_destroy(struct api *a); - -int api_start(struct api *a); - -int api_stop(struct api *a); - -/** Libwebsockets callback for "api" endpoint */ -int api_ws_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len); - -int api_http_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len); - -#ifdef __cplusplus -} -#endif diff --git a/include/villas/api.hpp b/include/villas/api.hpp new file mode 100644 index 000000000..df8881827 --- /dev/null +++ b/include/villas/api.hpp @@ -0,0 +1,87 @@ +/** REST-API-releated functions. + * + * @file + * @author Steffen Vogel + * @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +#pragma once + +#include +#include + +#include +#include +#include + +#include +#include +#include +#include + +namespace villas { +namespace node { +namespace api { + +/* Forward declarations */ +class Session; + +} // namespace api + +/* Forward declarations */ +class SuperNode; + +class Api { + +protected: + static Logger logger; + + enum state state; + + std::thread thread; + std::atomic running; /**< Atomic flag for signalizing thread termination. */ + + SuperNode *super_node; + api::Server server; + + void run(); + void worker(); + +public: + /** Initalize the API. + * + * Save references to list of paths / nodes for command execution. + */ + Api(SuperNode *sn); + ~Api(); + + void start(); + void stop(); + + SuperNode * getSuperNode() + { + return super_node; + } + + std::list sessions; /**< List of currently active connections */ + villas::QueueSignalled pending; /**< A queue of api_sessions which have pending requests. */ +}; + +} // node +} // villas diff --git a/include/villas/api/action.hpp b/include/villas/api/action.hpp new file mode 100644 index 000000000..e7d2d01ae --- /dev/null +++ b/include/villas/api/action.hpp @@ -0,0 +1,75 @@ +/** REST-API-releated functions. + * + * @file + * @author Steffen Vogel + * @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +#pragma once + +#include + +#include +#include + +namespace villas { +namespace node { +namespace api { + +/* Forward declarations */ +class Session; + +/** API action descriptor */ +class Action { + +protected: + Session *session; + + static Logger logger; + +public: + Action(Session *s) : + session(s) + { } + + virtual int execute(json_t *args, json_t **resp) = 0; +}; + +class ActionFactory : public plugin::Plugin { + +public: + using plugin::Plugin::Plugin; + + virtual Action * make(Session *s) = 0; +}; + +template +class ActionPlugin : public ActionFactory { + +public: + using ActionFactory::ActionFactory; + + virtual Action * make(Session *s) { + return new T(s); + }; +}; + +} // api +} // node +} // villas diff --git a/include/villas/api/server.hpp b/include/villas/api/server.hpp new file mode 100644 index 000000000..0f7d7a709 --- /dev/null +++ b/include/villas/api/server.hpp @@ -0,0 +1,74 @@ +/** Socket API endpoint. + * + * @file + * @author Steffen Vogel + * @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +#pragma once + +#include +#include + +#include + +#include + +namespace villas { +namespace node { + +/* Forward declarations */ +class Api; + +namespace api { +namespace sessions { + +/* Forward declarations */ +class Socket; + +} // namespace sessions + +class Server { + +protected: + enum state state; + + Api *api; + + int sd; + + std::vector pfds; + std::vector sessions; + + void acceptNewSession(); + void closeSession(sessions::Socket *s); + +public: + Server(Api *a); + ~Server(); + + void start(); + void stop(); + + void run(int timeout = 100); +}; + +} // namespace api +} // namespace node +} // namespace villas diff --git a/include/villas/api/session.h b/include/villas/api/session.hpp similarity index 58% rename from include/villas/api/session.h rename to include/villas/api/session.hpp index 6506b774e..c9fc45396 100644 --- a/include/villas/api/session.h +++ b/include/villas/api/session.hpp @@ -26,55 +26,72 @@ #include #include -#include #include -#include +#include +#include -#ifdef __cplusplus -extern "C" { -#endif +namespace villas { +namespace node { -enum api_version { - API_VERSION_UNKOWN = 0, - API_VERSION_1 = 1 -}; +/* Forward declarations */ +class SuperNode; +class Api; -enum api_mode { - API_MODE_WS, /**< This API session was established over a WebSocket connection. */ - API_MODE_HTTP /**< This API session was established via a HTTP REST request. */ -}; +namespace api { /** A connection via HTTP REST or WebSockets to issue API actions. */ -struct api_session { - enum { - API_SESSION_STATE_ESTABLISHED, - API_SESSION_STATE_SHUTDOWN - } state; +class Session { - enum api_version version; - enum api_mode mode; +public: + enum State { + ESTABLISHED, + SHUTDOWN + }; + + enum Version { + UNKOWN = 0, + VERSION_1 = 1 + }; + +protected: + enum State state; + enum Version version; + + static Logger logger; int runs; struct { - struct buffer buffer; + JsonBuffer buffer; struct queue queue; } request, response; - struct lws *wsi; - struct api *api; + Api *api; - char *_name; +public: + Session(Api *a); + + virtual ~Session(); + + int runAction(json_t *req, json_t **resp); + virtual void runPendingActions(); + + virtual std::string getName(); + + int getRuns() + { + return runs; + } + + SuperNode * getSuperNode() + { + return api->getSuperNode(); + } + + virtual void shutdown() + { } }; -int api_session_init(struct api_session *s, enum api_mode m); - -int api_session_destroy(struct api_session *s); - -int api_session_run_action(struct api_session *s, json_t *req, json_t **resp); - -char * api_session_name(struct api_session *s); - -#ifdef __cplusplus -} -#endif +} // api +} // node +} // villas diff --git a/include/villas/api/sessions/http.hpp b/include/villas/api/sessions/http.hpp new file mode 100644 index 000000000..a94f83858 --- /dev/null +++ b/include/villas/api/sessions/http.hpp @@ -0,0 +1,59 @@ +/** HTTP Api session. + * + * @file + * @author Steffen Vogel + * @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +#pragma once + +#include + +extern "C" int api_http_protocol_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len); + +namespace villas { +namespace node { + +/* Forward declarations */ +class SuperNode; +class Api; + +namespace api { +namespace sessions { + +class Http : public Wsi { + +public: + Http(Api *s, lws *w); + + void read(void *in, size_t len); + + int complete(); + + int write(); + + virtual ~Http(); + + virtual std::string getName(); +}; + +} // sessions +} // api +} // node +} // villas diff --git a/include/villas/api/sessions/socket.hpp b/include/villas/api/sessions/socket.hpp new file mode 100644 index 000000000..b7eb2d2d3 --- /dev/null +++ b/include/villas/api/sessions/socket.hpp @@ -0,0 +1,55 @@ +/** Socket Api session. + * + * @file + * @author Steffen Vogel + * @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +#pragma once + +#include + +namespace villas { +namespace node { + +/* Forward declarations */ +class Api; + +namespace api { +namespace sessions { + +class Socket : public Session { + +protected: + int sd; + +public: + Socket(Api *a, int s); + ~Socket(); + + int read(); + int write(); + + virtual std::string getName(); +}; + +} // sessions +} // api +} // node +} // villas diff --git a/include/villas/api/sessions/websocket.hpp b/include/villas/api/sessions/websocket.hpp new file mode 100644 index 000000000..4135b94e5 --- /dev/null +++ b/include/villas/api/sessions/websocket.hpp @@ -0,0 +1,56 @@ +/** WebSockets API session. + * + * @file + * @author Steffen Vogel + * @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +#pragma once + +#include + +extern "C" int api_ws_protocol_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len); + +namespace villas { +namespace node { + +/* Forward declarations */ +class Api; + +namespace api { +namespace sessions { + +class WebSocket : public Wsi { + +public: + WebSocket(Api *a, lws *w); + + virtual ~WebSocket(); + + virtual std::string getName(); + + int read(void *in, size_t len); + + int write(); +}; + +} // sessions +} // api +} // node +} // villas diff --git a/include/villas/api/sessions/wsi.hpp b/include/villas/api/sessions/wsi.hpp new file mode 100644 index 000000000..cef0c7718 --- /dev/null +++ b/include/villas/api/sessions/wsi.hpp @@ -0,0 +1,63 @@ +/** LWS Api session. + * + * @file + * @author Steffen Vogel + * @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +#pragma once + +#include + +#include +#include + +/* Forward declarations */ +struct lws; + +namespace villas { +namespace node { + +/* Forward declarations */ +class Api; + +namespace api { +namespace sessions { + +class Wsi : public Session { + +protected: + lws *wsi; + + Web *web; + +public: + Wsi(Api *a, lws *w); + + virtual void runPendingActions(); + + virtual std::string getName(); + + virtual void shutdown(); +}; + +} // sessions +} // api +} // node +} // villas diff --git a/include/villas/plugin.h b/include/villas/plugin.h index 15d651aa2..cdf6ef17b 100644 --- a/include/villas/plugin.h +++ b/include/villas/plugin.h @@ -25,7 +25,6 @@ #include #include -#include #include #include #include @@ -83,13 +82,12 @@ struct plugin { struct format_type format; struct node_type node; struct hook_type hook; - struct api_action api; struct cbuilder_model cb; }; }; /** Return a pointer to the plugin structure */ -#define plugin(vt) ((struct plugin *) ((char *) (vt) - offsetof(struct plugin, api))) +#define plugin(vt) ((struct plugin *) ((char *) (vt) - offsetof(struct plugin, format))) #define plugin_name(vt) plugin(vt)->name #define plugin_description(vt) plugin(vt)->description diff --git a/include/villas/super_node.h b/include/villas/super_node.hpp similarity index 71% rename from include/villas/super_node.h rename to include/villas/super_node.hpp index 86abcf6e1..74141898b 100644 --- a/include/villas/super_node.h +++ b/include/villas/super_node.hpp @@ -24,10 +24,11 @@ #pragma once #include -#include -#include -#include +#include +#include +#include #include +#include #include namespace villas { @@ -44,17 +45,24 @@ protected: int hugepages; /**< Number of hugepages to reserve. */ double stats; /**< Interval for path statistics. Set to 0 to disable them. */ + static Logger logger; + struct list nodes; struct list paths; struct list plugins; - struct log log; - struct api api; - struct web web; +#ifdef WITH_API + Api api; +#endif - char *name; /**< A name of this super node. Usually the hostname. */ +#ifdef WITH_WEB + Web web; +#endif - char *uri; /**< URI of configuration */ + struct task task; /**< Task for periodic stats output */ + + std::string name; /**< A name of this super node. Usually the hostname. */ + std::string uri; /**< URI of configuration */ json_t *json; /**< JSON representation of the configuration. */ @@ -65,7 +73,7 @@ public: int init(); /** Wrapper for super_node_parse() */ - int parseUri(const char *uri); + int parseUri(const std::string &name); /** Parse super-node configuration. * @@ -86,15 +94,48 @@ public: /** Run periodic hooks of this super node. */ int periodic(); - struct node * getNode(const char *name) { return (struct node *) list_lookup(&nodes, name); } + struct node * getNode(const std::string &name) + { + return (struct node *) list_lookup(&nodes, name.c_str()); + } - struct list * getNodes() { return &nodes; } - struct list * getPaths() { return &paths; } - struct web * getWeb() { return &web; } - struct api * getApi() { return &api; } - struct log * getLog() { return &log; } + struct list * getNodes() + { + return &nodes; + } - /** Desctroy configuration object. */ + struct list * getPaths() { + return &paths; + } + +#ifdef WITH_API + Api * getApi() { + return &api; + } +#endif + +#ifdef WITH_WEB + Web * getWeb() { + return &web; + } +#endif + + json_t * getConfig() + { + return json; + } + + std::string getConfigUri() + { + return uri; + } + + std::string getName() + { + return name; + } + + /** Destroy configuration object. */ ~SuperNode(); }; diff --git a/include/villas/web.h b/include/villas/web.hpp similarity index 51% rename from include/villas/web.h rename to include/villas/web.hpp index f5a4cf33d..f20109ceb 100644 --- a/include/villas/web.h +++ b/include/villas/web.hpp @@ -23,55 +23,67 @@ #pragma once -#include +#include +#include #include +#include #include -#include +#include -#ifdef __cplusplus -extern "C" { -#endif +namespace villas { +namespace node { /* Forward declarations */ -struct api; +class Api; -struct web { - struct api *api; +class Web { +protected: enum state state; - struct lws_context *context; /**< The libwebsockets server context. */ - struct lws_vhost *vhost; /**< The libwebsockets vhost. */ + static Logger logger; - struct queue writables; /**< Queue of WSIs for which we will call lws_callback_on_writable() */ + lws_context *context; /**< The libwebsockets server context. */ + lws_vhost *vhost; /**< The libwebsockets vhost. */ + + Queue writables; /**< Queue of WSIs for which we will call lws_callback_on_writable() */ int port; /**< Port of the build in HTTP / WebSocket server. */ - char *htdocs; /**< The root directory for files served via HTTP. */ - char *ssl_cert; /**< Path to the SSL certitifcate for HTTPS / WSS. */ - char *ssl_private_key; /**< Path to the SSL private key for HTTPS / WSS. */ + std::string htdocs; /**< The root directory for files served via HTTP. */ + std::string ssl_cert; /**< Path to the SSL certitifcate for HTTPS / WSS. */ + std::string ssl_private_key; /**< Path to the SSL private key for HTTPS / WSS. */ - pthread_t thread; + std::thread thread; + std::atomic running; /**< Atomic flag for signalizing thread termination. */ + + Api *api; + + void worker(); + static void lwsLogger(int level, const char *msg); + +public: + + /** Initialize the web interface. + * + * The web interface is based on the libwebsockets library. + */ + Web(Api *a); + + void start(); + void stop(); + + /** Parse HTTPd and WebSocket related options */ + int parse(json_t *cfg); + + Api * getApi() + { + return api; + } + + void callbackOnWritable(struct lws *wsi); }; -/** Initialize the web interface. - * - * The web interface is based on the libwebsockets library. - */ -int web_init(struct web *w, struct api *a); - -int web_destroy(struct web *w); - -int web_start(struct web *w); - -int web_stop(struct web *w); - -/** Parse HTTPd and WebSocket related options */ -int web_parse(struct web *w, json_t *cfg); - -void web_callback_on_writable(struct lws *wsi); - -#ifdef __cplusplus -} -#endif +} // namespace node +} // namespace villas diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index 78bbe8033..fb163501e 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -55,11 +55,6 @@ set(LIB_SRC queue_signalled.c ) - -if(ARCH STREQUAL "x86_64") - list(APPEND LIB_SRC tsc.c) -endif() - if(IBVERBS_FOUND AND RDMACM_FOUND) list(APPEND LIB_SRC memory/ib.c) endif() @@ -94,7 +89,7 @@ endif() if(WITH_WEB) list(APPEND LIB_SRC - web.c + web.cpp ) list(APPEND INCLUDE_DIRS ${LIBWEBSOCKETS_INCLUDE_DIRS}) @@ -103,7 +98,7 @@ endif() if(WITH_API AND WITH_WEB) list(APPEND LIB_SRC - api.c + api.cpp ) add_subdirectory(api) @@ -134,7 +129,6 @@ else() target_link_libraries(villas PRIVATE -Wl,--whole-archive ${WHOLE_ARCHIVES} -Wl,--no-whole-archive) endif() - set_target_properties(villas PROPERTIES VERSION ${CMAKE_PROJECT_VERSION} SOVERSION 1 diff --git a/lib/api.c b/lib/api.c deleted file mode 100644 index b6f7013bb..000000000 --- a/lib/api.c +++ /dev/null @@ -1,380 +0,0 @@ -/** REST-API-releated functions. - * - * @author Steffen Vogel - * @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC - * @license GNU General Public License (version 3) - * - * VILLASnode - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - *********************************************************************************/ - -#include -#include -#include - -#include -#include -#include -#include -#include -#include - -/* Forward declarations */ -static void * api_worker(void *ctx); - -int api_ws_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) -{ - int ret, pulled, pushed; - json_t *req, *resp; - - struct web *w = lws_context_user(lws_get_context(wsi)); - struct api_session *s = (struct api_session *) user; - - switch (reason) { - case LWS_CALLBACK_ESTABLISHED: - if (w->api == NULL) { - lws_close_reason(wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, (unsigned char *) "API disabled", strlen("API disabled")); - return -1; - } - - /* Parse request URI */ - char uri[64]; - lws_hdr_copy(wsi, uri, sizeof(uri), WSI_TOKEN_GET_URI); - - ret = sscanf(uri, "/v%d", (int *) &s->version); - if (ret != 1) - return -1; - - ret = api_session_init(s, API_MODE_WS); - if (ret) - return -1; - - s->state = API_SESSION_STATE_ESTABLISHED; - s->wsi = wsi; - s->api = w->api; - - list_push(&s->api->sessions, s); - - debug(LOG_API, "Initiated API session: %s", api_session_name(s)); - - break; - - case LWS_CALLBACK_CLOSED: - debug(LOG_API, "Closed API session: %s, runs=%d", api_session_name(s), s->runs); - - ret = api_session_destroy(s); - if (ret) - return -1; - - if (w->api->sessions.state == STATE_INITIALIZED) - list_remove(&w->api->sessions, s); - - break; - - case LWS_CALLBACK_RECEIVE: - if (lws_is_first_fragment(wsi)) - buffer_clear(&s->request.buffer); - - buffer_append(&s->request.buffer, in, len); - - if (lws_is_final_fragment(wsi)) { - ret = buffer_parse_json(&s->request.buffer, &req); - if (ret) - break; - - pushed = queue_push(&s->request.queue, req); - if (pushed != 1) - warn("Queue overun in API session"); - - pushed = queue_signalled_push(&w->api->pending, s); - if (pushed != 1) - warn("Queue overrun in API"); - } - - break; - - case LWS_CALLBACK_SERVER_WRITEABLE: - if (s->state == API_SESSION_STATE_SHUTDOWN) - return -1; - - pulled = queue_pull(&s->response.queue, (void **) &resp); - if (pulled) { - char pad[LWS_PRE]; - - buffer_clear(&s->response.buffer); - buffer_append(&s->response.buffer, pad, sizeof(pad)); - buffer_append_json(&s->response.buffer, resp); - - json_decref(resp); - - lws_write(wsi, (unsigned char *) s->response.buffer.buf + LWS_PRE, s->response.buffer.len - LWS_PRE, LWS_WRITE_TEXT); - } - break; - - default: - return 0; - } - - return 0; -} - -int api_http_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) -{ - int ret, pulled, pushed, hdrlen; - char *uri; - json_t *resp, *req; - - struct web *w = lws_context_user(lws_get_context(wsi)); - struct api_session *s = (struct api_session *) user; - - switch (reason) { - case LWS_CALLBACK_HTTP_BIND_PROTOCOL: - if (w->api == NULL) - return -1; - - int options; - if ((hdrlen = lws_hdr_total_length(wsi, WSI_TOKEN_OPTIONS_URI))) - options = 1; - else if ((hdrlen = lws_hdr_total_length(wsi, WSI_TOKEN_POST_URI))) - options = 0; - else - return -1; - - uri = malloc(hdrlen + 1); - lws_hdr_copy(wsi, uri, hdrlen + 1, options ? WSI_TOKEN_OPTIONS_URI : WSI_TOKEN_POST_URI); - - /* Parse request URI */ - ret = sscanf(uri, "/api/v%d", (int *) &s->version); - if (ret != 1) - return -1; - - free(uri); - - ret = api_session_init(s, API_MODE_HTTP); - if (ret) - return -1; - - s->state = API_SESSION_STATE_ESTABLISHED; - s->wsi = wsi; - s->api = w->api; - - list_push(&s->api->sessions, s); - - debug(LOG_API, "Initiated API session: %s", api_session_name(s)); - - if (options) - lws_callback_on_writable(wsi); - - break; - - case LWS_CALLBACK_HTTP_DROP_PROTOCOL: - if (!s) - return -1; - - debug(LOG_API, "Closed API session: %s, runs=%d", api_session_name(s), s->runs); - - ret = api_session_destroy(s); - if (ret) - return -1; - - if (w->api->sessions.state == STATE_INITIALIZED) - list_remove(&w->api->sessions, s); - - return 1; - - case LWS_CALLBACK_HTTP_BODY: - buffer_append(&s->request.buffer, in, len); - - break; - - case LWS_CALLBACK_HTTP_BODY_COMPLETION: - ret = buffer_parse_json(&s->request.buffer, &req); - if (ret) - break; - - buffer_clear(&s->request.buffer); - - pushed = queue_push(&s->request.queue, req); - if (pushed != 1) - warn("Queue overrun for API session: %s", api_session_name(s)); - - pushed = queue_signalled_push(&w->api->pending, s); - if (pushed != 1) - warn("Queue overrun for API"); - - break; - - case LWS_CALLBACK_HTTP_WRITEABLE: - pulled = queue_pull(&s->response.queue, (void **) &resp); - if (pulled) { - buffer_clear(&s->response.buffer); - buffer_append_json(&s->response.buffer, resp); - - json_decref(resp); - } - - char headers[1024]; - snprintf(headers, sizeof(headers), - "HTTP/1.1 200 OK\r\n" - "Content-type: application/json\r\n" - "User-agent: " USER_AGENT "\r\n" - "Access-Control-Allow-Origin: *\r\n" - "Access-Control-Allow-Methods: GET, POST, OPTIONS\r\n" - "Access-Control-Allow-Headers: Content-Type\r\n" - "Access-Control-Max-Age: 86400\r\n" - "Content-Length: %zu\r\n" - "\r\n", - s->response.buffer.len - ); - - ret = lws_write(wsi, (unsigned char *) headers, strlen(headers), LWS_WRITE_HTTP_HEADERS); - if (ret < 0) - return 1; - - ret = lws_write(wsi, (unsigned char *) s->response.buffer.buf, s->response.buffer.len, LWS_WRITE_HTTP); - if (ret < 0) - return 1; - - goto try_to_reuse; - - default: - break; - } - - return 0; - -try_to_reuse: - if (lws_http_transaction_completed(wsi)) - return -1; - - return 0; -} - -int api_init(struct api *a)//, struct super_node *sn) // @todo: port to C++ -{ - int ret; - - info("Initialize API sub-system"); - - ret = list_init(&a->sessions); - if (ret) - return ret; - - ret = queue_signalled_init(&a->pending, 1024, &memory_heap, 0); - if (ret) - return ret; - - a->super_node = NULL; //sn; // @todo: port to C++ - a->state = STATE_INITIALIZED; - - return 0; -} - -int api_destroy(struct api *a) -{ - int ret; - - assert(a->state != STATE_STARTED); - - ret = queue_signalled_destroy(&a->pending); - if (ret) - return ret; - - a->state = STATE_DESTROYED; - - return 0; -} - -int api_start(struct api *a) -{ - int ret; - - info("Starting API sub-system"); - - ret = pthread_create(&a->thread, NULL, api_worker, a); - if (ret) - error("Failed to start API worker thread"); - - a->state = STATE_STARTED; - - return 0; -} - -int api_stop(struct api *a) -{ - int ret; - - info("Stopping API sub-system"); - - if (a->state != STATE_STARTED) - return 0; - - for (int i = 0; i < list_length(&a->sessions); i++) { - struct api_session *s = (struct api_session *) list_at(&a->sessions, i); - - s->state = API_SESSION_STATE_SHUTDOWN; - - web_callback_on_writable(s->wsi); - } - - for (int i = 0; i < 10 && list_length(&a->sessions) > 0; i++) { - info("Wait for API sessions to close"); - usleep(1 * 1e6); - } - - ret = list_destroy(&a->sessions, (dtor_cb_t) api_session_destroy, false); - if (ret) - return ret; - - ret = pthread_cancel(a->thread); - if (ret) - serror("Failed to cancel API worker thread"); - - ret = pthread_join(a->thread, NULL); - if (ret) - serror("Failed to join API worker thread"); - - a->state = STATE_STOPPED; - - return 0; -} - -static void * api_worker(void *ctx) -{ - int pulled; - - struct api *a = ctx; - struct api_session *s; - - json_t *req, *resp; - - for (;;) { - pulled = queue_signalled_pull(&a->pending, (void **) &s); - if (pulled != 1) - continue; - - queue_pull(&s->request.queue, (void **) &req); - - api_session_run_action(s, req, &resp); - - json_decref(req); - - queue_push(&s->response.queue, resp); - - web_callback_on_writable(s->wsi); - } - - return NULL; -} diff --git a/lib/api.cpp b/lib/api.cpp new file mode 100644 index 000000000..bdf5f6d1a --- /dev/null +++ b/lib/api.cpp @@ -0,0 +1,104 @@ +/** REST-API-releated functions. + * + * @author Steffen Vogel + * @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +#include +#include +#include +#include +#include +#include +#include + +using namespace villas; +using namespace villas::node; +using namespace villas::node::api; + +Logger Api::logger = logging.get("api"); + +Api::Api(SuperNode *sn) : + state(STATE_INITIALIZED), + super_node(sn), + server(this) +{ } + +Api::~Api() +{ + assert(state != STATE_STARTED); +} + +void Api::start() +{ + assert(state != STATE_STARTED); + + logger->info("Starting sub-system"); + + server.start(); + + running = true; + thread = std::thread(&Api::worker, this); + + state = STATE_STARTED; +} + +void Api::stop() +{ + assert(state == STATE_STARTED); + + logger->info("Stopping sub-system"); + + for (Session *s : sessions) + s->shutdown(); + + for (int i = 0; i < 10 && sessions.size() > 0; i++) { + logger->info("Waiting for {} sessions to terminate", sessions.size()); + usleep(1 * 1e6); + } + + running = false; + pending.push(nullptr); /* unblock thread */ + thread.join(); + + server.stop(); + + state = STATE_STOPPED; +} + +void Api::run() +{ + if (pending.empty()) + return; + + /* Process pending actions */ + Session *s = pending.pop(); + if (s) + s->runPendingActions(); +} + +void Api::worker() +{ + while (running) { + run(); + server.run(); + } + + logger->info("Stopping worker"); +} diff --git a/lib/api/CMakeLists.txt b/lib/api/CMakeLists.txt index 2e093421d..664804a5c 100644 --- a/lib/api/CMakeLists.txt +++ b/lib/api/CMakeLists.txt @@ -22,16 +22,23 @@ ################################################################################### set(API_SRC - session.c - actions/capabiltities.c - actions/shutdown.c - actions/status.c -# @todo: port to C++ -# actions/node.c -# actions/config.c -# actions/nodes.c -# actions/paths.c -# actions/restart.c + session.cpp + action.cpp + server.cpp + + sessions/socket.cpp + sessions/wsi.cpp + sessions/http.cpp + sessions/websocket.cpp + + actions/capabiltities.cpp + actions/shutdown.cpp + actions/status.cpp + actions/config.cpp + actions/nodes.cpp + actions/paths.cpp + actions/restart.cpp + actions/node.cpp ) add_library(api STATIC ${API_SRC}) diff --git a/lib/api/actions/shutdown.c b/lib/api/action.cpp similarity index 71% rename from lib/api/actions/shutdown.c rename to lib/api/action.cpp index 0fff3e214..af643d78d 100644 --- a/lib/api/actions/shutdown.c +++ b/lib/api/action.cpp @@ -1,4 +1,4 @@ -/** The "shutdown" API action. +/** API session. * * @author Steffen Vogel * @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC @@ -20,21 +20,9 @@ * along with this program. If not, see . *********************************************************************************/ -#include -#include +#include -static int api_shutdown(struct api_action *h, json_t *args, json_t **resp, struct api_session *s) -{ - killme(SIGTERM); +using namespace villas; +using namespace villas::node::api; - return 0; -} - -static struct plugin p = { - .name = "shutdown", - .description = "quit VILLASnode", - .type = PLUGIN_TYPE_API, - .api.cb = api_shutdown -}; - -REGISTER_PLUGIN(&p) +Logger Action::logger = logging.get("api:action"); diff --git a/lib/api/actions/capabiltities.c b/lib/api/actions/capabiltities.c deleted file mode 100644 index 10cd91826..000000000 --- a/lib/api/actions/capabiltities.c +++ /dev/null @@ -1,78 +0,0 @@ -/** The "capabiltities" API ressource. - * - * @author Steffen Vogel - * @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC - * @license GNU General Public License (version 3) - * - * VILLASnode - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - *********************************************************************************/ - -#include -#include - -static int api_capabilities(struct api_action *h, json_t *args, json_t **resp, struct api_session *s) -{ - json_t *json_hooks = json_array(); - json_t *json_apis = json_array(); - json_t *json_nodes = json_array(); - json_t *json_ios = json_array(); - json_t *json_name; - - for (size_t i = 0; i < list_length(&plugins); i++) { - struct plugin *p = (struct plugin *) list_at(&plugins, i); - - json_name = json_string(p->name); - - switch (p->type) { - case PLUGIN_TYPE_NODE: - json_array_append_new(json_nodes, json_name); - break; - - case PLUGIN_TYPE_HOOK: - json_array_append_new(json_hooks, json_name); - break; - - case PLUGIN_TYPE_API: - json_array_append_new(json_apis, json_name); - break; - - case PLUGIN_TYPE_FORMAT: - json_array_append_new(json_ios, json_name); - break; - - default: - json_decref(json_name); - } - } - - *resp = json_pack("{ s: s, s: o, s: o, s: o, s: o }", - "build", BUILDID, - "hooks", json_hooks, - "node-types", json_nodes, - "apis", json_apis, - "formats", json_ios); - - return 0; -} - -static struct plugin p = { - .name = "capabilities", - .description = "get capabiltities and details about this VILLASnode instance", - .type = PLUGIN_TYPE_API, - .api.cb = api_capabilities -}; - -REGISTER_PLUGIN(&p) diff --git a/lib/api/actions/capabiltities.cpp b/lib/api/actions/capabiltities.cpp new file mode 100644 index 000000000..55834bbaa --- /dev/null +++ b/lib/api/actions/capabiltities.cpp @@ -0,0 +1,87 @@ +/** The "capabiltities" API ressource. + * + * @author Steffen Vogel + * @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +#include +#include + +namespace villas { +namespace node { +namespace api { + +class CapabilitiesAction : public Action { + +public: + using Action::Action; + virtual int execute(json_t *args, json_t **resp) + { + json_t *json_hooks = json_array(); + json_t *json_apis = json_array(); + json_t *json_nodes = json_array(); + json_t *json_formats = json_array(); + json_t *json_name; + + for (auto f : plugin::Registry::lookup()) { + json_name = json_string(f->getName().c_str()); + + json_array_append_new(json_apis, json_name); + } + +#if 0 // @todo Port to C++ + for (auto f : NodeFactory::lookup()) { + json_name = json_string(f->getName().c_str()); + + json_array_append_new(json_nodes, json_name); + } + + for (auto f : HookFactory::lookup()) { + json_name = json_string(f->getName().c_str()); + + json_array_append_new(json_hooks, json_name); + } + + for (auto f : FormatFactory::lookup()) { + json_name = json_string(f->getName().c_str()); + + json_array_append_new(json_formats, json_name); + } +#endif + + *resp = json_pack("{ s: s, s: o, s: o, s: o, s: o }", + "build", PROJECT_BUILD_ID, + "hooks", json_hooks, + "node-types", json_nodes, + "apis", json_apis, + "formats", json_formats); + + return 0; + } +}; + +/* Register action */ +static ActionPlugin p( + "capabilities", + "get capabiltities and details about this VILLASnode instance" +); + +} // api +} // node +} // villas diff --git a/lib/api/actions/config.cpp b/lib/api/actions/config.cpp new file mode 100644 index 000000000..c79a7d34d --- /dev/null +++ b/lib/api/actions/config.cpp @@ -0,0 +1,55 @@ +/** The "config" API ressource. + * + * @author Steffen Vogel + * @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +#include +#include +#include + +namespace villas { +namespace node { +namespace api { + +class ConfigAction : public Action { + +public: + using Action::Action; + virtual int execute(json_t *args, json_t **resp) + { + json_t *cfg = session->getSuperNode()->getConfig(); + + *resp = cfg + ? json_incref(cfg) + : json_object(); + + return 0; + } +}; + +/* Register action */ +static ActionPlugin p( + "config", + "get configuration of this VILLASnode instance" +); + +} // api +} // node +} // villas diff --git a/lib/api/actions/node.c b/lib/api/actions/node.c deleted file mode 100644 index 54f5b9355..000000000 --- a/lib/api/actions/node.c +++ /dev/null @@ -1,146 +0,0 @@ -/** The API ressource for start/stop/pause/resume nodes. - * - * @author Steffen Vogel - * @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC - * @license GNU General Public License (version 3) - * - * VILLASnode - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - *********************************************************************************/ - -#include - -#include -#include -#include -#include -#include - -#include - -enum api_control_action { - API_NODE_START, - API_NODE_STOP, - API_NODE_PAUSE, - API_NODE_RESUME, - API_NODE_RESTART -}; - -static int api_control(enum api_control_action act, struct api_action *r, json_t *args, json_t **resp, struct api_session *s) -{ - int ret; - json_error_t err; - const char *node_str; - - ret = json_unpack_ex(args, &err, 0, "{ s: s }", - "node", &node_str - ); - if (ret) - return ret; - - struct list *nodes = &s->api->super_node->nodes; - struct node *node = list_lookup(nodes, node_str); - - if (!node) - return -1; - - switch (act) { - case API_NODE_START: - return node_start(node); - - case API_NODE_STOP: - return node_stop(node); - - case API_NODE_PAUSE: - return node_pause(node); - - case API_NODE_RESUME: - return node_resume(node); - - case API_NODE_RESTART: - return node_restart(node); - - default: - return -1; - } - - return 0; -} - -static int api_control_start(struct api_action *r, json_t *args, json_t **resp, struct api_session *s) -{ - return api_control(API_NODE_START, r, args, resp, s); -} - -static int api_control_stop(struct api_action *r, json_t *args, json_t **resp, struct api_session *s) -{ - return api_control(API_NODE_STOP, r, args, resp, s); -} - -static int api_control_pause(struct api_action *r, json_t *args, json_t **resp, struct api_session *s) -{ - return api_control(API_NODE_PAUSE, r, args, resp, s); -} - -static int api_control_resume(struct api_action *r, json_t *args, json_t **resp, struct api_session *s) -{ - return api_control(API_NODE_RESUME, r, args, resp, s); -} - -static int api_control_restart(struct api_action *r, json_t *args, json_t **resp, struct api_session *s) -{ - return api_control(API_NODE_RESTART, r, args, resp, s); -} - -static struct plugin p1 = { - .name = "node.start", - .description = "start a node", - .type = PLUGIN_TYPE_API, - .api.cb = api_control_start -}; - -static struct plugin p2 = { - .name = "node.stop", - .description = "stop a node", - .type = PLUGIN_TYPE_API, - .api.cb = api_control_stop -}; - -static struct plugin p3 = { - .name = "node.pause", - .description = "pause a node", - .type = PLUGIN_TYPE_API, - .api.cb = api_control_pause -}; - -static struct plugin p4 = { - .name = "node.resume", - .description = "resume a node", - .type = PLUGIN_TYPE_API, - .api.cb = api_control_resume -}; - -static struct plugin p5 = { - .name = "node.restart", - .description = "restart a node", - .type = PLUGIN_TYPE_API, - .api.cb = api_control_restart -}; - -REGISTER_PLUGIN(&p1) -REGISTER_PLUGIN(&p2) -REGISTER_PLUGIN(&p3) -REGISTER_PLUGIN(&p4) -REGISTER_PLUGIN(&p5) diff --git a/lib/api/actions/nodes.c b/lib/api/actions/nodes.c deleted file mode 100644 index d4321c995..000000000 --- a/lib/api/actions/nodes.c +++ /dev/null @@ -1,73 +0,0 @@ -/** The "nodes" API ressource. - * - * @author Steffen Vogel - * @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC - * @license GNU General Public License (version 3) - * - * VILLASnode - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - *********************************************************************************/ - -#include - -#include -#include -#include -#include -#include - -#include - -static int api_nodes(struct api_action *r, json_t *args, json_t **resp, struct api_session *s) -{ - json_t *json_nodes = json_array(); - - for (size_t i = 0; i < list_length(&s->api->super_node->nodes); i++) { - struct node *n = (struct node *) list_at(&s->api->super_node->nodes, i); - - json_t *json_node = json_pack("{ s: s, s: s, s: i, s: { s: i }, s: { s: i } }", - "name", node_name_short(n), - "state", state_print(n->state), - "affinity", n->affinity, - "in", - "vectorize", n->in.vectorize, - "out", - "vectorize", n->out.vectorize - - ); - - if (n->stats) - json_object_set_new(json_node, "stats", stats_json(n->stats)); - - /* Add all additional fields of node here. - * This can be used for metadata */ - json_object_update(json_node, n->cfg); - - json_array_append_new(json_nodes, json_node); - } - - *resp = json_nodes; - - return 0; -} - -static struct plugin p = { - .name = "nodes", - .description = "retrieve list of all known nodes", - .type = PLUGIN_TYPE_API, - .api.cb = api_nodes -}; - -REGISTER_PLUGIN(&p) diff --git a/lib/api/actions/nodes.cpp b/lib/api/actions/nodes.cpp new file mode 100644 index 000000000..b1fb1cc92 --- /dev/null +++ b/lib/api/actions/nodes.cpp @@ -0,0 +1,84 @@ +/** The "nodes" API ressource. + * + * @author Steffen Vogel + * @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +#include + +#include +#include +#include +#include +#include +#include + +namespace villas { +namespace node { +namespace api { + +class NodesAction : public Action { + +public: + using Action::Action; + virtual int execute(json_t *args, json_t **resp) + { + json_t *json_nodes = json_array(); + + struct list *nodes = session->getSuperNode()->getNodes(); + + for (size_t i = 0; i < list_length(nodes); i++) { + struct node *n = (struct node *) list_at(nodes, i); + + json_t *json_node = json_pack("{ s: s, s: s, s: i, s: { s: i }, s: { s: i } }", + "name", node_name_short(n), + "state", state_print(n->state), + "affinity", n->affinity, + "in", + "vectorize", n->in.vectorize, + "out", + "vectorize", n->out.vectorize + ); + + if (n->stats) + json_object_set_new(json_node, "stats", stats_json(n->stats)); + + /* Add all additional fields of node here. + * This can be used for metadata */ + json_object_update(json_node, n->cfg); + + json_array_append_new(json_nodes, json_node); + } + + *resp = json_nodes; + + return 0; + } +}; + +/* Register action */ +static ActionPlugin p( + "nodes", + "retrieve list of all known nodes" +); + +} // api +} // node +} // villas + diff --git a/lib/api/actions/paths.c b/lib/api/actions/paths.cpp similarity index 53% rename from lib/api/actions/paths.c rename to lib/api/actions/paths.cpp index ab3b15e6c..af5a2ee8c 100644 --- a/lib/api/actions/paths.c +++ b/lib/api/actions/paths.cpp @@ -22,40 +22,52 @@ #include -#include +#include #include #include -#include -#include +#include +#include -static int api_paths(struct api_action *r, json_t *args, json_t **resp, struct api_session *s) -{ - json_t *json_paths = json_array(); +namespace villas { +namespace node { +namespace api { - for (size_t i = 0; i < list_length(&s->api->super_node->paths); i++) { - struct path *p = (struct path *) list_at(&s->api->super_node->paths, i); +class PathsAction : public Action { - json_t *json_path = json_pack("{ s: i }", - "state", p->state - ); +public: + using Action::Action; + virtual int execute(json_t *args, json_t **resp) + { + json_t *json_paths = json_array(); - /* Add all additional fields of node here. - * This can be used for metadata */ - json_object_update(json_path, p->cfg); + struct list *paths = session->getSuperNode()->getPaths(); - json_array_append_new(json_paths, json_path); + for (size_t i = 0; i < list_length(paths); i++) { + struct path *p = (struct path *) list_at(paths, i); + + json_t *json_path = json_pack("{ s: i }", + "state", p->state + ); + + /* Add all additional fields of node here. + * This can be used for metadata */ + json_object_update(json_path, p->cfg); + + json_array_append_new(json_paths, json_path); + } + + *resp = json_paths; + + return 0; } - - *resp = json_paths; - - return 0; -} - -static struct plugin p = { - .name = "paths", - .description = "retrieve list of all paths with details", - .type = PLUGIN_TYPE_API, - .api.cb = api_paths }; -REGISTER_PLUGIN(&p) +/* Register action */ +static ActionPlugin p( + "paths", + "retrieve list of all paths with details" +); + +} // api +} // node +} // villas diff --git a/lib/api/actions/restart.c b/lib/api/actions/restart.c deleted file mode 100644 index f4415dd65..000000000 --- a/lib/api/actions/restart.c +++ /dev/null @@ -1,99 +0,0 @@ -/** The "restart" API action. - * - * @author Steffen Vogel - * @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC - * @license GNU General Public License (version 3) - * - * VILLASnode - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - *********************************************************************************/ - -#include - -#include -#include -#include - -#include - -static char *config; - -void api_restart_handler() -{ - int ret; - - char *argv[] = { "villas-node", config, NULL }; - - ret = execvp("/proc/self/exe", argv); - if (ret) - serror("Failed to restart"); -} - -static int api_restart(struct api_action *h, json_t *args, json_t **resp, struct api_session *s) -{ - int ret; - json_error_t err; - - char *cfg = NULL; - - if (args) { - ret = json_unpack_ex(args, &err, 0, "{ s?: s }", "config", &cfg); - if (ret < 0) { - *resp = json_string("failed to parse request"); - return -1; - } - } - - /* If no config is provided via request, we will use the previous one */ - if (!cfg) - cfg = s->api->super_node->uri; - - config = strdup(cfg); - - info("restarting to %s", config); - - /* Increment API restart counter */ - char *scnt = getenv("VILLAS_API_RESTART_COUNT"); - int cnt = scnt ? atoi(scnt) : 0; - char buf[32]; - snprintf(buf, sizeof(buf), "%d", cnt + 1); - - /* We pass some env variables to the new process */ - setenv("VILLAS_API_RESTART_COUNT", buf, 1); - - *resp = json_pack("{ s: i, s: s }", - "restarts", cnt, - "config", config - ); - - /* Register exit handler */ - ret = atexit(api_restart_handler); - if (ret) - return 0; - - /* Properly terminate current instance */ - killme(SIGTERM); - - return 0; -} - -static struct plugin p = { - .name = "restart", - .description = "restart VILLASnode with new configuration", - .type = PLUGIN_TYPE_API, - .api.cb = api_restart -}; - -REGISTER_PLUGIN(&p) diff --git a/lib/api/actions/restart.cpp b/lib/api/actions/restart.cpp new file mode 100644 index 000000000..65bda6d01 --- /dev/null +++ b/lib/api/actions/restart.cpp @@ -0,0 +1,117 @@ +/** The "restart" API action. + * + * @author Steffen Vogel + * @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +#include +#include +#include +#include +#include +#include + +namespace villas { +namespace node { +namespace api { + +class RestartAction : public Action { + +protected: + static std::string configUri; + + static void handler() + { + int ret; + const char *cfg = !configUri.empty() + ? configUri.c_str() + : nullptr; + + const char *argv[] = { "villas-node", cfg, nullptr }; + + logger->info("Restart instance: config={}", cfg); + ret = execvp("/proc/self/exe", (char **) argv); + if (ret) + throw new SystemError("Failed to restart"); + } + +public: + using Action::Action; + virtual int execute(json_t *args, json_t **resp) + { + int ret; + json_error_t err; + + const char *cfg = nullptr; + + if (args) { + ret = json_unpack_ex(args, &err, 0, "{ s?: s }", "config", &cfg); + if (ret < 0) { + *resp = json_string("failed to parse request"); + return -1; + } + } + + /* If no config is provided via request, we will use the previous one */ + configUri = cfg + ? cfg + : session->getSuperNode()->getConfigUri(); + + logger->info("Restarting to %s", configUri.c_str()); + + /* Increment API restart counter */ + char *scnt = getenv("VILLAS_API_RESTART_COUNT"); + int cnt = scnt ? atoi(scnt) : 0; + char buf[32]; + snprintf(buf, sizeof(buf), "%d", cnt + 1); + + /* We pass some env variables to the new process */ + setenv("VILLAS_API_RESTART_COUNT", buf, 1); + + *resp = json_pack("{ s: i, s: o }", + "restarts", cnt, + "config", configUri.empty() + ? json_null() + : json_string(configUri.c_str()) + ); + + /* Register exit handler */ + ret = atexit(handler); + if (ret) + return 0; + + /* Properly terminate current instance */ + killme(SIGTERM); + + return 0; + } +}; + +std::string RestartAction::configUri; + +/* Register action */ +static ActionPlugin p( + "restart", + "restart VILLASnode with new configuration" +); + +} // api +} // node +} // villas + diff --git a/lib/api/actions/config.c b/lib/api/actions/shutdown.cpp similarity index 68% rename from lib/api/actions/config.c rename to lib/api/actions/shutdown.cpp index 278b10963..55f3b6666 100644 --- a/lib/api/actions/config.c +++ b/lib/api/actions/shutdown.cpp @@ -1,4 +1,4 @@ -/** The "config" API ressource. +/** The "shutdown" API action. * * @author Steffen Vogel * @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC @@ -20,23 +20,33 @@ * along with this program. If not, see . *********************************************************************************/ -#include +#include + #include -#include -#include +#include -static int api_config(struct api_action *h, json_t *args, json_t **resp, struct api_session *s) -{ - *resp = json_incref(s->api->super_node->cfg); +namespace villas { +namespace node { +namespace api { - return 0; -} +class ShutdownAction : public Action { -static struct plugin p = { - .name = "config", - .description = "retrieve current VILLASnode configuration", - .type = PLUGIN_TYPE_API, - .api.cb = api_config +public: + using Action::Action; + virtual int execute(json_t *args, json_t **resp) + { + killme(SIGTERM); + + return 0; + } }; -REGISTER_PLUGIN(&p) +/* Register action */ +static ActionPlugin p( + "shutdown", + "quit VILLASnode" +); + +} // api +} // node +} // villas diff --git a/lib/api/actions/status.c b/lib/api/actions/status.cpp similarity index 63% rename from lib/api/actions/status.c rename to lib/api/actions/status.cpp index a694c4202..16abeea2c 100644 --- a/lib/api/actions/status.c +++ b/lib/api/actions/status.cpp @@ -24,33 +24,38 @@ #include -#include -#include -#include -#include -#include -#include +#include -static int api_status(struct api_action *r, json_t *args, json_t **resp, struct api_session *s) -{ - int ret; - struct lws_context *ctx = lws_get_context(s->wsi); - char buf[4096]; +namespace villas { +namespace node { +namespace api { - ret = lws_json_dump_context(ctx, buf, sizeof(buf), 0); +class StatusAction : public Action { - *resp = json_loads(buf, 0, NULL); +public: + using Action::Action; + virtual int execute(json_t *args, json_t **resp) + { + int ret; + struct lws_context *ctx = lws_get_context(s->wsi); + char buf[4096]; - return ret; -} + ret = lws_json_dump_context(ctx, buf, sizeof(buf), 0); -static struct plugin p = { - .name = "status", - .description = "get status and statistics of web server", - .type = PLUGIN_TYPE_API, - .api.cb = api_status + *resp = json_loads(buf, 0, nullptr); + + return ret; + } }; -REGISTER_PLUGIN(&p) +/* Register action */ +static ActionPlugin p( + "status", + "get status and statistics of web server" +); + +} // api +} // node +} // villas #endif /* LWS_WITH_SERVER_STATUS */ diff --git a/lib/api/server.cpp b/lib/api/server.cpp new file mode 100644 index 000000000..c65169e57 --- /dev/null +++ b/lib/api/server.cpp @@ -0,0 +1,163 @@ +/** Socket API endpoint. + * + * @file + * @author Steffen Vogel + * @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include +#include + +using namespace villas::node::api; + +Server::Server(Api *a) : + state(STATE_INITIALIZED), + api(a) +{ + +} + +Server::~Server() +{ + assert(state != STATE_STARTED); +} + +void Server::start() +{ + int ret; + + assert(state != STATE_STARTED); + + sd = socket(AF_UNIX, SOCK_STREAM, 0); + if (sd < 0) + throw new SystemError("Failed to create Api socket"); + + pollfd pfd = { + .fd = sd, + .events = POLLIN + }; + + pfds.push_back(pfd); + sessions.push_back(nullptr); + + std::string path = PREFIX "/var/lib/villas/node-" + api->getSuperNode()->getName() + ".sock"; + + struct sockaddr_un sun = { .sun_family = AF_UNIX }; + strncpy(sun.sun_path, path.c_str(), sizeof(sun.sun_path) - 1); + + ret = unlink(sun.sun_path); + if (ret && errno != ENOENT && errno != ENOTDIR) + throw new SystemError("Failed to delete API socket"); + + ret = bind(sd, (struct sockaddr *) &sun, sizeof(struct sockaddr_un)); + if (ret) + throw new SystemError("Failed to bind API socket"); + + ret = listen(sd, 5); + if (ret) + throw new SystemError("Failed to listen on API socket"); + + state = STATE_STARTED; +} + +void Server::stop() +{ + int ret; + + assert(state == STATE_STARTED); + + ret = close(sd); + if (ret) + throw new SystemError("Failed to close API socket");; + + state = STATE_STOPPED; +} + +void Server::run(int timeout) +{ + int ret; + + assert(state == STATE_STARTED); + + ret = poll(pfds.data(), pfds.size(), timeout); + if (ret < 0) + throw new SystemError("Failed to poll on API socket");; + + for (unsigned i = 0; i < pfds.size(); i++) { + auto &pfd = pfds[i]; + auto s = sessions[i]; + + if (pfd.revents & POLLOUT) { + if (s) + s->write(); + } + + if (pfd.revents & POLLIN) { + /* New connection */ + if (s) { + ret = s->read(); + if (ret < 0) + closeSession(s); + } + else + acceptNewSession(); + } + } +} + +void Server::acceptNewSession() { + int fd = ::accept(sd, nullptr, nullptr); + + auto s = new sessions::Socket(api, fd); + + pollfd pfd = { + .fd = fd, + .events = POLLIN | POLLOUT + }; + + pfds.push_back(pfd); + sessions.push_back(s); + + api->sessions.push_back(s); +} + +void Server::closeSession(sessions::Socket *s) +{ + api->sessions.remove(s); + + ptrdiff_t pos = std::find(sessions.begin(), sessions.end(), s) - sessions.begin(); + + if (pos < (ptrdiff_t) sessions.size()) { + pfds.erase(pfds.begin() + pos); + sessions.erase(sessions.begin() + pos); + + delete s; + } +} diff --git a/lib/api/session.c b/lib/api/session.c deleted file mode 100644 index 06392943d..000000000 --- a/lib/api/session.c +++ /dev/null @@ -1,172 +0,0 @@ -/** API session. - * - * @author Steffen Vogel - * @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC - * @license GNU General Public License (version 3) - * - * VILLASnode - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - *********************************************************************************/ - -#include - -#include -#include -#include -#include - -int api_session_init(struct api_session *s, enum api_mode m) -{ - int ret; - - s->runs = 0; - s->mode = m; - - ret = buffer_init(&s->request.buffer, 0); - if (ret) - return ret; - - ret = buffer_init(&s->response.buffer, 0); - if (ret) - return ret; - - ret = queue_init(&s->request.queue, 128, &memory_heap); - if (ret) - return ret; - - ret = queue_init(&s->response.queue, 128, &memory_heap); - if (ret) - return ret; - - s->_name = NULL; - - return 0; -} - -int api_session_destroy(struct api_session *s) -{ - int ret; - - ret = buffer_destroy(&s->request.buffer); - if (ret) - return ret; - - ret = buffer_destroy(&s->response.buffer); - if (ret) - return ret; - - ret = queue_destroy(&s->request.queue); - if (ret) - return ret; - - ret = queue_destroy(&s->response.queue); - if (ret) - return ret; - - if (s->_name) - free(s->_name); - - return 0; -} - -int api_session_run_action(struct api_session *s, json_t *json_in, json_t **json_out) -{ - int ret; - const char *action; - char *id; - struct plugin *p; - - json_t *json_args = NULL; - json_t *json_resp = NULL; - - ret = json_unpack(json_in, "{ s: s, s: s, s?: o }", - "action", &action, - "id", &id, - "request", &json_args); - if (ret) { - ret = -100; - *json_out = json_pack("{ s: s, s: i }", - "error", "invalid request", - "code", ret); - goto out; - } - - p = plugin_lookup(PLUGIN_TYPE_API, action); - if (!p) { - ret = -101; - *json_out = json_pack("{ s: s, s: s, s: i, s: s }", - "action", action, - "id", id, - "code", ret, - "error", "action not found"); - goto out; - } - - debug(LOG_API, "Running API request: action=%s, id=%s", action, id); - - ret = p->api.cb(&p->api, json_args, &json_resp, s); - if (ret) - *json_out = json_pack("{ s: s, s: s, s: i, s: s }", - "action", action, - "id", id, - "code", ret, - "error", "action failed"); - else - *json_out = json_pack("{ s: s, s: s }", - "action", action, - "id", id); - - if (json_resp) - json_object_set_new(*json_out, "response", json_resp); - -out: debug(LOG_API, "Completed API request: action=%s, id=%s, code=%d", action, id, ret); - - s->runs++; - - return 0; -} - -char * api_session_name(struct api_session *s) -{ - if (!s->_name) { - char *mode; - - switch (s->mode) { - case API_MODE_WS: - mode = "ws"; - break; - - case API_MODE_HTTP: - mode = "http"; - break; - - default: - mode = "unknown"; - } - - strcatf(&s->_name, "version=%d, mode=%s", s->version, mode); - - if (s->wsi) { - char name[128]; - char ip[128]; - - lws_get_peer_addresses(s->wsi, lws_get_socket_fd(s->wsi), name, sizeof(name), ip, sizeof(ip)); - - strcatf(&s->_name, ", remote.name=%s, remote.ip=%s", name, ip); - } - } - - return s->_name; -} diff --git a/lib/api/session.cpp b/lib/api/session.cpp new file mode 100644 index 000000000..7399605e4 --- /dev/null +++ b/lib/api/session.cpp @@ -0,0 +1,140 @@ +/** API session. + * + * @author Steffen Vogel + * @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +#include + +#include + +#include +#include +#include + +#include +#include + +using namespace villas; +using namespace villas::node::api; + +Logger Session::logger = logging.get("api:session"); + +Session::Session(Api *a) : + runs(0), + api(a) +{ + queue_init(&request.queue, 128, &memory_heap); + queue_init(&response.queue, 128, &memory_heap); + + logger->debug("Initiated API session: {}", getName()); +} + +Session::~Session() +{ + queue_destroy(&request.queue); + queue_destroy(&response.queue); +} + + +void Session::runPendingActions() +{ + json_t *req, *resp; + + while (queue_available(&request.queue) > 0) { + queue_pull(&request.queue, (void **) &req); + + runAction(req, &resp); + + json_decref(req); + + queue_push(&response.queue, resp); + } +} + +int Session::runAction(json_t *json_in, json_t **json_out) +{ + int ret; + const char *action; + char *id; + + json_t *json_args = nullptr; + json_t *json_resp = nullptr; + + ret = json_unpack(json_in, "{ s: s, s: s, s?: o }", + "action", &action, + "id", &id, + "request", &json_args); + if (ret) { + ret = -100; + *json_out = json_pack("{ s: s, s: i }", + "error", "invalid request", + "code", ret); + + logger->debug("Completed API request: action={}, id={}, code={}", action, id, ret); + return 0; + } + + auto acf = plugin::Registry::lookup(action); + if (!acf) { + ret = -101; + *json_out = json_pack("{ s: s, s: s, s: i, s: s }", + "action", action, + "id", id, + "code", ret, + "error", "action not found"); + + logger->debug("Completed API request: action={}, id={}, code={}", action, id, ret); + return 0; + } + + logger->debug("Running API request: action={}, id={}", action, id); + + Action *act = acf->make(this); + + ret = act->execute(json_args, &json_resp); + if (ret) + *json_out = json_pack("{ s: s, s: s, s: i, s: s }", + "action", action, + "id", id, + "code", ret, + "error", "action failed"); + else + *json_out = json_pack("{ s: s, s: s }", + "action", action, + "id", id); + + if (json_resp) + json_object_set_new(*json_out, "response", json_resp); + + logger->debug("Completed API request: action={}, id={}, code={}", action, id, ret); + + runs++; + + return 0; +} + +std::string Session::getName() +{ + std::stringstream ss; + + ss << "version=" << version << ", runs=" << runs; + + return ss.str(); +} diff --git a/lib/api/sessions/http.cpp b/lib/api/sessions/http.cpp new file mode 100644 index 000000000..820689e2e --- /dev/null +++ b/lib/api/sessions/http.cpp @@ -0,0 +1,198 @@ +/** HTTP Api session. + * + * @author Steffen Vogel + * @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +#include + +#include + +#include +#include +#include +#include + +using namespace villas::node; +using namespace villas::node::api::sessions; + +Http::Http(Api *a, lws *w) : + Wsi(a, w) +{ + int hdrlen, options = -1, version; + char *uri; + + if ((hdrlen = lws_hdr_total_length(wsi, WSI_TOKEN_OPTIONS_URI))) + options = 1; + else if ((hdrlen = lws_hdr_total_length(wsi, WSI_TOKEN_POST_URI))) + options = 0; + + uri = new char[hdrlen + 1]; + lws_hdr_copy(wsi, uri, hdrlen + 1, options ? WSI_TOKEN_OPTIONS_URI : WSI_TOKEN_POST_URI); + + /* Parse request URI */ + sscanf(uri, "/api/v%d", (int *) &version); + + /** @todo Check version */ + + /* This is an OPTIONS request. + * + * We immediatly send headers and close the connection + * without waiting for a POST body */ + if (options) + lws_callback_on_writable(wsi); + + delete uri; +} + +void Http::read(void *in, size_t len) +{ + request.buffer.append((const char *) in, len); +} + +int Http::complete() +{ + int pushed; + json_t *req; + + req = request.buffer.decode(); + if (!req) + return 0; + + request.buffer.clear(); + + pushed = queue_push(&request.queue, req); + if (pushed != 1) + logger->warn("Queue overrun for API session: {}", getName()); + + return 1; +} + +int Http::write() +{ + int ret, pulled; + json_t *resp; + + pulled = queue_pull(&response.queue, (void **) &resp); + if (pulled) { + response.buffer.clear(); + response.buffer.encode(resp); + + json_decref(resp); + } + + std::stringstream headers; + + headers << "HTTP/1.1 200 OK\r\n" + << "Content-type: application/json\r\n" + << "User-agent: " USER_AGENT "\r\n" + << "Access-Control-Allow-Origin: *\r\n" + << "Access-Control-Allow-Methods: GET, POST, OPTIONS\r\n" + << "Access-Control-Allow-Headers: Content-Type\r\n" + << "Access-Control-Max-Age: 86400\r\n" + << "Content-Length: " << response.buffer.size() << "\r\n" + << "\r\n"; + + ret = lws_write(wsi, (unsigned char *) headers.str().data(), headers.str().size(), LWS_WRITE_HTTP_HEADERS); + if (ret < 0) + return 1; + + ret = lws_write(wsi, (unsigned char *) response.buffer.data(), response.buffer.size(), LWS_WRITE_HTTP); + if (ret < 0) + return 1; + + return 0; +} + +Http::~Http() +{ + logger->debug("Closed API session: {}", getName()); +} + +std::string Http::getName() +{ + std::stringstream ss; + + ss << Wsi::getName() << ", mode=http"; + + return ss.str(); +} + +int api_http_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) +{ + int ret; + + lws_context *ctx = lws_get_context(wsi); + void *user_ctx = lws_context_user(ctx); + + Web *w = static_cast(user_ctx); + Api *a = w->getApi(); + + Http *s = static_cast(user); + + switch (reason) { + case LWS_CALLBACK_HTTP_BIND_PROTOCOL: + if (a == nullptr) + return -1; + + new (s) Http(a, wsi); + + a->sessions.push_back(s); + + break; + + case LWS_CALLBACK_HTTP_DROP_PROTOCOL: + if (!s) + return -1; + + s->~Http(); + + a->sessions.remove(s); + + return 1; + + case LWS_CALLBACK_HTTP_BODY: + s->read(in, len); + + break; + + case LWS_CALLBACK_HTTP_BODY_COMPLETION: + ret = s->complete(); + if (ret) + a->pending.push(s); + + break; + + case LWS_CALLBACK_HTTP_WRITEABLE: + s->write(); + + goto try_to_reuse; + + default: + break; + } + + return 0; + +try_to_reuse: + if (lws_http_transaction_completed(wsi)) + return -1; + + return 0; +} diff --git a/lib/api/sessions/socket.cpp b/lib/api/sessions/socket.cpp new file mode 100644 index 000000000..e42c2ef2c --- /dev/null +++ b/lib/api/sessions/socket.cpp @@ -0,0 +1,85 @@ +/** Unix domain socket Api session. + * + * @author Steffen Vogel + * @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +#include + +#include +#include + +using namespace villas::node::api::sessions; + +Socket::Socket(Api *a, int s) : + Session(a), + sd(s) +{ + +} + +Socket::~Socket() +{ + close(sd); +} + +std::string Socket::getName() +{ + std::stringstream ss; + + ss << Session::getName() << ", mode=socket"; + + return ss.str(); +} + +int Socket::read() +{ + int ret; + json_t *j; + json_error_t err; + + j = json_loadfd(sd, JSON_DISABLE_EOF_CHECK, &err); + if (!j) + return -1; + + ret = queue_push(&request.queue, (json_t *) j); + if (ret != 1) + return -1; + + api->pending.push(this); + + return 0; +} + +int Socket::write() +{ + int ret; + json_t *j; + + while (queue_pull(&response.queue, (void **) &j)) { + ret = json_dumpfd(j, sd, 0); + if (ret) + return ret; + + char nl = '\n'; + send(sd, &nl, 1, 0); + } + + return 0; +} diff --git a/lib/api/sessions/websocket.cpp b/lib/api/sessions/websocket.cpp new file mode 100644 index 000000000..f0b9c3e98 --- /dev/null +++ b/lib/api/sessions/websocket.cpp @@ -0,0 +1,165 @@ +/** WebSockets Api session. + * + * @author Steffen Vogel + * @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +#include + +#include + +#include +#include +#include + +using namespace villas::node; +using namespace villas::node::api::sessions; + +WebSocket::WebSocket(Api *a, lws *w) : + Wsi(a, w) +{ + /* Parse request URI */ + int version; + char uri[64]; + lws_hdr_copy(wsi, uri, sizeof(uri), WSI_TOKEN_GET_URI); + + sscanf(uri, "/v%d", (int *) &version); + + /** @todo Check version */ + + logger->debug("Initiated API session: {}", getName()); +} + +WebSocket::~WebSocket() +{ + logger->debug("Closed API session: {}", getName()); +} + +int WebSocket::read(void *in, size_t len) +{ + int pushed; + json_t *req; + + if (lws_is_first_fragment(wsi)) + request.buffer.clear(); + + request.buffer.append((const char *) in, len); + + if (lws_is_final_fragment(wsi)) { + req = request.buffer.decode(); + if (!req) + return 0; + + pushed = queue_push(&request.queue, req); + if (pushed != 1) + logger->warn("Queue overun in API session"); + + return 1; + } + + return 0; +} + +int WebSocket::write() +{ + int pulled; + json_t *resp; + + if (state == State::SHUTDOWN) + return -1; + + pulled = queue_pull(&response.queue, (void **) &resp); + if (pulled) { + char pad[LWS_PRE]; + + response.buffer.clear(); + response.buffer.append(pad, sizeof(pad)); + response.buffer.encode(resp); + + json_decref(resp); + + lws_write(wsi, (unsigned char *) response.buffer.data() + LWS_PRE, response.buffer.size() - LWS_PRE, LWS_WRITE_TEXT); + } + + return 0; +} + +std::string WebSocket::getName() +{ + std::stringstream ss; + + ss << Wsi::getName() << ", mode=ws"; + + return ss.str(); +} + +int api_ws_protocol_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) +{ + int ret; + + lws_context *ctx = lws_get_context(wsi); + void *user_ctx = lws_context_user(ctx); + + Web *w = static_cast(user_ctx); + Api *a = w->getApi(); + + WebSocket *s = static_cast(user); + + switch (reason) { + case LWS_CALLBACK_ESTABLISHED: + if (a == nullptr) { + std::string err = "API disabled"; + lws_close_reason(wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, (unsigned char *) err.data(), err.length()); + return -1; + } + + new (s) WebSocket(a, wsi); + + a->sessions.push_back(s); + + break; + + case LWS_CALLBACK_CLOSED: + + s->~WebSocket(); + + a->sessions.remove(s); + + break; + + case LWS_CALLBACK_RECEIVE: + ret = s->read(in, len); + if (ret) + a->pending.push(s); + + break; + + case LWS_CALLBACK_SERVER_WRITEABLE: + ret = s->write(); + if (ret) + return ret; + + break; + + default: + break; + } + + return 0; +} diff --git a/lib/api/sessions/wsi.cpp b/lib/api/sessions/wsi.cpp new file mode 100644 index 000000000..9f9eeede2 --- /dev/null +++ b/lib/api/sessions/wsi.cpp @@ -0,0 +1,72 @@ +/** LWS Api session. + * + * @author Steffen Vogel + * @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +#include + +#include + +using namespace villas::node::api::sessions; + +void Wsi::runPendingActions() +{ + Session::runPendingActions(); + + web->callbackOnWritable(wsi); +} + +void Wsi::shutdown() +{ + state = State::SHUTDOWN; + + web->callbackOnWritable(wsi); +} + +std::string Wsi::getName() +{ + std::stringstream ss; + + ss << Session::getName(); + + if (wsi) { + char name[128]; + char ip[128]; + + lws_get_peer_addresses(wsi, lws_get_socket_fd(wsi), name, sizeof(name), ip, sizeof(ip)); + + ss << ", remote.name=" << name << ", remote.ip=" << ip; + } + + return ss.str(); + +} + +Wsi::Wsi(Api *a, lws *w) : + Session(a), + wsi(w) +{ + state = Session::State::ESTABLISHED; + + lws_context *ctx = lws_get_context(wsi); + void *ctx = lws_context_user(ctx); + + web = static_cast(ctx); +} diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c index 43f938ed1..25fddc543 100644 --- a/lib/nodes/websocket.c +++ b/lib/nodes/websocket.c @@ -28,7 +28,7 @@ #include -#include +#include #include #include #include diff --git a/lib/super_node.cpp b/lib/super_node.cpp index 538bb0368..94b965d38 100644 --- a/lib/super_node.cpp +++ b/lib/super_node.cpp @@ -25,38 +25,42 @@ #include #include -#include +#include #include #include #include #include #include #include -#include -#include -#include #include #include -#include #include +#include +#include +#include -#include - +using namespace villas; using namespace villas::node; +Logger SuperNode::logger = logging.get("super_node"); + SuperNode::SuperNode() : state(STATE_INITIALIZED), priority(0), affinity(0), hugepages(DEFAULT_NR_HUGEPAGES), - stats(0) + stats(0), + api(this), + web(&api) { list_init(&nodes); list_init(&paths); list_init(&plugins); - name = (char *) alloc(128); - gethostname(name, 128); + char hname[128]; + gethostname(hname, 128); + + name = hname; init(); } @@ -65,11 +69,7 @@ int SuperNode::init() { int ret; - ret = log_init(&log, name, 2, LOG_ALL); - if (ret) - return ret; - - ret = rt_init(priority, affinity); + ret = kernel::rt::init(priority, affinity); if (ret) return ret; @@ -77,114 +77,94 @@ int SuperNode::init() if (ret) return ret; -#ifdef WITH_API - ret = api_init(&api);//, this); // @todo: port to C++ - if (ret) - return ret; -#endif /* WITH_API */ - -#ifdef WITH_WEB - ret = web_init(&web, &api); - if (ret) - return ret; -#endif /* WITH_WEB */ - return 0; } -int SuperNode::parseUri(const char *u) +int SuperNode::parseUri(const std::string &u) { json_error_t err; - info("Parsing configuration"); + logger->info("Parsing configuration"); - if (u) { - FILE *f; - AFILE *af; + FILE *f; + AFILE *af; - /* Via stdin */ - if (!strcmp("-", u)) { - info("Reading configuration from stdin"); + /* Via stdin */ + if (u == "-") { + logger->info("Reading configuration from standard input"); - af = NULL; - f = stdin; - } - else { - info("Reading configuration from URI: %s", u); + af = nullptr; + f = stdin; + } + else { + logger->info("Reading configuration from URI: {}", u); - af = afopen(u, "r"); - if (!af) - error("Failed to open configuration from: %s", u); + af = afopen(u.c_str(), "r"); + if (!af) + throw new RuntimeError("Failed to open configuration from: {}", u); - f = af->file; - } + f = af->file; + } - /* Parse config */ - json = json_loadf(f, 0, &err); - if (json == NULL) { + /* Parse config */ + json = json_loadf(f, 0, &err); + if (json == nullptr) { #ifdef LIBCONFIG_FOUND - int ret; + int ret; - config_t cfg; - config_setting_t *json_root = NULL; + config_t cfg; + config_setting_t *json_root = nullptr; - warn("Failed to parse JSON configuration. Re-trying with old libconfig format."); - { - warn("Please consider migrating to the new format using the 'conf2json' command."); - } + logger->warn("Failed to parse JSON configuration. Re-trying with old libconfig format."); + logger->warn(" Please consider migrating to the new format using the 'conf2json' command."); - config_init(&cfg); - config_set_auto_convert(&cfg, 1); + config_init(&cfg); + config_set_auto_convert(&cfg, 1); - /* Setup libconfig include path. - * This is only supported for local files */ - if (access(uri, F_OK) != -1) { - char *cpy = strdup(uri); + /* Setup libconfig include path. + * This is only supported for local files */ + if (access(u.c_str(), F_OK) != -1) { + char *cpy = strdup(u.c_str()); - config_set_include_dir(&cfg, dirname(cpy)); + config_set_include_dir(&cfg, dirname(cpy)); - free(cpy); - } + free(cpy); + } - if (af) - arewind(af); - else - rewind(f); + if (af) + arewind(af); + else + rewind(f); - ret = config_read(&cfg, f); - if (ret != CONFIG_TRUE) { - { - warn("conf: %s in %s:%d", config_error_text(&cfg), uri, config_error_line(&cfg)); - warn("json: %s in %s:%d:%d", err.text, err.source, err.line, err.column); - } - error("Failed to parse configuration"); - } + ret = config_read(&cfg, f); + if (ret != CONFIG_TRUE) { + logger->warn("conf: {} in {}:{}", config_error_text(&cfg), u.c_str(), config_error_line(&cfg)); + logger->warn("json: {} in {}:{}:{}", err.text, err.source, err.line, err.column); + logger->error("Failed to parse configuration"); + killme(SIGABRT); + } - json_root = config_root_setting(&cfg); + json_root = config_root_setting(&cfg); - json = config_to_json(json_root); - if (json == NULL) - error("Failed to convert JSON to configuration file"); + json = config_to_json(json_root); + if (json == nullptr) + throw new RuntimeError("Failed to convert JSON to configuration file"); - config_destroy(&cfg); + config_destroy(&cfg); #else - jerror(&err, "Failed to parse configuration file"); + throw new JsonError(err, "Failed to parse configuration file"); #endif /* LIBCONFIG_FOUND */ } - /* Close configuration file */ - if (af) - afclose(af); - else if (f != stdin) - fclose(f); + /* Close configuration file */ + if (af) + afclose(af); + else if (f != stdin) + fclose(f); - uri = strdup(u); + uri = u; - return parseJson(json); - } - else { - warn("No configuration file specified. Starting unconfigured. Use the API to configure this instance."); - } + return parseJson(json); return 0; } @@ -217,38 +197,36 @@ int SuperNode::parseJson(json_t *j) "name", &nme ); if (ret) - jerror(&err, "Failed to parse global configuration"); + throw new JsonError(err, "Failed to parse global configuration"); - if (nme) { - name = (char *) realloc(name, strlen(nme)+1); - sprintf(name, "%s", nme); - } + if (nme) + name = nme; #ifdef WITH_WEB if (json_web) - web_parse(&web, json_web); + web.parse(json_web); #endif /* WITH_WEB */ if (json_logging) - log_parse(&log, json_logging); + logging.parse(json_logging); /* Parse plugins */ if (json_plugins) { if (!json_is_array(json_plugins)) - error("Setting 'plugins' must be a list of strings"); + throw new ConfigError(json_plugins, "node-config-plugins", "Setting 'plugins' must be a list of strings"); size_t i; json_t *json_plugin; json_array_foreach(json_plugins, i, json_plugin) { - struct plugin *p = (struct plugin *) alloc(sizeof(struct plugin)); + auto *p = (plugin *) alloc(sizeof(plugin)); ret = plugin_init(p); if (ret) - error("Failed to initialize plugin"); + throw new RuntimeError("Failed to initialize plugin"); ret = plugin_parse(p, json_plugin); if (ret) - error("Failed to parse plugin"); + throw new RuntimeError("Failed to parse plugin"); list_push(&plugins, p); } @@ -257,7 +235,7 @@ int SuperNode::parseJson(json_t *j) /* Parse nodes */ if (json_nodes) { if (!json_is_object(json_nodes)) - error("Setting 'nodes' must be a group with node name => group mappings."); + throw new ConfigError(json_nodes, "node-config-nodes", "Setting 'nodes' must be a group with node name => group mappings."); const char *name; json_t *json_node; @@ -267,21 +245,21 @@ int SuperNode::parseJson(json_t *j) ret = json_unpack_ex(json_node, &err, 0, "{ s: s }", "type", &type); if (ret) - jerror(&err, "Failed to parse node"); + throw new JsonError(err, "Failed to parse node"); nt = node_type_lookup(type); if (!nt) - error("Invalid node type: %s", type); + throw new RuntimeError("Invalid node type: {}", type); - struct node *n = (struct node *) alloc(sizeof(struct node)); + auto *n = (struct node *) alloc(sizeof(struct node)); ret = node_init(n, nt); if (ret) - error("Failed to initialize node"); + throw new RuntimeError("Failed to initialize node"); ret = node_parse(n, json_node, name); if (ret) - error("Failed to parse node"); + throw new RuntimeError("Failed to parse node"); list_push(&nodes, n); } @@ -290,33 +268,33 @@ int SuperNode::parseJson(json_t *j) /* Parse paths */ if (json_paths) { if (!json_is_array(json_paths)) - warn("Setting 'paths' must be a list."); + logger->warn("Setting 'paths' must be a list of objects"); size_t i; json_t *json_path; json_array_foreach(json_paths, i, json_path) { - struct path *p = (struct path *) alloc(sizeof(struct path)); + path *p = (path *) alloc(sizeof(path)); ret = path_init(p); if (ret) - error("Failed to initialize path"); + throw new RuntimeError("Failed to initialize path"); ret = path_parse(p, json_path, &nodes); if (ret) - error("Failed to parse path"); + throw new RuntimeError("Failed to parse path"); list_push(&paths, p); if (p->reverse) { - struct path *r = (struct path *) alloc(sizeof(struct path)); + path *r = (path *) alloc(sizeof(path)); ret = path_init(r); if (ret) - error("Failed to init path"); + throw new RuntimeError("Failed to init path"); ret = path_reverse(p, r); if (ret) - error("Failed to reverse path %s", path_name(p)); + throw new RuntimeError("Failed to reverse path {}", path_name(p)); list_push(&paths, r); } @@ -334,22 +312,22 @@ int SuperNode::check() { int ret; - assert(state == STATE_PARSED || state == STATE_PARSED || state == STATE_CHECKED); + assert(state == STATE_INITIALIZED || state == STATE_PARSED || state == STATE_CHECKED); for (size_t i = 0; i < list_length(&nodes); i++) { - struct node *n = (struct node *) list_at(&nodes, i); + auto *n = (struct node *) list_at(&nodes, i); ret = node_check(n); if (ret) - error("Invalid configuration for node %s", node_name(n)); + throw new RuntimeError("Invalid configuration for node {}", node_name(n)); } for (size_t i = 0; i < list_length(&paths); i++) { - struct path *p = (struct path *) list_at(&paths, i); + auto *p = (struct path *) list_at(&paths, i); ret = path_check(p); if (ret) - error("Invalid configuration for path %s", path_name(p)); + throw new RuntimeError("Invalid configuration for path {}", path_name(p)); } state = STATE_CHECKED; @@ -364,60 +342,70 @@ int SuperNode::start() assert(state == STATE_CHECKED); memory_init(hugepages); - rt_init(priority, affinity); + kernel::rt::init(priority, affinity); - log_open(&log); #ifdef WITH_API - api_start(&api); -#endif -#ifdef WITH_WEB - web_start(&web); + api.start(); #endif - info("Starting node-types"); +#ifdef WITH_WEB + web.start(); +#endif + + logger->info("Starting node-types"); for (size_t i = 0; i < list_length(&nodes); i++) { - struct node *n = (struct node *) list_at(&nodes, i); + auto *n = (struct node *) list_at(&nodes, i); ret = node_type_start(n->_vt);//, this); // @todo: port to C++ if (ret) - error("Failed to start node-type: %s", node_type_name(n->_vt)); + throw new RuntimeError("Failed to start node-type: {}", node_type_name(n->_vt)); } - info("Starting nodes"); + logger->info("Starting nodes"); for (size_t i = 0; i < list_length(&nodes); i++) { - struct node *n = (struct node *) list_at(&nodes, i); + auto *n = (struct node *) list_at(&nodes, i); ret = node_init2(n); if (ret) - error("Failed to prepare node: %s", node_name(n)); + throw new RuntimeError("Failed to prepare node: {}", node_name(n)); int refs = list_count(&paths, (cmp_cb_t) path_uses_node, n); if (refs > 0) { ret = node_start(n); if (ret) - error("Failed to start node: %s", node_name(n)); + throw new RuntimeError("Failed to start node: {}", node_name(n)); } else - warn("No path is using the node %s. Skipping...", node_name(n)); + logger->warn("No path is using the node {}. Skipping...", node_name(n)); } - info("Starting paths"); + logger->info("Starting paths"); for (size_t i = 0; i < list_length(&paths); i++) { - struct path *p = (struct path *) list_at(&paths, i); + auto *p = (struct path *) list_at(&paths, i); if (p->enabled) { ret = path_init2(p); if (ret) - error("Failed to prepare path: %s", path_name(p)); + throw new RuntimeError("Failed to prepare path: {}", path_name(p)); ret = path_start(p); if (ret) - error("Failed to start path: %s", path_name(p)); + throw new RuntimeError("Failed to start path: {}", path_name(p)); } else - warn("Path %s is disabled. Skipping...", path_name(p)); + logger->warn("Path {} is disabled. Skipping...", path_name(p)); } +#ifdef WITH_HOOKS + if (stats > 0) { + stats_print_header(STATS_FORMAT_HUMAN); + + ret = task_init(&task, 1.0 / stats, CLOCK_REALTIME); + if (ret) + throw new RuntimeError("Failed to create stats timer"); + } +#endif /* WITH_HOOKS */ + state = STATE_STARTED; return 0; @@ -427,45 +415,51 @@ int SuperNode::stop() { int ret; - if (stats > 0) +#ifdef WITH_HOOKS + if (stats > 0) { stats_print_footer(STATS_FORMAT_HUMAN); - info("Stopping paths"); + ret = task_destroy(&task); + if (ret) + throw new RuntimeError("Failed to stop stats timer"); + } +#endif /* WITH_HOOKS */ + + logger->info("Stopping paths"); for (size_t i = 0; i < list_length(&paths); i++) { - struct path *p = (struct path *) list_at(&paths, i); + auto *p = (struct path *) list_at(&paths, i); ret = path_stop(p); if (ret) - error("Failed to stop path: %s", path_name(p)); + throw new RuntimeError("Failed to stop path: {}", path_name(p)); } - info("Stopping nodes"); + logger->info("Stopping nodes"); for (size_t i = 0; i < list_length(&nodes); i++) { - struct node *n = (struct node *) list_at(&nodes, i); + auto *n = (struct node *) list_at(&nodes, i); ret = node_stop(n); if (ret) - error("Failed to stop node: %s", node_name(n)); + throw new RuntimeError("Failed to stop node: {}", node_name(n)); } - info("Stopping node-types"); + logger->info("Stopping node-types"); for (size_t i = 0; i < list_length(&plugins); i++) { - struct plugin *p = (struct plugin *) list_at(&plugins, i); + auto *p = (struct plugin *) list_at(&plugins, i); if (p->type == PLUGIN_TYPE_NODE) { ret = node_type_stop(&p->node); if (ret) - error("Failed to stop node-type: %s", node_type_name(&p->node)); + throw new RuntimeError("Failed to stop node-type: {}", node_type_name(&p->node)); } } #ifdef WITH_API - api_stop(&api); + api.stop(); #endif #ifdef WITH_WEB - web_stop(&web); + web.stop(); #endif - log_close(&log); state = STATE_STOPPED; @@ -475,49 +469,22 @@ int SuperNode::stop() void SuperNode::run() { #ifdef WITH_HOOKS - int ret; - - if (stats > 0) { - stats_print_header(STATS_FORMAT_HUMAN); - - struct task t; - - ret = task_init(&t, 1.0 / stats, CLOCK_REALTIME); - if (ret) - error("Failed to create stats timer"); - - for (;;) { - task_wait(&t); - - periodic(); - } - } - else + task_wait(&task); + periodic(); +#else + pause(); #endif /* WITH_HOOKS */ - for (;;) pause(); } SuperNode::~SuperNode() { - assert(state == STATE_STOPPED); + assert(state != STATE_STARTED); list_destroy(&plugins, (dtor_cb_t) plugin_destroy, false); list_destroy(&paths, (dtor_cb_t) path_destroy, true); list_destroy(&nodes, (dtor_cb_t) node_destroy, true); -#ifdef WITH_WEB - web_destroy(&web); -#endif /* WITH_WEB */ - -#ifdef WITH_API - api_destroy(&api); -#endif /* WITH_API */ - json_decref(json); - log_destroy(&log); - - if (name) - free(name); } int SuperNode::periodic() @@ -526,13 +493,13 @@ int SuperNode::periodic() int ret; for (size_t i = 0; i < list_length(&paths); i++) { - struct path *p = (struct path *) list_at(&paths, i); + auto *p = (struct path *) list_at(&paths, i); if (p->state != STATE_STARTED) continue; for (size_t j = 0; j < list_length(&p->hooks); j++) { - struct hook *h = (struct hook *) list_at(&p->hooks, j); + hook *h = (struct hook *) list_at(&p->hooks, j); ret = hook_periodic(h); if (ret) @@ -541,13 +508,13 @@ int SuperNode::periodic() } for (size_t i = 0; i < list_length(&nodes); i++) { - struct node *n = (struct node *) list_at(&nodes, i); + auto *n = (struct node *) list_at(&nodes, i); if (n->state != STATE_STARTED) continue; for (size_t j = 0; j < list_length(&n->in.hooks); j++) { - struct hook *h = (struct hook *) list_at(&n->in.hooks, j); + auto *h = (struct hook *) list_at(&n->in.hooks, j); ret = hook_periodic(h); if (ret) @@ -555,7 +522,7 @@ int SuperNode::periodic() } for (size_t j = 0; j < list_length(&n->out.hooks); j++) { - struct hook *h = (struct hook *) list_at(&n->out.hooks, j); + auto *h = (struct hook *) list_at(&n->out.hooks, j); ret = hook_periodic(h); if (ret) diff --git a/lib/web.c b/lib/web.c deleted file mode 100644 index 2d84ddc16..000000000 --- a/lib/web.c +++ /dev/null @@ -1,359 +0,0 @@ -/** LWS-releated functions. - * - * @author Steffen Vogel - * @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC - * @license GNU General Public License (version 3) - * - * VILLASnode - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - *********************************************************************************/ - -#include -#include - -#include -#include -#include -#include -#include - -#include - -/* Forward declarations */ -lws_callback_function api_ws_protocol_cb; -lws_callback_function api_http_protocol_cb; -lws_callback_function websocket_protocol_cb; - -/** List of libwebsockets protocols. */ -struct lws_protocols protocols[] = { - { - .name = "http", - .callback = lws_callback_http_dummy, - .per_session_data_size = 0, - .rx_buffer_size = 1024 - }, -#ifdef WITH_API - { - .name = "http-api", - .callback = api_http_protocol_cb, - .per_session_data_size = sizeof(struct api_session), - .rx_buffer_size = 1024 - }, - { - .name = "api", - .callback = api_ws_protocol_cb, - .per_session_data_size = sizeof(struct api_session), - .rx_buffer_size = 0 - }, -#endif /* WITH_API */ -#ifdef LIBWEBSOCKETS_FOUND - { - .name = "live", - .callback = websocket_protocol_cb, - .per_session_data_size = sizeof(struct websocket_connection), - .rx_buffer_size = 0 - }, -#endif /* LIBWEBSOCKETS_FOUND */ -#if 0 /* not supported yet */ - { - .name = "log", - .callback = log_ws_protocol_cb, - .per_session_data_size = 0, - .rx_buffer_size = 0 - }, - { - .name = "stats", - .callback = stats_ws_protocol_cb, - .per_session_data_size = sizeof(struct api_session), - .rx_buffer_size = 0 - }, -#endif - { NULL /* terminator */ } -}; - -/** List of libwebsockets mounts. */ -static struct lws_http_mount mounts[] = { - { - .mountpoint = "/", - .origin = NULL, - .def = "/index.html", - .cgienv = NULL, - .cgi_timeout = 0, - .cache_max_age = 0, - .cache_reusable = 0, - .cache_revalidate = 0, - .cache_intermediaries = 0, - .origin_protocol = LWSMPRO_FILE, - .mountpoint_len = 1, -#ifdef WITH_API - .mount_next = &mounts[1] - }, - { - .mountpoint = "/api/v1", - .origin = "http-api", - .def = NULL, - .cgienv = NULL, - .cgi_timeout = 0, - .cache_max_age = 0, - .cache_reusable = 0, - .cache_revalidate = 0, - .cache_intermediaries = 0, - .origin_protocol = LWSMPRO_CALLBACK, - .mountpoint_len = 7, -#endif /* WITH_API */ - .mount_next = NULL - } -}; - -/** List of libwebsockets extensions. */ -static const struct lws_extension extensions[] = { - { - "permessage-deflate", - lws_extension_callback_pm_deflate, - "permessage-deflate" - }, - { - "deflate-frame", - lws_extension_callback_pm_deflate, - "deflate_frame" - }, - { NULL /* terminator */ } -}; - -extern struct log *global_log; - -static void logger(int level, const char *msg) { - int len = strlen(msg); - if (strchr(msg, '\n')) - len -= 1; - - /* Decrease severity for some errors. */ - if (strstr(msg, "Unable to open") == msg) - level = LLL_WARN; - - switch (level) { - case LLL_ERR: - log_print(global_log, CLR_RED("Web "), "%.*s", len, msg); - break; - - case LLL_WARN: - log_print(global_log, CLR_YEL("Web "), "%.*s", len, msg); - break; - - case LLL_NOTICE: - log_print(global_log, CLR_WHT("Web "), "%.*s", len, msg); - break; - - case LLL_INFO: - log_print(global_log, "Web ", "%.*s", len, msg); - break; - - default: /* Everything else is debug */ - log_print(global_log, CLR_GRY("Web "), "%.*s", len, msg); break; - } -} - -void web_callback_on_writable(struct lws *wsi) -{ - struct lws_context *ctx = lws_get_context(wsi); - struct web *w = lws_context_user(ctx); - - queue_push(&w->writables, (void *) wsi); -} - -static void * web_worker(void *ctx) -{ - struct lws *wsi; - struct web *w = ctx; - - for (;;) { - lws_service(w->context, 100); - - while (queue_available(&w->writables)) { - queue_pull(&w->writables, (void **) &wsi); - lws_callback_on_writable(wsi); - } - } - - return NULL; -} - -int web_init(struct web *w, struct api *a) -{ - int ret, lvl = LLL_ERR | LLL_WARN | LLL_NOTICE; - - if (global_log->level >=10 && global_log->facilities & LOG_WEB) - lvl |= (1 << LLL_COUNT) - 1; - - lws_set_log_level(lvl, logger); - - w->api = a; - - /* Default values */ - w->port = getuid() > 0 ? 8080 : 80; /**< @todo Use libcap to check if user can bind to ports < 1024 */ - w->htdocs = strdup(WEB_PATH); - - ret = queue_init(&w->writables, 128, &memory_heap); - if (ret) - return ret; - - w->state = STATE_INITIALIZED; - - return 0; -} - -int web_parse(struct web *w, json_t *cfg) -{ - int ret, enabled = 1; - const char *ssl_cert = NULL; - const char *ssl_private_key = NULL; - const char *htdocs = NULL; - json_error_t err; - - ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: s, s?: s, s?: i, s?: b }", - "ssl_cert", &ssl_cert, - "ssl_private_key", &ssl_private_key, - "htdocs", &htdocs, - "port", &w->port, - "enabled", &enabled - ); - if (ret) - jerror(&err, "Failed to http section of configuration file"); - - if (ssl_cert) - w->ssl_cert = strdup(ssl_cert); - - if (ssl_private_key) - w->ssl_private_key = strdup(ssl_private_key); - - if (htdocs) { - if (w->htdocs) - free(w->htdocs); - - w->htdocs = strdup(htdocs); - } - - if (!enabled) - w->port = CONTEXT_PORT_NO_LISTEN; - - w->state = STATE_PARSED; - - return 0; -} - -int web_start(struct web *w) -{ - int ret; - - /* Start server */ - struct lws_context_creation_info ctx_info = { - .options = LWS_SERVER_OPTION_EXPLICIT_VHOSTS | LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT, - .gid = -1, - .uid = -1, - .user = (void *) w, - .protocols = protocols, - .mounts = mounts, - .extensions = extensions, - .port = w->port, - .ssl_cert_filepath = w->ssl_cert, - .ssl_private_key_filepath = w->ssl_private_key - }; - - info("Starting Web sub-system: webroot=%s", w->htdocs); - - /* update web root of mount point */ - mounts[0].origin = w->htdocs; - - w->context = lws_create_context(&ctx_info); - if (w->context == NULL) - error("WebSocket: failed to initialize server context"); - - for (int tries = 10; tries > 0; tries--) { - w->vhost = lws_create_vhost(w->context, &ctx_info); - if (w->vhost) - break; - - ctx_info.port++; - warn("WebSocket: failed to setup vhost. Trying another port: %d", ctx_info.port); - } - - if (w->vhost == NULL) - error("WebSocket: failed to initialize virtual host"); - - ret = pthread_create(&w->thread, NULL, web_worker, w); - if (ret) - error("Failed to start Web worker thread"); - - w->state = STATE_STARTED; - - return ret; -} - -int web_stop(struct web *w) -{ - int ret; - - if (w->state != STATE_STARTED) - return 0; - - info("Stopping Web sub-system"); - - { - lws_cancel_service(w->context); - - /** @todo Wait for all connections to be closed */ - - ret = pthread_cancel(w->thread); - if (ret) - serror("Failed to cancel Web worker thread"); - - ret = pthread_join(w->thread, NULL); - if (ret) - serror("Failed to join Web worker thread"); - } - - w->state = STATE_STOPPED; - - return 0; -} - -int web_destroy(struct web *w) -{ - int ret; - - if (w->state == STATE_DESTROYED) - return 0; - - if (w->context) { - lws_context_destroy(w->context); - } - - if (w->ssl_cert) - free(w->ssl_cert); - - if (w->ssl_private_key) - free(w->ssl_private_key); - - if (w->htdocs) - free(w->htdocs); - - ret = queue_destroy(&w->writables); - if (ret) - return ret; - - w->state = STATE_DESTROYED; - - return 0; -} diff --git a/lib/web.cpp b/lib/web.cpp new file mode 100644 index 000000000..c5735d13d --- /dev/null +++ b/lib/web.cpp @@ -0,0 +1,286 @@ +/** LWS-releated functions. + * + * @author Steffen Vogel + * @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include + +using namespace villas; +using namespace villas::node; + +/* Forward declarations */ +lws_callback_function websocket_protocol_cb; + +Logger Web::logger = logging.get("web"); + +/** List of libwebsockets protocols. */ +lws_protocols protocols[] = { + { + .name = "http", + .callback = lws_callback_http_dummy, + .per_session_data_size = 0, + .rx_buffer_size = 1024 + }, +#ifdef WITH_API + { + .name = "http-api", + .callback = api_http_protocol_cb, + .per_session_data_size = sizeof(api::sessions::Http), + .rx_buffer_size = 1024 + }, + { + .name = "api", + .callback = api_ws_protocol_cb, + .per_session_data_size = sizeof(api::sessions::WebSocket), + .rx_buffer_size = 0 + }, +#endif /* WITH_API */ +#ifdef __LIBWEBSOCKETS_FOUND /** @todo: Port to C++ */ + { + .name = "live", + .callback = websocket_protocol_cb, + .per_session_data_size = sizeof(websocket_connection), + .rx_buffer_size = 0 + }, +#endif /* LIBWEBSOCKETS_FOUND */ + { nullptr /* terminator */ } +}; + +/** List of libwebsockets mounts. */ +static lws_http_mount mounts[] = { +#ifdef WITH_API + { + .mount_next = &mounts[1], + .mountpoint = "/api/v1", + .origin = "http-api", + .def = nullptr, + .cgienv = nullptr, + .cgi_timeout = 0, + .cache_max_age = 0, + .cache_reusable = 0, + .cache_revalidate = 0, + .cache_intermediaries = 0, + .origin_protocol = LWSMPRO_CALLBACK, + .mountpoint_len = 7 + }, +#endif /* WITH_API */ + { + .mount_next = nullptr, + .mountpoint = "/", + .origin = nullptr, + .def = "/index.html", + .cgienv = nullptr, + .cgi_timeout = 0, + .cache_max_age = 0, + .cache_reusable = 0, + .cache_revalidate = 0, + .cache_intermediaries = 0, + .origin_protocol = LWSMPRO_FILE, + .mountpoint_len = 1 + } +}; + +/** List of libwebsockets extensions. */ +static const lws_extension extensions[] = { + { + "permessage-deflate", + lws_extension_callback_pm_deflate, + "permessage-deflate" + }, + { + "deflate-frame", + lws_extension_callback_pm_deflate, + "deflate_frame" + }, + { nullptr /* terminator */ } +}; + +void Web::lwsLogger(int lws_lvl, const char *msg) { + char *nl; + + nl = (char *) strchr(msg, '\n'); + if (nl) + *nl = 0; + + /* Decrease severity for some errors. */ + if (strstr(msg, "Unable to open") == msg) + lws_lvl = LLL_WARN; + + Logger logger = logging.get("lws"); + + switch (lws_lvl) { + case LLL_ERR: + logger->error("{}", msg); + break; + + case LLL_WARN: + logger->warn("{}", msg); + break; + + case LLL_NOTICE: + case LLL_INFO: + logger->info("{}", msg); + break; + + default: /* Everything else is debug */ + logger->debug("{}", msg); + } +} + +void Web::worker() +{ + lws *wsi; + + while (running) { + lws_service(context, 100); + + while (!writables.empty()) { + wsi = writables.pull(); + + lws_callback_on_writable(wsi); + } + } + + logger->info("Stopping worker"); +} + +Web::Web(Api *a) : + state(STATE_INITIALIZED), + htdocs(WEB_PATH), + api(a) +{ + int lvl = LLL_ERR | LLL_WARN | LLL_NOTICE; + + /** @todo: Port to C++: add LLL_DEBUG and others if trace log level is activated */ + + lws_set_log_level(lvl, lwsLogger); + + /* Default values */ + port = getuid() > 0 ? 8080 : 80; +} + +int Web::parse(json_t *cfg) +{ + int ret, enabled = 1; + const char *cert = nullptr; + const char *pkey = nullptr; + const char *htd = nullptr; + json_error_t err; + + ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: s, s?: s, s?: i, s?: b }", + "ssl_cert", &cert, + "ssl_private_key", &pkey, + "htdocs", &htd, + "port", &port, + "enabled", &enabled + ); + if (ret) + throw new JsonError(err, "Failed to http section of configuration file"); + + if (cert) + ssl_cert = cert; + + if (pkey) + ssl_private_key = pkey; + + if (htd) + htdocs = htd; + + if (!enabled) + port = CONTEXT_PORT_NO_LISTEN; + + state = STATE_PARSED; + + return 0; +} + +void Web::start() +{ + /* Start server */ + lws_context_creation_info ctx_info = { + .port = port, + .protocols = protocols, + .extensions = extensions, + .ssl_cert_filepath = ssl_cert.empty() ? nullptr : ssl_cert.c_str(), + .ssl_private_key_filepath = ssl_private_key.empty() ? nullptr : ssl_private_key.c_str(), + .gid = -1, + .uid = -1, + .options = LWS_SERVER_OPTION_EXPLICIT_VHOSTS | LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT, + .user = (void *) this, + .mounts = mounts, + }; + + logger->info("Starting sub-system: htdocs={}", htdocs.c_str()); + /* update web root of mount point */ + mounts[1].origin = htdocs.c_str(); + + context = lws_create_context(&ctx_info); + if (context == nullptr) + throw new RuntimeError("Failed to initialize server context"); + + for (int tries = 10; tries > 0; tries--) { + w->vhost = lws_create_vhost(w->context, &ctx_info); + if (w->vhost) + break; + + ctx_info.port++; + warn("WebSocket: failed to setup vhost. Trying another port: %d", ctx_info.port); + } + + if (w->vhost == NULL) + throw new RuntimeError("Failed to initialize virtual host"); + + /* Start thread */ + running = true; + thread = std::thread(&Web::worker, this); + + state = STATE_STARTED; +} + +void Web::stop() +{ + assert(state == STATE_STARTED); + + logger->info("Stopping sub-system"); + + running = false; + thread.join(); + + lws_context_destroy(context); + + state = STATE_STOPPED; +} + +void Web::callbackOnWritable(lws *wsi) +{ + writables.push(wsi); +}