1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

cpp: ported web and api

This commit is contained in:
Steffen Vogel 2018-10-20 14:20:06 +02:00
parent e625926d7d
commit 3c11acb8b5
39 changed files with 2455 additions and 1773 deletions

View file

@ -1,87 +0,0 @@
/** REST-API-releated functions.
*
* @file
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#pragma once
#include <jansson.h>
#include <pthread.h>
#include <libwebsockets.h>
#include <villas/list.h>
#include <villas/common.h>
#include <villas/queue_signalled.h>
#include <villas/api/session.h>
#ifdef __cplusplus
extern "C" {
#endif
/* Forward declarations */
struct api;
struct api_action;
/** Callback type of command function
*
* @param[inout] c Command handle
* @param[in] args JSON command arguments.
* @param[out] resp JSON command response.
* @param[in] i Execution context.
*/
typedef int (*api_cb_t)(struct api_action *c, json_t *args, json_t **resp, struct api_session *s);
struct api {
enum state state;
struct list sessions; /**< List of currently active connections */
struct queue_signalled pending; /**< A queue of api_sessions which have pending requests. */
pthread_t thread;
struct super_node *super_node;
};
/** API action descriptor */
struct api_action {
api_cb_t cb;
};
/** Initalize the API.
*
* Save references to list of paths / nodes for command execution.
*/
int api_init(struct api *a);//, struct super_node *sn); // @todo: port to C++
int api_destroy(struct api *a);
int api_start(struct api *a);
int api_stop(struct api *a);
/** Libwebsockets callback for "api" endpoint */
int api_ws_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len);
int api_http_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len);
#ifdef __cplusplus
}
#endif

87
include/villas/api.hpp Normal file
View file

@ -0,0 +1,87 @@
/** REST-API-releated functions.
*
* @file
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#pragma once
#include <jansson.h>
#include <libwebsockets.h>
#include <atomic>
#include <thread>
#include <list>
#include <villas/log.hpp>
#include <villas/common.h>
#include <villas/api/server.hpp>
#include <villas/queue_signalled.hpp>
namespace villas {
namespace node {
namespace api {
/* Forward declarations */
class Session;
} // namespace api
/* Forward declarations */
class SuperNode;
class Api {
protected:
static Logger logger;
enum state state;
std::thread thread;
std::atomic<bool> running; /**< Atomic flag for signalizing thread termination. */
SuperNode *super_node;
api::Server server;
void run();
void worker();
public:
/** Initalize the API.
*
* Save references to list of paths / nodes for command execution.
*/
Api(SuperNode *sn);
~Api();
void start();
void stop();
SuperNode * getSuperNode()
{
return super_node;
}
std::list<api::Session *> sessions; /**< List of currently active connections */
villas::QueueSignalled<api::Session *> pending; /**< A queue of api_sessions which have pending requests. */
};
} // node
} // villas

View file

@ -0,0 +1,75 @@
/** REST-API-releated functions.
*
* @file
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#pragma once
#include <jansson.h>
#include <villas/log.hpp>
#include <villas/plugin.hpp>
namespace villas {
namespace node {
namespace api {
/* Forward declarations */
class Session;
/** API action descriptor */
class Action {
protected:
Session *session;
static Logger logger;
public:
Action(Session *s) :
session(s)
{ }
virtual int execute(json_t *args, json_t **resp) = 0;
};
class ActionFactory : public plugin::Plugin {
public:
using plugin::Plugin::Plugin;
virtual Action * make(Session *s) = 0;
};
template<typename T>
class ActionPlugin : public ActionFactory {
public:
using ActionFactory::ActionFactory;
virtual Action * make(Session *s) {
return new T(s);
};
};
} // api
} // node
} // villas

View file

@ -0,0 +1,74 @@
/** Socket API endpoint.
*
* @file
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#pragma once
#include <string>
#include <vector>
#include <poll.h>
#include <villas/common.h>
namespace villas {
namespace node {
/* Forward declarations */
class Api;
namespace api {
namespace sessions {
/* Forward declarations */
class Socket;
} // namespace sessions
class Server {
protected:
enum state state;
Api *api;
int sd;
std::vector<pollfd> pfds;
std::vector<sessions::Socket *> sessions;
void acceptNewSession();
void closeSession(sessions::Socket *s);
public:
Server(Api *a);
~Server();
void start();
void stop();
void run(int timeout = 100);
};
} // namespace api
} // namespace node
} // namespace villas

View file

@ -26,55 +26,72 @@
#include <stdbool.h>
#include <jansson.h>
#include <villas/common.h>
#include <villas/queue.h>
#include <villas/buffer.h>
#include <villas/json_buffer.hpp>
#include <villas/api.hpp>
#ifdef __cplusplus
extern "C" {
#endif
namespace villas {
namespace node {
enum api_version {
API_VERSION_UNKOWN = 0,
API_VERSION_1 = 1
};
/* Forward declarations */
class SuperNode;
class Api;
enum api_mode {
API_MODE_WS, /**< This API session was established over a WebSocket connection. */
API_MODE_HTTP /**< This API session was established via a HTTP REST request. */
};
namespace api {
/** A connection via HTTP REST or WebSockets to issue API actions. */
struct api_session {
enum {
API_SESSION_STATE_ESTABLISHED,
API_SESSION_STATE_SHUTDOWN
} state;
class Session {
enum api_version version;
enum api_mode mode;
public:
enum State {
ESTABLISHED,
SHUTDOWN
};
enum Version {
UNKOWN = 0,
VERSION_1 = 1
};
protected:
enum State state;
enum Version version;
static Logger logger;
int runs;
struct {
struct buffer buffer;
JsonBuffer buffer;
struct queue queue;
} request, response;
struct lws *wsi;
struct api *api;
Api *api;
char *_name;
public:
Session(Api *a);
virtual ~Session();
int runAction(json_t *req, json_t **resp);
virtual void runPendingActions();
virtual std::string getName();
int getRuns()
{
return runs;
}
SuperNode * getSuperNode()
{
return api->getSuperNode();
}
virtual void shutdown()
{ }
};
int api_session_init(struct api_session *s, enum api_mode m);
int api_session_destroy(struct api_session *s);
int api_session_run_action(struct api_session *s, json_t *req, json_t **resp);
char * api_session_name(struct api_session *s);
#ifdef __cplusplus
}
#endif
} // api
} // node
} // villas

View file

@ -0,0 +1,59 @@
/** HTTP Api session.
*
* @file
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#pragma once
#include <villas/api/sessions/wsi.hpp>
extern "C" int api_http_protocol_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len);
namespace villas {
namespace node {
/* Forward declarations */
class SuperNode;
class Api;
namespace api {
namespace sessions {
class Http : public Wsi {
public:
Http(Api *s, lws *w);
void read(void *in, size_t len);
int complete();
int write();
virtual ~Http();
virtual std::string getName();
};
} // sessions
} // api
} // node
} // villas

View file

@ -0,0 +1,55 @@
/** Socket Api session.
*
* @file
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#pragma once
#include <villas/api/session.hpp>
namespace villas {
namespace node {
/* Forward declarations */
class Api;
namespace api {
namespace sessions {
class Socket : public Session {
protected:
int sd;
public:
Socket(Api *a, int s);
~Socket();
int read();
int write();
virtual std::string getName();
};
} // sessions
} // api
} // node
} // villas

View file

@ -0,0 +1,56 @@
/** WebSockets API session.
*
* @file
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#pragma once
#include <villas/api/sessions/wsi.hpp>
extern "C" int api_ws_protocol_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len);
namespace villas {
namespace node {
/* Forward declarations */
class Api;
namespace api {
namespace sessions {
class WebSocket : public Wsi {
public:
WebSocket(Api *a, lws *w);
virtual ~WebSocket();
virtual std::string getName();
int read(void *in, size_t len);
int write();
};
} // sessions
} // api
} // node
} // villas

View file

@ -0,0 +1,63 @@
/** LWS Api session.
*
* @file
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#pragma once
#include <string>
#include <villas/api/session.hpp>
#include <villas/web.hpp>
/* Forward declarations */
struct lws;
namespace villas {
namespace node {
/* Forward declarations */
class Api;
namespace api {
namespace sessions {
class Wsi : public Session {
protected:
lws *wsi;
Web *web;
public:
Wsi(Api *a, lws *w);
virtual void runPendingActions();
virtual std::string getName();
virtual void shutdown();
};
} // sessions
} // api
} // node
} // villas

View file

@ -25,7 +25,6 @@
#include <villas/common.h>
#include <villas/utils.h>
#include <villas/api.h>
#include <villas/nodes/cbuilder.h>
#include <villas/hook_type.h>
#include <villas/node_type.h>
@ -83,13 +82,12 @@ struct plugin {
struct format_type format;
struct node_type node;
struct hook_type hook;
struct api_action api;
struct cbuilder_model cb;
};
};
/** Return a pointer to the plugin structure */
#define plugin(vt) ((struct plugin *) ((char *) (vt) - offsetof(struct plugin, api)))
#define plugin(vt) ((struct plugin *) ((char *) (vt) - offsetof(struct plugin, format)))
#define plugin_name(vt) plugin(vt)->name
#define plugin_description(vt) plugin(vt)->description

View file

@ -24,10 +24,11 @@
#pragma once
#include <villas/list.h>
#include <villas/api.h>
#include <villas/web.h>
#include <villas/log.h>
#include <villas/api.hpp>
#include <villas/web.hpp>
#include <villas/log.hpp>
#include <villas/node.h>
#include <villas/task.h>
#include <villas/common.h>
namespace villas {
@ -44,17 +45,24 @@ protected:
int hugepages; /**< Number of hugepages to reserve. */
double stats; /**< Interval for path statistics. Set to 0 to disable them. */
static Logger logger;
struct list nodes;
struct list paths;
struct list plugins;
struct log log;
struct api api;
struct web web;
#ifdef WITH_API
Api api;
#endif
char *name; /**< A name of this super node. Usually the hostname. */
#ifdef WITH_WEB
Web web;
#endif
char *uri; /**< URI of configuration */
struct task task; /**< Task for periodic stats output */
std::string name; /**< A name of this super node. Usually the hostname. */
std::string uri; /**< URI of configuration */
json_t *json; /**< JSON representation of the configuration. */
@ -65,7 +73,7 @@ public:
int init();
/** Wrapper for super_node_parse() */
int parseUri(const char *uri);
int parseUri(const std::string &name);
/** Parse super-node configuration.
*
@ -86,15 +94,48 @@ public:
/** Run periodic hooks of this super node. */
int periodic();
struct node * getNode(const char *name) { return (struct node *) list_lookup(&nodes, name); }
struct node * getNode(const std::string &name)
{
return (struct node *) list_lookup(&nodes, name.c_str());
}
struct list * getNodes() { return &nodes; }
struct list * getPaths() { return &paths; }
struct web * getWeb() { return &web; }
struct api * getApi() { return &api; }
struct log * getLog() { return &log; }
struct list * getNodes()
{
return &nodes;
}
/** Desctroy configuration object. */
struct list * getPaths() {
return &paths;
}
#ifdef WITH_API
Api * getApi() {
return &api;
}
#endif
#ifdef WITH_WEB
Web * getWeb() {
return &web;
}
#endif
json_t * getConfig()
{
return json;
}
std::string getConfigUri()
{
return uri;
}
std::string getName()
{
return name;
}
/** Destroy configuration object. */
~SuperNode();
};

View file

@ -23,55 +23,67 @@
#pragma once
#include <pthread.h>
#include <atomic>
#include <thread>
#include <jansson.h>
#include <villas/log.hpp>
#include <villas/common.h>
#include <villas/queue.h>
#include <villas/queue.hpp>
#ifdef __cplusplus
extern "C" {
#endif
namespace villas {
namespace node {
/* Forward declarations */
struct api;
class Api;
struct web {
struct api *api;
class Web {
protected:
enum state state;
struct lws_context *context; /**< The libwebsockets server context. */
struct lws_vhost *vhost; /**< The libwebsockets vhost. */
static Logger logger;
struct queue writables; /**< Queue of WSIs for which we will call lws_callback_on_writable() */
lws_context *context; /**< The libwebsockets server context. */
lws_vhost *vhost; /**< The libwebsockets vhost. */
Queue<lws *> writables; /**< Queue of WSIs for which we will call lws_callback_on_writable() */
int port; /**< Port of the build in HTTP / WebSocket server. */
char *htdocs; /**< The root directory for files served via HTTP. */
char *ssl_cert; /**< Path to the SSL certitifcate for HTTPS / WSS. */
char *ssl_private_key; /**< Path to the SSL private key for HTTPS / WSS. */
std::string htdocs; /**< The root directory for files served via HTTP. */
std::string ssl_cert; /**< Path to the SSL certitifcate for HTTPS / WSS. */
std::string ssl_private_key; /**< Path to the SSL private key for HTTPS / WSS. */
pthread_t thread;
std::thread thread;
std::atomic<bool> running; /**< Atomic flag for signalizing thread termination. */
Api *api;
void worker();
static void lwsLogger(int level, const char *msg);
public:
/** Initialize the web interface.
*
* The web interface is based on the libwebsockets library.
*/
Web(Api *a);
void start();
void stop();
/** Parse HTTPd and WebSocket related options */
int parse(json_t *cfg);
Api * getApi()
{
return api;
}
void callbackOnWritable(struct lws *wsi);
};
/** Initialize the web interface.
*
* The web interface is based on the libwebsockets library.
*/
int web_init(struct web *w, struct api *a);
int web_destroy(struct web *w);
int web_start(struct web *w);
int web_stop(struct web *w);
/** Parse HTTPd and WebSocket related options */
int web_parse(struct web *w, json_t *cfg);
void web_callback_on_writable(struct lws *wsi);
#ifdef __cplusplus
}
#endif
} // namespace node
} // namespace villas

View file

@ -55,11 +55,6 @@ set(LIB_SRC
queue_signalled.c
)
if(ARCH STREQUAL "x86_64")
list(APPEND LIB_SRC tsc.c)
endif()
if(IBVERBS_FOUND AND RDMACM_FOUND)
list(APPEND LIB_SRC memory/ib.c)
endif()
@ -94,7 +89,7 @@ endif()
if(WITH_WEB)
list(APPEND LIB_SRC
web.c
web.cpp
)
list(APPEND INCLUDE_DIRS ${LIBWEBSOCKETS_INCLUDE_DIRS})
@ -103,7 +98,7 @@ endif()
if(WITH_API AND WITH_WEB)
list(APPEND LIB_SRC
api.c
api.cpp
)
add_subdirectory(api)
@ -134,7 +129,6 @@ else()
target_link_libraries(villas PRIVATE -Wl,--whole-archive ${WHOLE_ARCHIVES} -Wl,--no-whole-archive)
endif()
set_target_properties(villas PROPERTIES
VERSION ${CMAKE_PROJECT_VERSION}
SOVERSION 1

380
lib/api.c
View file

@ -1,380 +0,0 @@
/** REST-API-releated functions.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#include <libwebsockets.h>
#include <string.h>
#include <assert.h>
#include <villas/api.h>
#include <villas/log.h>
#include <villas/web.h>
#include <villas/config.h>
#include <villas/memory.h>
#include <villas/compat.h>
/* Forward declarations */
static void * api_worker(void *ctx);
int api_ws_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
{
int ret, pulled, pushed;
json_t *req, *resp;
struct web *w = lws_context_user(lws_get_context(wsi));
struct api_session *s = (struct api_session *) user;
switch (reason) {
case LWS_CALLBACK_ESTABLISHED:
if (w->api == NULL) {
lws_close_reason(wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, (unsigned char *) "API disabled", strlen("API disabled"));
return -1;
}
/* Parse request URI */
char uri[64];
lws_hdr_copy(wsi, uri, sizeof(uri), WSI_TOKEN_GET_URI);
ret = sscanf(uri, "/v%d", (int *) &s->version);
if (ret != 1)
return -1;
ret = api_session_init(s, API_MODE_WS);
if (ret)
return -1;
s->state = API_SESSION_STATE_ESTABLISHED;
s->wsi = wsi;
s->api = w->api;
list_push(&s->api->sessions, s);
debug(LOG_API, "Initiated API session: %s", api_session_name(s));
break;
case LWS_CALLBACK_CLOSED:
debug(LOG_API, "Closed API session: %s, runs=%d", api_session_name(s), s->runs);
ret = api_session_destroy(s);
if (ret)
return -1;
if (w->api->sessions.state == STATE_INITIALIZED)
list_remove(&w->api->sessions, s);
break;
case LWS_CALLBACK_RECEIVE:
if (lws_is_first_fragment(wsi))
buffer_clear(&s->request.buffer);
buffer_append(&s->request.buffer, in, len);
if (lws_is_final_fragment(wsi)) {
ret = buffer_parse_json(&s->request.buffer, &req);
if (ret)
break;
pushed = queue_push(&s->request.queue, req);
if (pushed != 1)
warn("Queue overun in API session");
pushed = queue_signalled_push(&w->api->pending, s);
if (pushed != 1)
warn("Queue overrun in API");
}
break;
case LWS_CALLBACK_SERVER_WRITEABLE:
if (s->state == API_SESSION_STATE_SHUTDOWN)
return -1;
pulled = queue_pull(&s->response.queue, (void **) &resp);
if (pulled) {
char pad[LWS_PRE];
buffer_clear(&s->response.buffer);
buffer_append(&s->response.buffer, pad, sizeof(pad));
buffer_append_json(&s->response.buffer, resp);
json_decref(resp);
lws_write(wsi, (unsigned char *) s->response.buffer.buf + LWS_PRE, s->response.buffer.len - LWS_PRE, LWS_WRITE_TEXT);
}
break;
default:
return 0;
}
return 0;
}
int api_http_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
{
int ret, pulled, pushed, hdrlen;
char *uri;
json_t *resp, *req;
struct web *w = lws_context_user(lws_get_context(wsi));
struct api_session *s = (struct api_session *) user;
switch (reason) {
case LWS_CALLBACK_HTTP_BIND_PROTOCOL:
if (w->api == NULL)
return -1;
int options;
if ((hdrlen = lws_hdr_total_length(wsi, WSI_TOKEN_OPTIONS_URI)))
options = 1;
else if ((hdrlen = lws_hdr_total_length(wsi, WSI_TOKEN_POST_URI)))
options = 0;
else
return -1;
uri = malloc(hdrlen + 1);
lws_hdr_copy(wsi, uri, hdrlen + 1, options ? WSI_TOKEN_OPTIONS_URI : WSI_TOKEN_POST_URI);
/* Parse request URI */
ret = sscanf(uri, "/api/v%d", (int *) &s->version);
if (ret != 1)
return -1;
free(uri);
ret = api_session_init(s, API_MODE_HTTP);
if (ret)
return -1;
s->state = API_SESSION_STATE_ESTABLISHED;
s->wsi = wsi;
s->api = w->api;
list_push(&s->api->sessions, s);
debug(LOG_API, "Initiated API session: %s", api_session_name(s));
if (options)
lws_callback_on_writable(wsi);
break;
case LWS_CALLBACK_HTTP_DROP_PROTOCOL:
if (!s)
return -1;
debug(LOG_API, "Closed API session: %s, runs=%d", api_session_name(s), s->runs);
ret = api_session_destroy(s);
if (ret)
return -1;
if (w->api->sessions.state == STATE_INITIALIZED)
list_remove(&w->api->sessions, s);
return 1;
case LWS_CALLBACK_HTTP_BODY:
buffer_append(&s->request.buffer, in, len);
break;
case LWS_CALLBACK_HTTP_BODY_COMPLETION:
ret = buffer_parse_json(&s->request.buffer, &req);
if (ret)
break;
buffer_clear(&s->request.buffer);
pushed = queue_push(&s->request.queue, req);
if (pushed != 1)
warn("Queue overrun for API session: %s", api_session_name(s));
pushed = queue_signalled_push(&w->api->pending, s);
if (pushed != 1)
warn("Queue overrun for API");
break;
case LWS_CALLBACK_HTTP_WRITEABLE:
pulled = queue_pull(&s->response.queue, (void **) &resp);
if (pulled) {
buffer_clear(&s->response.buffer);
buffer_append_json(&s->response.buffer, resp);
json_decref(resp);
}
char headers[1024];
snprintf(headers, sizeof(headers),
"HTTP/1.1 200 OK\r\n"
"Content-type: application/json\r\n"
"User-agent: " USER_AGENT "\r\n"
"Access-Control-Allow-Origin: *\r\n"
"Access-Control-Allow-Methods: GET, POST, OPTIONS\r\n"
"Access-Control-Allow-Headers: Content-Type\r\n"
"Access-Control-Max-Age: 86400\r\n"
"Content-Length: %zu\r\n"
"\r\n",
s->response.buffer.len
);
ret = lws_write(wsi, (unsigned char *) headers, strlen(headers), LWS_WRITE_HTTP_HEADERS);
if (ret < 0)
return 1;
ret = lws_write(wsi, (unsigned char *) s->response.buffer.buf, s->response.buffer.len, LWS_WRITE_HTTP);
if (ret < 0)
return 1;
goto try_to_reuse;
default:
break;
}
return 0;
try_to_reuse:
if (lws_http_transaction_completed(wsi))
return -1;
return 0;
}
int api_init(struct api *a)//, struct super_node *sn) // @todo: port to C++
{
int ret;
info("Initialize API sub-system");
ret = list_init(&a->sessions);
if (ret)
return ret;
ret = queue_signalled_init(&a->pending, 1024, &memory_heap, 0);
if (ret)
return ret;
a->super_node = NULL; //sn; // @todo: port to C++
a->state = STATE_INITIALIZED;
return 0;
}
int api_destroy(struct api *a)
{
int ret;
assert(a->state != STATE_STARTED);
ret = queue_signalled_destroy(&a->pending);
if (ret)
return ret;
a->state = STATE_DESTROYED;
return 0;
}
int api_start(struct api *a)
{
int ret;
info("Starting API sub-system");
ret = pthread_create(&a->thread, NULL, api_worker, a);
if (ret)
error("Failed to start API worker thread");
a->state = STATE_STARTED;
return 0;
}
int api_stop(struct api *a)
{
int ret;
info("Stopping API sub-system");
if (a->state != STATE_STARTED)
return 0;
for (int i = 0; i < list_length(&a->sessions); i++) {
struct api_session *s = (struct api_session *) list_at(&a->sessions, i);
s->state = API_SESSION_STATE_SHUTDOWN;
web_callback_on_writable(s->wsi);
}
for (int i = 0; i < 10 && list_length(&a->sessions) > 0; i++) {
info("Wait for API sessions to close");
usleep(1 * 1e6);
}
ret = list_destroy(&a->sessions, (dtor_cb_t) api_session_destroy, false);
if (ret)
return ret;
ret = pthread_cancel(a->thread);
if (ret)
serror("Failed to cancel API worker thread");
ret = pthread_join(a->thread, NULL);
if (ret)
serror("Failed to join API worker thread");
a->state = STATE_STOPPED;
return 0;
}
static void * api_worker(void *ctx)
{
int pulled;
struct api *a = ctx;
struct api_session *s;
json_t *req, *resp;
for (;;) {
pulled = queue_signalled_pull(&a->pending, (void **) &s);
if (pulled != 1)
continue;
queue_pull(&s->request.queue, (void **) &req);
api_session_run_action(s, req, &resp);
json_decref(req);
queue_push(&s->response.queue, resp);
web_callback_on_writable(s->wsi);
}
return NULL;
}

104
lib/api.cpp Normal file
View file

@ -0,0 +1,104 @@
/** REST-API-releated functions.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#include <villas/api.hpp>
#include <villas/api/session.hpp>
#include <villas/super_node.hpp>
#include <villas/utils.h>
#include <villas/node/config.h>
#include <villas/memory.h>
#include <villas/compat.h>
using namespace villas;
using namespace villas::node;
using namespace villas::node::api;
Logger Api::logger = logging.get("api");
Api::Api(SuperNode *sn) :
state(STATE_INITIALIZED),
super_node(sn),
server(this)
{ }
Api::~Api()
{
assert(state != STATE_STARTED);
}
void Api::start()
{
assert(state != STATE_STARTED);
logger->info("Starting sub-system");
server.start();
running = true;
thread = std::thread(&Api::worker, this);
state = STATE_STARTED;
}
void Api::stop()
{
assert(state == STATE_STARTED);
logger->info("Stopping sub-system");
for (Session *s : sessions)
s->shutdown();
for (int i = 0; i < 10 && sessions.size() > 0; i++) {
logger->info("Waiting for {} sessions to terminate", sessions.size());
usleep(1 * 1e6);
}
running = false;
pending.push(nullptr); /* unblock thread */
thread.join();
server.stop();
state = STATE_STOPPED;
}
void Api::run()
{
if (pending.empty())
return;
/* Process pending actions */
Session *s = pending.pop();
if (s)
s->runPendingActions();
}
void Api::worker()
{
while (running) {
run();
server.run();
}
logger->info("Stopping worker");
}

View file

@ -22,16 +22,23 @@
###################################################################################
set(API_SRC
session.c
actions/capabiltities.c
actions/shutdown.c
actions/status.c
# @todo: port to C++
# actions/node.c
# actions/config.c
# actions/nodes.c
# actions/paths.c
# actions/restart.c
session.cpp
action.cpp
server.cpp
sessions/socket.cpp
sessions/wsi.cpp
sessions/http.cpp
sessions/websocket.cpp
actions/capabiltities.cpp
actions/shutdown.cpp
actions/status.cpp
actions/config.cpp
actions/nodes.cpp
actions/paths.cpp
actions/restart.cpp
actions/node.cpp
)
add_library(api STATIC ${API_SRC})

View file

@ -1,4 +1,4 @@
/** The "shutdown" API action.
/** API session.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC
@ -20,21 +20,9 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#include <villas/plugin.h>
#include <villas/api.h>
#include <villas/api/action.hpp>
static int api_shutdown(struct api_action *h, json_t *args, json_t **resp, struct api_session *s)
{
killme(SIGTERM);
using namespace villas;
using namespace villas::node::api;
return 0;
}
static struct plugin p = {
.name = "shutdown",
.description = "quit VILLASnode",
.type = PLUGIN_TYPE_API,
.api.cb = api_shutdown
};
REGISTER_PLUGIN(&p)
Logger Action::logger = logging.get("api:action");

View file

@ -1,78 +0,0 @@
/** The "capabiltities" API ressource.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#include <villas/plugin.h>
#include <villas/config.h>
static int api_capabilities(struct api_action *h, json_t *args, json_t **resp, struct api_session *s)
{
json_t *json_hooks = json_array();
json_t *json_apis = json_array();
json_t *json_nodes = json_array();
json_t *json_ios = json_array();
json_t *json_name;
for (size_t i = 0; i < list_length(&plugins); i++) {
struct plugin *p = (struct plugin *) list_at(&plugins, i);
json_name = json_string(p->name);
switch (p->type) {
case PLUGIN_TYPE_NODE:
json_array_append_new(json_nodes, json_name);
break;
case PLUGIN_TYPE_HOOK:
json_array_append_new(json_hooks, json_name);
break;
case PLUGIN_TYPE_API:
json_array_append_new(json_apis, json_name);
break;
case PLUGIN_TYPE_FORMAT:
json_array_append_new(json_ios, json_name);
break;
default:
json_decref(json_name);
}
}
*resp = json_pack("{ s: s, s: o, s: o, s: o, s: o }",
"build", BUILDID,
"hooks", json_hooks,
"node-types", json_nodes,
"apis", json_apis,
"formats", json_ios);
return 0;
}
static struct plugin p = {
.name = "capabilities",
.description = "get capabiltities and details about this VILLASnode instance",
.type = PLUGIN_TYPE_API,
.api.cb = api_capabilities
};
REGISTER_PLUGIN(&p)

View file

@ -0,0 +1,87 @@
/** The "capabiltities" API ressource.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#include <villas/config.h>
#include <villas/api/action.hpp>
namespace villas {
namespace node {
namespace api {
class CapabilitiesAction : public Action {
public:
using Action::Action;
virtual int execute(json_t *args, json_t **resp)
{
json_t *json_hooks = json_array();
json_t *json_apis = json_array();
json_t *json_nodes = json_array();
json_t *json_formats = json_array();
json_t *json_name;
for (auto f : plugin::Registry::lookup<ActionFactory>()) {
json_name = json_string(f->getName().c_str());
json_array_append_new(json_apis, json_name);
}
#if 0 // @todo Port to C++
for (auto f : NodeFactory::lookup()) {
json_name = json_string(f->getName().c_str());
json_array_append_new(json_nodes, json_name);
}
for (auto f : HookFactory::lookup()) {
json_name = json_string(f->getName().c_str());
json_array_append_new(json_hooks, json_name);
}
for (auto f : FormatFactory::lookup()) {
json_name = json_string(f->getName().c_str());
json_array_append_new(json_formats, json_name);
}
#endif
*resp = json_pack("{ s: s, s: o, s: o, s: o, s: o }",
"build", PROJECT_BUILD_ID,
"hooks", json_hooks,
"node-types", json_nodes,
"apis", json_apis,
"formats", json_formats);
return 0;
}
};
/* Register action */
static ActionPlugin<CapabilitiesAction> p(
"capabilities",
"get capabiltities and details about this VILLASnode instance"
);
} // api
} // node
} // villas

View file

@ -0,0 +1,55 @@
/** The "config" API ressource.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#include <villas/api/action.hpp>
#include <villas/api/session.hpp>
#include <villas/super_node.hpp>
namespace villas {
namespace node {
namespace api {
class ConfigAction : public Action {
public:
using Action::Action;
virtual int execute(json_t *args, json_t **resp)
{
json_t *cfg = session->getSuperNode()->getConfig();
*resp = cfg
? json_incref(cfg)
: json_object();
return 0;
}
};
/* Register action */
static ActionPlugin<ConfigAction> p(
"config",
"get configuration of this VILLASnode instance"
);
} // api
} // node
} // villas

View file

@ -1,146 +0,0 @@
/** The API ressource for start/stop/pause/resume nodes.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#include <jansson.h>
#include <villas/plugin.h>
#include <villas/node.h>
#include <villas/super_node.h>
#include <villas/utils.h>
#include <villas/stats.h>
#include <villas/api.h>
enum api_control_action {
API_NODE_START,
API_NODE_STOP,
API_NODE_PAUSE,
API_NODE_RESUME,
API_NODE_RESTART
};
static int api_control(enum api_control_action act, struct api_action *r, json_t *args, json_t **resp, struct api_session *s)
{
int ret;
json_error_t err;
const char *node_str;
ret = json_unpack_ex(args, &err, 0, "{ s: s }",
"node", &node_str
);
if (ret)
return ret;
struct list *nodes = &s->api->super_node->nodes;
struct node *node = list_lookup(nodes, node_str);
if (!node)
return -1;
switch (act) {
case API_NODE_START:
return node_start(node);
case API_NODE_STOP:
return node_stop(node);
case API_NODE_PAUSE:
return node_pause(node);
case API_NODE_RESUME:
return node_resume(node);
case API_NODE_RESTART:
return node_restart(node);
default:
return -1;
}
return 0;
}
static int api_control_start(struct api_action *r, json_t *args, json_t **resp, struct api_session *s)
{
return api_control(API_NODE_START, r, args, resp, s);
}
static int api_control_stop(struct api_action *r, json_t *args, json_t **resp, struct api_session *s)
{
return api_control(API_NODE_STOP, r, args, resp, s);
}
static int api_control_pause(struct api_action *r, json_t *args, json_t **resp, struct api_session *s)
{
return api_control(API_NODE_PAUSE, r, args, resp, s);
}
static int api_control_resume(struct api_action *r, json_t *args, json_t **resp, struct api_session *s)
{
return api_control(API_NODE_RESUME, r, args, resp, s);
}
static int api_control_restart(struct api_action *r, json_t *args, json_t **resp, struct api_session *s)
{
return api_control(API_NODE_RESTART, r, args, resp, s);
}
static struct plugin p1 = {
.name = "node.start",
.description = "start a node",
.type = PLUGIN_TYPE_API,
.api.cb = api_control_start
};
static struct plugin p2 = {
.name = "node.stop",
.description = "stop a node",
.type = PLUGIN_TYPE_API,
.api.cb = api_control_stop
};
static struct plugin p3 = {
.name = "node.pause",
.description = "pause a node",
.type = PLUGIN_TYPE_API,
.api.cb = api_control_pause
};
static struct plugin p4 = {
.name = "node.resume",
.description = "resume a node",
.type = PLUGIN_TYPE_API,
.api.cb = api_control_resume
};
static struct plugin p5 = {
.name = "node.restart",
.description = "restart a node",
.type = PLUGIN_TYPE_API,
.api.cb = api_control_restart
};
REGISTER_PLUGIN(&p1)
REGISTER_PLUGIN(&p2)
REGISTER_PLUGIN(&p3)
REGISTER_PLUGIN(&p4)
REGISTER_PLUGIN(&p5)

View file

@ -1,73 +0,0 @@
/** The "nodes" API ressource.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#include <jansson.h>
#include <villas/plugin.h>
#include <villas/node.h>
#include <villas/super_node.h>
#include <villas/utils.h>
#include <villas/stats.h>
#include <villas/api.h>
static int api_nodes(struct api_action *r, json_t *args, json_t **resp, struct api_session *s)
{
json_t *json_nodes = json_array();
for (size_t i = 0; i < list_length(&s->api->super_node->nodes); i++) {
struct node *n = (struct node *) list_at(&s->api->super_node->nodes, i);
json_t *json_node = json_pack("{ s: s, s: s, s: i, s: { s: i }, s: { s: i } }",
"name", node_name_short(n),
"state", state_print(n->state),
"affinity", n->affinity,
"in",
"vectorize", n->in.vectorize,
"out",
"vectorize", n->out.vectorize
);
if (n->stats)
json_object_set_new(json_node, "stats", stats_json(n->stats));
/* Add all additional fields of node here.
* This can be used for metadata */
json_object_update(json_node, n->cfg);
json_array_append_new(json_nodes, json_node);
}
*resp = json_nodes;
return 0;
}
static struct plugin p = {
.name = "nodes",
.description = "retrieve list of all known nodes",
.type = PLUGIN_TYPE_API,
.api.cb = api_nodes
};
REGISTER_PLUGIN(&p)

84
lib/api/actions/nodes.cpp Normal file
View file

@ -0,0 +1,84 @@
/** The "nodes" API ressource.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#include <jansson.h>
#include <villas/super_node.hpp>
#include <villas/node.h>
#include <villas/utils.h>
#include <villas/stats.h>
#include <villas/api/action.hpp>
#include <villas/api/session.hpp>
namespace villas {
namespace node {
namespace api {
class NodesAction : public Action {
public:
using Action::Action;
virtual int execute(json_t *args, json_t **resp)
{
json_t *json_nodes = json_array();
struct list *nodes = session->getSuperNode()->getNodes();
for (size_t i = 0; i < list_length(nodes); i++) {
struct node *n = (struct node *) list_at(nodes, i);
json_t *json_node = json_pack("{ s: s, s: s, s: i, s: { s: i }, s: { s: i } }",
"name", node_name_short(n),
"state", state_print(n->state),
"affinity", n->affinity,
"in",
"vectorize", n->in.vectorize,
"out",
"vectorize", n->out.vectorize
);
if (n->stats)
json_object_set_new(json_node, "stats", stats_json(n->stats));
/* Add all additional fields of node here.
* This can be used for metadata */
json_object_update(json_node, n->cfg);
json_array_append_new(json_nodes, json_node);
}
*resp = json_nodes;
return 0;
}
};
/* Register action */
static ActionPlugin<NodesAction> p(
"nodes",
"retrieve list of all known nodes"
);
} // api
} // node
} // villas

View file

@ -22,40 +22,52 @@
#include <jansson.h>
#include <villas/plugin.h>
#include <villas/super_node.hpp>
#include <villas/path.h>
#include <villas/utils.h>
#include <villas/super_node.h>
#include <villas/api.h>
#include <villas/api/action.hpp>
#include <villas/api/session.hpp>
static int api_paths(struct api_action *r, json_t *args, json_t **resp, struct api_session *s)
{
json_t *json_paths = json_array();
namespace villas {
namespace node {
namespace api {
for (size_t i = 0; i < list_length(&s->api->super_node->paths); i++) {
struct path *p = (struct path *) list_at(&s->api->super_node->paths, i);
class PathsAction : public Action {
json_t *json_path = json_pack("{ s: i }",
"state", p->state
);
public:
using Action::Action;
virtual int execute(json_t *args, json_t **resp)
{
json_t *json_paths = json_array();
/* Add all additional fields of node here.
* This can be used for metadata */
json_object_update(json_path, p->cfg);
struct list *paths = session->getSuperNode()->getPaths();
json_array_append_new(json_paths, json_path);
for (size_t i = 0; i < list_length(paths); i++) {
struct path *p = (struct path *) list_at(paths, i);
json_t *json_path = json_pack("{ s: i }",
"state", p->state
);
/* Add all additional fields of node here.
* This can be used for metadata */
json_object_update(json_path, p->cfg);
json_array_append_new(json_paths, json_path);
}
*resp = json_paths;
return 0;
}
*resp = json_paths;
return 0;
}
static struct plugin p = {
.name = "paths",
.description = "retrieve list of all paths with details",
.type = PLUGIN_TYPE_API,
.api.cb = api_paths
};
REGISTER_PLUGIN(&p)
/* Register action */
static ActionPlugin<PathsAction> p(
"paths",
"retrieve list of all paths with details"
);
} // api
} // node
} // villas

View file

@ -1,99 +0,0 @@
/** The "restart" API action.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#include <string.h>
#include <villas/plugin.h>
#include <villas/api.h>
#include <villas/super_node.h>
#include <villas/log.h>
static char *config;
void api_restart_handler()
{
int ret;
char *argv[] = { "villas-node", config, NULL };
ret = execvp("/proc/self/exe", argv);
if (ret)
serror("Failed to restart");
}
static int api_restart(struct api_action *h, json_t *args, json_t **resp, struct api_session *s)
{
int ret;
json_error_t err;
char *cfg = NULL;
if (args) {
ret = json_unpack_ex(args, &err, 0, "{ s?: s }", "config", &cfg);
if (ret < 0) {
*resp = json_string("failed to parse request");
return -1;
}
}
/* If no config is provided via request, we will use the previous one */
if (!cfg)
cfg = s->api->super_node->uri;
config = strdup(cfg);
info("restarting to %s", config);
/* Increment API restart counter */
char *scnt = getenv("VILLAS_API_RESTART_COUNT");
int cnt = scnt ? atoi(scnt) : 0;
char buf[32];
snprintf(buf, sizeof(buf), "%d", cnt + 1);
/* We pass some env variables to the new process */
setenv("VILLAS_API_RESTART_COUNT", buf, 1);
*resp = json_pack("{ s: i, s: s }",
"restarts", cnt,
"config", config
);
/* Register exit handler */
ret = atexit(api_restart_handler);
if (ret)
return 0;
/* Properly terminate current instance */
killme(SIGTERM);
return 0;
}
static struct plugin p = {
.name = "restart",
.description = "restart VILLASnode with new configuration",
.type = PLUGIN_TYPE_API,
.api.cb = api_restart
};
REGISTER_PLUGIN(&p)

117
lib/api/actions/restart.cpp Normal file
View file

@ -0,0 +1,117 @@
/** The "restart" API action.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#include <villas/super_node.hpp>
#include <villas/api/action.hpp>
#include <villas/api/session.hpp>
#include <villas/log.hpp>
#include <villas/exceptions.hpp>
#include <villas/utils.h>
namespace villas {
namespace node {
namespace api {
class RestartAction : public Action {
protected:
static std::string configUri;
static void handler()
{
int ret;
const char *cfg = !configUri.empty()
? configUri.c_str()
: nullptr;
const char *argv[] = { "villas-node", cfg, nullptr };
logger->info("Restart instance: config={}", cfg);
ret = execvp("/proc/self/exe", (char **) argv);
if (ret)
throw new SystemError("Failed to restart");
}
public:
using Action::Action;
virtual int execute(json_t *args, json_t **resp)
{
int ret;
json_error_t err;
const char *cfg = nullptr;
if (args) {
ret = json_unpack_ex(args, &err, 0, "{ s?: s }", "config", &cfg);
if (ret < 0) {
*resp = json_string("failed to parse request");
return -1;
}
}
/* If no config is provided via request, we will use the previous one */
configUri = cfg
? cfg
: session->getSuperNode()->getConfigUri();
logger->info("Restarting to %s", configUri.c_str());
/* Increment API restart counter */
char *scnt = getenv("VILLAS_API_RESTART_COUNT");
int cnt = scnt ? atoi(scnt) : 0;
char buf[32];
snprintf(buf, sizeof(buf), "%d", cnt + 1);
/* We pass some env variables to the new process */
setenv("VILLAS_API_RESTART_COUNT", buf, 1);
*resp = json_pack("{ s: i, s: o }",
"restarts", cnt,
"config", configUri.empty()
? json_null()
: json_string(configUri.c_str())
);
/* Register exit handler */
ret = atexit(handler);
if (ret)
return 0;
/* Properly terminate current instance */
killme(SIGTERM);
return 0;
}
};
std::string RestartAction::configUri;
/* Register action */
static ActionPlugin<RestartAction> p(
"restart",
"restart VILLASnode with new configuration"
);
} // api
} // node
} // villas

View file

@ -1,4 +1,4 @@
/** The "config" API ressource.
/** The "shutdown" API action.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC
@ -20,23 +20,33 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#include <villas/api.h>
#include <signal.h>
#include <villas/utils.h>
#include <villas/plugin.h>
#include <villas/super_node.h>
#include <villas/api/action.hpp>
static int api_config(struct api_action *h, json_t *args, json_t **resp, struct api_session *s)
{
*resp = json_incref(s->api->super_node->cfg);
namespace villas {
namespace node {
namespace api {
return 0;
}
class ShutdownAction : public Action {
static struct plugin p = {
.name = "config",
.description = "retrieve current VILLASnode configuration",
.type = PLUGIN_TYPE_API,
.api.cb = api_config
public:
using Action::Action;
virtual int execute(json_t *args, json_t **resp)
{
killme(SIGTERM);
return 0;
}
};
REGISTER_PLUGIN(&p)
/* Register action */
static ActionPlugin<ShutdownAction> p(
"shutdown",
"quit VILLASnode"
);
} // api
} // node
} // villas

View file

@ -24,33 +24,38 @@
#include <jansson.h>
#include <villas/plugin.h>
#include <villas/node.h>
#include <villas/super_node.h>
#include <villas/api.h>
#include <villas/utils.h>
#include <villas/stats.h>
#include <villas/api/action.hpp>
static int api_status(struct api_action *r, json_t *args, json_t **resp, struct api_session *s)
{
int ret;
struct lws_context *ctx = lws_get_context(s->wsi);
char buf[4096];
namespace villas {
namespace node {
namespace api {
ret = lws_json_dump_context(ctx, buf, sizeof(buf), 0);
class StatusAction : public Action {
*resp = json_loads(buf, 0, NULL);
public:
using Action::Action;
virtual int execute(json_t *args, json_t **resp)
{
int ret;
struct lws_context *ctx = lws_get_context(s->wsi);
char buf[4096];
return ret;
}
ret = lws_json_dump_context(ctx, buf, sizeof(buf), 0);
static struct plugin p = {
.name = "status",
.description = "get status and statistics of web server",
.type = PLUGIN_TYPE_API,
.api.cb = api_status
*resp = json_loads(buf, 0, nullptr);
return ret;
}
};
REGISTER_PLUGIN(&p)
/* Register action */
static ActionPlugin<StatusAction> p(
"status",
"get status and statistics of web server"
);
} // api
} // node
} // villas
#endif /* LWS_WITH_SERVER_STATUS */

163
lib/api/server.cpp Normal file
View file

@ -0,0 +1,163 @@
/** Socket API endpoint.
*
* @file
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#include <exception>
#include <algorithm>
#include <villas/config.h>
#include <villas/exceptions.hpp>
#include <villas/utils.h>
#include <villas/super_node.hpp>
#include <villas/api/server.hpp>
#include <villas/api/sessions/socket.hpp>
using namespace villas::node::api;
Server::Server(Api *a) :
state(STATE_INITIALIZED),
api(a)
{
}
Server::~Server()
{
assert(state != STATE_STARTED);
}
void Server::start()
{
int ret;
assert(state != STATE_STARTED);
sd = socket(AF_UNIX, SOCK_STREAM, 0);
if (sd < 0)
throw new SystemError("Failed to create Api socket");
pollfd pfd = {
.fd = sd,
.events = POLLIN
};
pfds.push_back(pfd);
sessions.push_back(nullptr);
std::string path = PREFIX "/var/lib/villas/node-" + api->getSuperNode()->getName() + ".sock";
struct sockaddr_un sun = { .sun_family = AF_UNIX };
strncpy(sun.sun_path, path.c_str(), sizeof(sun.sun_path) - 1);
ret = unlink(sun.sun_path);
if (ret && errno != ENOENT && errno != ENOTDIR)
throw new SystemError("Failed to delete API socket");
ret = bind(sd, (struct sockaddr *) &sun, sizeof(struct sockaddr_un));
if (ret)
throw new SystemError("Failed to bind API socket");
ret = listen(sd, 5);
if (ret)
throw new SystemError("Failed to listen on API socket");
state = STATE_STARTED;
}
void Server::stop()
{
int ret;
assert(state == STATE_STARTED);
ret = close(sd);
if (ret)
throw new SystemError("Failed to close API socket");;
state = STATE_STOPPED;
}
void Server::run(int timeout)
{
int ret;
assert(state == STATE_STARTED);
ret = poll(pfds.data(), pfds.size(), timeout);
if (ret < 0)
throw new SystemError("Failed to poll on API socket");;
for (unsigned i = 0; i < pfds.size(); i++) {
auto &pfd = pfds[i];
auto s = sessions[i];
if (pfd.revents & POLLOUT) {
if (s)
s->write();
}
if (pfd.revents & POLLIN) {
/* New connection */
if (s) {
ret = s->read();
if (ret < 0)
closeSession(s);
}
else
acceptNewSession();
}
}
}
void Server::acceptNewSession() {
int fd = ::accept(sd, nullptr, nullptr);
auto s = new sessions::Socket(api, fd);
pollfd pfd = {
.fd = fd,
.events = POLLIN | POLLOUT
};
pfds.push_back(pfd);
sessions.push_back(s);
api->sessions.push_back(s);
}
void Server::closeSession(sessions::Socket *s)
{
api->sessions.remove(s);
ptrdiff_t pos = std::find(sessions.begin(), sessions.end(), s) - sessions.begin();
if (pos < (ptrdiff_t) sessions.size()) {
pfds.erase(pfds.begin() + pos);
sessions.erase(sessions.begin() + pos);
delete s;
}
}

View file

@ -1,172 +0,0 @@
/** API session.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#include <libwebsockets.h>
#include <villas/api/session.h>
#include <villas/web.h>
#include <villas/plugin.h>
#include <villas/memory.h>
int api_session_init(struct api_session *s, enum api_mode m)
{
int ret;
s->runs = 0;
s->mode = m;
ret = buffer_init(&s->request.buffer, 0);
if (ret)
return ret;
ret = buffer_init(&s->response.buffer, 0);
if (ret)
return ret;
ret = queue_init(&s->request.queue, 128, &memory_heap);
if (ret)
return ret;
ret = queue_init(&s->response.queue, 128, &memory_heap);
if (ret)
return ret;
s->_name = NULL;
return 0;
}
int api_session_destroy(struct api_session *s)
{
int ret;
ret = buffer_destroy(&s->request.buffer);
if (ret)
return ret;
ret = buffer_destroy(&s->response.buffer);
if (ret)
return ret;
ret = queue_destroy(&s->request.queue);
if (ret)
return ret;
ret = queue_destroy(&s->response.queue);
if (ret)
return ret;
if (s->_name)
free(s->_name);
return 0;
}
int api_session_run_action(struct api_session *s, json_t *json_in, json_t **json_out)
{
int ret;
const char *action;
char *id;
struct plugin *p;
json_t *json_args = NULL;
json_t *json_resp = NULL;
ret = json_unpack(json_in, "{ s: s, s: s, s?: o }",
"action", &action,
"id", &id,
"request", &json_args);
if (ret) {
ret = -100;
*json_out = json_pack("{ s: s, s: i }",
"error", "invalid request",
"code", ret);
goto out;
}
p = plugin_lookup(PLUGIN_TYPE_API, action);
if (!p) {
ret = -101;
*json_out = json_pack("{ s: s, s: s, s: i, s: s }",
"action", action,
"id", id,
"code", ret,
"error", "action not found");
goto out;
}
debug(LOG_API, "Running API request: action=%s, id=%s", action, id);
ret = p->api.cb(&p->api, json_args, &json_resp, s);
if (ret)
*json_out = json_pack("{ s: s, s: s, s: i, s: s }",
"action", action,
"id", id,
"code", ret,
"error", "action failed");
else
*json_out = json_pack("{ s: s, s: s }",
"action", action,
"id", id);
if (json_resp)
json_object_set_new(*json_out, "response", json_resp);
out: debug(LOG_API, "Completed API request: action=%s, id=%s, code=%d", action, id, ret);
s->runs++;
return 0;
}
char * api_session_name(struct api_session *s)
{
if (!s->_name) {
char *mode;
switch (s->mode) {
case API_MODE_WS:
mode = "ws";
break;
case API_MODE_HTTP:
mode = "http";
break;
default:
mode = "unknown";
}
strcatf(&s->_name, "version=%d, mode=%s", s->version, mode);
if (s->wsi) {
char name[128];
char ip[128];
lws_get_peer_addresses(s->wsi, lws_get_socket_fd(s->wsi), name, sizeof(name), ip, sizeof(ip));
strcatf(&s->_name, ", remote.name=%s, remote.ip=%s", name, ip);
}
}
return s->_name;
}

140
lib/api/session.cpp Normal file
View file

@ -0,0 +1,140 @@
/** API session.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#include <sstream>
#include <libwebsockets.h>
#include <villas/web.hpp>
#include <villas/plugin.h>
#include <villas/memory.h>
#include <villas/api/session.hpp>
#include <villas/api/action.hpp>
using namespace villas;
using namespace villas::node::api;
Logger Session::logger = logging.get("api:session");
Session::Session(Api *a) :
runs(0),
api(a)
{
queue_init(&request.queue, 128, &memory_heap);
queue_init(&response.queue, 128, &memory_heap);
logger->debug("Initiated API session: {}", getName());
}
Session::~Session()
{
queue_destroy(&request.queue);
queue_destroy(&response.queue);
}
void Session::runPendingActions()
{
json_t *req, *resp;
while (queue_available(&request.queue) > 0) {
queue_pull(&request.queue, (void **) &req);
runAction(req, &resp);
json_decref(req);
queue_push(&response.queue, resp);
}
}
int Session::runAction(json_t *json_in, json_t **json_out)
{
int ret;
const char *action;
char *id;
json_t *json_args = nullptr;
json_t *json_resp = nullptr;
ret = json_unpack(json_in, "{ s: s, s: s, s?: o }",
"action", &action,
"id", &id,
"request", &json_args);
if (ret) {
ret = -100;
*json_out = json_pack("{ s: s, s: i }",
"error", "invalid request",
"code", ret);
logger->debug("Completed API request: action={}, id={}, code={}", action, id, ret);
return 0;
}
auto acf = plugin::Registry::lookup<ActionFactory>(action);
if (!acf) {
ret = -101;
*json_out = json_pack("{ s: s, s: s, s: i, s: s }",
"action", action,
"id", id,
"code", ret,
"error", "action not found");
logger->debug("Completed API request: action={}, id={}, code={}", action, id, ret);
return 0;
}
logger->debug("Running API request: action={}, id={}", action, id);
Action *act = acf->make(this);
ret = act->execute(json_args, &json_resp);
if (ret)
*json_out = json_pack("{ s: s, s: s, s: i, s: s }",
"action", action,
"id", id,
"code", ret,
"error", "action failed");
else
*json_out = json_pack("{ s: s, s: s }",
"action", action,
"id", id);
if (json_resp)
json_object_set_new(*json_out, "response", json_resp);
logger->debug("Completed API request: action={}, id={}, code={}", action, id, ret);
runs++;
return 0;
}
std::string Session::getName()
{
std::stringstream ss;
ss << "version=" << version << ", runs=" << runs;
return ss.str();
}

198
lib/api/sessions/http.cpp Normal file
View file

@ -0,0 +1,198 @@
/** HTTP Api session.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#include <sstream>
#include <libwebsockets.h>
#include <villas/web.hpp>
#include <villas/log.hpp>
#include <villas/config.h>
#include <villas/api/sessions/http.hpp>
using namespace villas::node;
using namespace villas::node::api::sessions;
Http::Http(Api *a, lws *w) :
Wsi(a, w)
{
int hdrlen, options = -1, version;
char *uri;
if ((hdrlen = lws_hdr_total_length(wsi, WSI_TOKEN_OPTIONS_URI)))
options = 1;
else if ((hdrlen = lws_hdr_total_length(wsi, WSI_TOKEN_POST_URI)))
options = 0;
uri = new char[hdrlen + 1];
lws_hdr_copy(wsi, uri, hdrlen + 1, options ? WSI_TOKEN_OPTIONS_URI : WSI_TOKEN_POST_URI);
/* Parse request URI */
sscanf(uri, "/api/v%d", (int *) &version);
/** @todo Check version */
/* This is an OPTIONS request.
*
* We immediatly send headers and close the connection
* without waiting for a POST body */
if (options)
lws_callback_on_writable(wsi);
delete uri;
}
void Http::read(void *in, size_t len)
{
request.buffer.append((const char *) in, len);
}
int Http::complete()
{
int pushed;
json_t *req;
req = request.buffer.decode();
if (!req)
return 0;
request.buffer.clear();
pushed = queue_push(&request.queue, req);
if (pushed != 1)
logger->warn("Queue overrun for API session: {}", getName());
return 1;
}
int Http::write()
{
int ret, pulled;
json_t *resp;
pulled = queue_pull(&response.queue, (void **) &resp);
if (pulled) {
response.buffer.clear();
response.buffer.encode(resp);
json_decref(resp);
}
std::stringstream headers;
headers << "HTTP/1.1 200 OK\r\n"
<< "Content-type: application/json\r\n"
<< "User-agent: " USER_AGENT "\r\n"
<< "Access-Control-Allow-Origin: *\r\n"
<< "Access-Control-Allow-Methods: GET, POST, OPTIONS\r\n"
<< "Access-Control-Allow-Headers: Content-Type\r\n"
<< "Access-Control-Max-Age: 86400\r\n"
<< "Content-Length: " << response.buffer.size() << "\r\n"
<< "\r\n";
ret = lws_write(wsi, (unsigned char *) headers.str().data(), headers.str().size(), LWS_WRITE_HTTP_HEADERS);
if (ret < 0)
return 1;
ret = lws_write(wsi, (unsigned char *) response.buffer.data(), response.buffer.size(), LWS_WRITE_HTTP);
if (ret < 0)
return 1;
return 0;
}
Http::~Http()
{
logger->debug("Closed API session: {}", getName());
}
std::string Http::getName()
{
std::stringstream ss;
ss << Wsi::getName() << ", mode=http";
return ss.str();
}
int api_http_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
{
int ret;
lws_context *ctx = lws_get_context(wsi);
void *user_ctx = lws_context_user(ctx);
Web *w = static_cast<Web *>(user_ctx);
Api *a = w->getApi();
Http *s = static_cast<Http *>(user);
switch (reason) {
case LWS_CALLBACK_HTTP_BIND_PROTOCOL:
if (a == nullptr)
return -1;
new (s) Http(a, wsi);
a->sessions.push_back(s);
break;
case LWS_CALLBACK_HTTP_DROP_PROTOCOL:
if (!s)
return -1;
s->~Http();
a->sessions.remove(s);
return 1;
case LWS_CALLBACK_HTTP_BODY:
s->read(in, len);
break;
case LWS_CALLBACK_HTTP_BODY_COMPLETION:
ret = s->complete();
if (ret)
a->pending.push(s);
break;
case LWS_CALLBACK_HTTP_WRITEABLE:
s->write();
goto try_to_reuse;
default:
break;
}
return 0;
try_to_reuse:
if (lws_http_transaction_completed(wsi))
return -1;
return 0;
}

View file

@ -0,0 +1,85 @@
/** Unix domain socket Api session.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#include <sstream>
#include <villas/log.hpp>
#include <villas/api/sessions/socket.hpp>
using namespace villas::node::api::sessions;
Socket::Socket(Api *a, int s) :
Session(a),
sd(s)
{
}
Socket::~Socket()
{
close(sd);
}
std::string Socket::getName()
{
std::stringstream ss;
ss << Session::getName() << ", mode=socket";
return ss.str();
}
int Socket::read()
{
int ret;
json_t *j;
json_error_t err;
j = json_loadfd(sd, JSON_DISABLE_EOF_CHECK, &err);
if (!j)
return -1;
ret = queue_push(&request.queue, (json_t *) j);
if (ret != 1)
return -1;
api->pending.push(this);
return 0;
}
int Socket::write()
{
int ret;
json_t *j;
while (queue_pull(&response.queue, (void **) &j)) {
ret = json_dumpfd(j, sd, 0);
if (ret)
return ret;
char nl = '\n';
send(sd, &nl, 1, 0);
}
return 0;
}

View file

@ -0,0 +1,165 @@
/** WebSockets Api session.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#include <sstream>
#include <libwebsockets.h>
#include <villas/web.hpp>
#include <villas/log.hpp>
#include <villas/api/sessions/websocket.hpp>
using namespace villas::node;
using namespace villas::node::api::sessions;
WebSocket::WebSocket(Api *a, lws *w) :
Wsi(a, w)
{
/* Parse request URI */
int version;
char uri[64];
lws_hdr_copy(wsi, uri, sizeof(uri), WSI_TOKEN_GET_URI);
sscanf(uri, "/v%d", (int *) &version);
/** @todo Check version */
logger->debug("Initiated API session: {}", getName());
}
WebSocket::~WebSocket()
{
logger->debug("Closed API session: {}", getName());
}
int WebSocket::read(void *in, size_t len)
{
int pushed;
json_t *req;
if (lws_is_first_fragment(wsi))
request.buffer.clear();
request.buffer.append((const char *) in, len);
if (lws_is_final_fragment(wsi)) {
req = request.buffer.decode();
if (!req)
return 0;
pushed = queue_push(&request.queue, req);
if (pushed != 1)
logger->warn("Queue overun in API session");
return 1;
}
return 0;
}
int WebSocket::write()
{
int pulled;
json_t *resp;
if (state == State::SHUTDOWN)
return -1;
pulled = queue_pull(&response.queue, (void **) &resp);
if (pulled) {
char pad[LWS_PRE];
response.buffer.clear();
response.buffer.append(pad, sizeof(pad));
response.buffer.encode(resp);
json_decref(resp);
lws_write(wsi, (unsigned char *) response.buffer.data() + LWS_PRE, response.buffer.size() - LWS_PRE, LWS_WRITE_TEXT);
}
return 0;
}
std::string WebSocket::getName()
{
std::stringstream ss;
ss << Wsi::getName() << ", mode=ws";
return ss.str();
}
int api_ws_protocol_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
{
int ret;
lws_context *ctx = lws_get_context(wsi);
void *user_ctx = lws_context_user(ctx);
Web *w = static_cast<Web*>(user_ctx);
Api *a = w->getApi();
WebSocket *s = static_cast<WebSocket *>(user);
switch (reason) {
case LWS_CALLBACK_ESTABLISHED:
if (a == nullptr) {
std::string err = "API disabled";
lws_close_reason(wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, (unsigned char *) err.data(), err.length());
return -1;
}
new (s) WebSocket(a, wsi);
a->sessions.push_back(s);
break;
case LWS_CALLBACK_CLOSED:
s->~WebSocket();
a->sessions.remove(s);
break;
case LWS_CALLBACK_RECEIVE:
ret = s->read(in, len);
if (ret)
a->pending.push(s);
break;
case LWS_CALLBACK_SERVER_WRITEABLE:
ret = s->write();
if (ret)
return ret;
break;
default:
break;
}
return 0;
}

72
lib/api/sessions/wsi.cpp Normal file
View file

@ -0,0 +1,72 @@
/** LWS Api session.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#include <sstream>
#include <villas/api/sessions/wsi.hpp>
using namespace villas::node::api::sessions;
void Wsi::runPendingActions()
{
Session::runPendingActions();
web->callbackOnWritable(wsi);
}
void Wsi::shutdown()
{
state = State::SHUTDOWN;
web->callbackOnWritable(wsi);
}
std::string Wsi::getName()
{
std::stringstream ss;
ss << Session::getName();
if (wsi) {
char name[128];
char ip[128];
lws_get_peer_addresses(wsi, lws_get_socket_fd(wsi), name, sizeof(name), ip, sizeof(ip));
ss << ", remote.name=" << name << ", remote.ip=" << ip;
}
return ss.str();
}
Wsi::Wsi(Api *a, lws *w) :
Session(a),
wsi(w)
{
state = Session::State::ESTABLISHED;
lws_context *ctx = lws_get_context(wsi);
void *ctx = lws_context_user(ctx);
web = static_cast<Web*>(ctx);
}

View file

@ -28,7 +28,7 @@
#include <libwebsockets.h>
#include <villas/web.h>
#include <villas/web.hpp>
#include <villas/timing.h>
#include <villas/utils.h>
#include <villas/buffer.h>

View file

@ -25,38 +25,42 @@
#include <libgen.h>
#include <unistd.h>
#include <villas/super_node.h>
#include <villas/super_node.hpp>
#include <villas/node.h>
#include <villas/path.h>
#include <villas/utils.h>
#include <villas/list.h>
#include <villas/hook.h>
#include <villas/advio.h>
#include <villas/web.h>
#include <villas/log.h>
#include <villas/api.h>
#include <villas/plugin.h>
#include <villas/memory.h>
#include <villas/node/config.h>
#include <villas/config_helper.h>
#include <villas/log.hpp>
#include <villas/exceptions.hpp>
#include <villas/kernel/rt.hpp>
#include <villas/kernel/rt.h>
using namespace villas;
using namespace villas::node;
Logger SuperNode::logger = logging.get("super_node");
SuperNode::SuperNode() :
state(STATE_INITIALIZED),
priority(0),
affinity(0),
hugepages(DEFAULT_NR_HUGEPAGES),
stats(0)
stats(0),
api(this),
web(&api)
{
list_init(&nodes);
list_init(&paths);
list_init(&plugins);
name = (char *) alloc(128);
gethostname(name, 128);
char hname[128];
gethostname(hname, 128);
name = hname;
init();
}
@ -65,11 +69,7 @@ int SuperNode::init()
{
int ret;
ret = log_init(&log, name, 2, LOG_ALL);
if (ret)
return ret;
ret = rt_init(priority, affinity);
ret = kernel::rt::init(priority, affinity);
if (ret)
return ret;
@ -77,114 +77,94 @@ int SuperNode::init()
if (ret)
return ret;
#ifdef WITH_API
ret = api_init(&api);//, this); // @todo: port to C++
if (ret)
return ret;
#endif /* WITH_API */
#ifdef WITH_WEB
ret = web_init(&web, &api);
if (ret)
return ret;
#endif /* WITH_WEB */
return 0;
}
int SuperNode::parseUri(const char *u)
int SuperNode::parseUri(const std::string &u)
{
json_error_t err;
info("Parsing configuration");
logger->info("Parsing configuration");
if (u) {
FILE *f;
AFILE *af;
FILE *f;
AFILE *af;
/* Via stdin */
if (!strcmp("-", u)) {
info("Reading configuration from stdin");
/* Via stdin */
if (u == "-") {
logger->info("Reading configuration from standard input");
af = NULL;
f = stdin;
}
else {
info("Reading configuration from URI: %s", u);
af = nullptr;
f = stdin;
}
else {
logger->info("Reading configuration from URI: {}", u);
af = afopen(u, "r");
if (!af)
error("Failed to open configuration from: %s", u);
af = afopen(u.c_str(), "r");
if (!af)
throw new RuntimeError("Failed to open configuration from: {}", u);
f = af->file;
}
f = af->file;
}
/* Parse config */
json = json_loadf(f, 0, &err);
if (json == NULL) {
/* Parse config */
json = json_loadf(f, 0, &err);
if (json == nullptr) {
#ifdef LIBCONFIG_FOUND
int ret;
int ret;
config_t cfg;
config_setting_t *json_root = NULL;
config_t cfg;
config_setting_t *json_root = nullptr;
warn("Failed to parse JSON configuration. Re-trying with old libconfig format.");
{
warn("Please consider migrating to the new format using the 'conf2json' command.");
}
logger->warn("Failed to parse JSON configuration. Re-trying with old libconfig format.");
logger->warn(" Please consider migrating to the new format using the 'conf2json' command.");
config_init(&cfg);
config_set_auto_convert(&cfg, 1);
config_init(&cfg);
config_set_auto_convert(&cfg, 1);
/* Setup libconfig include path.
* This is only supported for local files */
if (access(uri, F_OK) != -1) {
char *cpy = strdup(uri);
/* Setup libconfig include path.
* This is only supported for local files */
if (access(u.c_str(), F_OK) != -1) {
char *cpy = strdup(u.c_str());
config_set_include_dir(&cfg, dirname(cpy));
config_set_include_dir(&cfg, dirname(cpy));
free(cpy);
}
free(cpy);
}
if (af)
arewind(af);
else
rewind(f);
if (af)
arewind(af);
else
rewind(f);
ret = config_read(&cfg, f);
if (ret != CONFIG_TRUE) {
{
warn("conf: %s in %s:%d", config_error_text(&cfg), uri, config_error_line(&cfg));
warn("json: %s in %s:%d:%d", err.text, err.source, err.line, err.column);
}
error("Failed to parse configuration");
}
ret = config_read(&cfg, f);
if (ret != CONFIG_TRUE) {
logger->warn("conf: {} in {}:{}", config_error_text(&cfg), u.c_str(), config_error_line(&cfg));
logger->warn("json: {} in {}:{}:{}", err.text, err.source, err.line, err.column);
logger->error("Failed to parse configuration");
killme(SIGABRT);
}
json_root = config_root_setting(&cfg);
json_root = config_root_setting(&cfg);
json = config_to_json(json_root);
if (json == NULL)
error("Failed to convert JSON to configuration file");
json = config_to_json(json_root);
if (json == nullptr)
throw new RuntimeError("Failed to convert JSON to configuration file");
config_destroy(&cfg);
config_destroy(&cfg);
#else
jerror(&err, "Failed to parse configuration file");
throw new JsonError(err, "Failed to parse configuration file");
#endif /* LIBCONFIG_FOUND */
}
/* Close configuration file */
if (af)
afclose(af);
else if (f != stdin)
fclose(f);
/* Close configuration file */
if (af)
afclose(af);
else if (f != stdin)
fclose(f);
uri = strdup(u);
uri = u;
return parseJson(json);
}
else {
warn("No configuration file specified. Starting unconfigured. Use the API to configure this instance.");
}
return parseJson(json);
return 0;
}
@ -217,38 +197,36 @@ int SuperNode::parseJson(json_t *j)
"name", &nme
);
if (ret)
jerror(&err, "Failed to parse global configuration");
throw new JsonError(err, "Failed to parse global configuration");
if (nme) {
name = (char *) realloc(name, strlen(nme)+1);
sprintf(name, "%s", nme);
}
if (nme)
name = nme;
#ifdef WITH_WEB
if (json_web)
web_parse(&web, json_web);
web.parse(json_web);
#endif /* WITH_WEB */
if (json_logging)
log_parse(&log, json_logging);
logging.parse(json_logging);
/* Parse plugins */
if (json_plugins) {
if (!json_is_array(json_plugins))
error("Setting 'plugins' must be a list of strings");
throw new ConfigError(json_plugins, "node-config-plugins", "Setting 'plugins' must be a list of strings");
size_t i;
json_t *json_plugin;
json_array_foreach(json_plugins, i, json_plugin) {
struct plugin *p = (struct plugin *) alloc(sizeof(struct plugin));
auto *p = (plugin *) alloc(sizeof(plugin));
ret = plugin_init(p);
if (ret)
error("Failed to initialize plugin");
throw new RuntimeError("Failed to initialize plugin");
ret = plugin_parse(p, json_plugin);
if (ret)
error("Failed to parse plugin");
throw new RuntimeError("Failed to parse plugin");
list_push(&plugins, p);
}
@ -257,7 +235,7 @@ int SuperNode::parseJson(json_t *j)
/* Parse nodes */
if (json_nodes) {
if (!json_is_object(json_nodes))
error("Setting 'nodes' must be a group with node name => group mappings.");
throw new ConfigError(json_nodes, "node-config-nodes", "Setting 'nodes' must be a group with node name => group mappings.");
const char *name;
json_t *json_node;
@ -267,21 +245,21 @@ int SuperNode::parseJson(json_t *j)
ret = json_unpack_ex(json_node, &err, 0, "{ s: s }", "type", &type);
if (ret)
jerror(&err, "Failed to parse node");
throw new JsonError(err, "Failed to parse node");
nt = node_type_lookup(type);
if (!nt)
error("Invalid node type: %s", type);
throw new RuntimeError("Invalid node type: {}", type);
struct node *n = (struct node *) alloc(sizeof(struct node));
auto *n = (struct node *) alloc(sizeof(struct node));
ret = node_init(n, nt);
if (ret)
error("Failed to initialize node");
throw new RuntimeError("Failed to initialize node");
ret = node_parse(n, json_node, name);
if (ret)
error("Failed to parse node");
throw new RuntimeError("Failed to parse node");
list_push(&nodes, n);
}
@ -290,33 +268,33 @@ int SuperNode::parseJson(json_t *j)
/* Parse paths */
if (json_paths) {
if (!json_is_array(json_paths))
warn("Setting 'paths' must be a list.");
logger->warn("Setting 'paths' must be a list of objects");
size_t i;
json_t *json_path;
json_array_foreach(json_paths, i, json_path) {
struct path *p = (struct path *) alloc(sizeof(struct path));
path *p = (path *) alloc(sizeof(path));
ret = path_init(p);
if (ret)
error("Failed to initialize path");
throw new RuntimeError("Failed to initialize path");
ret = path_parse(p, json_path, &nodes);
if (ret)
error("Failed to parse path");
throw new RuntimeError("Failed to parse path");
list_push(&paths, p);
if (p->reverse) {
struct path *r = (struct path *) alloc(sizeof(struct path));
path *r = (path *) alloc(sizeof(path));
ret = path_init(r);
if (ret)
error("Failed to init path");
throw new RuntimeError("Failed to init path");
ret = path_reverse(p, r);
if (ret)
error("Failed to reverse path %s", path_name(p));
throw new RuntimeError("Failed to reverse path {}", path_name(p));
list_push(&paths, r);
}
@ -334,22 +312,22 @@ int SuperNode::check()
{
int ret;
assert(state == STATE_PARSED || state == STATE_PARSED || state == STATE_CHECKED);
assert(state == STATE_INITIALIZED || state == STATE_PARSED || state == STATE_CHECKED);
for (size_t i = 0; i < list_length(&nodes); i++) {
struct node *n = (struct node *) list_at(&nodes, i);
auto *n = (struct node *) list_at(&nodes, i);
ret = node_check(n);
if (ret)
error("Invalid configuration for node %s", node_name(n));
throw new RuntimeError("Invalid configuration for node {}", node_name(n));
}
for (size_t i = 0; i < list_length(&paths); i++) {
struct path *p = (struct path *) list_at(&paths, i);
auto *p = (struct path *) list_at(&paths, i);
ret = path_check(p);
if (ret)
error("Invalid configuration for path %s", path_name(p));
throw new RuntimeError("Invalid configuration for path {}", path_name(p));
}
state = STATE_CHECKED;
@ -364,60 +342,70 @@ int SuperNode::start()
assert(state == STATE_CHECKED);
memory_init(hugepages);
rt_init(priority, affinity);
kernel::rt::init(priority, affinity);
log_open(&log);
#ifdef WITH_API
api_start(&api);
#endif
#ifdef WITH_WEB
web_start(&web);
api.start();
#endif
info("Starting node-types");
#ifdef WITH_WEB
web.start();
#endif
logger->info("Starting node-types");
for (size_t i = 0; i < list_length(&nodes); i++) {
struct node *n = (struct node *) list_at(&nodes, i);
auto *n = (struct node *) list_at(&nodes, i);
ret = node_type_start(n->_vt);//, this); // @todo: port to C++
if (ret)
error("Failed to start node-type: %s", node_type_name(n->_vt));
throw new RuntimeError("Failed to start node-type: {}", node_type_name(n->_vt));
}
info("Starting nodes");
logger->info("Starting nodes");
for (size_t i = 0; i < list_length(&nodes); i++) {
struct node *n = (struct node *) list_at(&nodes, i);
auto *n = (struct node *) list_at(&nodes, i);
ret = node_init2(n);
if (ret)
error("Failed to prepare node: %s", node_name(n));
throw new RuntimeError("Failed to prepare node: {}", node_name(n));
int refs = list_count(&paths, (cmp_cb_t) path_uses_node, n);
if (refs > 0) {
ret = node_start(n);
if (ret)
error("Failed to start node: %s", node_name(n));
throw new RuntimeError("Failed to start node: {}", node_name(n));
}
else
warn("No path is using the node %s. Skipping...", node_name(n));
logger->warn("No path is using the node {}. Skipping...", node_name(n));
}
info("Starting paths");
logger->info("Starting paths");
for (size_t i = 0; i < list_length(&paths); i++) {
struct path *p = (struct path *) list_at(&paths, i);
auto *p = (struct path *) list_at(&paths, i);
if (p->enabled) {
ret = path_init2(p);
if (ret)
error("Failed to prepare path: %s", path_name(p));
throw new RuntimeError("Failed to prepare path: {}", path_name(p));
ret = path_start(p);
if (ret)
error("Failed to start path: %s", path_name(p));
throw new RuntimeError("Failed to start path: {}", path_name(p));
}
else
warn("Path %s is disabled. Skipping...", path_name(p));
logger->warn("Path {} is disabled. Skipping...", path_name(p));
}
#ifdef WITH_HOOKS
if (stats > 0) {
stats_print_header(STATS_FORMAT_HUMAN);
ret = task_init(&task, 1.0 / stats, CLOCK_REALTIME);
if (ret)
throw new RuntimeError("Failed to create stats timer");
}
#endif /* WITH_HOOKS */
state = STATE_STARTED;
return 0;
@ -427,45 +415,51 @@ int SuperNode::stop()
{
int ret;
if (stats > 0)
#ifdef WITH_HOOKS
if (stats > 0) {
stats_print_footer(STATS_FORMAT_HUMAN);
info("Stopping paths");
ret = task_destroy(&task);
if (ret)
throw new RuntimeError("Failed to stop stats timer");
}
#endif /* WITH_HOOKS */
logger->info("Stopping paths");
for (size_t i = 0; i < list_length(&paths); i++) {
struct path *p = (struct path *) list_at(&paths, i);
auto *p = (struct path *) list_at(&paths, i);
ret = path_stop(p);
if (ret)
error("Failed to stop path: %s", path_name(p));
throw new RuntimeError("Failed to stop path: {}", path_name(p));
}
info("Stopping nodes");
logger->info("Stopping nodes");
for (size_t i = 0; i < list_length(&nodes); i++) {
struct node *n = (struct node *) list_at(&nodes, i);
auto *n = (struct node *) list_at(&nodes, i);
ret = node_stop(n);
if (ret)
error("Failed to stop node: %s", node_name(n));
throw new RuntimeError("Failed to stop node: {}", node_name(n));
}
info("Stopping node-types");
logger->info("Stopping node-types");
for (size_t i = 0; i < list_length(&plugins); i++) {
struct plugin *p = (struct plugin *) list_at(&plugins, i);
auto *p = (struct plugin *) list_at(&plugins, i);
if (p->type == PLUGIN_TYPE_NODE) {
ret = node_type_stop(&p->node);
if (ret)
error("Failed to stop node-type: %s", node_type_name(&p->node));
throw new RuntimeError("Failed to stop node-type: {}", node_type_name(&p->node));
}
}
#ifdef WITH_API
api_stop(&api);
api.stop();
#endif
#ifdef WITH_WEB
web_stop(&web);
web.stop();
#endif
log_close(&log);
state = STATE_STOPPED;
@ -475,49 +469,22 @@ int SuperNode::stop()
void SuperNode::run()
{
#ifdef WITH_HOOKS
int ret;
if (stats > 0) {
stats_print_header(STATS_FORMAT_HUMAN);
struct task t;
ret = task_init(&t, 1.0 / stats, CLOCK_REALTIME);
if (ret)
error("Failed to create stats timer");
for (;;) {
task_wait(&t);
periodic();
}
}
else
task_wait(&task);
periodic();
#else
pause();
#endif /* WITH_HOOKS */
for (;;) pause();
}
SuperNode::~SuperNode()
{
assert(state == STATE_STOPPED);
assert(state != STATE_STARTED);
list_destroy(&plugins, (dtor_cb_t) plugin_destroy, false);
list_destroy(&paths, (dtor_cb_t) path_destroy, true);
list_destroy(&nodes, (dtor_cb_t) node_destroy, true);
#ifdef WITH_WEB
web_destroy(&web);
#endif /* WITH_WEB */
#ifdef WITH_API
api_destroy(&api);
#endif /* WITH_API */
json_decref(json);
log_destroy(&log);
if (name)
free(name);
}
int SuperNode::periodic()
@ -526,13 +493,13 @@ int SuperNode::periodic()
int ret;
for (size_t i = 0; i < list_length(&paths); i++) {
struct path *p = (struct path *) list_at(&paths, i);
auto *p = (struct path *) list_at(&paths, i);
if (p->state != STATE_STARTED)
continue;
for (size_t j = 0; j < list_length(&p->hooks); j++) {
struct hook *h = (struct hook *) list_at(&p->hooks, j);
hook *h = (struct hook *) list_at(&p->hooks, j);
ret = hook_periodic(h);
if (ret)
@ -541,13 +508,13 @@ int SuperNode::periodic()
}
for (size_t i = 0; i < list_length(&nodes); i++) {
struct node *n = (struct node *) list_at(&nodes, i);
auto *n = (struct node *) list_at(&nodes, i);
if (n->state != STATE_STARTED)
continue;
for (size_t j = 0; j < list_length(&n->in.hooks); j++) {
struct hook *h = (struct hook *) list_at(&n->in.hooks, j);
auto *h = (struct hook *) list_at(&n->in.hooks, j);
ret = hook_periodic(h);
if (ret)
@ -555,7 +522,7 @@ int SuperNode::periodic()
}
for (size_t j = 0; j < list_length(&n->out.hooks); j++) {
struct hook *h = (struct hook *) list_at(&n->out.hooks, j);
auto *h = (struct hook *) list_at(&n->out.hooks, j);
ret = hook_periodic(h);
if (ret)

359
lib/web.c
View file

@ -1,359 +0,0 @@
/** LWS-releated functions.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#include <libwebsockets.h>
#include <string.h>
#include <villas/config.h>
#include <villas/utils.h>
#include <villas/log.h>
#include <villas/web.h>
#include <villas/api/session.h>
#include <villas/nodes/websocket.h>
/* Forward declarations */
lws_callback_function api_ws_protocol_cb;
lws_callback_function api_http_protocol_cb;
lws_callback_function websocket_protocol_cb;
/** List of libwebsockets protocols. */
struct lws_protocols protocols[] = {
{
.name = "http",
.callback = lws_callback_http_dummy,
.per_session_data_size = 0,
.rx_buffer_size = 1024
},
#ifdef WITH_API
{
.name = "http-api",
.callback = api_http_protocol_cb,
.per_session_data_size = sizeof(struct api_session),
.rx_buffer_size = 1024
},
{
.name = "api",
.callback = api_ws_protocol_cb,
.per_session_data_size = sizeof(struct api_session),
.rx_buffer_size = 0
},
#endif /* WITH_API */
#ifdef LIBWEBSOCKETS_FOUND
{
.name = "live",
.callback = websocket_protocol_cb,
.per_session_data_size = sizeof(struct websocket_connection),
.rx_buffer_size = 0
},
#endif /* LIBWEBSOCKETS_FOUND */
#if 0 /* not supported yet */
{
.name = "log",
.callback = log_ws_protocol_cb,
.per_session_data_size = 0,
.rx_buffer_size = 0
},
{
.name = "stats",
.callback = stats_ws_protocol_cb,
.per_session_data_size = sizeof(struct api_session),
.rx_buffer_size = 0
},
#endif
{ NULL /* terminator */ }
};
/** List of libwebsockets mounts. */
static struct lws_http_mount mounts[] = {
{
.mountpoint = "/",
.origin = NULL,
.def = "/index.html",
.cgienv = NULL,
.cgi_timeout = 0,
.cache_max_age = 0,
.cache_reusable = 0,
.cache_revalidate = 0,
.cache_intermediaries = 0,
.origin_protocol = LWSMPRO_FILE,
.mountpoint_len = 1,
#ifdef WITH_API
.mount_next = &mounts[1]
},
{
.mountpoint = "/api/v1",
.origin = "http-api",
.def = NULL,
.cgienv = NULL,
.cgi_timeout = 0,
.cache_max_age = 0,
.cache_reusable = 0,
.cache_revalidate = 0,
.cache_intermediaries = 0,
.origin_protocol = LWSMPRO_CALLBACK,
.mountpoint_len = 7,
#endif /* WITH_API */
.mount_next = NULL
}
};
/** List of libwebsockets extensions. */
static const struct lws_extension extensions[] = {
{
"permessage-deflate",
lws_extension_callback_pm_deflate,
"permessage-deflate"
},
{
"deflate-frame",
lws_extension_callback_pm_deflate,
"deflate_frame"
},
{ NULL /* terminator */ }
};
extern struct log *global_log;
static void logger(int level, const char *msg) {
int len = strlen(msg);
if (strchr(msg, '\n'))
len -= 1;
/* Decrease severity for some errors. */
if (strstr(msg, "Unable to open") == msg)
level = LLL_WARN;
switch (level) {
case LLL_ERR:
log_print(global_log, CLR_RED("Web "), "%.*s", len, msg);
break;
case LLL_WARN:
log_print(global_log, CLR_YEL("Web "), "%.*s", len, msg);
break;
case LLL_NOTICE:
log_print(global_log, CLR_WHT("Web "), "%.*s", len, msg);
break;
case LLL_INFO:
log_print(global_log, "Web ", "%.*s", len, msg);
break;
default: /* Everything else is debug */
log_print(global_log, CLR_GRY("Web "), "%.*s", len, msg); break;
}
}
void web_callback_on_writable(struct lws *wsi)
{
struct lws_context *ctx = lws_get_context(wsi);
struct web *w = lws_context_user(ctx);
queue_push(&w->writables, (void *) wsi);
}
static void * web_worker(void *ctx)
{
struct lws *wsi;
struct web *w = ctx;
for (;;) {
lws_service(w->context, 100);
while (queue_available(&w->writables)) {
queue_pull(&w->writables, (void **) &wsi);
lws_callback_on_writable(wsi);
}
}
return NULL;
}
int web_init(struct web *w, struct api *a)
{
int ret, lvl = LLL_ERR | LLL_WARN | LLL_NOTICE;
if (global_log->level >=10 && global_log->facilities & LOG_WEB)
lvl |= (1 << LLL_COUNT) - 1;
lws_set_log_level(lvl, logger);
w->api = a;
/* Default values */
w->port = getuid() > 0 ? 8080 : 80; /**< @todo Use libcap to check if user can bind to ports < 1024 */
w->htdocs = strdup(WEB_PATH);
ret = queue_init(&w->writables, 128, &memory_heap);
if (ret)
return ret;
w->state = STATE_INITIALIZED;
return 0;
}
int web_parse(struct web *w, json_t *cfg)
{
int ret, enabled = 1;
const char *ssl_cert = NULL;
const char *ssl_private_key = NULL;
const char *htdocs = NULL;
json_error_t err;
ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: s, s?: s, s?: i, s?: b }",
"ssl_cert", &ssl_cert,
"ssl_private_key", &ssl_private_key,
"htdocs", &htdocs,
"port", &w->port,
"enabled", &enabled
);
if (ret)
jerror(&err, "Failed to http section of configuration file");
if (ssl_cert)
w->ssl_cert = strdup(ssl_cert);
if (ssl_private_key)
w->ssl_private_key = strdup(ssl_private_key);
if (htdocs) {
if (w->htdocs)
free(w->htdocs);
w->htdocs = strdup(htdocs);
}
if (!enabled)
w->port = CONTEXT_PORT_NO_LISTEN;
w->state = STATE_PARSED;
return 0;
}
int web_start(struct web *w)
{
int ret;
/* Start server */
struct lws_context_creation_info ctx_info = {
.options = LWS_SERVER_OPTION_EXPLICIT_VHOSTS | LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT,
.gid = -1,
.uid = -1,
.user = (void *) w,
.protocols = protocols,
.mounts = mounts,
.extensions = extensions,
.port = w->port,
.ssl_cert_filepath = w->ssl_cert,
.ssl_private_key_filepath = w->ssl_private_key
};
info("Starting Web sub-system: webroot=%s", w->htdocs);
/* update web root of mount point */
mounts[0].origin = w->htdocs;
w->context = lws_create_context(&ctx_info);
if (w->context == NULL)
error("WebSocket: failed to initialize server context");
for (int tries = 10; tries > 0; tries--) {
w->vhost = lws_create_vhost(w->context, &ctx_info);
if (w->vhost)
break;
ctx_info.port++;
warn("WebSocket: failed to setup vhost. Trying another port: %d", ctx_info.port);
}
if (w->vhost == NULL)
error("WebSocket: failed to initialize virtual host");
ret = pthread_create(&w->thread, NULL, web_worker, w);
if (ret)
error("Failed to start Web worker thread");
w->state = STATE_STARTED;
return ret;
}
int web_stop(struct web *w)
{
int ret;
if (w->state != STATE_STARTED)
return 0;
info("Stopping Web sub-system");
{
lws_cancel_service(w->context);
/** @todo Wait for all connections to be closed */
ret = pthread_cancel(w->thread);
if (ret)
serror("Failed to cancel Web worker thread");
ret = pthread_join(w->thread, NULL);
if (ret)
serror("Failed to join Web worker thread");
}
w->state = STATE_STOPPED;
return 0;
}
int web_destroy(struct web *w)
{
int ret;
if (w->state == STATE_DESTROYED)
return 0;
if (w->context) {
lws_context_destroy(w->context);
}
if (w->ssl_cert)
free(w->ssl_cert);
if (w->ssl_private_key)
free(w->ssl_private_key);
if (w->htdocs)
free(w->htdocs);
ret = queue_destroy(&w->writables);
if (ret)
return ret;
w->state = STATE_DESTROYED;
return 0;
}

286
lib/web.cpp Normal file
View file

@ -0,0 +1,286 @@
/** LWS-releated functions.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017-2018, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#include <libwebsockets.h>
#include <string.h>
#include <pthread.h>
#include <villas/node/config.h>
#include <villas/utils.h>
#include <villas/web.hpp>
#include <villas/api.hpp>
#include <villas/exceptions.hpp>
#include <villas/api/sessions/http.hpp>
#include <villas/api/sessions/websocket.hpp>
#include <villas/nodes/websocket.h>
using namespace villas;
using namespace villas::node;
/* Forward declarations */
lws_callback_function websocket_protocol_cb;
Logger Web::logger = logging.get("web");
/** List of libwebsockets protocols. */
lws_protocols protocols[] = {
{
.name = "http",
.callback = lws_callback_http_dummy,
.per_session_data_size = 0,
.rx_buffer_size = 1024
},
#ifdef WITH_API
{
.name = "http-api",
.callback = api_http_protocol_cb,
.per_session_data_size = sizeof(api::sessions::Http),
.rx_buffer_size = 1024
},
{
.name = "api",
.callback = api_ws_protocol_cb,
.per_session_data_size = sizeof(api::sessions::WebSocket),
.rx_buffer_size = 0
},
#endif /* WITH_API */
#ifdef __LIBWEBSOCKETS_FOUND /** @todo: Port to C++ */
{
.name = "live",
.callback = websocket_protocol_cb,
.per_session_data_size = sizeof(websocket_connection),
.rx_buffer_size = 0
},
#endif /* LIBWEBSOCKETS_FOUND */
{ nullptr /* terminator */ }
};
/** List of libwebsockets mounts. */
static lws_http_mount mounts[] = {
#ifdef WITH_API
{
.mount_next = &mounts[1],
.mountpoint = "/api/v1",
.origin = "http-api",
.def = nullptr,
.cgienv = nullptr,
.cgi_timeout = 0,
.cache_max_age = 0,
.cache_reusable = 0,
.cache_revalidate = 0,
.cache_intermediaries = 0,
.origin_protocol = LWSMPRO_CALLBACK,
.mountpoint_len = 7
},
#endif /* WITH_API */
{
.mount_next = nullptr,
.mountpoint = "/",
.origin = nullptr,
.def = "/index.html",
.cgienv = nullptr,
.cgi_timeout = 0,
.cache_max_age = 0,
.cache_reusable = 0,
.cache_revalidate = 0,
.cache_intermediaries = 0,
.origin_protocol = LWSMPRO_FILE,
.mountpoint_len = 1
}
};
/** List of libwebsockets extensions. */
static const lws_extension extensions[] = {
{
"permessage-deflate",
lws_extension_callback_pm_deflate,
"permessage-deflate"
},
{
"deflate-frame",
lws_extension_callback_pm_deflate,
"deflate_frame"
},
{ nullptr /* terminator */ }
};
void Web::lwsLogger(int lws_lvl, const char *msg) {
char *nl;
nl = (char *) strchr(msg, '\n');
if (nl)
*nl = 0;
/* Decrease severity for some errors. */
if (strstr(msg, "Unable to open") == msg)
lws_lvl = LLL_WARN;
Logger logger = logging.get("lws");
switch (lws_lvl) {
case LLL_ERR:
logger->error("{}", msg);
break;
case LLL_WARN:
logger->warn("{}", msg);
break;
case LLL_NOTICE:
case LLL_INFO:
logger->info("{}", msg);
break;
default: /* Everything else is debug */
logger->debug("{}", msg);
}
}
void Web::worker()
{
lws *wsi;
while (running) {
lws_service(context, 100);
while (!writables.empty()) {
wsi = writables.pull();
lws_callback_on_writable(wsi);
}
}
logger->info("Stopping worker");
}
Web::Web(Api *a) :
state(STATE_INITIALIZED),
htdocs(WEB_PATH),
api(a)
{
int lvl = LLL_ERR | LLL_WARN | LLL_NOTICE;
/** @todo: Port to C++: add LLL_DEBUG and others if trace log level is activated */
lws_set_log_level(lvl, lwsLogger);
/* Default values */
port = getuid() > 0 ? 8080 : 80;
}
int Web::parse(json_t *cfg)
{
int ret, enabled = 1;
const char *cert = nullptr;
const char *pkey = nullptr;
const char *htd = nullptr;
json_error_t err;
ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: s, s?: s, s?: i, s?: b }",
"ssl_cert", &cert,
"ssl_private_key", &pkey,
"htdocs", &htd,
"port", &port,
"enabled", &enabled
);
if (ret)
throw new JsonError(err, "Failed to http section of configuration file");
if (cert)
ssl_cert = cert;
if (pkey)
ssl_private_key = pkey;
if (htd)
htdocs = htd;
if (!enabled)
port = CONTEXT_PORT_NO_LISTEN;
state = STATE_PARSED;
return 0;
}
void Web::start()
{
/* Start server */
lws_context_creation_info ctx_info = {
.port = port,
.protocols = protocols,
.extensions = extensions,
.ssl_cert_filepath = ssl_cert.empty() ? nullptr : ssl_cert.c_str(),
.ssl_private_key_filepath = ssl_private_key.empty() ? nullptr : ssl_private_key.c_str(),
.gid = -1,
.uid = -1,
.options = LWS_SERVER_OPTION_EXPLICIT_VHOSTS | LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT,
.user = (void *) this,
.mounts = mounts,
};
logger->info("Starting sub-system: htdocs={}", htdocs.c_str());
/* update web root of mount point */
mounts[1].origin = htdocs.c_str();
context = lws_create_context(&ctx_info);
if (context == nullptr)
throw new RuntimeError("Failed to initialize server context");
for (int tries = 10; tries > 0; tries--) {
w->vhost = lws_create_vhost(w->context, &ctx_info);
if (w->vhost)
break;
ctx_info.port++;
warn("WebSocket: failed to setup vhost. Trying another port: %d", ctx_info.port);
}
if (w->vhost == NULL)
throw new RuntimeError("Failed to initialize virtual host");
/* Start thread */
running = true;
thread = std::thread(&Web::worker, this);
state = STATE_STARTED;
}
void Web::stop()
{
assert(state == STATE_STARTED);
logger->info("Stopping sub-system");
running = false;
thread.join();
lws_context_destroy(context);
state = STATE_STOPPED;
}
void Web::callbackOnWritable(lws *wsi)
{
writables.push(wsi);
}