mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
node-iec61850-goose: Periodic resend and cleanup
- introduce resend_thread for periodic resend - change mms_type to reflect the table in iec61850.hpp to be able to merge these tables in the future without breaking configs Signed-off-by: Philipp Jungkamp <philipp.jungkamp@rwth-aachen.de>
This commit is contained in:
parent
98ac02a85b
commit
5a1995d901
3 changed files with 116 additions and 74 deletions
|
@ -30,10 +30,10 @@ nodes = {
|
|||
},
|
||||
{
|
||||
# Mandatory MMS type
|
||||
mms_type = "bit-string"
|
||||
mms_type = "bitstring"
|
||||
|
||||
# Type meta data
|
||||
mms_bit_string_size = 13
|
||||
mms_bitstring_size = 13
|
||||
|
||||
# Constant value
|
||||
value = 2048
|
||||
|
@ -54,28 +54,28 @@ nodes = {
|
|||
signal = "ABB_cascade_state"
|
||||
},
|
||||
{
|
||||
mms_type = "bit-string"
|
||||
mms_bit_string_size = 13
|
||||
mms_type = "bitstring"
|
||||
mms_bitstring_size = 13
|
||||
value = 2048
|
||||
},
|
||||
{
|
||||
mms_type = "bit-string"
|
||||
mms_bit_string_size = 2
|
||||
mms_type = "bitstring"
|
||||
mms_bitstring_size = 2
|
||||
value = 0
|
||||
},
|
||||
{
|
||||
mms_type = "bit-string"
|
||||
mms_bit_string_size = 13
|
||||
mms_type = "bitstring"
|
||||
mms_bitstring_size = 13
|
||||
value = 2048
|
||||
},
|
||||
{
|
||||
mms_type = "bit-string"
|
||||
mms_bit_string_size = 13
|
||||
mms_type = "bitstring"
|
||||
mms_bitstring_size = 13
|
||||
value = 2048
|
||||
},
|
||||
{
|
||||
mms_type = "bit-string"
|
||||
mms_bit_string_size = 2
|
||||
mms_type = "bitstring"
|
||||
mms_bitstring_size = 2
|
||||
value = 0
|
||||
}
|
||||
)
|
||||
|
@ -134,7 +134,7 @@ nodes = {
|
|||
{
|
||||
name = "ABB_relay_state_meta_bitset"
|
||||
type = "integer"
|
||||
mms_type = "bit-string"
|
||||
mms_type = "bitstring"
|
||||
subscriber = "relay"
|
||||
index = 1
|
||||
}
|
||||
|
|
|
@ -7,12 +7,15 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include <string>
|
||||
#include <optional>
|
||||
#include <array>
|
||||
#include <mutex>
|
||||
#include <condition_variable>
|
||||
#include <cstdint>
|
||||
#include <ctime>
|
||||
#include <array>
|
||||
#include <map>
|
||||
#include <optional>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <villas/node/config.hpp>
|
||||
#include <villas/node.hpp>
|
||||
#include <villas/pool.hpp>
|
||||
|
@ -34,8 +37,8 @@ public:
|
|||
};
|
||||
|
||||
struct Descriptor {
|
||||
SignalType signal_type;
|
||||
std::string name;
|
||||
SignalType signal_type;
|
||||
MmsType mms_type;
|
||||
Meta default_meta;
|
||||
};
|
||||
|
@ -73,16 +76,17 @@ public:
|
|||
Meta meta;
|
||||
private:
|
||||
inline static std::array const descriptors {
|
||||
// Boolean signals
|
||||
Descriptor { SignalType::BOOLEAN, "boolean", MmsType::MMS_BOOLEAN },
|
||||
|
||||
// Integer signals
|
||||
Descriptor { SignalType::INTEGER, "integer", MmsType::MMS_INTEGER, {.size = 64 } },
|
||||
Descriptor { SignalType::INTEGER, "unsigned", MmsType::MMS_UNSIGNED, {.size = 32 } },
|
||||
Descriptor { SignalType::INTEGER, "bit-string", MmsType::MMS_BIT_STRING, {.size = 32 } },
|
||||
|
||||
// Float signals
|
||||
Descriptor { SignalType::FLOAT, "float", MmsType::MMS_FLOAT, {.size = 64 } },
|
||||
Descriptor { "boolean", SignalType::BOOLEAN, MmsType::MMS_BOOLEAN },
|
||||
Descriptor { "int8", SignalType::INTEGER, MmsType::MMS_INTEGER, {.size = 8 } },
|
||||
Descriptor { "int16", SignalType::INTEGER, MmsType::MMS_INTEGER, {.size = 16 } },
|
||||
Descriptor { "int32", SignalType::INTEGER, MmsType::MMS_INTEGER, {.size = 32 } },
|
||||
Descriptor { "int64", SignalType::INTEGER, MmsType::MMS_INTEGER, {.size = 64 } },
|
||||
Descriptor { "int8u", SignalType::INTEGER, MmsType::MMS_UNSIGNED, {.size = 8 } },
|
||||
Descriptor { "int16u", SignalType::INTEGER, MmsType::MMS_UNSIGNED, {.size = 16 } },
|
||||
Descriptor { "int32u", SignalType::INTEGER, MmsType::MMS_UNSIGNED, {.size = 32 } },
|
||||
Descriptor { "bitstring", SignalType::INTEGER, MmsType::MMS_BIT_STRING, {.size = 32 } },
|
||||
Descriptor { "float32", SignalType::FLOAT, MmsType::MMS_FLOAT, {.size = 32 } },
|
||||
Descriptor { "float64", SignalType::FLOAT, MmsType::MMS_FLOAT, {.size = 64 } },
|
||||
};
|
||||
|
||||
static MmsValue * newMmsInteger(int64_t i, int size);
|
||||
|
@ -169,6 +173,13 @@ protected:
|
|||
enum { NONE, STOPPED, READY } state;
|
||||
std::vector<OutputContext> contexts;
|
||||
std::string interface_id;
|
||||
int resend_interval;
|
||||
|
||||
std::mutex send_mutex;
|
||||
bool changed;
|
||||
bool resend_thread_stop;
|
||||
std::optional<std::thread> resend_thread;
|
||||
std::condition_variable resend_thread_cv;
|
||||
} output;
|
||||
|
||||
void createReceiver() noexcept;
|
||||
|
@ -188,6 +199,9 @@ protected:
|
|||
void addSubscriber(InputEventContext &ctx) noexcept;
|
||||
void pushSample(uint64_t timestamp) noexcept;
|
||||
|
||||
static void publish_values(GoosePublisher publisher, std::vector<GooseSignal> &values, bool changed, int burst = 1) noexcept;
|
||||
static void resend_thread(GooseNode::Output *output) noexcept;
|
||||
|
||||
int _parse(json_t *json, json_error_t *err);
|
||||
|
||||
int parseInput(json_t *json, json_error_t *err);
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
*********************************************************************************/
|
||||
|
||||
#include <algorithm>
|
||||
#include <chrono>
|
||||
#include <villas/node_compat.hpp>
|
||||
#include <villas/nodes/iec61850_goose.hpp>
|
||||
#include <villas/utils.hpp>
|
||||
|
@ -112,11 +113,11 @@ MmsValue * GooseSignal::toMmsValue() const
|
|||
|
||||
MmsValue * GooseSignal::newMmsBitString(uint32_t i, int size)
|
||||
{
|
||||
auto mms_bit_string = MmsValue_newBitString(size);
|
||||
auto mms_bitstring = MmsValue_newBitString(size);
|
||||
|
||||
MmsValue_setBitStringFromInteger(mms_bit_string, i);
|
||||
MmsValue_setBitStringFromInteger(mms_bitstring, i);
|
||||
|
||||
return mms_bit_string;
|
||||
return mms_bitstring;
|
||||
}
|
||||
|
||||
MmsValue * GooseSignal::newMmsInteger(int64_t i, int size)
|
||||
|
@ -300,7 +301,7 @@ void GooseNode::pushSample(uint64_t timestamp) noexcept
|
|||
|
||||
if (mapping.type->mms_type != values[mapping.index]->mmsType()) {
|
||||
auto signal_str = sample->signals->getByIndex(signal)->toString();
|
||||
logger->error("unexpected mms_type for signal {}", signal_str);
|
||||
logger->error("received unexpected mms_type for signal {}", signal_str);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -441,6 +442,11 @@ void GooseNode::startPublishers() noexcept(false)
|
|||
else
|
||||
stopPublishers();
|
||||
|
||||
if (output.resend_interval != 0) {
|
||||
output.resend_thread_stop = false;
|
||||
output.resend_thread = std::thread(resend_thread, &output);
|
||||
}
|
||||
|
||||
output.state = Output::READY;
|
||||
}
|
||||
|
||||
|
@ -449,6 +455,16 @@ void GooseNode::stopPublishers() noexcept
|
|||
if (output.state == Output::NONE)
|
||||
return;
|
||||
|
||||
if (output.resend_thread) {
|
||||
auto lock = std::unique_lock { output.send_mutex };
|
||||
output.resend_thread_stop = true;
|
||||
lock.unlock();
|
||||
|
||||
output.resend_thread_cv.notify_all();
|
||||
output.resend_thread->join();
|
||||
output.resend_thread = std::nullopt;
|
||||
}
|
||||
|
||||
output.state = Output::STOPPED;
|
||||
}
|
||||
|
||||
|
@ -467,7 +483,8 @@ int GooseNode::_read(Sample *samples[], unsigned sample_count)
|
|||
return available_samples;
|
||||
}
|
||||
|
||||
static void publish_values(GoosePublisher publisher, std::vector<GooseSignal> &values, bool changed, int burst) {
|
||||
void GooseNode::publish_values(GoosePublisher publisher, std::vector<GooseSignal> &values, bool changed, int burst) noexcept
|
||||
{
|
||||
auto data_set = LinkedList_create();
|
||||
|
||||
for (auto &value : values) {
|
||||
|
@ -484,10 +501,36 @@ static void publish_values(GoosePublisher publisher, std::vector<GooseSignal> &v
|
|||
LinkedList_destroyDeep(data_set, (LinkedListValueDeleteFunction) MmsValue_delete);
|
||||
}
|
||||
|
||||
void GooseNode::resend_thread(GooseNode::Output *output) noexcept
|
||||
{
|
||||
auto interval = std::chrono::milliseconds(output->resend_interval);
|
||||
auto lock = std::unique_lock { output->send_mutex };
|
||||
std::chrono::time_point<std::chrono::steady_clock> next_sample_time;
|
||||
|
||||
// wait for the first GooseNode::_write call
|
||||
output->resend_thread_cv.wait(lock);
|
||||
|
||||
while (!output->resend_thread_stop) {
|
||||
if (output->changed) {
|
||||
output->changed = false;
|
||||
next_sample_time = std::chrono::steady_clock::now() + interval;
|
||||
}
|
||||
|
||||
auto status = output->resend_thread_cv.wait_until(lock, next_sample_time);
|
||||
|
||||
if (status == std::cv_status::no_timeout || output->changed)
|
||||
continue;
|
||||
|
||||
for (auto &ctx : output->contexts)
|
||||
publish_values(ctx.publisher, ctx.values, false);
|
||||
|
||||
next_sample_time = next_sample_time + interval;
|
||||
}
|
||||
}
|
||||
|
||||
int GooseNode::_write(Sample *samples[], unsigned sample_count)
|
||||
{
|
||||
if (output.state != Output::READY)
|
||||
return 0;
|
||||
auto lock = std::unique_lock { output.send_mutex };
|
||||
|
||||
for (unsigned int i = 0; i < sample_count; i++) {
|
||||
auto sample = samples[i];
|
||||
|
@ -511,9 +554,16 @@ int GooseNode::_write(Sample *samples[], unsigned sample_count)
|
|||
}
|
||||
}
|
||||
|
||||
publish_values(ctx.publisher, ctx.values, changed, ctx.config.burst);
|
||||
if (changed) {
|
||||
output.changed = true;
|
||||
publish_values(ctx.publisher, ctx.values, changed, ctx.config.burst);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (output.changed) {
|
||||
lock.unlock();
|
||||
output.resend_thread_cv.notify_all();
|
||||
}
|
||||
|
||||
return sample_count;
|
||||
|
@ -531,6 +581,9 @@ GooseNode::GooseNode(const std::string &name) :
|
|||
|
||||
output.state = Output::NONE;
|
||||
output.interface_id = "lo";
|
||||
output.changed = false;
|
||||
output.resend_interval = 1000;
|
||||
output.resend_thread = std::nullopt;
|
||||
}
|
||||
|
||||
GooseNode::~GooseNode()
|
||||
|
@ -588,7 +641,7 @@ int GooseNode::parseInput(json_t *json, json_error_t *err)
|
|||
|
||||
json_t *subscribers_json = nullptr;
|
||||
json_t *signals_json = nullptr;
|
||||
char const *interface_id = nullptr;
|
||||
char const *interface_id = input.interface_id.c_str();
|
||||
int with_timestamp = true;
|
||||
ret = json_unpack_ex(json, err, 0, "{ s: o, s: o, s?: s, s: b }",
|
||||
"subscribers", &subscribers_json,
|
||||
|
@ -604,9 +657,7 @@ int GooseNode::parseInput(json_t *json, json_error_t *err)
|
|||
ret = parseInputSignals(signals_json, err, input.mappings);
|
||||
if (ret) return ret;
|
||||
|
||||
input.interface_id = interface_id
|
||||
? std::string { interface_id }
|
||||
: std::string { "eth0" };
|
||||
input.interface_id = interface_id;
|
||||
|
||||
input.with_timestamp = with_timestamp;
|
||||
|
||||
|
@ -706,20 +757,21 @@ int GooseNode::parseOutput(json_t *json, json_error_t *err)
|
|||
|
||||
json_t *publishers_json = nullptr;
|
||||
json_t *signals_json = nullptr;
|
||||
char const *interface_id = nullptr;
|
||||
ret = json_unpack_ex(json, err, 0, "{ s: o, s: o, s?: s }",
|
||||
char const *interface_id = output.interface_id.c_str();
|
||||
int resend_interval = output.resend_interval;
|
||||
ret = json_unpack_ex(json, err, 0, "{ s:o, s:o, s?:s, s?:i }",
|
||||
"publishers", &publishers_json,
|
||||
"signals", &signals_json,
|
||||
"interface", &interface_id
|
||||
"interface", &interface_id,
|
||||
"resend_interval", &resend_interval
|
||||
);
|
||||
if (ret) return ret;
|
||||
|
||||
ret = parsePublishers(publishers_json, err, output.contexts);
|
||||
if (ret) return ret;
|
||||
|
||||
output.interface_id = interface_id
|
||||
? std::string { interface_id }
|
||||
: std::string { "eth0" };
|
||||
output.interface_id = interface_id;
|
||||
output.resend_interval = resend_interval;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -735,44 +787,20 @@ int GooseNode::parsePublisherData(json_t *json, json_error_t *err, std::vector<O
|
|||
char const *mms_type = nullptr;
|
||||
char const *signal_str = nullptr;
|
||||
json_t *value_json = nullptr;
|
||||
int integer_size = -1;
|
||||
int unsigned_size = -1;
|
||||
int bit_string_size = -1;
|
||||
int float_size = -1;
|
||||
ret = json_unpack_ex(signal_or_value_json, err, 0, "{ s:s, s?:s, s?:o, s?:i, s?:i, s?:i, s?:i }",
|
||||
int bitstring_size = -1;
|
||||
ret = json_unpack_ex(signal_or_value_json, err, 0, "{ s:s, s?:s, s?:o, s?:i }",
|
||||
"mms_type", &mms_type,
|
||||
"signal", &signal_str,
|
||||
"value", &value_json,
|
||||
"mms_integer_size", &integer_size,
|
||||
"mms_unsigned_size", &unsigned_size,
|
||||
"mms_bit_string_size", &bit_string_size,
|
||||
"mms_float_size", &float_size
|
||||
"mms_bitstring_size", &bitstring_size
|
||||
);
|
||||
if (ret) return ret;
|
||||
|
||||
auto goose_type = GooseSignal::lookupMmsTypeName(mms_type).value();
|
||||
std::optional<GooseSignal::Meta> meta = std::nullopt;
|
||||
|
||||
switch (goose_type->mms_type) {
|
||||
case MmsType::MMS_INTEGER:
|
||||
if (integer_size != -1)
|
||||
meta = {.size = integer_size };
|
||||
break;
|
||||
case MmsType::MMS_UNSIGNED:
|
||||
if (unsigned_size != -1)
|
||||
meta = {.size = unsigned_size };
|
||||
break;
|
||||
case MmsType::MMS_BIT_STRING:
|
||||
if (bit_string_size != -1)
|
||||
meta = {.size = bit_string_size };
|
||||
break;
|
||||
case MmsType::MMS_FLOAT:
|
||||
if (float_size != -1)
|
||||
meta = {.size = float_size };
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
if (goose_type->mms_type == MmsType::MMS_BIT_STRING && bitstring_size != -1)
|
||||
meta->size = bitstring_size;
|
||||
|
||||
auto signal_data = SignalData {};
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue