diff --git a/doc/openapi/components/schemas/config/hooks/_frame.yaml b/doc/openapi/components/schemas/config/hooks/_frame.yaml new file mode 100644 index 000000000..75bfcaa32 --- /dev/null +++ b/doc/openapi/components/schemas/config/hooks/_frame.yaml @@ -0,0 +1,7 @@ +# yaml-language-server: $schema=http://json-schema.org/draft-07/schema +# SPDX-FileCopyrightText: 2023 OPAL-RT Germany GmbH +# SPDX-License-Identifier: Apache-2.0 +--- +allOf: +- $ref: ../hook_obj.yaml +- $ref: frame.yaml diff --git a/doc/openapi/components/schemas/config/hooks/frame.yaml b/doc/openapi/components/schemas/config/hooks/frame.yaml new file mode 100644 index 000000000..34f4500e0 --- /dev/null +++ b/doc/openapi/components/schemas/config/hooks/frame.yaml @@ -0,0 +1,36 @@ +# yaml-language-server: $schema=http://json-schema.org/draft-07/schema +# SPDX-FileCopyrightText: 2023 OPAL-RT Germany GmbH +# SPDX-License-Identifier: Apache-2.0 +--- +allOf: +- type: object + properties: + trigger: + description: The trigger for new frames. + type: string + default: sequence + enum: + - sequence + - timestamp + + unit: + description: The type of a timestamp trigger. + type: string + enum: + - milliseconds + - seconds + - minutes + - hours + + interval: + description: The interval in which frames are annotated. + type: number + default: 1 + + offset: + description: An offset in the interval for the annotation of new frames. + type: number + default: 0 + +- $ref: ../hook.yaml + diff --git a/etc/examples/hooks/frame.conf b/etc/examples/hooks/frame.conf new file mode 100644 index 000000000..4cd73e016 --- /dev/null +++ b/etc/examples/hooks/frame.conf @@ -0,0 +1,21 @@ +# SPDX-FileCopyrightText: 2023 OPAL-RT Germany GmbH +# SPDX-License-Identifier: Apache-2.0 + +@include "hook-nodes.conf" + +paths = ( + { + in = "signal_node" + out = "file_node" + + hooks = ( + { + type = "frame" + + trigger = "timestamp" + unit = "seconds" + interval = 10 + } + ) + } +) diff --git a/lib/hooks/CMakeLists.txt b/lib/hooks/CMakeLists.txt index 16c27a575..883472f90 100644 --- a/lib/hooks/CMakeLists.txt +++ b/lib/hooks/CMakeLists.txt @@ -13,6 +13,7 @@ set(HOOK_SRC dump.cpp ebm.cpp fix.cpp + frame.cpp gate.cpp jitter_calc.cpp limit_rate.cpp diff --git a/lib/hooks/frame.cpp b/lib/hooks/frame.cpp new file mode 100644 index 000000000..99b962ab1 --- /dev/null +++ b/lib/hooks/frame.cpp @@ -0,0 +1,250 @@ +/* Frame hook. + * + * Author: Philipp Jungkamp + * SPDX-FileCopyrightText: 2023 OPAL-RT Germany GmbH + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace villas { +namespace node { + +enum class Trigger { + TIMESTAMP, + SEQUENCE, +}; + +enum class Unit { + MILLISECONDS, + SECONDS, + MINUTES, + HOURS, +}; + +uint64_t unit_wrap(std::optional unit) { + if (unit) { + switch (*unit) { + case Unit::HOURS: + return 24; + case Unit::MINUTES: + return 60; + case Unit::SECONDS: + return 60; + case Unit::MILLISECONDS: + return 1000; + } + } + + return std::numeric_limits::max(); +} + +class FrameHook : public Hook { + using sample_ptr = std::unique_ptr; + +private: + Trigger trigger; + uint64_t interval; + uint64_t offset; + std::optional unit; + sample_ptr last_smp; + + bool updateInterval(Sample const *next_smp) { + bool changed = false; + + if (!last_smp.get() || + (next_smp->flags & (int)SampleFlags::NEW_SIMULATION) == + (int)SampleFlags::NEW_SIMULATION) { + changed = true; + } else if (trigger == Trigger::SEQUENCE) { + auto last_interval = (last_smp->sequence + interval - offset) / interval; + auto next_interval = (next_smp->sequence + interval - offset) / interval; + changed = last_interval != next_interval; + } else + switch (unit.value()) { + case Unit::HOURS: { + auto last_hour = last_smp->ts.origin.tv_sec / 3'600; + auto next_hour = next_smp->ts.origin.tv_sec / 3'600; + + auto last_day = last_hour / 24; + auto next_day = next_hour / 24; + if (last_day != next_day) { + changed = true; + break; + } + + auto last_hour_of_day = last_hour - 24 * last_day; + auto next_hour_of_day = next_hour - 24 * next_day; + auto last_interval_of_day = + (last_hour_of_day + interval - offset) / interval; + auto next_interval_of_day = + (next_hour_of_day + interval - offset) / interval; + changed = last_interval_of_day != next_interval_of_day; + break; + } + + case Unit::MINUTES: { + auto last_minute = last_smp->ts.origin.tv_sec / 60; + auto next_minute = next_smp->ts.origin.tv_sec / 60; + + auto last_hour = last_minute / 60; + auto next_hour = next_minute / 60; + if (last_hour != next_hour) { + changed = true; + break; + } + + auto last_minute_of_hour = last_minute - 60 * last_hour; + auto next_minute_of_hour = next_minute - 60 * next_hour; + auto last_interval_of_hour = + (last_minute_of_hour + interval - offset) / interval; + auto next_interval_of_hour = + (next_minute_of_hour + interval - offset) / interval; + changed = last_interval_of_hour != next_interval_of_hour; + break; + } + + case Unit::SECONDS: { + auto last_second = last_smp->ts.origin.tv_sec; + auto next_second = next_smp->ts.origin.tv_sec; + + auto last_minute = last_second / 60; + auto next_minute = next_second / 60; + if (last_minute != next_minute) { + changed = true; + break; + } + + auto last_second_of_minute = last_second - 60 * last_minute; + auto next_second_of_minute = next_second - 60 * next_minute; + auto last_interval_of_minute = + (last_second_of_minute + interval - offset) / interval; + auto next_interval_of_minute = + (next_second_of_minute + interval - offset) / interval; + changed = last_interval_of_minute != next_interval_of_minute; + break; + } + + case Unit::MILLISECONDS: { + auto last_second = last_smp->ts.origin.tv_sec; + auto next_second = next_smp->ts.origin.tv_sec; + if (last_second != next_second) { + changed = true; + break; + } + + auto last_millisecond_of_second = + last_smp->ts.origin.tv_nsec / 1'000'000; + auto next_millisecond_of_second = + next_smp->ts.origin.tv_nsec / 1'000'000; + auto last_interval_of_second = + (last_millisecond_of_second + interval - offset) / interval; + auto next_interval_of_second = + (next_millisecond_of_second + interval - offset) / interval; + changed = last_interval_of_second != next_interval_of_second; + break; + } + } + + if (changed) + logger->debug("new frame"); + + return changed; + } + +public: + FrameHook(Path *p, Node *n, int fl, int prio, bool en = true) + : Hook(p, n, fl, prio, en), trigger(Trigger::SEQUENCE), interval(1), + offset(0), unit{std::nullopt}, last_smp{nullptr, &sample_decref} {} + + virtual void parse(json_t *json) override { + Hook::parse(json); + + char *trigger_str = nullptr; + char *unit_str = nullptr; + int interval_int = -1; + int offset_int = -1; + + json_error_t err; + auto ret = json_unpack_ex(json, &err, 0, "{ s?: s, s?: s, s?: i, s?: i }", + "trigger", &trigger_str, "unit", &unit_str, + "interval", &interval_int, "offset", &offset_int); + if (ret) + throw ConfigError(json, err, "node-config-hook-frame"); + + if (trigger_str) { + if (!strcmp(trigger_str, "sequence")) + trigger = Trigger::SEQUENCE; + else if (!strcmp(trigger_str, "timestamp")) + trigger = Trigger::TIMESTAMP; + else + throw ConfigError(json, "node-config-hook-frame-unit"); + } + + if (trigger == Trigger::TIMESTAMP) { + if (!strcmp(unit_str, "milliseconds")) + unit = Unit::MILLISECONDS; + else if (!strcmp(unit_str, "seconds")) + unit = Unit::SECONDS; + else if (!strcmp(unit_str, "minutes")) + unit = Unit::MINUTES; + else if (!strcmp(unit_str, "hours")) + unit = Unit::HOURS; + else + throw ConfigError(json, "node-config-hook-frame-unit"); + } + + if (interval_int != -1) { + if (interval_int <= 0 || (uint64_t)interval_int > unit_wrap(unit)) + throw ConfigError(json, "node-config-hook-frame-interval"); + + interval = interval_int; + } + + if (offset_int != -1) { + if (offset_int < 0 || (uint64_t)offset_int >= unit_wrap(unit)) + throw ConfigError(json, "node-config-hook-frame-offset"); + + offset = offset_int; + } + } + + virtual Hook::Reason process(Sample *smp) override { + Hook::process(smp); + + if (updateInterval(smp)) + smp->flags |= (int)SampleFlags::NEW_FRAME; + else + smp->flags &= ~(int)SampleFlags::NEW_FRAME; + + sample_incref(smp); + last_smp = sample_ptr{smp, &sample_decref}; + + return Reason::OK; + } + + virtual void stop() override { + Hook::stop(); + + last_smp = nullptr; + } +}; + +// Register hook +static char n[] = "frame"; +static char d[] = "Add frame annotations too the stream of samples"; +static HookPlugin p; + +} // namespace node +} // namespace villas