diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index e78ee3e48..6d1e1ee67 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -178,6 +178,7 @@ test:integration: services: - eclipse-mosquitto - rabbitmq + - redis tags: - docker needs: diff --git a/CMakeLists.txt b/CMakeLists.txt index d641d24ac..8e66120f3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -112,6 +112,8 @@ pkg_check_modules(LIBIEC61850 IMPORTED_TARGET libiec61850>=1.2) pkg_check_modules(LIBCONFIG IMPORTED_TARGET libconfig>=1.4.9) pkg_check_modules(MOSQUITTO IMPORTED_TARGET libmosquitto>=1.6.9) pkg_check_modules(RDKAFKA IMPORTED_TARGET rdkafka>=1.5.0) +pkg_check_modules(HIREDIS IMPORTED_TARGET hiredis>=1.0.0) +pkg_check_modules(REDISPP IMPORTED_TARGET redis++>=1.2.0) pkg_check_modules(RABBITMQ_C IMPORTED_TARGET librabbitmq>=0.8.0) pkg_check_modules(COMEDILIB IMPORTED_TARGET comedilib>=0.11.0) pkg_check_modules(LIBZMQ IMPORTED_TARGET libzmq>=2.2.0) @@ -169,6 +171,7 @@ cmake_dependent_option(WITH_NODE_MQTT "Build with mqtt node-type" cmake_dependent_option(WITH_NODE_NANOMSG "Build with nanomsg node-type" ON "NANOMSG_FOUND" OFF) cmake_dependent_option(WITH_NODE_NGSI "Build with ngsi node-type" ON "" OFF) cmake_dependent_option(WITH_NODE_OPAL "Build with opal node-type" ON "Opal_FOUND" OFF) +cmake_dependent_option(WITH_NODE_REDIS "Build with redis node-type" ON "HIREDIS_FOUND; REDISPP_FOUND" OFF) cmake_dependent_option(WITH_NODE_RTP "Build with rtp node-type" ON "RE_FOUND" OFF) cmake_dependent_option(WITH_NODE_SHMEM "Build with shmem node-type" ON "HAS_SEMAPHORE; HAS_MMAN" OFF) cmake_dependent_option(WITH_NODE_SIGNAL "Build with signal node-type" ON "" OFF) @@ -267,6 +270,7 @@ add_feature_info(NODE_TEST_RTT WITH_NODE_TEST_RTT "Build with add_feature_info(NODE_ULDAQ WITH_NODE_ULDAQ "Build with uldaq node-type") add_feature_info(NODE_WEBSOCKET WITH_NODE_WEBSOCKET "Build with websocket node-type") add_feature_info(NODE_ZEROMQ WITH_NODE_ZEROMQ "Build with zeromq node-type") +add_feature_info(NODE_REDIS WITH_NODE_REDIS "Build with redis node-type") if(TOPLEVEL_PROJECT) feature_summary(WHAT ALL VAR FEATURES) diff --git a/cmake/VILLASnodePackaging.cmake b/cmake/VILLASnodePackaging.cmake index 198b3ad16..90926e14b 100644 --- a/cmake/VILLASnodePackaging.cmake +++ b/cmake/VILLASnodePackaging.cmake @@ -61,8 +61,8 @@ set(CPACK_RPM_PLUGINS_FILE_NAME "${CPACK_RPM_PLUGINS_PACKAGE_NAME}-${SUFFIX}") set(CPACK_RPM_TOOLS_FILE_NAME "${CPACK_RPM_TOOLS_PACKAGE_NAME}-${SUFFIX}") set(CPACK_RPM_DOC_FILE_NAME "${CPACK_RPM_DOC_PACKAGE_NAME}-${SUFFIX}") -set(CPACK_RPM_DEVEL_PACKAGE_REQUIRES "${CPACK_RPM_LIB_PACKAGE_NAME} >= ${CPACK_PACKAGE_VERSION} fmt-devel >= 5.2.0, spdlog-devel >= 1.3.1, openssl-devel >= 1.0.0, libuuid-devel, protobuf-devel >= 2.6.0, protobuf-c-devel >= 1.1.0, libconfig-devel >= 1.4.9, libnl3-devel >= 3.2.27, libcurl-devel >= 7.29.0, jansson-devel >= 2.7, libwebsockets-devel >= 2.3.0, zeromq-devel >= 2.2.0, nanomsg-devel >= 1.0.0, librabbitmq-devel >= 0.8.0, mosquitto-devel >= 1.4.15, libibverbs-devel >= 16.2, librdmacm-devel >= 16.2, libusb-devel >= 0.1.5, lua-devel >= 5.1, librdkafka-deve; >= 1.5.0") -set(CPACK_RPM_LIB_PACKAGE_REQUIRES " fmt >= 5.2.0, spdlog >= 1.3.1, openssl-libs >= 1.0.0, libuuid, protobuf >= 2.6.0, protobuf-c >= 1.1.0, libconfig >= 1.4.9, libnl3 >= 3.2.27, libcurl >= 7.29.0, jansson >= 2.7, libwebsockets >= 2.3.0, zeromq >= 2.2.0, nanomsg >= 1.0.0, librabbitmq >= 0.8.0, mosquitto >= 1.4.15, libibverbs >= 16.2, librdmacm >= 16.2, libusb >= 0.1.5, lua >= 5.1, librdkafka >= 1.5.0") +set(CPACK_RPM_DEVEL_PACKAGE_REQUIRES "${CPACK_RPM_LIB_PACKAGE_NAME} >= ${CPACK_PACKAGE_VERSION} fmt-devel >= 5.2.0, spdlog-devel >= 1.3.1, openssl-devel >= 1.0.0, libuuid-devel, protobuf-devel >= 2.6.0, protobuf-c-devel >= 1.1.0, libconfig-devel >= 1.4.9, libnl3-devel >= 3.2.27, libcurl-devel >= 7.29.0, jansson-devel >= 2.7, libwebsockets-devel >= 2.3.0, zeromq-devel >= 2.2.0, nanomsg-devel >= 1.0.0, librabbitmq-devel >= 0.8.0, mosquitto-devel >= 1.4.15, libibverbs-devel >= 16.2, librdmacm-devel >= 16.2, libusb-devel >= 0.1.5, lua-devel >= 5.1, librdkafka-devel >= 1.5.0, hiredis-devel >= 1.0.0") +set(CPACK_RPM_LIB_PACKAGE_REQUIRES " fmt >= 5.2.0, spdlog >= 1.3.1, openssl-libs >= 1.0.0, libuuid, protobuf >= 2.6.0, protobuf-c >= 1.1.0, libconfig >= 1.4.9, libnl3 >= 3.2.27, libcurl >= 7.29.0, jansson >= 2.7, libwebsockets >= 2.3.0, zeromq >= 2.2.0, nanomsg >= 1.0.0, librabbitmq >= 0.8.0, mosquitto >= 1.4.15, libibverbs >= 16.2, librdmacm >= 16.2, libusb >= 0.1.5, lua >= 5.1, librdkafka >= 1.5.0, hiredis >= 1.0.0") set(CPACK_RPM_BIN_PACKAGE_REQUIRES "${CPACK_RPM_LIB_PACKAGE_NAME} >= ${CPACK_PACKAGE_VERSION}") set(CPACK_RPM_PLUGINS_PACKAGE_REQUIRES "${CPACK_RPM_LIB_PACKAGE_NAME} >= ${CPACK_PACKAGE_VERSION}") set(CPACK_RPM_TOOLS_PACKAGE_REQUIRES "${CPACK_RPM_LIB_PACKAGE_NAME} >= ${CPACK_PACKAGE_VERSION}") diff --git a/etc/examples/nodes/redis.conf b/etc/examples/nodes/redis.conf new file mode 100644 index 000000000..dc0288ba3 --- /dev/null +++ b/etc/examples/nodes/redis.conf @@ -0,0 +1,39 @@ +nodes = { + redis_node = { + type = "redis", + + format = "json", # only valid for mode = 'channel' and 'key' + # With mode = 'hash' we will use a simple human readable format + + key = "my_key" # The Redis key to be used for mode = 'key' or 'hash' (default is the node name) + channel = "my_channel" # the Redis channel tp be used for mode = 'channel' (default is the node name) + + mode = "key", # one of: + # - 'channel' (publish/subscribe) + # - 'key' (set/get) + # - 'hash' (hmset/hgetall) + + notify = false # Whether or not to use Redis keyspace event notifications to get notified about updates + + rate = 1.0 # The polling rate when notify = false + + uri = "tcp://localhost:6379/0", # The Redis connection URI + + # host = "localhost" # Alternatively the connection options can be specified independently + # port = 6379 # Note: options here will overwrite the respective part of the URI if both are given. + # db = 0 + + # path = "/var/run/redis.sock" + + # user = "guest", + # password = "guest" + + # ssl = { + # enabled: true + # cacert: "/etc/ssl/certs/ca-certificates.crt", + # cacertdir: "/etc/ssl/certs" + # cert: "./my_cert.crt", + # key, "./my_key.key" + # } + } +} diff --git a/include/villas/nodes/example.hpp b/include/villas/nodes/example.hpp index 7878cb037..ce9d0d5c2 100644 --- a/include/villas/nodes/example.hpp +++ b/include/villas/nodes/example.hpp @@ -22,7 +22,7 @@ *********************************************************************************/ /** - * @addtogroup example BSD example Node Type + * @addtogroup example Example node-type * @ingroup node * @{ */ diff --git a/include/villas/nodes/redis.hpp b/include/villas/nodes/redis.hpp new file mode 100644 index 000000000..267737e4f --- /dev/null +++ b/include/villas/nodes/redis.hpp @@ -0,0 +1,156 @@ +/** Redis node-type + * + * @file + * @author Steffen Vogel + * @copyright 2014-2021, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +/** + * @addtogroup redis BSD redis Node Type + * @ingroup node + * @{ + */ + +#pragma once + +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include + +enum class RedisMode { + KEY, + HASH, + CHANNEL +}; + +class RedisConnection { + +public: + sw::redis::Redis context; + +protected: + enum State { + INITIALIZED, + RUNNING, + STOPPING + }; + + std::thread thread; + std::atomic state; + + void onMessage(std::string channel, std::string msg); + + void loop(); + + std::unordered_multimap subscriberMap; + + sw::redis::Subscriber subscriber; + + villas::Logger logger; + +public: + + RedisConnection(const sw::redis::ConnectionOptions &opts); + + static + RedisConnection * get(const sw::redis::ConnectionOptions &opts); + + void start(); + void stop(); + + void subscribe(struct vnode *n, const std::string &channel); + void unsubscribe(struct vnode *n, const std::string &channel); +}; + +struct redis { + sw::redis::ConnectionOptions options; + + RedisConnection *conn; + + enum RedisMode mode; + + std::string key; + + bool notify; /**< Use Redis Keyspace notifications to listen for updates. */ + + struct Task task; /**< Timer for periodic events. */ + double rate; /**< Rate for polling key updates if keyspace notifications are disabled. */ + + villas::node::Format *formatter; + + struct pool pool; + struct queue_signalled queue; +}; + +/** @see node_type::init */ +int redis_init(struct vnode *n); + +/** @see node_type::destroy */ +int redis_destroy(struct vnode *n); + +/** @see node_type::parse */ +int redis_parse(struct vnode *n, json_t *json); + +/** @see node_type::print */ +char * redis_print(struct vnode *n); + +/** @see node_type::check */ +int redis_check(); + +/** @see node_type::prepare */ +int redis_prepare(); + +/** @see node_type::start */ +int redis_start(struct vnode *n); + +/** @see node_type::stop */ +int redis_stop(struct vnode *n); + +/** @see node_type::pause */ +int redis_pause(struct vnode *n); + +/** @see node_type::resume */ +int redis_resume(struct vnode *n); + +/** @see node_type::write */ +int redis_write(struct vnode *n, struct sample *smps[], unsigned cnt, unsigned *release); + +/** @see node_type::read */ +int redis_read(struct vnode *n, struct sample *smps[], unsigned cnt, unsigned *release); + +/** @see node_type::reverse */ +int redis_reverse(struct vnode *n); + +/** @see node_type::poll_fds */ +int redis_poll_fds(struct vnode *n, int fds[]); + +/** @see node_type::netem_fds */ +int redis_netem_fds(struct vnode *n, int fds[]); + +/** @} */ diff --git a/include/villas/nodes/redis_helpers.hpp b/include/villas/nodes/redis_helpers.hpp new file mode 100644 index 000000000..be5a661a0 --- /dev/null +++ b/include/villas/nodes/redis_helpers.hpp @@ -0,0 +1,198 @@ +/** Redis node-type helpers + * + * @file + * @author Steffen Vogel + * @copyright 2014-2021, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +/** + * @addtogroup redis BSD redis Node Type + * @ingroup node + * @{ + */ + +#pragma once + +#include +#include +#include +#include + +namespace std { + +template +struct hash> +{ + std::size_t operator()(std::chrono::duration<_rep, ratio> const& s) const + { + return std::hash<_rep>{}(s.count()); + } +}; + +template <> +struct hash +{ + std::size_t operator()(const sw::redis::tls::TlsOptions& t) const + { + return hash()(t.enabled) ^ + hash()(t.cacert) ^ + hash()(t.cacertdir) ^ + hash()(t.cert) ^ + hash()(t.key) ^ + hash()(t.sni); + } +}; + +template <> +struct hash +{ + std::size_t operator()(const sw::redis::ConnectionOptions& o) const + { + return hash()(static_cast(o.type)) ^ + hash()(o.host) ^ + hash()(o.port) ^ + hash()(o.path) ^ + hash()(o.user) ^ + hash()(o.password) ^ + hash()(o.db) ^ + hash()(o.keep_alive) ^ + hash()(o.connect_timeout) ^ + hash()(o.socket_timeout) ^ + hash()(o.tls); + } +}; + +} /* namespace std */ + +namespace sw { +namespace redis { + +bool operator==(const tls::TlsOptions &o1, const tls::TlsOptions &o2) +{ + return o1.enabled == o2.enabled && + o1.cacert == o2.cacert && + o1.cacertdir == o2.cacertdir && + o1.cert == o2.cert && + o1.key == o2.key && + o1.sni == o2.sni; +} + +bool operator==(const ConnectionOptions &o1, const ConnectionOptions &o2) +{ + return o1.type == o2.type && + o1.host == o2.host && + o1.port == o2.port && + o1.path == o2.path && + o1.user == o2.user && + o1.password == o2.password && + o1.db == o2.db && + o1.keep_alive == o2.keep_alive && + o1.connect_timeout == o2.connect_timeout && + o1.socket_timeout == o2.socket_timeout && + o1.tls == o2.tls; +} + +template +OStream &operator<<(OStream &os, const tls::TlsOptions &t) +{ + os << "tls.enabled=" << (t.enabled ? "yes" : "no"); + + if (t.enabled) { + if (!t.cacert.empty()) + os << ", tls.cacert=" << t.cacert; + + if (!t.cacertdir.empty()) + os << ", tls.cacertdir=" << t.cacertdir; + + if (!t.cert.empty()) + os << ", tls.cert=" << t.cert; + + if (!t.key.empty()) + os << ", tls.key=" << t.key; + + if (!t.sni.empty()) + os << ", tls.sni=" << t.sni; + } + + return os; +} + +template +OStream &operator<<(OStream &os, const ConnectionType &t) +{ + switch (t) { + case ConnectionType::TCP: + os << "tcp"; + break; + + case ConnectionType::UNIX: + os << "unix"; + break; + } + + return os; +} + +template +OStream &operator<<(OStream &os, const ConnectionOptions &o) +{ + os << "type=" << o.type; + + switch (o.type) { + case ConnectionType::TCP: + os << ", host=" << o.host << ", port=" << o.port; + break; + + case ConnectionType::UNIX: + os << ", path=" << o.path; + break; + } + + os << ", db=" << o.db + << ", user=" << o.user + << ", keepalive=" << (o.keep_alive ? "yes" : "no") + << ", " << o.tls; + + return os; +} + +} /* namespace redis */ +} /* namespace sw */ + +template +OStream &operator<<(OStream &os, const enum RedisMode &m) +{ + switch (m) { + case RedisMode::KEY: + os << "key"; + break; + + case RedisMode::HASH: + os << "hash"; + break; + + case RedisMode::CHANNEL: + os << "channel"; + break; + } + + return os; +} + +/** @} */ diff --git a/lib/nodes/CMakeLists.txt b/lib/nodes/CMakeLists.txt index 71367f106..77c603b67 100644 --- a/lib/nodes/CMakeLists.txt +++ b/lib/nodes/CMakeLists.txt @@ -179,6 +179,12 @@ if(WITH_NODE_TEMPER) list(APPEND LIBRARIES PkgConfig::LIBUSB) endif() +# Enable Redis support +if(WITH_NODE_REDIS) + list(APPEND NODE_SRC redis.cpp) + list(APPEND LIBRARIES PkgConfig::HIREDIS PkgConfig::REDISPP) +endif() + add_library(nodes STATIC ${NODE_SRC}) target_include_directories(nodes PUBLIC ${INCLUDE_DIRS}) target_link_libraries(nodes PUBLIC ${LIBRARIES}) diff --git a/lib/nodes/ngsi.cpp b/lib/nodes/ngsi.cpp index 83cfd3db5..b1384fc5e 100644 --- a/lib/nodes/ngsi.cpp +++ b/lib/nodes/ngsi.cpp @@ -738,7 +738,7 @@ int ngsi_read(struct vnode *n, struct sample * const smps[], unsigned cnt) int ret; if (i->task.wait() == 0) - perror("Failed to wait for task"); + throw SystemError("Failed to wait for task"); json_t *json_rentity; json_t *json_entity = ngsi_build_entity(n, nullptr, 0, 0); diff --git a/lib/nodes/redis.cpp b/lib/nodes/redis.cpp new file mode 100644 index 000000000..fcfbbc294 --- /dev/null +++ b/lib/nodes/redis.cpp @@ -0,0 +1,674 @@ +/** Redis node-type + * + * @author Steffen Vogel + * @copyright 2014-2021, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +/* Forward declartions */ +static struct vnode_type p; +static void redis_on_message(struct vnode *n, std::string channel, std::string msg); + +using namespace villas; +using namespace villas::node; +using namespace villas::utils; +using namespace sw::redis; + +static std::unordered_map connections; + +RedisConnection::RedisConnection(const ConnectionOptions &opts) : + context(opts), + subscriber(context.subscriber()), + logger(logging.get("nodes:redis")) +{ + /* Enable keyspace notifications */ + context.command("config", "set", "notify-keyspace-events", "K$h"); + + subscriber.on_message([this](std::string channel, std::string msg) { + onMessage(channel, msg); + }); + + logger->info("New connection: {}", opts); + + state = State::INITIALIZED; +} + +RedisConnection * RedisConnection::get(const ConnectionOptions &opts) +{ + RedisConnection *conn; + + auto it = connections.find(opts); + if (it != connections.end()) + conn = it->second; + else { + try { + conn = new RedisConnection(opts); + } catch (IoError &e) { + throw RuntimeError(e.what()); + } + + connections[opts] = conn; + } + + return conn; +} + +void RedisConnection::onMessage(std::string channel, std::string msg) +{ + auto itp = subscriberMap.equal_range(channel); + for (auto it = itp.first; it != itp.second; ++it) { + struct vnode *n = it->second; + + redis_on_message(n, channel, msg); + } +} + +void RedisConnection::subscribe(struct vnode *n, const std::string &channel) +{ + subscriber.subscribe(channel); + + subscriberMap.emplace(channel, n); +} + +void RedisConnection::unsubscribe(struct vnode *n, const std::string &channel) +{ + auto itp = subscriberMap.equal_range(channel); + for (auto it = itp.first; it != itp.second; ++it) { + if (it->second == n) + subscriberMap.erase(it); + } + + if (subscriberMap.count(channel) == 0) + subscriber.subscribe(channel); +} + +void RedisConnection::start() +{ + if (state == State::RUNNING) + return; /* Already running */ + + state = State::RUNNING; + + thread = std::thread(&RedisConnection::loop, this); +} + +void RedisConnection::stop() +{ + state = State::STOPPING; + + thread.join(); + + state = State::INITIALIZED; +} + +void RedisConnection::loop() +{ + while (state == State::RUNNING) { + try { + subscriber.consume(); + } + catch (const TimeoutError &e) { + continue; + } + catch (const ReplyError &e) { + continue; + } + catch (const Error &e) { + /* Create a new subscriber */ + subscriber = context.subscriber(); + } + } +} + +static int redis_get(struct vnode *n, struct sample * const smps[], unsigned cnt) +{ + int ret; + struct redis *r = (struct redis *) n->_vd; + + switch (r->mode) { + case RedisMode::KEY: { + auto value = r->conn->context.get(r->key); + if (value) { + size_t rbytes, bytes = value->size(); + ret = r->formatter->sscan(value->c_str(), bytes, &rbytes, smps, cnt); + + if (rbytes != bytes) + return -1; + + return ret; + } + else + return -1; + } + + case RedisMode::HASH: { + struct sample *smp = smps[0]; + + for (unsigned j = 0; j < vlist_length(&n->in.signals); j++) { + struct signal *sig = (struct signal *) vlist_at(&n->in.signals, j); + union signal_data *data = &smp->data[j]; + + *data = sig->init; + } + + std::unordered_map kvs; + r->conn->context.hgetall(r->key, std::inserter(kvs, kvs.begin())); + + int max_idx = -1; + for (auto &it : kvs) { + auto &name = it.first; + auto &value = it.second; + + struct signal *sig = vlist_lookup_name(&n->in.signals, name); + if (!sig) + continue; + + auto idx = vlist_index(&n->in.signals, sig); + if (idx > max_idx) + max_idx = idx; + + char *end; + ret = signal_data_parse_str(&smp->data[idx], sig->type, value.c_str(), &end); + if (ret < 0) + continue; + } + + smp->flags = 0; + smp->length = max_idx + 1; + + if (smp->length > 0) + smp->flags |= (int) SampleFlags::HAS_DATA; + + return 1; + } + + default: + return -1; + } +} + +static void redis_on_message(struct vnode *n, std::string channel, std::string msg) +{ + struct redis *r = (struct redis *) n->_vd; + + n->logger->debug("Message: {}: {}", channel, msg); + + int alloc, scanned, pushed; + unsigned cnt = n->in.vectorize; + struct sample *smps[cnt]; + + alloc = sample_alloc_many(&r->pool, smps, cnt); + if (alloc < 0) { + n->logger->error("Failed to allocate samples"); + return; + } + else if ((unsigned) alloc < cnt) + n->logger->warn("Pool underrun"); + + switch (r->mode) { + case RedisMode::CHANNEL: { + size_t rbytes; + scanned = r->formatter->sscan(msg.c_str(), msg.size(), &rbytes, smps, alloc); + break; + } + + case RedisMode::HASH: + case RedisMode::KEY: + scanned = redis_get(n, smps, cnt); + break; + + default: + goto out; + } + + if (scanned < 0) { + n->logger->error("Failed to decode samples"); + pushed = 0; + goto out; + } + + pushed = queue_signalled_push_many(&r->queue, (void **) smps, scanned); + if (pushed < 0) { + n->logger->error("Failed to enqueue"); + pushed = 0; + } + else if (pushed != scanned) + n->logger->warn("Queue underrun"); + +out: sample_decref_many(smps + pushed, alloc - pushed); +} + +int redis_init(struct vnode *n) +{ + struct redis *r = (struct redis *) n->_vd; + + r->mode = RedisMode::KEY; + r->formatter = nullptr; + r->notify = true; + r->rate = 1.0; + + new (&r->options) ConnectionOptions; + new (&r->task) Task(CLOCK_REALTIME); + new (&r->key) std::string(); + + return 0; +} + +int redis_destroy(struct vnode *n) +{ + int ret; + struct redis *r = (struct redis *) n->_vd; + + if (r->formatter) + delete r->formatter; + + using string = std::string; + + r->options.~ConnectionOptions(); + r->key.~string(); + r->task.~Task(); + + ret = queue_signalled_destroy(&r->queue); + if (ret) + return ret; + + ret = pool_destroy(&r->pool); + if (ret) + return ret; + + return 0; +} + +int redis_parse(struct vnode *n, json_t *json) +{ + int ret; + struct redis *r = (struct redis *) n->_vd; + + json_error_t err; + json_t *json_ssl = nullptr; + json_t *json_format = nullptr; + + const char *host = nullptr; + const char *path = nullptr; + const char *user = nullptr; + const char *password = nullptr; + const char *mode = nullptr; + const char *uri = nullptr; + const char *key = nullptr; + const char *channel = nullptr; + int keepalive = -1; + int db = -1; + int notify = -1; + + double connect_timeout = -1; + double socket_timeout = -1; + + ret = json_unpack_ex(json, &err, 0, "{ s?: o, s?: s, s?: s, s?: i, s?: s, s?: s, s?: s, s?: i, s?: { s?: F, s?: F }, s?: o, s?: b, s?: s, s?: s, s?: s, s?: b, s?: F }", + "format", &json_format, + "uri", &uri, + "host", &host, + "port", &r->options.port, + "path", &path, + "user", &user, + "password", &password, + "db", &db, + "timeout", + "connect", &connect_timeout, + "socket", &socket_timeout, + "ssl", &json_ssl, + "keepalive", &keepalive, + "mode", &mode, + "key", &key, + "channel", &channel, + "notify", ¬ify, + "rate", &r->rate + ); + if (ret) + throw ConfigError(json, err, "node-config-node-redis", "Failed to parse node configuration"); + + if (json_ssl) { + const char *cacert; + const char *cacertdir; + const char *cert; + const char *key; + + int enabled = 1; + + ret = json_unpack_ex(json_ssl, &err, 0, "{ s?: b, s?: s, s?: s, s?: s, s?: s }", + "enabled", &enabled, + "cacert", &cacert, + "cacertdir", &cacertdir, + "cert", &cert, + "key", &key + ); + if (ret) + throw ConfigError(json, err, "node-config-node-redis-ssl", "Failed to parse Redis SSL configuration"); + + r->options.tls.enabled = enabled != 0; + + r->options.tls.cacert = cacert; + r->options.tls.cacertdir = cacertdir; + r->options.tls.cert = cert; + r->options.tls.key = key; + + if (host) + r->options.tls.sni = host; + } + + /* Mode */ + if (mode) { + if (!strcmp(mode, "key") || !strcmp(mode, "set-get")) + r->mode = RedisMode::KEY; + else if (!strcmp(mode, "hash") || !strcmp(mode, "hset-hget")) + r->mode = RedisMode::HASH; + else if (!strcmp(mode, "channel") || !strcmp(mode, "pub-sub")) + r->mode = RedisMode::CHANNEL; + else + throw ConfigError(json, "node-config-node-redis-mode", "Invalid Redis mode: {}", mode); + } + + /* Format */ + r->formatter = json_format + ? FormatFactory::make(json_format) + : FormatFactory::make("json"); + if (!r->formatter) + throw ConfigError(json_format, "node-config-node-redis-format", "Invalid format configuration"); + + if (key && r->mode == RedisMode::KEY) + r->key = key; + if (channel && r->mode == RedisMode::CHANNEL) + r->key = channel; + + if (notify >= 0) + r->notify = notify != 0; + + /* Connection options */ + if (uri) + r->options = ConnectionOptions(uri); + + if (db >= 0) + r->options.db = db; + + if (user) + r->options.user = user; + + if (password) + r->options.password = password; + + if (host) + r->options.host = host; + + if (path) + r->options.path = path; + + if (keepalive >= 0) + r->options.keep_alive = keepalive != 0; + + if (socket_timeout >= 0) + r->options.socket_timeout = std::chrono::milliseconds((int) (1000 * socket_timeout)); + + if (connect_timeout >= 0) + r->options.connect_timeout = std::chrono::milliseconds((int) (1000 * connect_timeout)); + + return 0; +} + +int redis_check(struct vnode *n) +{ + struct redis *r = (struct redis *) n->_vd; + + if (!r->options.host.empty() && !r->options.path.empty()) + return -1; + + if (r->options.db < 0) + return -1; + + return 0; +} + +char * redis_print(struct vnode *n) +{ + struct redis *r = (struct redis *) n->_vd; + + std::stringstream ss; + + ss << "mode=" << r->mode + << ", key=" << r->key + << ", notify=" << (r->notify ? "yes" : "no"); + + if (!r->notify) + ss << ", rate=" << r->rate; + + ss << ", " << r->options; + + return strdup(ss.str().c_str()); +} + +int redis_prepare(struct vnode *n) +{ + int ret; + struct redis *r = (struct redis *) n->_vd; + + r->options.type = r->options.path.empty() + ? ConnectionType::TCP + : ConnectionType::UNIX; + + if (r->key.empty()) + r->key = node_name_short(n); + + ret = queue_signalled_init(&r->queue, 1024); + if (ret) + return ret; + + ret = pool_init(&r->pool, 1024, SAMPLE_LENGTH(64)); + if (ret) + return ret; + + r->conn = RedisConnection::get(r->options); + if (!r->conn) + return -1; + + return 0; +} + +int redis_start(struct vnode *n) +{ + struct redis *r = (struct redis *) n->_vd; + + r->formatter->start(&n->in.signals, ~(int) SampleFlags::HAS_OFFSET); + + if (!r->notify) + r->task.setRate(r->rate); + + switch (r->mode) { + case RedisMode::CHANNEL: + r->conn->subscribe(n, r->key); + break; + + case RedisMode::HASH: + case RedisMode::KEY: + if (r->notify) { + auto pattern = fmt::format("__keyspace@{}__:{}", r->options.db, r->key); + r->conn->subscribe(n, pattern); + } + break; + } + + r->conn->start(); + + return 0; +} + +int redis_stop(struct vnode *n) +{ + struct redis *r = (struct redis *) n->_vd; + + r->conn->stop(); + + if (!r->notify) + r->task.stop(); + + switch (r->mode) { + case RedisMode::CHANNEL: + r->conn->unsubscribe(n, r->key); + break; + + case RedisMode::HASH: + case RedisMode::KEY: + if (r->notify) { + auto pattern = fmt::format("__keyspace@{}__:{}", r->options.db, r->key); + r->conn->unsubscribe(n, pattern); + } + break; + } + + return 0; +} + +int redis_read(struct vnode *n, struct sample * const smps[], unsigned cnt) +{ + struct redis *r = (struct redis *) n->_vd; + + /* Wait for new data */ + if (r->notify || r->mode == RedisMode::CHANNEL) { + int pulled_cnt; + struct sample *pulled_smps[cnt]; + + pulled_cnt = queue_signalled_pull_many(&r->queue, (void **) pulled_smps, cnt); + + sample_copy_many(smps, pulled_smps, pulled_cnt); + sample_decref_many(pulled_smps, pulled_cnt); + + return pulled_cnt; + } + else { + if (r->task.wait() == 0) + throw SystemError("Failed to wait for task"); + + return redis_get(n, smps, cnt); + } +} + +int redis_write(struct vnode *n, struct sample * const smps[], unsigned cnt) +{ + int ret; + struct redis *r = (struct redis *) n->_vd; + + switch (r->mode) { + case RedisMode::CHANNEL: + for (unsigned i = 0; i < cnt; i++) { + char buf[1500]; + size_t wbytes; + + ret = r->formatter->sprint(buf, sizeof(buf), &wbytes, &smps[i], cnt); + if (ret < 0) + return ret; + + auto value = std::string_view(buf, wbytes); + + r->conn->context.publish(r->key, value); + } + break; + + case RedisMode::KEY: { + char buf[1500]; + size_t wbytes; + + ret = r->formatter->sprint(buf, sizeof(buf), &wbytes, smps, cnt); + if (ret < 0) + return ret; + + auto value = std::string_view(buf, wbytes); + + r->conn->context.set(r->key, value); + break; + } + + case RedisMode::HASH: { + /* We only update the signals with their latest value here. */ + struct sample *smp = smps[cnt - 1]; + + std::unordered_map kvs; + + for (unsigned j = 0; j < MIN(vlist_length(smp->signals), smp->length); j++) { + struct signal *sig = (struct signal *) vlist_at(smp->signals, j); + union signal_data *data = &smp->data[j]; + + char val[128]; + signal_data_print_str(data, sig->type, val, sizeof(val), 16); + + kvs[sig->name] = val; + } + + r->conn->context.hmset(r->key, kvs.begin(), kvs.end()); + break; + } + } + + return cnt; +} + +int redis_poll_fds(struct vnode *n, int fds[]) +{ + struct redis *r = (struct redis *) n->_vd; + + fds[0] = r->notify + ? queue_signalled_fd(&r->queue) + : r->task.getFD(); + + return 1; +} + +__attribute__((constructor(110))) +static void register_plugin() { + p.name = "redis"; + p.description = "Redis key-value store"; + p.vectorize = 0; + p.size = sizeof(struct redis); + p.init = redis_init; + p.destroy = redis_destroy; + p.parse = redis_parse; + p.check = redis_check; + p.print = redis_print; + p.prepare = redis_prepare; + p.start = redis_start; + p.stop = redis_stop; + p.read = redis_read; + p.write = redis_write; + p.poll_fds = redis_poll_fds; + + if (!node_types) + node_types = new NodeTypeList(); + + node_types->push_back(&p); +} diff --git a/packaging/deps.sh b/packaging/deps.sh index a1960245c..20860f4b5 100644 --- a/packaging/deps.sh +++ b/packaging/deps.sh @@ -133,6 +133,32 @@ if ! pkg-config "rdkafka>=1.5.0" && \ popd fi +# Build & Install hiredis +if ! pkg-config "hiredis>1.0.0" && \ + [ -z "${SKIP_HIREDIS}" ]; then + git clone --branch v1.0.0 --depth 1 https://github.com/redis/hiredis.git + mkdir -p hiredis/build + pushd hiredis/build + cmake -DDISABLE_TESTS=ON \ + -DENABLE_SSL=ON \ + ${CMAKE_OPTS} .. + make -j$(nproc) ${TARGET} + popd +fi + +# Build & Install redis++ +if [ -z "${SKIP_REDISPP}" ]; then + git clone --depth 1 https://github.com/sewenew/redis-plus-plus.git + mkdir -p redis-plus-plus/build + pushd redis-plus-plus/build + cmake -DREDIS_PLUS_PLUS_BUILD_STATIC=OFF \ + -DREDIS_PLUS_PLUS_USE_TLS=ON \ + -DREDIS_PLUS_PLUS_CXX_STANDARD=17 \ + ${CMAKE_OPTS} .. + make -j$(nproc) ${TARGET} + popd +fi + # Build & Install uldaq if ! pkg-config "libuldaq >= 1.0.0" && \ [ -z "${SKIP_ULDAQ}" ]; then diff --git a/packaging/docker/Dockerfile.alpine b/packaging/docker/Dockerfile.alpine index 339aa19b6..883aaca86 100644 --- a/packaging/docker/Dockerfile.alpine +++ b/packaging/docker/Dockerfile.alpine @@ -58,7 +58,8 @@ RUN apk update && \ mosquitto-dev \ librdkafka-dev \ libusb-dev \ - lua-dev + lua-dev \ + hiredis-dev RUN if [ "${ARCH}" != "armv6" -a "${ARCH}" != "armv7" ]; then \ apk add \ diff --git a/packaging/docker/Dockerfile.centos b/packaging/docker/Dockerfile.centos index 853441d5a..bc03cfb89 100644 --- a/packaging/docker/Dockerfile.centos +++ b/packaging/docker/Dockerfile.centos @@ -60,7 +60,8 @@ RUN dnf -y install \ libibverbs-devel \ librdmacm-devel \ libusb1-devel \ - lua-devel + lua-devel \ + hiredis-devel # Add local and 64-bit locations to linker paths ENV echo /usr/local/lib >> /etc/ld.so.conf && \ diff --git a/packaging/docker/Dockerfile.debian b/packaging/docker/Dockerfile.debian index 1a0a933e6..77d78d5c7 100644 --- a/packaging/docker/Dockerfile.debian +++ b/packaging/docker/Dockerfile.debian @@ -63,7 +63,8 @@ RUN apt-get update && \ libusb-1.0-0-dev \ libfmt-dev \ libspdlog-dev \ - liblua5.3-dev + liblua5.3-dev \ + libhiredis-dev # Add local and 64-bit locations to linker paths ENV echo /usr/local/lib >> /etc/ld.so.conf && \ diff --git a/packaging/docker/Dockerfile.debian-multiarch b/packaging/docker/Dockerfile.debian-multiarch index c7c02c1e0..92eb2cfed 100644 --- a/packaging/docker/Dockerfile.debian-multiarch +++ b/packaging/docker/Dockerfile.debian-multiarch @@ -67,7 +67,8 @@ RUN apt-get update && \ libibverbs-dev:${ARCH} \ librdmacm-dev:${ARCH} \ libre-dev:${ARCH} \ - liblua5.3-dev + liblua5.3-dev:${ARCH} \ + libhiredis-dev:${ARCH} # Add local and 64-bit locations to linker paths ENV echo /usr/local/lib >> /etc/ld.so.conf && \ diff --git a/packaging/docker/Dockerfile.fedora b/packaging/docker/Dockerfile.fedora index 2cd08f510..d4bfa105e 100644 --- a/packaging/docker/Dockerfile.fedora +++ b/packaging/docker/Dockerfile.fedora @@ -78,7 +78,8 @@ RUN dnf -y install \ libibverbs-devel \ librdmacm-devel \ libusb-devel \ - lua-devel + lua-devel \ + hiredis-devel # Add local and 64-bit locations to linker paths RUN echo /usr/local/lib >> /etc/ld.so.conf && \ diff --git a/packaging/docker/Dockerfile.ubuntu b/packaging/docker/Dockerfile.ubuntu index d6ad6b401..8e4526e0c 100644 --- a/packaging/docker/Dockerfile.ubuntu +++ b/packaging/docker/Dockerfile.ubuntu @@ -64,7 +64,8 @@ RUN apt-get update && \ libwebsockets-dev \ libfmt-dev \ libspdlog-dev \ - liblua5.3-dev + liblua5.3-dev \ + libhiredis-dev # Add local and 64-bit locations to linker paths ENV echo /usr/local/lib >> /etc/ld.so.conf && \