diff --git a/include/villas/nodes/webrtc.hpp b/include/villas/nodes/webrtc.hpp index 3238738ff..996d0f37f 100644 --- a/include/villas/nodes/webrtc.hpp +++ b/include/villas/nodes/webrtc.hpp @@ -32,7 +32,7 @@ protected: std::string peer; int wait_seconds; - Format *format; + Format::Ptr formatter; struct CQueueSignalled queue; struct Pool pool; @@ -40,6 +40,8 @@ protected: rtc::Configuration rtcConf; rtc::DataChannelInit dci; + void onMessage(rtc::binary msg); + virtual int _read(struct Sample *smps[], unsigned cnt); virtual int _write(struct Sample *smps[], unsigned cnt); diff --git a/include/villas/nodes/webrtc/peer_connection.hpp b/include/villas/nodes/webrtc/peer_connection.hpp index 06f7e0936..546149ad5 100644 --- a/include/villas/nodes/webrtc/peer_connection.hpp +++ b/include/villas/nodes/webrtc/peer_connection.hpp @@ -14,8 +14,8 @@ #include #include #include -#include #include +#include #include #include #include @@ -54,7 +54,8 @@ class PeerConnection { public: PeerConnection(const std::string &server, const std::string &session, - const std::string &peer, std::shared_ptr signals, + const std::string &peer, + std::shared_ptr out_signals, rtc::Configuration config, Web *w, rtc::DataChannelInit d); ~PeerConnection(); @@ -76,7 +77,7 @@ protected: std::shared_ptr conn; std::shared_ptr chan; std::shared_ptr client; - std::shared_ptr signals; + std::shared_ptr out_signals; Logger logger; diff --git a/lib/nodes/webrtc.cpp b/lib/nodes/webrtc.cpp index 1d24158a1..6e7cd6b14 100644 --- a/lib/nodes/webrtc.cpp +++ b/lib/nodes/webrtc.cpp @@ -7,16 +7,17 @@ * SPDX-License-Identifier: Apache-2.0 */ +#include #include #include +#include #include #include #include #include #include #include -#include using namespace villas; using namespace villas::node; @@ -27,7 +28,7 @@ static villas::node::Web *web; WebRTCNode::WebRTCNode(const uuid_t &id, const std::string &name) : Node(id, name), server("https://villas.k8s.eonerc.rwth-aachen.de/ws/signaling"), - peer(uuid::toString(id)), wait_seconds(0), format(nullptr), queue({}), + peer(uuid::toString(id)), wait_seconds(0), formatter(), queue({}), pool({}), dci({}) { #if RTC_VERSION < 0x001400 @@ -40,8 +41,8 @@ WebRTCNode::WebRTCNode(const uuid_t &id, const std::string &name) WebRTCNode::~WebRTCNode() { int ret = pool_destroy(&pool); - if (ret) // TODO log - ; + if (ret) + logger->error("Failed to destroy pool"); } int WebRTCNode::parse(json_t *json) { @@ -107,10 +108,12 @@ int WebRTCNode::parse(json_t *json) { } } - format = json_format ? FormatFactory::make(json_format) - : FormatFactory::make("villas.binary"); - - assert(format); + auto *fmt = json_format ? FormatFactory::make(json_format) + : FormatFactory::make("villas.binary"); + formatter = Format::Ptr(fmt); + if (!formatter) + throw ConfigError(json_format, "node-config-node-webrtc-format", + "Invalid format configuration"); return 0; } @@ -120,44 +123,28 @@ int WebRTCNode::prepare() { if (ret) return ret; - format->start(getInputSignals(false), ~(int)SampleFlags::HAS_OFFSET); + formatter->start(getInputSignals(false), ~(int)SampleFlags::HAS_OFFSET); // TODO: Determine output signals reliably - auto signals = std::make_shared(); + auto output_signals = std::make_shared(); - conn = std::make_shared(server, session, peer, - signals, rtcConf, web, dci); + conn = std::make_shared( + server, session, peer, output_signals, rtcConf, web, dci); ret = pool_init(&pool, 1024, SAMPLE_LENGTH(getInputSignals(false)->size())); - if (ret) // TODO log + if (ret) { + logger->error("Failed to create pool"); return ret; + } ret = queue_signalled_init(&queue, 1024); - if (ret) // TODO log + if (ret) { + logger->error("Failed to create queue"); return ret; + } - // TODO: Move this to a member function - conn->onMessage([this](rtc::binary msg) { - int ret; - std::vector smps; - smps.resize(this->in.vectorize); - - ret = sample_alloc_many(&this->pool, smps.data(), smps.size()); - if (ret < 0) // TODO log - return; - - ret = format->sscan((const char *)msg.data(), msg.size(), nullptr, - smps.data(), ret); - if (ret < 0) // TODO log - return; - - ret = queue_signalled_push_many(&this->queue, (void **)smps.data(), ret); - if (ret < 0) // TODO log - return; - - this->logger->trace( - "onMessage(rtc::binary) callback finished pushing {} samples", ret); - }); + conn->onMessage( + std::bind(&WebRTCNode::onMessage, this, std::placeholders::_1)); return 0; } @@ -214,9 +201,12 @@ int WebRTCNode::_write(struct Sample *smps[], unsigned cnt) { size_t wbytes; buf.resize(4 * 1024); - int ret = format->sprint((char *)buf.data(), buf.size(), &wbytes, smps, cnt); - if (ret < 0) // TODO log + int ret = + formatter->sprint((char *)buf.data(), buf.size(), &wbytes, smps, cnt); + if (ret < 0) { + logger->error("Failed to format payload"); return ret; + } buf.resize(wbytes); conn->sendMessage(buf); @@ -231,6 +221,34 @@ json_t *WebRTCNode::_readStatus() const { return conn->readStatus(); } +void WebRTCNode::onMessage(rtc::binary msg) { + int ret; + std::vector smps; + smps.resize(in.vectorize); + + ret = sample_alloc_many(&pool, smps.data(), smps.size()); + if (ret < 0) { + logger->error("Failed to allocate samples"); + return; + } + + ret = formatter->sscan((const char *)msg.data(), msg.size(), nullptr, + smps.data(), ret); + if (ret < 0) { + logger->error("Failed to parse payload"); + return; + } + + ret = queue_signalled_push_many(&queue, (void **)smps.data(), ret); + if (ret < 0) { + logger->error("Failed to enqueue samples"); + return; + } + + logger->trace("onMessage(rtc::binary) callback finished pushing {} samples", + ret); +} + int WebRTCNodeFactory::start(SuperNode *sn) { web = sn->getWeb(); if (!web->isEnabled()) diff --git a/lib/nodes/webrtc/peer_connection.cpp b/lib/nodes/webrtc/peer_connection.cpp index 9e839cdf9..026e9bbd1 100644 --- a/lib/nodes/webrtc/peer_connection.cpp +++ b/lib/nodes/webrtc/peer_connection.cpp @@ -8,10 +8,12 @@ */ #include +#include #include #include #include +#include #include #include #include @@ -25,20 +27,23 @@ using namespace villas::node::webrtc; PeerConnection::PeerConnection(const std::string &server, const std::string &session, const std::string &peer, - std::shared_ptr signals, + std::shared_ptr out_signals, rtc::Configuration cfg, Web *w, rtc::DataChannelInit d) : web(w), extraServers({}), dataChannelInit(d), defaultConfig(cfg), - conn(nullptr), chan(nullptr), signals(signals), + conn(nullptr), chan(nullptr), out_signals(out_signals), logger(logging.get("webrtc:pc")), stopStartup(false), warnNotConnected(false), standby(true), first(false), firstID(INT_MAX), secondID(INT_MAX), onMessageCallback(nullptr) { client = std::make_shared(server, session, peer, web); - client->onConnected([this]() { this->onSignalingConnected(); }); - client->onDisconnected([this]() { this->onSignalingDisconnected(); }); - client->onError([this](auto err) { this->onSignalingError(std::move(err)); }); - client->onMessage( - [this](auto msg) { this->onSignalingMessage(std::move(msg)); }); + + client->onConnected(std::bind(&PeerConnection::onSignalingConnected, this)); + client->onDisconnected( + std::bind(&PeerConnection::onSignalingDisconnected, this)); + client->onError(std::bind(&PeerConnection::onSignalingError, this, + std::placeholders::_1)); + client->onMessage(std::bind(&PeerConnection::onSignalingMessage, this, + std::placeholders::_1)); auto lock = std::unique_lock{mutex}; resetConnectionAndStandby(lock); @@ -135,18 +140,18 @@ void PeerConnection::setupPeerConnection( std::begin(extraServers), std::end(extraServers)); conn = pc ? std::move(pc) : std::make_shared(config); - conn->onLocalDescription( - [this](auto desc) { this->onLocalDescription(std::move(desc)); }); - conn->onLocalCandidate( - [this](auto cand) { this->onLocalCandidate(std::move(cand)); }); + conn->onLocalDescription(std::bind(&PeerConnection::onLocalDescription, this, + std::placeholders::_1)); + conn->onLocalCandidate(std::bind(&PeerConnection::onLocalCandidate, this, + std::placeholders::_1)); conn->onDataChannel( - [this](auto channel) { this->onDataChannel(std::move(channel)); }); - conn->onGatheringStateChange( - [this](auto state) { this->onGatheringStateChange(std::move(state)); }); - conn->onSignalingStateChange( - [this](auto state) { this->onSignalingStateChange(std::move(state)); }); - conn->onStateChange( - [this](auto state) { this->onConnectionStateChange(std::move(state)); }); + std::bind(&PeerConnection::onDataChannel, this, std::placeholders::_1)); + conn->onGatheringStateChange(std::bind( + &PeerConnection::onGatheringStateChange, this, std::placeholders::_1)); + conn->onSignalingStateChange(std::bind( + &PeerConnection::onSignalingStateChange, this, std::placeholders::_1)); + conn->onStateChange(std::bind(&PeerConnection::onConnectionStateChange, this, + std::placeholders::_1)); } void PeerConnection::setupDataChannel(std::shared_ptr dc) { @@ -155,12 +160,14 @@ void PeerConnection::setupDataChannel(std::shared_ptr dc) { assert(conn); chan = dc ? std::move(dc) : conn->createDataChannel("villas", dataChannelInit); + chan->onMessage( [this](rtc::binary msg) { this->onDataChannelMessage(std::move(msg)); }, [this](rtc::string msg) { this->onDataChannelMessage(std::move(msg)); }); - chan->onOpen([this]() { this->onDataChannelOpen(); }); - chan->onClosed([this]() { this->onDataChannelClosed(); }); - chan->onError([this](auto err) { this->onDataChannelError(std::move(err)); }); + chan->onOpen(std::bind(&PeerConnection::onDataChannelOpen, this)); + chan->onClosed(std::bind(&PeerConnection::onDataChannelClosed, this)); + chan->onError(std::bind(&PeerConnection::onDataChannelError, this, + std::placeholders::_1)); // If this node has it's data channel set up, don't accept any new ones conn->onDataChannel(nullptr); @@ -251,7 +258,7 @@ void PeerConnection::onSignalingConnected() { auto lock = std::unique_lock{mutex}; - client->sendMessage({*signals}); + client->sendMessage({*out_signals}); } void PeerConnection::onSignalingDisconnected() { @@ -343,7 +350,9 @@ void PeerConnection::onSignalingMessage(SignalingMessage msg) { conn->addRemoteCandidate(c); }, - [&](auto other) { logger->warn("unknown signaling message"); }}, + [&](auto other) { + logger->warn("Signaling message has been skipped"); + }}, msg.message); }