1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

redis: add new redis node-type

This commit is contained in:
Steffen Vogel 2021-06-16 10:36:19 -04:00 committed by Steffen Vogel
parent 5e62b3a35f
commit d72a133263
17 changed files with 1120 additions and 10 deletions

View file

@ -178,6 +178,7 @@ test:integration:
services:
- eclipse-mosquitto
- rabbitmq
- redis
tags:
- docker
needs:

View file

@ -112,6 +112,8 @@ pkg_check_modules(LIBIEC61850 IMPORTED_TARGET libiec61850>=1.2)
pkg_check_modules(LIBCONFIG IMPORTED_TARGET libconfig>=1.4.9)
pkg_check_modules(MOSQUITTO IMPORTED_TARGET libmosquitto>=1.6.9)
pkg_check_modules(RDKAFKA IMPORTED_TARGET rdkafka>=1.5.0)
pkg_check_modules(HIREDIS IMPORTED_TARGET hiredis>=1.0.0)
pkg_check_modules(REDISPP IMPORTED_TARGET redis++>=1.2.0)
pkg_check_modules(RABBITMQ_C IMPORTED_TARGET librabbitmq>=0.8.0)
pkg_check_modules(COMEDILIB IMPORTED_TARGET comedilib>=0.11.0)
pkg_check_modules(LIBZMQ IMPORTED_TARGET libzmq>=2.2.0)
@ -169,6 +171,7 @@ cmake_dependent_option(WITH_NODE_MQTT "Build with mqtt node-type"
cmake_dependent_option(WITH_NODE_NANOMSG "Build with nanomsg node-type" ON "NANOMSG_FOUND" OFF)
cmake_dependent_option(WITH_NODE_NGSI "Build with ngsi node-type" ON "" OFF)
cmake_dependent_option(WITH_NODE_OPAL "Build with opal node-type" ON "Opal_FOUND" OFF)
cmake_dependent_option(WITH_NODE_REDIS "Build with redis node-type" ON "HIREDIS_FOUND; REDISPP_FOUND" OFF)
cmake_dependent_option(WITH_NODE_RTP "Build with rtp node-type" ON "RE_FOUND" OFF)
cmake_dependent_option(WITH_NODE_SHMEM "Build with shmem node-type" ON "HAS_SEMAPHORE; HAS_MMAN" OFF)
cmake_dependent_option(WITH_NODE_SIGNAL "Build with signal node-type" ON "" OFF)
@ -267,6 +270,7 @@ add_feature_info(NODE_TEST_RTT WITH_NODE_TEST_RTT "Build with
add_feature_info(NODE_ULDAQ WITH_NODE_ULDAQ "Build with uldaq node-type")
add_feature_info(NODE_WEBSOCKET WITH_NODE_WEBSOCKET "Build with websocket node-type")
add_feature_info(NODE_ZEROMQ WITH_NODE_ZEROMQ "Build with zeromq node-type")
add_feature_info(NODE_REDIS WITH_NODE_REDIS "Build with redis node-type")
if(TOPLEVEL_PROJECT)
feature_summary(WHAT ALL VAR FEATURES)

View file

@ -61,8 +61,8 @@ set(CPACK_RPM_PLUGINS_FILE_NAME "${CPACK_RPM_PLUGINS_PACKAGE_NAME}-${SUFFIX}")
set(CPACK_RPM_TOOLS_FILE_NAME "${CPACK_RPM_TOOLS_PACKAGE_NAME}-${SUFFIX}")
set(CPACK_RPM_DOC_FILE_NAME "${CPACK_RPM_DOC_PACKAGE_NAME}-${SUFFIX}")
set(CPACK_RPM_DEVEL_PACKAGE_REQUIRES "${CPACK_RPM_LIB_PACKAGE_NAME} >= ${CPACK_PACKAGE_VERSION} fmt-devel >= 5.2.0, spdlog-devel >= 1.3.1, openssl-devel >= 1.0.0, libuuid-devel, protobuf-devel >= 2.6.0, protobuf-c-devel >= 1.1.0, libconfig-devel >= 1.4.9, libnl3-devel >= 3.2.27, libcurl-devel >= 7.29.0, jansson-devel >= 2.7, libwebsockets-devel >= 2.3.0, zeromq-devel >= 2.2.0, nanomsg-devel >= 1.0.0, librabbitmq-devel >= 0.8.0, mosquitto-devel >= 1.4.15, libibverbs-devel >= 16.2, librdmacm-devel >= 16.2, libusb-devel >= 0.1.5, lua-devel >= 5.1, librdkafka-deve; >= 1.5.0")
set(CPACK_RPM_LIB_PACKAGE_REQUIRES " fmt >= 5.2.0, spdlog >= 1.3.1, openssl-libs >= 1.0.0, libuuid, protobuf >= 2.6.0, protobuf-c >= 1.1.0, libconfig >= 1.4.9, libnl3 >= 3.2.27, libcurl >= 7.29.0, jansson >= 2.7, libwebsockets >= 2.3.0, zeromq >= 2.2.0, nanomsg >= 1.0.0, librabbitmq >= 0.8.0, mosquitto >= 1.4.15, libibverbs >= 16.2, librdmacm >= 16.2, libusb >= 0.1.5, lua >= 5.1, librdkafka >= 1.5.0")
set(CPACK_RPM_DEVEL_PACKAGE_REQUIRES "${CPACK_RPM_LIB_PACKAGE_NAME} >= ${CPACK_PACKAGE_VERSION} fmt-devel >= 5.2.0, spdlog-devel >= 1.3.1, openssl-devel >= 1.0.0, libuuid-devel, protobuf-devel >= 2.6.0, protobuf-c-devel >= 1.1.0, libconfig-devel >= 1.4.9, libnl3-devel >= 3.2.27, libcurl-devel >= 7.29.0, jansson-devel >= 2.7, libwebsockets-devel >= 2.3.0, zeromq-devel >= 2.2.0, nanomsg-devel >= 1.0.0, librabbitmq-devel >= 0.8.0, mosquitto-devel >= 1.4.15, libibverbs-devel >= 16.2, librdmacm-devel >= 16.2, libusb-devel >= 0.1.5, lua-devel >= 5.1, librdkafka-devel >= 1.5.0, hiredis-devel >= 1.0.0")
set(CPACK_RPM_LIB_PACKAGE_REQUIRES " fmt >= 5.2.0, spdlog >= 1.3.1, openssl-libs >= 1.0.0, libuuid, protobuf >= 2.6.0, protobuf-c >= 1.1.0, libconfig >= 1.4.9, libnl3 >= 3.2.27, libcurl >= 7.29.0, jansson >= 2.7, libwebsockets >= 2.3.0, zeromq >= 2.2.0, nanomsg >= 1.0.0, librabbitmq >= 0.8.0, mosquitto >= 1.4.15, libibverbs >= 16.2, librdmacm >= 16.2, libusb >= 0.1.5, lua >= 5.1, librdkafka >= 1.5.0, hiredis >= 1.0.0")
set(CPACK_RPM_BIN_PACKAGE_REQUIRES "${CPACK_RPM_LIB_PACKAGE_NAME} >= ${CPACK_PACKAGE_VERSION}")
set(CPACK_RPM_PLUGINS_PACKAGE_REQUIRES "${CPACK_RPM_LIB_PACKAGE_NAME} >= ${CPACK_PACKAGE_VERSION}")
set(CPACK_RPM_TOOLS_PACKAGE_REQUIRES "${CPACK_RPM_LIB_PACKAGE_NAME} >= ${CPACK_PACKAGE_VERSION}")

View file

@ -0,0 +1,39 @@
nodes = {
redis_node = {
type = "redis",
format = "json", # only valid for mode = 'channel' and 'key'
# With mode = 'hash' we will use a simple human readable format
key = "my_key" # The Redis key to be used for mode = 'key' or 'hash' (default is the node name)
channel = "my_channel" # the Redis channel tp be used for mode = 'channel' (default is the node name)
mode = "key", # one of:
# - 'channel' (publish/subscribe)
# - 'key' (set/get)
# - 'hash' (hmset/hgetall)
notify = false # Whether or not to use Redis keyspace event notifications to get notified about updates
rate = 1.0 # The polling rate when notify = false
uri = "tcp://localhost:6379/0", # The Redis connection URI
# host = "localhost" # Alternatively the connection options can be specified independently
# port = 6379 # Note: options here will overwrite the respective part of the URI if both are given.
# db = 0
# path = "/var/run/redis.sock"
# user = "guest",
# password = "guest"
# ssl = {
# enabled: true
# cacert: "/etc/ssl/certs/ca-certificates.crt",
# cacertdir: "/etc/ssl/certs"
# cert: "./my_cert.crt",
# key, "./my_key.key"
# }
}
}

View file

@ -22,7 +22,7 @@
*********************************************************************************/
/**
* @addtogroup example BSD example Node Type
* @addtogroup example Example node-type
* @ingroup node
* @{
*/

View file

@ -0,0 +1,156 @@
/** Redis node-type
*
* @file
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2014-2021, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
/**
* @addtogroup redis BSD redis Node Type
* @ingroup node
* @{
*/
#pragma once
#include <atomic>
#include <thread>
#include <unordered_map>
#include <sw/redis++/redis++.h>
#include <villas/node/config.h>
#include <villas/node.h>
#include <villas/timing.h>
#include <villas/format.hpp>
#include <villas/task.hpp>
#include <villas/pool.h>
#include <villas/queue_signalled.h>
enum class RedisMode {
KEY,
HASH,
CHANNEL
};
class RedisConnection {
public:
sw::redis::Redis context;
protected:
enum State {
INITIALIZED,
RUNNING,
STOPPING
};
std::thread thread;
std::atomic<enum State> state;
void onMessage(std::string channel, std::string msg);
void loop();
std::unordered_multimap<std::string, struct vnode *> subscriberMap;
sw::redis::Subscriber subscriber;
villas::Logger logger;
public:
RedisConnection(const sw::redis::ConnectionOptions &opts);
static
RedisConnection * get(const sw::redis::ConnectionOptions &opts);
void start();
void stop();
void subscribe(struct vnode *n, const std::string &channel);
void unsubscribe(struct vnode *n, const std::string &channel);
};
struct redis {
sw::redis::ConnectionOptions options;
RedisConnection *conn;
enum RedisMode mode;
std::string key;
bool notify; /**< Use Redis Keyspace notifications to listen for updates. */
struct Task task; /**< Timer for periodic events. */
double rate; /**< Rate for polling key updates if keyspace notifications are disabled. */
villas::node::Format *formatter;
struct pool pool;
struct queue_signalled queue;
};
/** @see node_type::init */
int redis_init(struct vnode *n);
/** @see node_type::destroy */
int redis_destroy(struct vnode *n);
/** @see node_type::parse */
int redis_parse(struct vnode *n, json_t *json);
/** @see node_type::print */
char * redis_print(struct vnode *n);
/** @see node_type::check */
int redis_check();
/** @see node_type::prepare */
int redis_prepare();
/** @see node_type::start */
int redis_start(struct vnode *n);
/** @see node_type::stop */
int redis_stop(struct vnode *n);
/** @see node_type::pause */
int redis_pause(struct vnode *n);
/** @see node_type::resume */
int redis_resume(struct vnode *n);
/** @see node_type::write */
int redis_write(struct vnode *n, struct sample *smps[], unsigned cnt, unsigned *release);
/** @see node_type::read */
int redis_read(struct vnode *n, struct sample *smps[], unsigned cnt, unsigned *release);
/** @see node_type::reverse */
int redis_reverse(struct vnode *n);
/** @see node_type::poll_fds */
int redis_poll_fds(struct vnode *n, int fds[]);
/** @see node_type::netem_fds */
int redis_netem_fds(struct vnode *n, int fds[]);
/** @} */

View file

@ -0,0 +1,198 @@
/** Redis node-type helpers
*
* @file
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2014-2021, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
/**
* @addtogroup redis BSD redis Node Type
* @ingroup node
* @{
*/
#pragma once
#include <chrono>
#include <functional>
#include <sw/redis++/redis++.h>
#include <spdlog/fmt/ostr.h>
namespace std {
template<typename _rep, typename ratio>
struct hash<std::chrono::duration<_rep, ratio>>
{
std::size_t operator()(std::chrono::duration<_rep, ratio> const& s) const
{
return std::hash<_rep>{}(s.count());
}
};
template <>
struct hash<sw::redis::tls::TlsOptions>
{
std::size_t operator()(const sw::redis::tls::TlsOptions& t) const
{
return hash<bool>()(t.enabled) ^
hash<std::string>()(t.cacert) ^
hash<std::string>()(t.cacertdir) ^
hash<std::string>()(t.cert) ^
hash<std::string>()(t.key) ^
hash<std::string>()(t.sni);
}
};
template <>
struct hash<sw::redis::ConnectionOptions>
{
std::size_t operator()(const sw::redis::ConnectionOptions& o) const
{
return hash<int>()(static_cast<int>(o.type)) ^
hash<std::string>()(o.host) ^
hash<int>()(o.port) ^
hash<std::string>()(o.path) ^
hash<std::string>()(o.user) ^
hash<std::string>()(o.password) ^
hash<int>()(o.db) ^
hash<bool>()(o.keep_alive) ^
hash<std::chrono::milliseconds>()(o.connect_timeout) ^
hash<std::chrono::milliseconds>()(o.socket_timeout) ^
hash<sw::redis::tls::TlsOptions>()(o.tls);
}
};
} /* namespace std */
namespace sw {
namespace redis {
bool operator==(const tls::TlsOptions &o1, const tls::TlsOptions &o2)
{
return o1.enabled == o2.enabled &&
o1.cacert == o2.cacert &&
o1.cacertdir == o2.cacertdir &&
o1.cert == o2.cert &&
o1.key == o2.key &&
o1.sni == o2.sni;
}
bool operator==(const ConnectionOptions &o1, const ConnectionOptions &o2)
{
return o1.type == o2.type &&
o1.host == o2.host &&
o1.port == o2.port &&
o1.path == o2.path &&
o1.user == o2.user &&
o1.password == o2.password &&
o1.db == o2.db &&
o1.keep_alive == o2.keep_alive &&
o1.connect_timeout == o2.connect_timeout &&
o1.socket_timeout == o2.socket_timeout &&
o1.tls == o2.tls;
}
template<typename OStream>
OStream &operator<<(OStream &os, const tls::TlsOptions &t)
{
os << "tls.enabled=" << (t.enabled ? "yes" : "no");
if (t.enabled) {
if (!t.cacert.empty())
os << ", tls.cacert=" << t.cacert;
if (!t.cacertdir.empty())
os << ", tls.cacertdir=" << t.cacertdir;
if (!t.cert.empty())
os << ", tls.cert=" << t.cert;
if (!t.key.empty())
os << ", tls.key=" << t.key;
if (!t.sni.empty())
os << ", tls.sni=" << t.sni;
}
return os;
}
template<typename OStream>
OStream &operator<<(OStream &os, const ConnectionType &t)
{
switch (t) {
case ConnectionType::TCP:
os << "tcp";
break;
case ConnectionType::UNIX:
os << "unix";
break;
}
return os;
}
template<typename OStream>
OStream &operator<<(OStream &os, const ConnectionOptions &o)
{
os << "type=" << o.type;
switch (o.type) {
case ConnectionType::TCP:
os << ", host=" << o.host << ", port=" << o.port;
break;
case ConnectionType::UNIX:
os << ", path=" << o.path;
break;
}
os << ", db=" << o.db
<< ", user=" << o.user
<< ", keepalive=" << (o.keep_alive ? "yes" : "no")
<< ", " << o.tls;
return os;
}
} /* namespace redis */
} /* namespace sw */
template<typename OStream>
OStream &operator<<(OStream &os, const enum RedisMode &m)
{
switch (m) {
case RedisMode::KEY:
os << "key";
break;
case RedisMode::HASH:
os << "hash";
break;
case RedisMode::CHANNEL:
os << "channel";
break;
}
return os;
}
/** @} */

View file

@ -179,6 +179,12 @@ if(WITH_NODE_TEMPER)
list(APPEND LIBRARIES PkgConfig::LIBUSB)
endif()
# Enable Redis support
if(WITH_NODE_REDIS)
list(APPEND NODE_SRC redis.cpp)
list(APPEND LIBRARIES PkgConfig::HIREDIS PkgConfig::REDISPP)
endif()
add_library(nodes STATIC ${NODE_SRC})
target_include_directories(nodes PUBLIC ${INCLUDE_DIRS})
target_link_libraries(nodes PUBLIC ${LIBRARIES})

View file

@ -738,7 +738,7 @@ int ngsi_read(struct vnode *n, struct sample * const smps[], unsigned cnt)
int ret;
if (i->task.wait() == 0)
perror("Failed to wait for task");
throw SystemError("Failed to wait for task");
json_t *json_rentity;
json_t *json_entity = ngsi_build_entity(n, nullptr, 0, 0);

674
lib/nodes/redis.cpp Normal file
View file

@ -0,0 +1,674 @@
/** Redis node-type
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2014-2021, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#include <functional>
#include <chrono>
#include <unordered_map>
#include <sys/time.h>
#include <villas/nodes/redis.hpp>
#include <villas/nodes/redis_helpers.hpp>
#include <villas/utils.hpp>
#include <villas/sample.h>
#include <villas/exceptions.hpp>
#include <villas/super_node.hpp>
#include <villas/exceptions.hpp>
#include <villas/timing.h>
/* Forward declartions */
static struct vnode_type p;
static void redis_on_message(struct vnode *n, std::string channel, std::string msg);
using namespace villas;
using namespace villas::node;
using namespace villas::utils;
using namespace sw::redis;
static std::unordered_map<ConnectionOptions, RedisConnection*> connections;
RedisConnection::RedisConnection(const ConnectionOptions &opts) :
context(opts),
subscriber(context.subscriber()),
logger(logging.get("nodes:redis"))
{
/* Enable keyspace notifications */
context.command("config", "set", "notify-keyspace-events", "K$h");
subscriber.on_message([this](std::string channel, std::string msg) {
onMessage(channel, msg);
});
logger->info("New connection: {}", opts);
state = State::INITIALIZED;
}
RedisConnection * RedisConnection::get(const ConnectionOptions &opts)
{
RedisConnection *conn;
auto it = connections.find(opts);
if (it != connections.end())
conn = it->second;
else {
try {
conn = new RedisConnection(opts);
} catch (IoError &e) {
throw RuntimeError(e.what());
}
connections[opts] = conn;
}
return conn;
}
void RedisConnection::onMessage(std::string channel, std::string msg)
{
auto itp = subscriberMap.equal_range(channel);
for (auto it = itp.first; it != itp.second; ++it) {
struct vnode *n = it->second;
redis_on_message(n, channel, msg);
}
}
void RedisConnection::subscribe(struct vnode *n, const std::string &channel)
{
subscriber.subscribe(channel);
subscriberMap.emplace(channel, n);
}
void RedisConnection::unsubscribe(struct vnode *n, const std::string &channel)
{
auto itp = subscriberMap.equal_range(channel);
for (auto it = itp.first; it != itp.second; ++it) {
if (it->second == n)
subscriberMap.erase(it);
}
if (subscriberMap.count(channel) == 0)
subscriber.subscribe(channel);
}
void RedisConnection::start()
{
if (state == State::RUNNING)
return; /* Already running */
state = State::RUNNING;
thread = std::thread(&RedisConnection::loop, this);
}
void RedisConnection::stop()
{
state = State::STOPPING;
thread.join();
state = State::INITIALIZED;
}
void RedisConnection::loop()
{
while (state == State::RUNNING) {
try {
subscriber.consume();
}
catch (const TimeoutError &e) {
continue;
}
catch (const ReplyError &e) {
continue;
}
catch (const Error &e) {
/* Create a new subscriber */
subscriber = context.subscriber();
}
}
}
static int redis_get(struct vnode *n, struct sample * const smps[], unsigned cnt)
{
int ret;
struct redis *r = (struct redis *) n->_vd;
switch (r->mode) {
case RedisMode::KEY: {
auto value = r->conn->context.get(r->key);
if (value) {
size_t rbytes, bytes = value->size();
ret = r->formatter->sscan(value->c_str(), bytes, &rbytes, smps, cnt);
if (rbytes != bytes)
return -1;
return ret;
}
else
return -1;
}
case RedisMode::HASH: {
struct sample *smp = smps[0];
for (unsigned j = 0; j < vlist_length(&n->in.signals); j++) {
struct signal *sig = (struct signal *) vlist_at(&n->in.signals, j);
union signal_data *data = &smp->data[j];
*data = sig->init;
}
std::unordered_map<std::string, std::string> kvs;
r->conn->context.hgetall(r->key, std::inserter(kvs, kvs.begin()));
int max_idx = -1;
for (auto &it : kvs) {
auto &name = it.first;
auto &value = it.second;
struct signal *sig = vlist_lookup_name<struct signal>(&n->in.signals, name);
if (!sig)
continue;
auto idx = vlist_index(&n->in.signals, sig);
if (idx > max_idx)
max_idx = idx;
char *end;
ret = signal_data_parse_str(&smp->data[idx], sig->type, value.c_str(), &end);
if (ret < 0)
continue;
}
smp->flags = 0;
smp->length = max_idx + 1;
if (smp->length > 0)
smp->flags |= (int) SampleFlags::HAS_DATA;
return 1;
}
default:
return -1;
}
}
static void redis_on_message(struct vnode *n, std::string channel, std::string msg)
{
struct redis *r = (struct redis *) n->_vd;
n->logger->debug("Message: {}: {}", channel, msg);
int alloc, scanned, pushed;
unsigned cnt = n->in.vectorize;
struct sample *smps[cnt];
alloc = sample_alloc_many(&r->pool, smps, cnt);
if (alloc < 0) {
n->logger->error("Failed to allocate samples");
return;
}
else if ((unsigned) alloc < cnt)
n->logger->warn("Pool underrun");
switch (r->mode) {
case RedisMode::CHANNEL: {
size_t rbytes;
scanned = r->formatter->sscan(msg.c_str(), msg.size(), &rbytes, smps, alloc);
break;
}
case RedisMode::HASH:
case RedisMode::KEY:
scanned = redis_get(n, smps, cnt);
break;
default:
goto out;
}
if (scanned < 0) {
n->logger->error("Failed to decode samples");
pushed = 0;
goto out;
}
pushed = queue_signalled_push_many(&r->queue, (void **) smps, scanned);
if (pushed < 0) {
n->logger->error("Failed to enqueue");
pushed = 0;
}
else if (pushed != scanned)
n->logger->warn("Queue underrun");
out: sample_decref_many(smps + pushed, alloc - pushed);
}
int redis_init(struct vnode *n)
{
struct redis *r = (struct redis *) n->_vd;
r->mode = RedisMode::KEY;
r->formatter = nullptr;
r->notify = true;
r->rate = 1.0;
new (&r->options) ConnectionOptions;
new (&r->task) Task(CLOCK_REALTIME);
new (&r->key) std::string();
return 0;
}
int redis_destroy(struct vnode *n)
{
int ret;
struct redis *r = (struct redis *) n->_vd;
if (r->formatter)
delete r->formatter;
using string = std::string;
r->options.~ConnectionOptions();
r->key.~string();
r->task.~Task();
ret = queue_signalled_destroy(&r->queue);
if (ret)
return ret;
ret = pool_destroy(&r->pool);
if (ret)
return ret;
return 0;
}
int redis_parse(struct vnode *n, json_t *json)
{
int ret;
struct redis *r = (struct redis *) n->_vd;
json_error_t err;
json_t *json_ssl = nullptr;
json_t *json_format = nullptr;
const char *host = nullptr;
const char *path = nullptr;
const char *user = nullptr;
const char *password = nullptr;
const char *mode = nullptr;
const char *uri = nullptr;
const char *key = nullptr;
const char *channel = nullptr;
int keepalive = -1;
int db = -1;
int notify = -1;
double connect_timeout = -1;
double socket_timeout = -1;
ret = json_unpack_ex(json, &err, 0, "{ s?: o, s?: s, s?: s, s?: i, s?: s, s?: s, s?: s, s?: i, s?: { s?: F, s?: F }, s?: o, s?: b, s?: s, s?: s, s?: s, s?: b, s?: F }",
"format", &json_format,
"uri", &uri,
"host", &host,
"port", &r->options.port,
"path", &path,
"user", &user,
"password", &password,
"db", &db,
"timeout",
"connect", &connect_timeout,
"socket", &socket_timeout,
"ssl", &json_ssl,
"keepalive", &keepalive,
"mode", &mode,
"key", &key,
"channel", &channel,
"notify", &notify,
"rate", &r->rate
);
if (ret)
throw ConfigError(json, err, "node-config-node-redis", "Failed to parse node configuration");
if (json_ssl) {
const char *cacert;
const char *cacertdir;
const char *cert;
const char *key;
int enabled = 1;
ret = json_unpack_ex(json_ssl, &err, 0, "{ s?: b, s?: s, s?: s, s?: s, s?: s }",
"enabled", &enabled,
"cacert", &cacert,
"cacertdir", &cacertdir,
"cert", &cert,
"key", &key
);
if (ret)
throw ConfigError(json, err, "node-config-node-redis-ssl", "Failed to parse Redis SSL configuration");
r->options.tls.enabled = enabled != 0;
r->options.tls.cacert = cacert;
r->options.tls.cacertdir = cacertdir;
r->options.tls.cert = cert;
r->options.tls.key = key;
if (host)
r->options.tls.sni = host;
}
/* Mode */
if (mode) {
if (!strcmp(mode, "key") || !strcmp(mode, "set-get"))
r->mode = RedisMode::KEY;
else if (!strcmp(mode, "hash") || !strcmp(mode, "hset-hget"))
r->mode = RedisMode::HASH;
else if (!strcmp(mode, "channel") || !strcmp(mode, "pub-sub"))
r->mode = RedisMode::CHANNEL;
else
throw ConfigError(json, "node-config-node-redis-mode", "Invalid Redis mode: {}", mode);
}
/* Format */
r->formatter = json_format
? FormatFactory::make(json_format)
: FormatFactory::make("json");
if (!r->formatter)
throw ConfigError(json_format, "node-config-node-redis-format", "Invalid format configuration");
if (key && r->mode == RedisMode::KEY)
r->key = key;
if (channel && r->mode == RedisMode::CHANNEL)
r->key = channel;
if (notify >= 0)
r->notify = notify != 0;
/* Connection options */
if (uri)
r->options = ConnectionOptions(uri);
if (db >= 0)
r->options.db = db;
if (user)
r->options.user = user;
if (password)
r->options.password = password;
if (host)
r->options.host = host;
if (path)
r->options.path = path;
if (keepalive >= 0)
r->options.keep_alive = keepalive != 0;
if (socket_timeout >= 0)
r->options.socket_timeout = std::chrono::milliseconds((int) (1000 * socket_timeout));
if (connect_timeout >= 0)
r->options.connect_timeout = std::chrono::milliseconds((int) (1000 * connect_timeout));
return 0;
}
int redis_check(struct vnode *n)
{
struct redis *r = (struct redis *) n->_vd;
if (!r->options.host.empty() && !r->options.path.empty())
return -1;
if (r->options.db < 0)
return -1;
return 0;
}
char * redis_print(struct vnode *n)
{
struct redis *r = (struct redis *) n->_vd;
std::stringstream ss;
ss << "mode=" << r->mode
<< ", key=" << r->key
<< ", notify=" << (r->notify ? "yes" : "no");
if (!r->notify)
ss << ", rate=" << r->rate;
ss << ", " << r->options;
return strdup(ss.str().c_str());
}
int redis_prepare(struct vnode *n)
{
int ret;
struct redis *r = (struct redis *) n->_vd;
r->options.type = r->options.path.empty()
? ConnectionType::TCP
: ConnectionType::UNIX;
if (r->key.empty())
r->key = node_name_short(n);
ret = queue_signalled_init(&r->queue, 1024);
if (ret)
return ret;
ret = pool_init(&r->pool, 1024, SAMPLE_LENGTH(64));
if (ret)
return ret;
r->conn = RedisConnection::get(r->options);
if (!r->conn)
return -1;
return 0;
}
int redis_start(struct vnode *n)
{
struct redis *r = (struct redis *) n->_vd;
r->formatter->start(&n->in.signals, ~(int) SampleFlags::HAS_OFFSET);
if (!r->notify)
r->task.setRate(r->rate);
switch (r->mode) {
case RedisMode::CHANNEL:
r->conn->subscribe(n, r->key);
break;
case RedisMode::HASH:
case RedisMode::KEY:
if (r->notify) {
auto pattern = fmt::format("__keyspace@{}__:{}", r->options.db, r->key);
r->conn->subscribe(n, pattern);
}
break;
}
r->conn->start();
return 0;
}
int redis_stop(struct vnode *n)
{
struct redis *r = (struct redis *) n->_vd;
r->conn->stop();
if (!r->notify)
r->task.stop();
switch (r->mode) {
case RedisMode::CHANNEL:
r->conn->unsubscribe(n, r->key);
break;
case RedisMode::HASH:
case RedisMode::KEY:
if (r->notify) {
auto pattern = fmt::format("__keyspace@{}__:{}", r->options.db, r->key);
r->conn->unsubscribe(n, pattern);
}
break;
}
return 0;
}
int redis_read(struct vnode *n, struct sample * const smps[], unsigned cnt)
{
struct redis *r = (struct redis *) n->_vd;
/* Wait for new data */
if (r->notify || r->mode == RedisMode::CHANNEL) {
int pulled_cnt;
struct sample *pulled_smps[cnt];
pulled_cnt = queue_signalled_pull_many(&r->queue, (void **) pulled_smps, cnt);
sample_copy_many(smps, pulled_smps, pulled_cnt);
sample_decref_many(pulled_smps, pulled_cnt);
return pulled_cnt;
}
else {
if (r->task.wait() == 0)
throw SystemError("Failed to wait for task");
return redis_get(n, smps, cnt);
}
}
int redis_write(struct vnode *n, struct sample * const smps[], unsigned cnt)
{
int ret;
struct redis *r = (struct redis *) n->_vd;
switch (r->mode) {
case RedisMode::CHANNEL:
for (unsigned i = 0; i < cnt; i++) {
char buf[1500];
size_t wbytes;
ret = r->formatter->sprint(buf, sizeof(buf), &wbytes, &smps[i], cnt);
if (ret < 0)
return ret;
auto value = std::string_view(buf, wbytes);
r->conn->context.publish(r->key, value);
}
break;
case RedisMode::KEY: {
char buf[1500];
size_t wbytes;
ret = r->formatter->sprint(buf, sizeof(buf), &wbytes, smps, cnt);
if (ret < 0)
return ret;
auto value = std::string_view(buf, wbytes);
r->conn->context.set(r->key, value);
break;
}
case RedisMode::HASH: {
/* We only update the signals with their latest value here. */
struct sample *smp = smps[cnt - 1];
std::unordered_map<std::string, std::string> kvs;
for (unsigned j = 0; j < MIN(vlist_length(smp->signals), smp->length); j++) {
struct signal *sig = (struct signal *) vlist_at(smp->signals, j);
union signal_data *data = &smp->data[j];
char val[128];
signal_data_print_str(data, sig->type, val, sizeof(val), 16);
kvs[sig->name] = val;
}
r->conn->context.hmset(r->key, kvs.begin(), kvs.end());
break;
}
}
return cnt;
}
int redis_poll_fds(struct vnode *n, int fds[])
{
struct redis *r = (struct redis *) n->_vd;
fds[0] = r->notify
? queue_signalled_fd(&r->queue)
: r->task.getFD();
return 1;
}
__attribute__((constructor(110)))
static void register_plugin() {
p.name = "redis";
p.description = "Redis key-value store";
p.vectorize = 0;
p.size = sizeof(struct redis);
p.init = redis_init;
p.destroy = redis_destroy;
p.parse = redis_parse;
p.check = redis_check;
p.print = redis_print;
p.prepare = redis_prepare;
p.start = redis_start;
p.stop = redis_stop;
p.read = redis_read;
p.write = redis_write;
p.poll_fds = redis_poll_fds;
if (!node_types)
node_types = new NodeTypeList();
node_types->push_back(&p);
}

View file

@ -133,6 +133,32 @@ if ! pkg-config "rdkafka>=1.5.0" && \
popd
fi
# Build & Install hiredis
if ! pkg-config "hiredis>1.0.0" && \
[ -z "${SKIP_HIREDIS}" ]; then
git clone --branch v1.0.0 --depth 1 https://github.com/redis/hiredis.git
mkdir -p hiredis/build
pushd hiredis/build
cmake -DDISABLE_TESTS=ON \
-DENABLE_SSL=ON \
${CMAKE_OPTS} ..
make -j$(nproc) ${TARGET}
popd
fi
# Build & Install redis++
if [ -z "${SKIP_REDISPP}" ]; then
git clone --depth 1 https://github.com/sewenew/redis-plus-plus.git
mkdir -p redis-plus-plus/build
pushd redis-plus-plus/build
cmake -DREDIS_PLUS_PLUS_BUILD_STATIC=OFF \
-DREDIS_PLUS_PLUS_USE_TLS=ON \
-DREDIS_PLUS_PLUS_CXX_STANDARD=17 \
${CMAKE_OPTS} ..
make -j$(nproc) ${TARGET}
popd
fi
# Build & Install uldaq
if ! pkg-config "libuldaq >= 1.0.0" && \
[ -z "${SKIP_ULDAQ}" ]; then

View file

@ -58,7 +58,8 @@ RUN apk update && \
mosquitto-dev \
librdkafka-dev \
libusb-dev \
lua-dev
lua-dev \
hiredis-dev
RUN if [ "${ARCH}" != "armv6" -a "${ARCH}" != "armv7" ]; then \
apk add \

View file

@ -60,7 +60,8 @@ RUN dnf -y install \
libibverbs-devel \
librdmacm-devel \
libusb1-devel \
lua-devel
lua-devel \
hiredis-devel
# Add local and 64-bit locations to linker paths
ENV echo /usr/local/lib >> /etc/ld.so.conf && \

View file

@ -63,7 +63,8 @@ RUN apt-get update && \
libusb-1.0-0-dev \
libfmt-dev \
libspdlog-dev \
liblua5.3-dev
liblua5.3-dev \
libhiredis-dev
# Add local and 64-bit locations to linker paths
ENV echo /usr/local/lib >> /etc/ld.so.conf && \

View file

@ -67,7 +67,8 @@ RUN apt-get update && \
libibverbs-dev:${ARCH} \
librdmacm-dev:${ARCH} \
libre-dev:${ARCH} \
liblua5.3-dev
liblua5.3-dev:${ARCH} \
libhiredis-dev:${ARCH}
# Add local and 64-bit locations to linker paths
ENV echo /usr/local/lib >> /etc/ld.so.conf && \

View file

@ -78,7 +78,8 @@ RUN dnf -y install \
libibverbs-devel \
librdmacm-devel \
libusb-devel \
lua-devel
lua-devel \
hiredis-devel
# Add local and 64-bit locations to linker paths
RUN echo /usr/local/lib >> /etc/ld.so.conf && \

View file

@ -64,7 +64,8 @@ RUN apt-get update && \
libwebsockets-dev \
libfmt-dev \
libspdlog-dev \
liblua5.3-dev
liblua5.3-dev \
libhiredis-dev
# Add local and 64-bit locations to linker paths
ENV echo /usr/local/lib >> /etc/ld.so.conf && \