1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00
VILLASnode/lib/hooks/reorder_ts.cpp
Steffen Vogel 2529c7b2d7 Remove superfluous includes
Signed-off-by: Steffen Vogel <post@steffenvogel.de>
2024-04-10 18:56:28 +02:00

156 lines
3.7 KiB
C++

/* Reorder samples hook.
*
* Author: Philipp Jungkamp <philipp.jungkamp@opal-rt.com>
* SPDX-FileCopyrightText: 2023, OPAL-RT Germany GmbH
* SPDX-License-Identifier: Apache-2.0
*/
#include "villas/exceptions.hpp"
#include <algorithm>
#include <cstring>
#include <ctime>
#include <vector>
#include <villas/hook.hpp>
#include <villas/sample.hpp>
#include <villas/timing.hpp>
namespace villas {
namespace node {
class ReorderTsHook : public Hook {
protected:
std::vector<Sample *> window;
std::size_t window_size;
Sample *buffer;
void swapSample(Sample *lhs, Sample *rhs) {
if (buffer) {
sample_copy(buffer, lhs);
sample_copy(lhs, rhs);
sample_copy(rhs, buffer);
} else {
buffer = sample_clone(lhs);
if (!buffer)
throw RuntimeError{"Out of memory."};
sample_copy(lhs, rhs);
sample_copy(rhs, buffer);
}
}
public:
ReorderTsHook(Path *p, Node *n, int fl, int prio, bool en = true)
: Hook(p, n, fl, prio, en), window{}, window_size(16), buffer(nullptr) {}
virtual void parse(json_t *json) {
assert(state != State::STARTED);
json_error_t err;
int ret =
json_unpack_ex(json, &err, 0, "{ s?: i }", "window_size", &window_size);
if (ret)
throw ConfigError(json, err, "node-config-hook-reorder-ts");
state = State::PARSED;
}
virtual void start() {
assert(state == State::PREPARED || state == State::STOPPED);
window.reserve(window_size);
state = State::STARTED;
}
virtual void stop() {
assert(state == State::STARTED);
for (auto sample : window)
sample_free(sample);
if (buffer)
sample_free(buffer);
window.clear();
state = State::STOPPED;
}
virtual Hook::Reason process(Sample *smp) {
assert(state == State::STARTED);
assert(smp);
if (window.empty()) {
auto new_smp = sample_clone(smp);
if (!new_smp)
throw RuntimeError{"Out of memory."};
window.push_back(new_smp);
logger->debug("window.size={}/{}", window.size(), window_size);
return Hook::Reason::SKIP_SAMPLE;
}
for (std::size_t i = window.size() - 1;; i--) {
if (time_cmp(&smp->ts.origin, &window[i]->ts.origin) >= 0) {
if (i != window.size() - 1)
logger->warn("Fixing reordered Sample");
if (window.size() == window_size) {
Sample *window_sample = window.front();
std::copy(++std::begin(window), std::next(std::begin(window), i + 1),
std::begin(window));
window[i] = window_sample;
swapSample(window_sample, smp);
return Hook::Reason::OK;
} else {
window.push_back(nullptr);
std::copy_backward(std::next(std::begin(window), i + 1),
--std::end(window), std::end(window));
auto new_smp = sample_clone(smp);
if (!new_smp)
throw RuntimeError{"Out of memory."};
window[i + 1] = new_smp;
logger->debug("window.size={}/{}", window.size(), window_size);
return Hook::Reason::SKIP_SAMPLE;
}
}
if (!i)
break;
}
logger->error("Could not reorder Sample");
return Hook::Reason::SKIP_SAMPLE;
}
virtual void restart() {
assert(state == State::STARTED);
for (auto sample : window)
sample_free(sample);
window.clear();
}
};
// Register hook
static char n[] = "reorder_ts";
static char d[] = "Reorder messages by their timestamp";
static HookPlugin<ReorderTsHook, n, d,
(int)Hook::Flags::NODE_WRITE | (int)Hook::Flags::PATH |
(int)Hook::Flags::NODE_READ,
2>
p;
} // namespace node
} // namespace villas