diff --git a/doc/openapi/components/schemas/config/hook_obj.yaml b/doc/openapi/components/schemas/config/hook_obj.yaml index 37d4021b5..38dc07929 100644 --- a/doc/openapi/components/schemas/config/hook_obj.yaml +++ b/doc/openapi/components/schemas/config/hook_obj.yaml @@ -17,6 +17,7 @@ discriminator: average: hooks/_average.yaml cast: hooks/_cast.yaml decimate: hooks/_decimate.yaml + digest: hooks/_digest.yaml dp: hooks/_dp.yaml drop: hooks/_drop.yaml dump: hooks/_dump.yaml diff --git a/doc/openapi/components/schemas/config/hooks/_digest.yaml b/doc/openapi/components/schemas/config/hooks/_digest.yaml new file mode 100644 index 000000000..d97530b19 --- /dev/null +++ b/doc/openapi/components/schemas/config/hooks/_digest.yaml @@ -0,0 +1,7 @@ +# yaml-language-server: $schema=http://json-schema.org/draft-07/schema +# SPDX-FileCopyrightText: 2023 OPAL-RT Germany GmbH +# SPDX-License-Identifier: Apache-2.0 +--- +allOf: +- $ref: ../hook_obj.yaml +- $ref: digest.yaml diff --git a/doc/openapi/components/schemas/config/hooks/digest.yaml b/doc/openapi/components/schemas/config/hooks/digest.yaml new file mode 100644 index 000000000..884314046 --- /dev/null +++ b/doc/openapi/components/schemas/config/hooks/digest.yaml @@ -0,0 +1,24 @@ +# yaml-language-server: $schema=http://json-schema.org/draft-07/schema +# SPDX-FileCopyrightText: 2023 OPAL-RT Germany GmbH +# SPDX-License-Identifier: Apache-2.0 +--- +allOf: +- type: object + required: + - uri + - mode + - interval + - algorithm + properties: + uri: + description: The output file for digests. + example: digest.txt + type: string + format: uri + + algorithm: + description: The algorithm used for calculating digests. + example: sha256 + type: string + +- $ref: ../hook.yaml diff --git a/etc/examples/hooks/digest.conf b/etc/examples/hooks/digest.conf new file mode 100644 index 000000000..7d8694771 --- /dev/null +++ b/etc/examples/hooks/digest.conf @@ -0,0 +1,23 @@ +# SPDX-FileCopyrightText: 2023 OPAL-RT Germany GmbH +# SPDX-License-Identifier: Apache-2.0 + +@include "hook-nodes.conf" + +paths = ( + { + in = "signal_node" + out = "file_node" + + hooks = ( + # Use a frame hook to generate NEW_FRAME annotations. + "frame", + { + type = "digest" + # The algorithm used for digest calculation + algorithm = "sha256" + # The output file for digests + uri = "sequence.digest" + } + ) + } +) diff --git a/lib/hooks/CMakeLists.txt b/lib/hooks/CMakeLists.txt index 883472f90..83d2f13f3 100644 --- a/lib/hooks/CMakeLists.txt +++ b/lib/hooks/CMakeLists.txt @@ -8,6 +8,7 @@ set(HOOK_SRC average.cpp cast.cpp decimate.cpp + digest.cpp dp.cpp drop.cpp dump.cpp diff --git a/lib/hooks/digest.cpp b/lib/hooks/digest.cpp new file mode 100644 index 000000000..49fa4bc93 --- /dev/null +++ b/lib/hooks/digest.cpp @@ -0,0 +1,261 @@ +/* Digest hook. + * + * Author: Philipp Jungkamp + * SPDX-FileCopyrightText: 2023 OPAL-RT Germany GmbH + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace villas { +namespace node { +namespace digest { + +template +static std::array +little_endian_bytes(T const &t) noexcept { + auto bytes = std::array(); + auto const ptr = reinterpret_cast(std::addressof(t)); + std::copy(ptr, ptr + sizeof(T), bytes.begin()); + +#if BYTE_ORDER == BIG_ENDIAN + std::reverse(bytes.begin(), bytes.end()); +#endif + + return bytes; +} + +static void FILE_free(FILE *f) { fclose(f); } + +class DigestHook : public Hook { + using md_ctx_ptr = std::unique_ptr; + using file_ptr = std::unique_ptr; + + // Parameters + std::string algorithm; + std::string uri; + + // Context + md_ctx_ptr md_ctx; + EVP_MD const *md; + file_ptr file; + std::optional first_sequence; + std::optional first_timestamp; + std::optional last_sequence; + std::optional last_timestamp; + std::vector md_buffer; + std::string md_string_buffer; + + void emitDigest() { + if (!first_sequence || !first_timestamp || !last_sequence || + !last_timestamp) + return; + + unsigned int md_size; + unsigned char md_value[EVP_MAX_MD_SIZE]; + if (!EVP_DigestFinal_ex(md_ctx.get(), md_value, &md_size)) + throw RuntimeError{"Could not finalize digest"}; + + if (!EVP_MD_CTX_reset(md_ctx.get())) + throw RuntimeError{"Could not reset digest context"}; + + if (!EVP_DigestInit_ex(md_ctx.get(), md, NULL)) + throw RuntimeError{"Could not initialize digest"}; + + md_string_buffer.clear(); + auto inserter = std::back_inserter(md_string_buffer); + fmt::format_to(inserter, "{}.{:09}-{} ", first_timestamp->tv_sec, + first_timestamp->tv_nsec, *first_sequence); + fmt::format_to(inserter, "{}.{:09}-{} ", last_timestamp->tv_sec, + last_timestamp->tv_nsec, *last_sequence); + fmt::format_to(inserter, "{} ", algorithm); + for (unsigned int i = 0; i < md_size; ++i) + fmt::format_to(inserter, "{:02X}", md_value[i]); + + logger->debug("emit {}", md_string_buffer); + md_string_buffer.push_back('\n'); + fputs(md_string_buffer.c_str(), file.get()); + fflush(file.get()); + } + + void updateInterval(Sample const *smp) { + auto const next_sequence = smp->sequence; + auto const next_timestamp = smp->ts.origin; + + if (smp->flags & (int)SampleFlags::NEW_FRAME) { + emitDigest(); + first_sequence = next_sequence; + first_timestamp = next_timestamp; + } + + last_sequence = next_sequence; + last_timestamp = next_timestamp; + } + + void updateDigest(Sample const *smp) { + md_buffer.clear(); + auto inserter = std::back_inserter(md_buffer); + + if (smp->flags & (int)SampleFlags::HAS_TS_ORIGIN) { + auto const bytes_sec = + little_endian_bytes(smp->ts.origin.tv_sec); + std::copy(std::cbegin(bytes_sec), std::cend(bytes_sec), inserter); + auto const bytes_nsec = + little_endian_bytes(smp->ts.origin.tv_nsec); + std::copy(std::cbegin(bytes_nsec), std::cend(bytes_nsec), inserter); + } + + if (smp->flags & (int)SampleFlags::HAS_SEQUENCE) { + auto const bytes = little_endian_bytes(smp->sequence); + std::copy(std::cbegin(bytes), std::cend(bytes), inserter); + } + + if (smp->flags & (int)SampleFlags::HAS_DATA) { + if (signals->size() != smp->length) + throw RuntimeError{"Sample length does not match signal list."}; + + for (unsigned int i = 0; i < smp->length; ++i) { + auto const signal = signals->getByIndex(i); + auto const &data = smp->data[i]; + + switch (signal->type) { + case SignalType::BOOLEAN: { + auto const bytes = little_endian_bytes(data.b ? 1 : 0); + std::copy(std::cbegin(bytes), std::cend(bytes), inserter); + break; + } + + case SignalType::INTEGER: { + auto const bytes = little_endian_bytes(data.i); + std::copy(std::cbegin(bytes), std::cend(bytes), inserter); + break; + } + + case SignalType::FLOAT: { + auto const f = (data.f == data.f) ? data.f : std::nan(""); + auto const bytes = little_endian_bytes(f); + std::copy(std::cbegin(bytes), std::cend(bytes), inserter); + break; + } + + case SignalType::COMPLEX: { + auto real = data.z.real(); + real = (real == real) ? real : std::nanf(""); + auto const rbytes = little_endian_bytes(real); + std::copy(std::cbegin(rbytes), std::cend(rbytes), inserter); + + auto imag = data.z.imag(); + imag = (imag == imag) ? imag : std::nanf(""); + auto const ibytes = little_endian_bytes(imag); + std::copy(std::cbegin(ibytes), std::cend(ibytes), inserter); + break; + } + + case SignalType::INVALID: + default: + throw RuntimeError{"Can calculate digest for invalid sample."}; + } + } + } + + if (!EVP_DigestUpdate(md_ctx.get(), md_buffer.data(), md_buffer.size())) + throw RuntimeError{"Could not update digest"}; + } + +public: + DigestHook(Path *p, Node *n, int fl, int prio, bool en = true) + : Hook(p, n, fl, prio, en), algorithm(), uri(), + md_ctx(EVP_MD_CTX_new(), &EVP_MD_CTX_free), md(nullptr), + file(nullptr, &FILE_free), first_sequence(std::nullopt), + first_timestamp(std::nullopt), last_sequence(std::nullopt), + last_timestamp(std::nullopt), md_buffer(), md_string_buffer() {} + + virtual void parse(json_t *json) { + Hook::parse(json); + + char const *uri_str; + char const *mode_str; + char const *algorithm_str; + + json_error_t err; + int ret = + json_unpack_ex(json, &err, 0, "{ s: s, s?: s, s?: s }", "uri", &uri_str, + "mode", &mode_str, "algorithm", &algorithm_str); + if (ret) + throw ConfigError(json, err, "node-config-hook-digest"); + + uri = std::string(uri_str); + + if (algorithm_str) + algorithm = std::string(algorithm_str); + } + + virtual void prepare() { + Hook::prepare(); + + file = file_ptr(fopen(uri.c_str(), "w"), &FILE_free); + if (!file) + throw RuntimeError{"Could not open file {}: {}", uri, strerror(errno)}; + + md = EVP_get_digestbyname(algorithm.c_str()); + if (!md) + throw RuntimeError{"Could not fetch algorithm {}", algorithm}; + } + + virtual void start() { + Hook::start(); + + if (!EVP_DigestInit_ex(md_ctx.get(), md, NULL)) + throw RuntimeError{"Could not initialize digest"}; + } + + virtual Hook::Reason process(struct Sample *smp) { + assert(smp); + assert(state == State::STARTED); + + updateInterval(smp); + updateDigest(smp); + + return Reason::OK; + } + + virtual void stop() { + Hook::stop(); + + first_sequence.reset(); + first_timestamp.reset(); + last_sequence.reset(); + last_timestamp.reset(); + if (!EVP_MD_CTX_reset(md_ctx.get())) + throw RuntimeError{"Could not reset digest context"}; + } +}; + +// Register hook +static char n[] = "digest"; +static char d[] = "Calculate the digest for a range of samples"; +static HookPlugin p; + +} // namespace digest +} // namespace node +} // namespace villas diff --git a/lib/hooks/frame.cpp b/lib/hooks/frame.cpp index 99b962ab1..f13b05bbf 100644 --- a/lib/hooks/frame.cpp +++ b/lib/hooks/frame.cpp @@ -168,6 +168,8 @@ public: : Hook(p, n, fl, prio, en), trigger(Trigger::SEQUENCE), interval(1), offset(0), unit{std::nullopt}, last_smp{nullptr, &sample_decref} {} + virtual ~FrameHook() { (void)last_smp.release(); } + virtual void parse(json_t *json) override { Hook::parse(json);