diff --git a/etc/examples/hooks/reorder_ts.conf b/etc/examples/hooks/reorder_ts.conf index fa16d1361..7d639f744 100644 --- a/etc/examples/hooks/reorder_ts.conf +++ b/etc/examples/hooks/reorder_ts.conf @@ -9,7 +9,7 @@ paths = ( { type = "reorder_ts" - window = 10 + window_size = 10 } ) } diff --git a/lib/hooks/reorder_ts.cpp b/lib/hooks/reorder_ts.cpp index ca593038f..ebd4cd8b4 100644 --- a/lib/hooks/reorder_ts.cpp +++ b/lib/hooks/reorder_ts.cpp @@ -9,9 +9,11 @@ #include #include #include +#include #include #include +#include namespace villas { namespace node { @@ -23,19 +25,6 @@ protected: std::size_t window_size; Sample *buffer; - bool compareTimestamp(Sample *lhs, Sample *rhs) { - auto lhs_ts = lhs->ts.origin; - auto rhs_ts = rhs->ts.origin; - - if (lhs_ts.tv_sec < rhs_ts.tv_sec) - return true; - - if (lhs_ts.tv_sec == rhs_ts.tv_sec && lhs_ts.tv_nsec < rhs_ts.tv_nsec) - return true; - - return false; - } - void swapSample(Sample *lhs, Sample *rhs) { if (buffer) { sample_copy(buffer, lhs); @@ -56,13 +45,14 @@ public: buffer(nullptr) { } - virtual void parse(json_t *json) + virtual + void parse(json_t *json) { assert(state != State::STARTED); json_error_t err; int ret = json_unpack_ex(json, &err, 0, "{ s?: i }", - "window", &window_size + "window_size", &window_size ); if (ret) throw ConfigError(json, err, "node-config-hook-reorder-ts"); @@ -70,7 +60,8 @@ public: state = State::PARSED; } - virtual void start() + virtual + void start() { assert(state == State::PREPARED || state == State::STOPPED); @@ -79,7 +70,8 @@ public: state = State::STARTED; } - virtual void stop() + virtual + void stop() { assert(state == State::STARTED); @@ -94,61 +86,60 @@ public: state = State::STOPPED; } - virtual Hook::Reason process(Sample *sample) + virtual + Hook::Reason process(Sample *smp) { assert(state == State::STARTED); - assert(sample); + assert(smp); if (window.empty()) { - window.push_back(sample_clone(sample)); + window.push_back(sample_clone(smp)); - logger->info("window.size={}/{}", window.size(), window_size); + logger->debug("window.size={}/{}", window.size(), window_size); return Hook::Reason::SKIP_SAMPLE; } for (std::size_t i = window.size() - 1;; i--) { - if (!compareTimestamp(sample, window[i])) { - if (i != window.size() - 1) { + if (time_cmp(&smp->ts.origin, &window[i]->ts.origin) >= 0) { + if (i != window.size() - 1) logger->warn("Fixing reordered Sample"); - sample_dump(logger, sample); - } if (window.size() == window_size) { - // The front sample will be returned. Sample *window_sample = window.front(); - // Move all elements before the index of insertion towards the front. - std::memmove(&window[0], &window[1], i * sizeof(decltype(window)::value_type)); - // Store the new sample at the correct index. + std::copy( + ++std::begin(window), + std::next(std::begin(window), i + 1), + std::begin(window)); window[i] = window_sample; - // Swap the contents of the front sample with the processed sample. - swapSample(window_sample, sample); + swapSample(window_sample, smp); return Hook::Reason::OK; } else { - // Increase the vector size by 1. window.push_back(nullptr); - // Move all elements from the index of insertion onwards towards the back. - std::memmove(&window[i + 2], &window[i + 1], (window.size() - i - 2) * sizeof(decltype(window)::value_type)); - // Store the new sample at the correct index. - window[i + 1] = sample_clone(sample); + std::copy_backward( + std::next(std::begin(window), i + 1), + --std::end(window), + std::end(window)); + window[i + 1] = sample_clone(smp); - logger->info("window.size={}/{}", window.size(), window_size); + logger->debug("window.size={}/{}", window.size(), window_size); return Hook::Reason::SKIP_SAMPLE; } } - if (!i) break; + if (!i) + break; } logger->error("Could not reorder Sample"); - sample_dump(logger, sample); return Hook::Reason::SKIP_SAMPLE; } - virtual void restart() + virtual + void restart() { assert(state == State::STARTED);