1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

Allow sending multiple ASDUs for a sample

ASDUs have a limited capacity, this creates multiple ASDUs if a single
one is not able to hold all sample values.
This commit is contained in:
Philipp Jungkamp 2022-06-29 23:46:14 +00:00
parent d97beaf2d1
commit 7e10188c2d
2 changed files with 105 additions and 85 deletions

View file

@ -95,8 +95,8 @@ public:
SignalType signalType() const;
// check if ASDU contains this data
std::optional<ASDUData::Sample> checkASDU(CS101_ASDU const &asdu) const;
// add SignalData to an ASDU
void addSampleToASDU(CS101_ASDU &asdu, ASDUData::Sample sample) const;
// add SignalData to an ASDU, returns false when sample couldn't be added (insufficient space in ASDU)
bool addSampleToASDU(CS101_ASDU &asdu, ASDUData::Sample sample) const;
// every value in an ASDU has an associated "information object address" (ioa)
int ioa;
@ -133,9 +133,10 @@ class SlaveNode : public Node {
protected:
bool debug = true;
struct Server {
// slave state
bool created = false;
enum { NONE, STOPPED, READY } state = NONE;
// config (use explicit defaults)
std::string local_address = "0.0.0.0";
@ -180,7 +181,7 @@ protected:
bool onInterrogation(IMasterConnection connection, CS101_ASDU asdu, uint8_t _of_inter) const noexcept;
bool onASDU(IMasterConnection connection, CS101_ASDU asdu) const noexcept;
unsigned fillASDU(CS101_ASDU &asdu, Sample const *sample, ASDUData::Type type) const noexcept(false);
void sendPeriodicASDUsForSample(Sample const *sample) const noexcept(false);
virtual
int _write(struct Sample *smps[], unsigned cnt) override;

View file

@ -33,6 +33,7 @@ using namespace villas;
using namespace villas::node;
using namespace villas::utils;
using namespace villas::node::iec60870;
using namespace std::literals::chrono_literals;
CP56Time2a timespec_to_cp56time2a(timespec time) {
time_t time_ms =
@ -247,7 +248,7 @@ std::optional<ASDUData::Sample> ASDUData::checkASDU(CS101_ASDU const &asdu) cons
return std::nullopt;
}
void ASDUData::addSampleToASDU(CS101_ASDU &asdu, ASDUData::Sample sample) const
bool ASDUData::addSampleToASDU(CS101_ASDU &asdu, ASDUData::Sample sample) const
{
std::optional<CP56Time2a> timestamp = sample.timestamp.has_value()
? std::optional { timespec_to_cp56time2a(sample.timestamp.value()) }
@ -307,8 +308,9 @@ void ASDUData::addSampleToASDU(CS101_ASDU &asdu, ASDUData::Sample sample) const
} break;
default: assert(!"unreachable");
}
assert(CS101_ASDU_addInformationObject(asdu, io));
bool successfully_added = CS101_ASDU_addInformationObject(asdu, io);
InformationObject_destroy(io);
return successfully_added;
}
ASDUData::ASDUData(ASDUData::Descriptor const &descriptor, int ioa) : ioa(ioa), descriptor(descriptor)
@ -369,33 +371,35 @@ void SlaveNode::createSlave() noexcept
self->debugPrintMessage(connection,message,message_size,sent);
}, this);
server.created = true;
server.state = SlaveNode::Server::READY;
}
void SlaveNode::destroySlave() noexcept
{
auto &server = this->server;
if (!server.created) {
if (server.state == SlaveNode::Server::NONE) {
return;
}
this->stopSlave();
CS104_Slave_destroy(server.slave);
server.created = false;
server.state = SlaveNode::Server::NONE;
}
void SlaveNode::startSlave() noexcept(false)
{
auto &server = this->server;
if (!server.created) {
if (server.state == SlaveNode::Server::NONE) {
this->createSlave();
} else {
this->stopSlave();
}
server.state = SlaveNode::Server::READY;
CS104_Slave_start(server.slave);
if (!CS104_Slave_isRunning(server.slave)) {
@ -407,14 +411,18 @@ void SlaveNode::stopSlave() noexcept
{
auto &server = this->server;
if (!server.created || !CS104_Slave_isRunning(server.slave)) {
if (server.state != SlaveNode::Server::READY || !CS104_Slave_isRunning(server.slave)) {
return;
}
// wait for all messages to be send
server.state = SlaveNode::Server::STOPPED;
if (CS104_Slave_getNumberOfQueueEntries(server.slave, NULL) != 0)
this->logger->info("waiting for last messages in queue");
// wait for all messages to be send before really stopping
while ( (CS104_Slave_getNumberOfQueueEntries(server.slave, NULL) != 0) &&
(CS104_Slave_getOpenConnections(server.slave) != 0)) {
sleep(1);
std::this_thread::sleep_for(100ms);
}
CS104_Slave_stop(server.slave);
@ -465,34 +473,36 @@ bool SlaveNode::onInterrogation(IMasterConnection connection, CS101_ASDU asdu, Q
auto guard = std::lock_guard { this->output.last_values_mutex };
for(auto asdu_type : asdu_types) {
auto signal_asdu = CS101_ASDU_create(
IMasterConnection_getApplicationLayerParameters(connection),
false,
CS101_COT_INTERROGATED_BY_STATION,
0,
this->server.common_address,
false,
false
);
for (unsigned i = 0; i < mapping.size();) {
auto signal_asdu = CS101_ASDU_create(
IMasterConnection_getApplicationLayerParameters(connection),
false,
CS101_COT_INTERROGATED_BY_STATION,
0,
this->server.common_address,
false,
false
);
for (unsigned i = 0; i < mapping.size(); i++) {
auto asdu_data = mapping[i];
auto last_value = last_values[i];
do {
auto asdu_data = mapping[i].withoutTimestamp();
auto last_value = last_values[i];
if (asdu_data.type() == asdu_type)
asdu_data
.withoutTimestamp()
.addSampleToASDU(signal_asdu, ASDUData::Sample {
last_value,
IEC60870_QUALITY_GOOD,
std::nullopt
});
if (asdu_data.type() != asdu_type)
continue;
if(asdu_data.addSampleToASDU(signal_asdu, ASDUData::Sample {
last_value,
IEC60870_QUALITY_GOOD,
std::nullopt
}) == false)
break;
} while (++i < mapping.size());
IMasterConnection_sendASDU(connection, signal_asdu);
CS101_ASDU_destroy(signal_asdu);
}
assert(CS101_ASDU_getNumberOfElements(signal_asdu) > 0);
IMasterConnection_sendASDU(connection, signal_asdu);
CS101_ASDU_destroy(signal_asdu);
}
IMasterConnection_sendACT_TERM(connection, asdu);
@ -512,53 +522,15 @@ bool SlaveNode::onASDU(IMasterConnection connection, CS101_ASDU asdu) const noex
return true;
}
unsigned SlaveNode::fillASDU(CS101_ASDU &asdu, Sample const *sample, ASDUData::Type type) const noexcept(false) {
int asdu_elements = 0;
auto &mapping = this->output.mapping;
for (unsigned signal = 0; signal < MIN(sample->length, mapping.size()); signal++) {
if (mapping[signal].type() != type) continue;
auto timestamp = (sample->flags & (int) SampleFlags::HAS_TS_ORIGIN)
? std::optional{ sample->ts.origin }
: std::nullopt;
if (mapping[signal].hasTimestamp() && !timestamp.has_value())
throw RuntimeError("Received sample without timestamp for ASDU type with mandatory timestamp");
if (mapping[signal].signalType() != sample_format(sample,signal))
throw RuntimeError("Expected signal type {}, but received {}",
signalTypeToString(mapping[signal].signalType()),
signalTypeToString(sample_format(sample,signal))
);
mapping[signal].addSampleToASDU(asdu, ASDUData::Sample {
sample->data[signal],
IEC60870_QUALITY_GOOD,
timestamp
});
asdu_elements++;
}
assert(CS101_ASDU_getNumberOfElements(asdu) == asdu_elements);
return asdu_elements;
};
int SlaveNode::_write(Sample *samples[], unsigned sample_count)
void SlaveNode::sendPeriodicASDUsForSample(Sample const *sample) const noexcept(false)
{
for (unsigned sample_index = 0; sample_index < sample_count; sample_index++) {
Sample const *sample = samples[sample_index];
auto &mapping = this->output.mapping;
// update last_values
this->output.last_values_mutex.lock();
for (unsigned i = 0; i < MIN(sample->length, this->output.last_values.size()); i++) {
this->output.last_values[i] = sample->data[i];
}
this->output.last_values_mutex.unlock();
// create one asdu per asdu_type
for (auto& asdu_type : this->output.asdu_types) {
// ASDUs may only carry one type of asdu
for (auto& type : this->output.asdu_types) {
// search all occurences of this ASDU type
for (unsigned signal = 0; signal < MIN(sample->length, mapping.size());) {
// create an ASDU for periodic transimission
CS101_ASDU asdu = CS101_ASDU_create(
this->server.asdu_app_layer_parameters,
0,
@ -569,13 +541,60 @@ int SlaveNode::_write(Sample *samples[], unsigned sample_count)
false
);
// if data was added to asdu, enqueue it
if (this->fillASDU(asdu, sample, asdu_type) != 0)
do {
auto &asdu_data = mapping[signal];
// this signal_data does not belong in this ASDU
if (asdu_data.type() != type)
continue;
auto timestamp = (sample->flags & (int) SampleFlags::HAS_TS_ORIGIN)
? std::optional{ sample->ts.origin }
: std::nullopt;
if (asdu_data.hasTimestamp() && !timestamp.has_value())
throw RuntimeError("Received sample without timestamp for ASDU type with mandatory timestamp");
if (asdu_data.signalType() != sample_format(sample,signal))
throw RuntimeError("Expected signal type {}, but received {}",
signalTypeToString(asdu_data.signalType()),
signalTypeToString(sample_format(sample,signal))
);
if (asdu_data.addSampleToASDU(asdu, ASDUData::Sample {
sample->data[signal],
IEC60870_QUALITY_GOOD,
timestamp
}) == false)
// ASDU is full -> dispatch -> create a new one
break;
} while (++signal < MIN(sample->length, mapping.size()));
if (CS101_ASDU_getNumberOfElements(asdu) != 0)
CS104_Slave_enqueueASDU(this->server.slave, asdu);
CS101_ASDU_destroy(asdu);
}
}
}
int SlaveNode::_write(Sample *samples[], unsigned sample_count)
{
if (this->server.state != SlaveNode::Server::READY)
return -1;
for (unsigned sample_index = 0; sample_index < sample_count; sample_index++) {
Sample const *sample = samples[sample_index];
// update last_values
this->output.last_values_mutex.lock();
for (unsigned i = 0; i < MIN(sample->length, this->output.last_values.size()); i++) {
this->output.last_values[i] = sample->data[i];
}
this->output.last_values_mutex.unlock();
this->sendPeriodicASDUsForSample(sample);
}
return sample_count;
}