diff --git a/include/villas/nodes/api_client.hpp b/include/villas/nodes/api_client.hpp new file mode 100644 index 000000000..bef6f7b1f --- /dev/null +++ b/include/villas/nodes/api_client.hpp @@ -0,0 +1,130 @@ +/** Node type: Client for the Universal Data-exchange API (v2) + * + * @see https://github.com/ERIGrid2/JRA-3.1-api + * @file + * @author Andres Acosta + * @copyright 2014-2023, Institute for Automation of Complex Power Systems, EONERC + * @license Apache 2.0 + *********************************************************************************/ + +#pragma once + +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace villas { +namespace node { + +/* Forward declarations */ +struct Sample; + +class APIClientNode : public APINode { + +public: + APIClientNode(const uuid_t &id = {}, const std::string &name = ""); + + // #ifdef CURL_SSL_REQUIRES_LOCKING + // pthread_mutex_t *mutex_buf; + // #endif /* CURL_SSL_REQUIRES_LOCKING */ + + struct ClientDirection : Direction { + CURL *curl; + }; + + ClientDirection read, write; + + virtual + ~APIClientNode(); + + // virtual + // int prepare(); + + /** Validate node configuration. */ + // virtual + // int check(); + + virtual + int start(); + + virtual + int stop(); + + virtual + int reverse(); + + // virtual + // std::vector getNetemFDs(); + + // virtual + // struct villas::node::memory::Type * getMemoryType(); + + // virtual + // const std::string & getDetails(); + +protected: + virtual + int parse(json_t *json); + + const char *endpoint; + const char *access_token; + + double timeout; + double rate; + + struct Task task; + int ssl_verify; + + struct curl_slist *headers; + + int parse_channel_response(json_t *json_response, int *code, char **reason, json_t **json_rvalue, Logger logger); + // int request(CURL *handle, const char *endpoint, const char *operation, json_t *json_request, json_t **json_response, Logger logger); + int request_channel_query(CURL *handle, const char *endpoint, const char *signal_name, json_t **json_rvalue, Logger logger); + // int request_channel_update(CURL *handle, const char *endpoint, const char *action, json_t *json_entity, Logger logger); + + virtual + int _read(struct Sample *smps[], unsigned cnt); + + virtual + int _write(struct Sample *smps[], unsigned cnt); +}; + +// struct api_client { +// const char *endpoint; /**< The REST API client context broker endpoint URL. */ +// const char *access_token; /**< An optional authentication token which will be sent as HTTP header. */ + +// double timeout; /**< HTTP timeout in seconds */ +// double rate; /**< Rate used for polling. */ + +// struct Task task; /**< Timer for periodic events. */ +// int ssl_verify; /**< Boolean flag whether SSL server certificates should be verified or not. */ + +// struct curl_slist *headers; /**< List of HTTP request headers for libcurl */ + +// struct { +// CURL *curl; /**< libcurl: handle */ +// struct List signals; /**< A mapping between indices of the VILLASnode samples and the attributes in api_client::context */ +// } in, out; +// }; + + +// int api_client_type_start(SuperNode *sn); + +// int api_client_type_stop(); + +// char * api_client_print(NodeCompat *n); + +// int api_client_init(NodeCompat *n); + +// int api_client_reverse(NodeCompat *n); + +// int api_client_poll_fds(NodeCompat *n, int fds[]); + +} /* namespace node */ +} /* namespace villas */ diff --git a/lib/nodes/api_client.cpp b/lib/nodes/api_client.cpp new file mode 100644 index 000000000..9da8c48b1 --- /dev/null +++ b/lib/nodes/api_client.cpp @@ -0,0 +1,296 @@ +/** Node type: Client for the Universal Data-exchange API (v2) + * + * @see https://github.com/ERIGrid2/JRA-3.1-api + * @author Andres Acosta + * @copyright 2014-2023, Institute for Automation of Complex Power Systems, EONERC + * @license Apache 2.0 + *********************************************************************************/ + +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace villas; +using namespace villas::node; +using namespace villas::utils; +using namespace villas::node::api; + +struct api_response { + char *data; + size_t len; +}; + +APIClientNode::APIClientNode(const uuid_t &id, const std::string &name) : + APINode(id, name) +{ + CURLcode ret = curl_global_init(CURL_GLOBAL_ALL); + + if (ret) + throw RuntimeError("failed to initialize CURL"); + +} + +APIClientNode::~APIClientNode() +{ + curl_global_cleanup(); +} + +int APIClientNode::start() +{ + APINode::start(); + read.curl = curl_easy_init(); + write.curl = curl_easy_init(); + headers = nullptr; + + if (access_token) { + char buf[128]; + snprintf(buf, sizeof(buf), "Auth-Token: %s", access_token); + headers = curl_slist_append(headers, buf); + } + + /* Create task */ + if (timeout > 1 / rate) + logger->warn("Timeout is to large for given rate: {}", rate); + + task.setRate(rate); + + headers = curl_slist_append(headers, "Accept: application/json"); + headers = curl_slist_append(headers, "Content-Type: application/json"); + + CURL *handles[] = { read.curl, write.curl }; + + for (unsigned p = 0; p < ARRAY_LEN(handles); p++) { + curl_easy_setopt(handles[p], CURLOPT_SSL_VERIFYPEER, ssl_verify); + curl_easy_setopt(handles[p], CURLOPT_TIMEOUT_MS, timeout * 1e3); + curl_easy_setopt(handles[p], CURLOPT_HTTPHEADER, headers); + curl_easy_setopt(handles[p], CURLOPT_USERAGENT, HTTP_USER_AGENT); + } + + return 0; +} + +int APIClientNode::stop() +{ + // int ret; + + task.stop(); + + curl_easy_cleanup(read.curl); + curl_easy_cleanup(write.curl); + curl_slist_free_all(headers); + + return 0; +} + +int APIClientNode::reverse() +{ + swapSignals(); + SWAP(read.channels, write.channels); + + return 0; +} + +int APIClientNode::parse_channel_response(json_t *json_response, int *code, char **reason, json_t **json_rvalue, Logger logger) +{ + int ret; + + double timestamp = 0; + const char *validity = nullptr; + const char *source = nullptr; + const char *timesource = nullptr; + + json_error_t err; + ret = json_unpack_ex(json_response, &err, 0, "{ s: F, s: O, s?: s, s?: s, s?: s }", + "timestamp", ×tamp, + "value", json_rvalue, + "validity", &validity, + "timesource", ×ource, + "source", &source + ); + + if (ret) { + logger->warn("Failed to find API response code"); + return ret; + } + + if (validity) + logger->warn("Attribute 'validity' is not supported by VILLASnode"); + + if (source) + logger->warn("Attribute 'source' is not supported by VILLASnode"); + + if (timesource) + logger->warn("Attribute 'timesource' is not supported by VILLASnode"); + + return ret; +} + +static +size_t request_writer(void *contents, size_t size, size_t nmemb, void *userp) +{ + size_t realsize = size * nmemb; + struct api_response *mem = (struct api_response *) userp; + + mem->data = (char *) realloc(mem->data, mem->len + realsize + 1); + if (mem->data == nullptr) /* out of memory! */ + throw MemoryAllocationError(); + + memcpy(&(mem->data[mem->len]), contents, realsize); + mem->len += realsize; + mem->data[mem->len] = 0; + + return realsize; +} + +int APIClientNode::request_channel_query(CURL *handle, const char *endpoint, const char *signal_name, json_t **json_rvalue, Logger logger) +{ + int ret, code; + char *reason; + json_t *json_response; + + struct api_response chunk = { 0 }; + int old; + double time; + char url[128]; + json_error_t err; + + snprintf(url, sizeof(url), "%s/channel/%s/sample", endpoint, signal_name); + + curl_easy_setopt(handle, CURLOPT_URL, url); + curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, request_writer); + curl_easy_setopt(handle, CURLOPT_WRITEDATA, (void *) &chunk); + + logger->debug("Request to context broker: {}", url); + + /* We don't want to leave the handle in an invalid state */ + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old); + CURLcode curl_ret = curl_easy_perform(handle); + pthread_setcancelstate(old, nullptr); + + if (curl_ret) { + logger->warn("HTTP request failed: {}", curl_easy_strerror(curl_ret)); + goto out; + } + + curl_easy_getinfo(handle, CURLINFO_TOTAL_TIME, &time); + + logger->debug("Request to context broker completed in {} seconds", time); + logger->debug("Response from context broker:\n{}", chunk.data); + + json_response = json_loads(chunk.data, 0, &err); + if (!json_response) + logger->warn("Received invalid JSON: {} in {}:{}:{}\n{}", err.text, err.source, err.line, err.column, chunk.data); + + ret = parse_channel_response(json_response, &code, &reason, json_rvalue, logger); + if (ret) + goto out2; + +out: free(chunk.data); +out2: json_decref(json_response); + + return ret; +} + +int APIClientNode::_read(struct Sample *smps[], unsigned cnt) +{ + int ret; + + if (task.wait() == 0) + throw SystemError("Failed to wait for task"); + + json_t *json_rvalue; + + for (size_t i = 0; i < MIN(getInputSignals()->size(), read.channels.size()); i++) { + auto sig = getInputSignals()->at(i); + + if (!sig) + return -11; + + auto ch = read.channels.at(i); + + ret = request_channel_query(read.curl, endpoint, sig->name.c_str(), &json_rvalue, logger); + if (ret) + goto out; + + struct Sample *smp = smps[0]; + + ret = smp->data[i].parseJson(sig->type, json_rvalue); + + if (ret) { + throw BadRequest("Malformed value"); + } + } + +out: json_decref(json_rvalue); + + return ret; +} + +int APIClientNode::_write(struct Sample *smps[], unsigned cnt) +{ + // assert(cnt == 1); + + // sample_copy(write.sample, smps[0]); + + // pthread_cond_signal(&write.cv); + + return 1; +} + +int APIClientNode::parse(json_t *json) +{ + int ret = Node::parse(json); + if (ret) + return ret; + + json_t *json_signals_in = nullptr; + json_t *json_signals_out = nullptr; + + json_error_t err; + ret = json_unpack_ex(json, &err, 0, "{ s?: { s?: o }, s?: { s?: o } }", + "in", + "signals", &json_signals_in, + "out", + "signals", &json_signals_out + ); + if (ret) + throw ConfigError(json, err, "node-config-node-api"); + + if (json_signals_in) + read.channels.parse(json_signals_in, false, true); + + if (json_signals_out) + write.channels.parse(json_signals_out, true, false); + + ret = json_unpack_ex(json, &err, 0, "{ s?: s, s: s, s?: b, s?: F, s?: F }", + "access_token", &access_token, + "endpoint", &endpoint, + "ssl_verify", &ssl_verify, + "timeout", &timeout, + "rate", &rate + ); + if (ret) + throw ConfigError(json, err, "node-config-node-api-client"); + + return 0; +} + +static char n[] = "api-client"; +static char d[] = "Client for the HTTP REST interface"; +static NodePlugin p;