/* Node type: C37-118. * * Author: Philipp Jungkamp * SPDX-FileCopyrightText: 2014-2024 Institute for Automation of Complex Power Systems, RWTH Aachen University * SPDX-License-Identifier: Apache-2.0 */ #include #include #include #include #include #include #include #include #include #include using namespace std::literals::string_view_literals; using namespace villas::node; using namespace villas::node::c37_118; using namespace villas::node::c37_118::types; void C37_118::Client::send(Frame::Variant message, timespec ts) { std::vector send_buf = parser.serialize({ .idcode = idcode, .soc = static_cast(ts.tv_sec & 0xFFFFFF), .fracsec = static_cast(ts.tv_nsec * static_cast(config->time_base)) / 1'000'000'000, .message = message, }, config ? &*config : nullptr); size_t sum = 0; int ret = 0; if (socktype == SOCK_DGRAM) { do { ret = ::send(connection_fd, send_buf.data() + sum, send_buf.size() - sum, 0); sum += ret; } while (sum < send_buf.size() && ret > 0); } else { ret = ::write(connection_fd, send_buf.data(), send_buf.size()); } if (ret == 0) throw RuntimeError{"end of stream"}; if (ret < 0) throw SystemError{"send error {}", ::strerror(errno)}; sum += ret; } std::optional C37_118::Client::recv() { int ret; if (socktype == SOCK_DGRAM) ret = ::recv(connection_fd, recv_buf.data(), recv_buf.size(), 0); else ret = ::read(connection_fd, recv_buf.data(), recv_buf.size()); if (ret == 0) throw RuntimeError{"end of stream"}; if (ret == -EWOULDBLOCK) return std::nullopt; if (ret < 0) throw SystemError{"socket error {}", ::strerror(errno)}; auto frame = parser.deserialize(recv_buf.data(), ret, config ? &*config : nullptr); if (!frame.has_value()) return std::nullopt; std::copy(recv_buf.begin() + frame->framesize, recv_buf.end(), recv_buf.begin()); return *frame; } int C37_118::_read(struct Sample *smps[], unsigned cnt) { auto smp = smps[0]; if (auto *client = std::get_if(&role)) { auto frame = client->recv(); if (!frame.has_value()) return 0; auto data = std::get_if(&frame->message); if (data == nullptr) return 0; std::size_t s = 0; for (auto &pmu : data->pmus) { for (auto p = pmu.phasor.begin(); p != pmu.phasor.end() && s < smp->capacity; ++p) smp->data[s++].z = *p; if (s < smp->capacity) smp->data[s++].f = pmu.freq; if (s < smp->capacity) smp->data[s++].f = pmu.dfreq; for (auto a = pmu.analog.begin(); a != pmu.analog.end() && s < smp->capacity; ++a) smp->data[s++].f = *a; for (auto d = pmu.digital.begin(); d != pmu.digital.end() && s < smp->capacity; ++d) for (auto i = 0; i < 16; i++) smp->data[s++].b = (*d >> i) & 1; } smp->length = s; smp->ts.origin = { .tv_sec = frame->soc, .tv_nsec = static_cast(frame->fracsec & 0xFFFFFF) * 1'000'000'000 / client->config->time_base, }; smp->flags = (int) SampleFlags::HAS_DATA | (int) SampleFlags::HAS_TS_ORIGIN; return 1; } return 0; } C37_118::~C37_118() { } void C37_118::parseServer(json_t *json) { json_error_t err; char *addr = nullptr; char *transport = nullptr; int port = 0; int idcode = 0; json_t *config_json = nullptr; if (json_unpack_ex(json, &err, 0, "{ s:s, s:s, s:i, s:i, s:o }", "transport", &transport, "address", &addr, "port", &port, "idcode", &idcode, "config", &config_json)) throw ConfigError(json, err, "node-config-node-c37.118-server"); int socktype; if (transport == "tcp"sv) socktype = SOCK_STREAM; else if (transport == "udp"sv) socktype = SOCK_DGRAM; else throw RuntimeError{"invalid transport {}", transport}; throw RuntimeError{"unimplemented"}; this->role = Server { .addr = std::string(addr), .port = static_cast(port), .socktype = socktype, }; } void C37_118::parseClient(json_t *json) { json_error_t err; char *addr = nullptr; char *transport = nullptr; int port = 0; int idcode = 0; if (json_unpack_ex(json, &err, 0, "{ s:s, s:s, s:i, s:i }", "transport", &transport, "address", &addr, "port", &port, "idcode", &idcode)) throw ConfigError(json, err, "node-config-node-c37.118-client"); int socktype; if (transport == "tcp"sv) socktype = SOCK_STREAM; else if (transport == "udp"sv) socktype = SOCK_DGRAM; else throw RuntimeError{"invalid transport {}", transport}; this->role = Client { .addr = std::string(addr), .port = static_cast(port), .idcode = static_cast(idcode), .socktype = socktype, }; } int C37_118::parse(json_t *json) { json_error_t err; json_t *server = nullptr; json_t *client = nullptr; if (json_unpack_ex(json, &err, 0, "{ s?:o, s?:o }", "server", &server, "client", &client)) throw ConfigError(json, err, "node-config-node-c37.118"); if (server && client) throw RuntimeError{"c37_118: can't be both server and client"}; else if (server) parseServer(server); else if (client) parseClient(client); return Node::parse(json); } void C37_118::prepareServer(Server &server) { addrinfo hints; std::memset(&hints, 0, sizeof(hints)); hints.ai_family = AF_UNSPEC; hints.ai_socktype = server.socktype; hints.ai_flags = AI_PASSIVE; addrinfo *addrs_ret; std::string service = std::to_string(server.port); if (auto err = ::getaddrinfo(server.addr.c_str(), service.c_str(), &hints, &addrs_ret)) // TODO: make configurable throw SystemError{"c37_118: getaddrinfo {}", ::gai_strerror(err)}; auto addrs = std::unique_ptr{ addrs_ret, &freeaddrinfo}; int sock; addrinfo *addr; for (addr = addrs.get(); addr != nullptr; addr = addr->ai_next) { sock = ::socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); if (sock < 0) continue; if (::bind(sock, addr->ai_addr, addr->ai_addrlen) == 0) { server.listener_fd = sock; break; } ::close(sock); } if (addr == nullptr) throw RuntimeError{"could not bind port {} on {}", server.port, server.addr}; throw RuntimeError{"unimplemented"}; } void C37_118::prepareClient(Client &client) { addrinfo hints; std::memset(&hints, 0, sizeof(hints)); hints.ai_family = AF_UNSPEC; hints.ai_socktype = client.socktype; addrinfo *addrs_ret; std::string service = std::to_string(client.port); if (auto err = ::getaddrinfo(client.addr.c_str(), service.c_str(), &hints, &addrs_ret)) throw SystemError{"c37_118: getaddrinfo {}", ::gai_strerror(err)}; auto addrs = std::unique_ptr{ addrs_ret, &freeaddrinfo}; addrinfo *addr; for (addr = addrs.get(); addr != nullptr; addr = addr->ai_next) { int sock = ::socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); if (sock < 0) continue; if (::connect(sock, addr->ai_addr, addr->ai_addrlen) == 0) { client.connection_fd = sock; break; } ::close(sock); } if (addr == nullptr) throw RuntimeError{"could not connect to port {} on {}", client.port, client.addr}; client.send(Command{Command::GET_CONFIG2}); for (;;) { auto frame = client.recv(); if (!frame.has_value()) continue; auto config2 = std::get_if(&frame->message); if (config2 == nullptr) continue; client.config = *config2; auto &sigs = in.signals; sigs->clear(); for (auto &pmu : client.config->pmus) { for (auto &p : pmu.phinfo) sigs->emplace_back(std::make_shared(p.chnam, p.unit_str(), SignalType::COMPLEX)); sigs->emplace_back(std::make_shared("FREQ", "Hz", SignalType::FLOAT)); sigs->emplace_back(std::make_shared("DFREQ", "Hz/s", SignalType::FLOAT)); for (auto &a : pmu.aninfo) sigs->emplace_back(std::make_shared(a.chnam, a.unit_str(), SignalType::FLOAT)); for (auto &d : pmu.dginfo) for (size_t i = 0; i < 16; ++i) sigs->emplace_back(std::make_shared(d.chnam[i], "", SignalType::BOOLEAN)); } break; } } int C37_118::prepare() { std::visit(villas::utils::overloaded { [&](Server &server){ prepareServer(server); }, [&](Client &client){ prepareClient(client); }, }, role); return Node::prepare(); } int C37_118::start() { int ret = Node::start(); if (ret) return ret; std::visit(villas::utils::overloaded { [&](Server &server){ }, [&](Client &client){ client.send(Command{Command::DATA_START}); }, }, role); return 0; } int C37_118::stop() { int ret = Node::stop(); if (ret) return ret; std::visit(villas::utils::overloaded { [&](Server &server){ }, [&](Client &client){ client.send(Command{Command::DATA_STOP}); }, }, role); return 0; } std::vector C37_118::getPollFDs() { return std::visit(villas::utils::overloaded { [&](Server &server){ return std::vector { }; }, [&](Client &client){ return std::vector { client.connection_fd }; }, }, role); } // Register node static char n[] = "c37.118"; static char d[] = "A node for a C37.118 TCP/UDP server/client"; static NodePlugin p;