1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

hook-digest: Add hook for computing digests

Signed-off-by: Philipp Jungkamp <Philipp.Jungkamp@opal-rt.com>
This commit is contained in:
Philipp Jungkamp 2023-08-25 13:33:57 +02:00 committed by Steffen Vogel
parent c2eefabb93
commit 065de3ab6d
7 changed files with 319 additions and 0 deletions

View file

@ -17,6 +17,7 @@ discriminator:
average: hooks/_average.yaml
cast: hooks/_cast.yaml
decimate: hooks/_decimate.yaml
digest: hooks/_digest.yaml
dp: hooks/_dp.yaml
drop: hooks/_drop.yaml
dump: hooks/_dump.yaml

View file

@ -0,0 +1,7 @@
# yaml-language-server: $schema=http://json-schema.org/draft-07/schema
# SPDX-FileCopyrightText: 2023 OPAL-RT Germany GmbH
# SPDX-License-Identifier: Apache-2.0
---
allOf:
- $ref: ../hook_obj.yaml
- $ref: digest.yaml

View file

@ -0,0 +1,24 @@
# yaml-language-server: $schema=http://json-schema.org/draft-07/schema
# SPDX-FileCopyrightText: 2023 OPAL-RT Germany GmbH
# SPDX-License-Identifier: Apache-2.0
---
allOf:
- type: object
required:
- uri
- mode
- interval
- algorithm
properties:
uri:
description: The output file for digests.
example: digest.txt
type: string
format: uri
algorithm:
description: The algorithm used for calculating digests.
example: sha256
type: string
- $ref: ../hook.yaml

View file

@ -0,0 +1,23 @@
# SPDX-FileCopyrightText: 2023 OPAL-RT Germany GmbH
# SPDX-License-Identifier: Apache-2.0
@include "hook-nodes.conf"
paths = (
{
in = "signal_node"
out = "file_node"
hooks = (
# Use a frame hook to generate NEW_FRAME annotations.
"frame",
{
type = "digest"
# The algorithm used for digest calculation
algorithm = "sha256"
# The output file for digests
uri = "sequence.digest"
}
)
}
)

View file

@ -8,6 +8,7 @@ set(HOOK_SRC
average.cpp
cast.cpp
decimate.cpp
digest.cpp
dp.cpp
drop.cpp
dump.cpp

261
lib/hooks/digest.cpp Normal file
View file

@ -0,0 +1,261 @@
/* Digest hook.
*
* Author: Philipp Jungkamp <philipp.jungkamp@opal-rt.com>
* SPDX-FileCopyrightText: 2023 OPAL-RT Germany GmbH
* SPDX-License-Identifier: Apache-2.0
*/
#include <array>
#include <cmath>
#include <cstddef>
#include <cstdint>
#include <fmt/core.h>
#include <iterator>
#include <memory>
#include <optional>
#include <string>
#include <variant>
#include <fmt/format.h>
#include <openssl/evp.h>
#include <villas/exceptions.hpp>
#include <villas/hook.hpp>
#include <villas/sample.hpp>
#include <villas/signal_data.hpp>
#include <villas/signal_type.hpp>
#include <villas/timing.hpp>
#include <villas/utils.hpp>
namespace villas {
namespace node {
namespace digest {
template <typename T>
static std::array<std::uint8_t, sizeof(T)>
little_endian_bytes(T const &t) noexcept {
auto bytes = std::array<std::uint8_t, sizeof(T)>();
auto const ptr = reinterpret_cast<uint8_t const *>(std::addressof(t));
std::copy(ptr, ptr + sizeof(T), bytes.begin());
#if BYTE_ORDER == BIG_ENDIAN
std::reverse(bytes.begin(), bytes.end());
#endif
return bytes;
}
static void FILE_free(FILE *f) { fclose(f); }
class DigestHook : public Hook {
using md_ctx_ptr = std::unique_ptr<EVP_MD_CTX, decltype(&EVP_MD_CTX_free)>;
using file_ptr = std::unique_ptr<FILE, decltype(&FILE_free)>;
// Parameters
std::string algorithm;
std::string uri;
// Context
md_ctx_ptr md_ctx;
EVP_MD const *md;
file_ptr file;
std::optional<uint64_t> first_sequence;
std::optional<timespec> first_timestamp;
std::optional<uint64_t> last_sequence;
std::optional<timespec> last_timestamp;
std::vector<unsigned char> md_buffer;
std::string md_string_buffer;
void emitDigest() {
if (!first_sequence || !first_timestamp || !last_sequence ||
!last_timestamp)
return;
unsigned int md_size;
unsigned char md_value[EVP_MAX_MD_SIZE];
if (!EVP_DigestFinal_ex(md_ctx.get(), md_value, &md_size))
throw RuntimeError{"Could not finalize digest"};
if (!EVP_MD_CTX_reset(md_ctx.get()))
throw RuntimeError{"Could not reset digest context"};
if (!EVP_DigestInit_ex(md_ctx.get(), md, NULL))
throw RuntimeError{"Could not initialize digest"};
md_string_buffer.clear();
auto inserter = std::back_inserter(md_string_buffer);
fmt::format_to(inserter, "{}.{:09}-{} ", first_timestamp->tv_sec,
first_timestamp->tv_nsec, *first_sequence);
fmt::format_to(inserter, "{}.{:09}-{} ", last_timestamp->tv_sec,
last_timestamp->tv_nsec, *last_sequence);
fmt::format_to(inserter, "{} ", algorithm);
for (unsigned int i = 0; i < md_size; ++i)
fmt::format_to(inserter, "{:02X}", md_value[i]);
logger->debug("emit {}", md_string_buffer);
md_string_buffer.push_back('\n');
fputs(md_string_buffer.c_str(), file.get());
fflush(file.get());
}
void updateInterval(Sample const *smp) {
auto const next_sequence = smp->sequence;
auto const next_timestamp = smp->ts.origin;
if (smp->flags & (int)SampleFlags::NEW_FRAME) {
emitDigest();
first_sequence = next_sequence;
first_timestamp = next_timestamp;
}
last_sequence = next_sequence;
last_timestamp = next_timestamp;
}
void updateDigest(Sample const *smp) {
md_buffer.clear();
auto inserter = std::back_inserter(md_buffer);
if (smp->flags & (int)SampleFlags::HAS_TS_ORIGIN) {
auto const bytes_sec =
little_endian_bytes<uint64_t>(smp->ts.origin.tv_sec);
std::copy(std::cbegin(bytes_sec), std::cend(bytes_sec), inserter);
auto const bytes_nsec =
little_endian_bytes<uint64_t>(smp->ts.origin.tv_nsec);
std::copy(std::cbegin(bytes_nsec), std::cend(bytes_nsec), inserter);
}
if (smp->flags & (int)SampleFlags::HAS_SEQUENCE) {
auto const bytes = little_endian_bytes<uint64_t>(smp->sequence);
std::copy(std::cbegin(bytes), std::cend(bytes), inserter);
}
if (smp->flags & (int)SampleFlags::HAS_DATA) {
if (signals->size() != smp->length)
throw RuntimeError{"Sample length does not match signal list."};
for (unsigned int i = 0; i < smp->length; ++i) {
auto const signal = signals->getByIndex(i);
auto const &data = smp->data[i];
switch (signal->type) {
case SignalType::BOOLEAN: {
auto const bytes = little_endian_bytes<uint8_t>(data.b ? 1 : 0);
std::copy(std::cbegin(bytes), std::cend(bytes), inserter);
break;
}
case SignalType::INTEGER: {
auto const bytes = little_endian_bytes<uint64_t>(data.i);
std::copy(std::cbegin(bytes), std::cend(bytes), inserter);
break;
}
case SignalType::FLOAT: {
auto const f = (data.f == data.f) ? data.f : std::nan("");
auto const bytes = little_endian_bytes<double>(f);
std::copy(std::cbegin(bytes), std::cend(bytes), inserter);
break;
}
case SignalType::COMPLEX: {
auto real = data.z.real();
real = (real == real) ? real : std::nanf("");
auto const rbytes = little_endian_bytes<float>(real);
std::copy(std::cbegin(rbytes), std::cend(rbytes), inserter);
auto imag = data.z.imag();
imag = (imag == imag) ? imag : std::nanf("");
auto const ibytes = little_endian_bytes<float>(imag);
std::copy(std::cbegin(ibytes), std::cend(ibytes), inserter);
break;
}
case SignalType::INVALID:
default:
throw RuntimeError{"Can calculate digest for invalid sample."};
}
}
}
if (!EVP_DigestUpdate(md_ctx.get(), md_buffer.data(), md_buffer.size()))
throw RuntimeError{"Could not update digest"};
}
public:
DigestHook(Path *p, Node *n, int fl, int prio, bool en = true)
: Hook(p, n, fl, prio, en), algorithm(), uri(),
md_ctx(EVP_MD_CTX_new(), &EVP_MD_CTX_free), md(nullptr),
file(nullptr, &FILE_free), first_sequence(std::nullopt),
first_timestamp(std::nullopt), last_sequence(std::nullopt),
last_timestamp(std::nullopt), md_buffer(), md_string_buffer() {}
virtual void parse(json_t *json) {
Hook::parse(json);
char const *uri_str;
char const *mode_str;
char const *algorithm_str;
json_error_t err;
int ret =
json_unpack_ex(json, &err, 0, "{ s: s, s?: s, s?: s }", "uri", &uri_str,
"mode", &mode_str, "algorithm", &algorithm_str);
if (ret)
throw ConfigError(json, err, "node-config-hook-digest");
uri = std::string(uri_str);
if (algorithm_str)
algorithm = std::string(algorithm_str);
}
virtual void prepare() {
Hook::prepare();
file = file_ptr(fopen(uri.c_str(), "w"), &FILE_free);
if (!file)
throw RuntimeError{"Could not open file {}: {}", uri, strerror(errno)};
md = EVP_get_digestbyname(algorithm.c_str());
if (!md)
throw RuntimeError{"Could not fetch algorithm {}", algorithm};
}
virtual void start() {
Hook::start();
if (!EVP_DigestInit_ex(md_ctx.get(), md, NULL))
throw RuntimeError{"Could not initialize digest"};
}
virtual Hook::Reason process(struct Sample *smp) {
assert(smp);
assert(state == State::STARTED);
updateInterval(smp);
updateDigest(smp);
return Reason::OK;
}
virtual void stop() {
Hook::stop();
first_sequence.reset();
first_timestamp.reset();
last_sequence.reset();
last_timestamp.reset();
if (!EVP_MD_CTX_reset(md_ctx.get()))
throw RuntimeError{"Could not reset digest context"};
}
};
// Register hook
static char n[] = "digest";
static char d[] = "Calculate the digest for a range of samples";
static HookPlugin<DigestHook, n, d, (int)Hook::Flags::PATH, 999> p;
} // namespace digest
} // namespace node
} // namespace villas

View file

@ -168,6 +168,8 @@ public:
: Hook(p, n, fl, prio, en), trigger(Trigger::SEQUENCE), interval(1),
offset(0), unit{std::nullopt}, last_smp{nullptr, &sample_decref} {}
virtual ~FrameHook() { (void)last_smp.release(); }
virtual void parse(json_t *json) override {
Hook::parse(json);