diff --git a/include/villas/nodes/iec60870.hpp b/include/villas/nodes/iec60870.hpp index dedfcf9a8..0e3832076 100644 --- a/include/villas/nodes/iec60870.hpp +++ b/include/villas/nodes/iec60870.hpp @@ -95,8 +95,8 @@ public: SignalType signalType() const; // check if ASDU contains this data std::optional checkASDU(CS101_ASDU const &asdu) const; - // add SignalData to an ASDU - void addSampleToASDU(CS101_ASDU &asdu, ASDUData::Sample sample) const; + // add SignalData to an ASDU, returns false when sample couldn't be added (insufficient space in ASDU) + bool addSampleToASDU(CS101_ASDU &asdu, ASDUData::Sample sample) const; // every value in an ASDU has an associated "information object address" (ioa) int ioa; @@ -133,9 +133,10 @@ class SlaveNode : public Node { protected: bool debug = true; + struct Server { // slave state - bool created = false; + enum { NONE, STOPPED, READY } state = NONE; // config (use explicit defaults) std::string local_address = "0.0.0.0"; @@ -180,7 +181,7 @@ protected: bool onInterrogation(IMasterConnection connection, CS101_ASDU asdu, uint8_t _of_inter) const noexcept; bool onASDU(IMasterConnection connection, CS101_ASDU asdu) const noexcept; - unsigned fillASDU(CS101_ASDU &asdu, Sample const *sample, ASDUData::Type type) const noexcept(false); + void sendPeriodicASDUsForSample(Sample const *sample) const noexcept(false); virtual int _write(struct Sample *smps[], unsigned cnt) override; diff --git a/lib/nodes/iec60870.cpp b/lib/nodes/iec60870.cpp index 0aefdeef5..12985c58d 100644 --- a/lib/nodes/iec60870.cpp +++ b/lib/nodes/iec60870.cpp @@ -33,6 +33,7 @@ using namespace villas; using namespace villas::node; using namespace villas::utils; using namespace villas::node::iec60870; +using namespace std::literals::chrono_literals; CP56Time2a timespec_to_cp56time2a(timespec time) { time_t time_ms = @@ -247,7 +248,7 @@ std::optional ASDUData::checkASDU(CS101_ASDU const &asdu) cons return std::nullopt; } -void ASDUData::addSampleToASDU(CS101_ASDU &asdu, ASDUData::Sample sample) const +bool ASDUData::addSampleToASDU(CS101_ASDU &asdu, ASDUData::Sample sample) const { std::optional timestamp = sample.timestamp.has_value() ? std::optional { timespec_to_cp56time2a(sample.timestamp.value()) } @@ -307,8 +308,9 @@ void ASDUData::addSampleToASDU(CS101_ASDU &asdu, ASDUData::Sample sample) const } break; default: assert(!"unreachable"); } - assert(CS101_ASDU_addInformationObject(asdu, io)); + bool successfully_added = CS101_ASDU_addInformationObject(asdu, io); InformationObject_destroy(io); + return successfully_added; } ASDUData::ASDUData(ASDUData::Descriptor const &descriptor, int ioa) : ioa(ioa), descriptor(descriptor) @@ -369,33 +371,35 @@ void SlaveNode::createSlave() noexcept self->debugPrintMessage(connection,message,message_size,sent); }, this); - server.created = true; + server.state = SlaveNode::Server::READY; } void SlaveNode::destroySlave() noexcept { auto &server = this->server; - if (!server.created) { + if (server.state == SlaveNode::Server::NONE) { return; } this->stopSlave(); CS104_Slave_destroy(server.slave); - server.created = false; + server.state = SlaveNode::Server::NONE; } void SlaveNode::startSlave() noexcept(false) { auto &server = this->server; - if (!server.created) { + if (server.state == SlaveNode::Server::NONE) { this->createSlave(); } else { this->stopSlave(); } + server.state = SlaveNode::Server::READY; + CS104_Slave_start(server.slave); if (!CS104_Slave_isRunning(server.slave)) { @@ -407,14 +411,18 @@ void SlaveNode::stopSlave() noexcept { auto &server = this->server; - if (!server.created || !CS104_Slave_isRunning(server.slave)) { + if (server.state != SlaveNode::Server::READY || !CS104_Slave_isRunning(server.slave)) { return; } - // wait for all messages to be send + server.state = SlaveNode::Server::STOPPED; + + if (CS104_Slave_getNumberOfQueueEntries(server.slave, NULL) != 0) + this->logger->info("waiting for last messages in queue"); + // wait for all messages to be send before really stopping while ( (CS104_Slave_getNumberOfQueueEntries(server.slave, NULL) != 0) && (CS104_Slave_getOpenConnections(server.slave) != 0)) { - sleep(1); + std::this_thread::sleep_for(100ms); } CS104_Slave_stop(server.slave); @@ -465,34 +473,36 @@ bool SlaveNode::onInterrogation(IMasterConnection connection, CS101_ASDU asdu, Q auto guard = std::lock_guard { this->output.last_values_mutex }; for(auto asdu_type : asdu_types) { - auto signal_asdu = CS101_ASDU_create( - IMasterConnection_getApplicationLayerParameters(connection), - false, - CS101_COT_INTERROGATED_BY_STATION, - 0, - this->server.common_address, - false, - false - ); + for (unsigned i = 0; i < mapping.size();) { + auto signal_asdu = CS101_ASDU_create( + IMasterConnection_getApplicationLayerParameters(connection), + false, + CS101_COT_INTERROGATED_BY_STATION, + 0, + this->server.common_address, + false, + false + ); - for (unsigned i = 0; i < mapping.size(); i++) { - auto asdu_data = mapping[i]; - auto last_value = last_values[i]; + do { + auto asdu_data = mapping[i].withoutTimestamp(); + auto last_value = last_values[i]; - if (asdu_data.type() == asdu_type) - asdu_data - .withoutTimestamp() - .addSampleToASDU(signal_asdu, ASDUData::Sample { - last_value, - IEC60870_QUALITY_GOOD, - std::nullopt - }); + if (asdu_data.type() != asdu_type) + continue; + + if(asdu_data.addSampleToASDU(signal_asdu, ASDUData::Sample { + last_value, + IEC60870_QUALITY_GOOD, + std::nullopt + }) == false) + break; + } while (++i < mapping.size()); + + IMasterConnection_sendASDU(connection, signal_asdu); + + CS101_ASDU_destroy(signal_asdu); } - - assert(CS101_ASDU_getNumberOfElements(signal_asdu) > 0); - IMasterConnection_sendASDU(connection, signal_asdu); - - CS101_ASDU_destroy(signal_asdu); } IMasterConnection_sendACT_TERM(connection, asdu); @@ -512,53 +522,15 @@ bool SlaveNode::onASDU(IMasterConnection connection, CS101_ASDU asdu) const noex return true; } -unsigned SlaveNode::fillASDU(CS101_ASDU &asdu, Sample const *sample, ASDUData::Type type) const noexcept(false) { - int asdu_elements = 0; - auto &mapping = this->output.mapping; - for (unsigned signal = 0; signal < MIN(sample->length, mapping.size()); signal++) { - if (mapping[signal].type() != type) continue; - - auto timestamp = (sample->flags & (int) SampleFlags::HAS_TS_ORIGIN) - ? std::optional{ sample->ts.origin } - : std::nullopt; - - if (mapping[signal].hasTimestamp() && !timestamp.has_value()) - throw RuntimeError("Received sample without timestamp for ASDU type with mandatory timestamp"); - - if (mapping[signal].signalType() != sample_format(sample,signal)) - throw RuntimeError("Expected signal type {}, but received {}", - signalTypeToString(mapping[signal].signalType()), - signalTypeToString(sample_format(sample,signal)) - ); - - mapping[signal].addSampleToASDU(asdu, ASDUData::Sample { - sample->data[signal], - IEC60870_QUALITY_GOOD, - timestamp - }); - - asdu_elements++; - } - - assert(CS101_ASDU_getNumberOfElements(asdu) == asdu_elements); - - return asdu_elements; -}; - -int SlaveNode::_write(Sample *samples[], unsigned sample_count) +void SlaveNode::sendPeriodicASDUsForSample(Sample const *sample) const noexcept(false) { - for (unsigned sample_index = 0; sample_index < sample_count; sample_index++) { - Sample const *sample = samples[sample_index]; + auto &mapping = this->output.mapping; - // update last_values - this->output.last_values_mutex.lock(); - for (unsigned i = 0; i < MIN(sample->length, this->output.last_values.size()); i++) { - this->output.last_values[i] = sample->data[i]; - } - this->output.last_values_mutex.unlock(); - - // create one asdu per asdu_type - for (auto& asdu_type : this->output.asdu_types) { + // ASDUs may only carry one type of asdu + for (auto& type : this->output.asdu_types) { + // search all occurences of this ASDU type + for (unsigned signal = 0; signal < MIN(sample->length, mapping.size());) { + // create an ASDU for periodic transimission CS101_ASDU asdu = CS101_ASDU_create( this->server.asdu_app_layer_parameters, 0, @@ -569,13 +541,60 @@ int SlaveNode::_write(Sample *samples[], unsigned sample_count) false ); - // if data was added to asdu, enqueue it - if (this->fillASDU(asdu, sample, asdu_type) != 0) + do { + auto &asdu_data = mapping[signal]; + + // this signal_data does not belong in this ASDU + if (asdu_data.type() != type) + continue; + + auto timestamp = (sample->flags & (int) SampleFlags::HAS_TS_ORIGIN) + ? std::optional{ sample->ts.origin } + : std::nullopt; + + if (asdu_data.hasTimestamp() && !timestamp.has_value()) + throw RuntimeError("Received sample without timestamp for ASDU type with mandatory timestamp"); + + if (asdu_data.signalType() != sample_format(sample,signal)) + throw RuntimeError("Expected signal type {}, but received {}", + signalTypeToString(asdu_data.signalType()), + signalTypeToString(sample_format(sample,signal)) + ); + + if (asdu_data.addSampleToASDU(asdu, ASDUData::Sample { + sample->data[signal], + IEC60870_QUALITY_GOOD, + timestamp + }) == false) + // ASDU is full -> dispatch -> create a new one + break; + } while (++signal < MIN(sample->length, mapping.size())); + + if (CS101_ASDU_getNumberOfElements(asdu) != 0) CS104_Slave_enqueueASDU(this->server.slave, asdu); CS101_ASDU_destroy(asdu); } } +} + +int SlaveNode::_write(Sample *samples[], unsigned sample_count) +{ + if (this->server.state != SlaveNode::Server::READY) + return -1; + + for (unsigned sample_index = 0; sample_index < sample_count; sample_index++) { + Sample const *sample = samples[sample_index]; + + // update last_values + this->output.last_values_mutex.lock(); + for (unsigned i = 0; i < MIN(sample->length, this->output.last_values.size()); i++) { + this->output.last_values[i] = sample->data[i]; + } + this->output.last_values_mutex.unlock(); + + this->sendPeriodicASDUsForSample(sample); + } return sample_count; }