1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

webrtc: Fix several TODOs and other smaller tweaks

Signed-off-by: Steffen Vogel <post@steffenvogel.de>
This commit is contained in:
Steffen Vogel 2024-04-09 23:44:26 +02:00 committed by pipeacosta
parent 6acf240b20
commit 6c5e29de68
4 changed files with 95 additions and 65 deletions

View file

@ -32,7 +32,7 @@ protected:
std::string peer;
int wait_seconds;
Format *format;
Format::Ptr formatter;
struct CQueueSignalled queue;
struct Pool pool;
@ -40,6 +40,8 @@ protected:
rtc::Configuration rtcConf;
rtc::DataChannelInit dci;
void onMessage(rtc::binary msg);
virtual int _read(struct Sample *smps[], unsigned cnt);
virtual int _write(struct Sample *smps[], unsigned cnt);

View file

@ -14,8 +14,8 @@
#include <rtc/peerconnection.hpp>
#include <rtc/rtc.hpp>
#include <villas/config.hpp>
#include <villas/node/config.hpp>
#include <villas/log.hpp>
#include <villas/node/config.hpp>
#include <villas/nodes/webrtc/signaling_client.hpp>
#include <villas/signal_list.hpp>
#include <villas/web.hpp>
@ -54,7 +54,8 @@ class PeerConnection {
public:
PeerConnection(const std::string &server, const std::string &session,
const std::string &peer, std::shared_ptr<SignalList> signals,
const std::string &peer,
std::shared_ptr<SignalList> out_signals,
rtc::Configuration config, Web *w, rtc::DataChannelInit d);
~PeerConnection();
@ -76,7 +77,7 @@ protected:
std::shared_ptr<rtc::PeerConnection> conn;
std::shared_ptr<rtc::DataChannel> chan;
std::shared_ptr<SignalingClient> client;
std::shared_ptr<SignalList> signals;
std::shared_ptr<SignalList> out_signals;
Logger logger;

View file

@ -7,16 +7,17 @@
* SPDX-License-Identifier: Apache-2.0
*/
#include <functional>
#include <vector>
#include <villas/exceptions.hpp>
#include <villas/node/config.hpp>
#include <villas/node_compat.hpp>
#include <villas/nodes/webrtc.hpp>
#include <villas/sample.hpp>
#include <villas/super_node.hpp>
#include <villas/utils.hpp>
#include <villas/uuid.hpp>
#include <villas/node/config.hpp>
using namespace villas;
using namespace villas::node;
@ -27,7 +28,7 @@ static villas::node::Web *web;
WebRTCNode::WebRTCNode(const uuid_t &id, const std::string &name)
: Node(id, name),
server("https://villas.k8s.eonerc.rwth-aachen.de/ws/signaling"),
peer(uuid::toString(id)), wait_seconds(0), format(nullptr), queue({}),
peer(uuid::toString(id)), wait_seconds(0), formatter(), queue({}),
pool({}), dci({}) {
#if RTC_VERSION < 0x001400
@ -40,8 +41,8 @@ WebRTCNode::WebRTCNode(const uuid_t &id, const std::string &name)
WebRTCNode::~WebRTCNode() {
int ret = pool_destroy(&pool);
if (ret) // TODO log
;
if (ret)
logger->error("Failed to destroy pool");
}
int WebRTCNode::parse(json_t *json) {
@ -107,10 +108,12 @@ int WebRTCNode::parse(json_t *json) {
}
}
format = json_format ? FormatFactory::make(json_format)
: FormatFactory::make("villas.binary");
assert(format);
auto *fmt = json_format ? FormatFactory::make(json_format)
: FormatFactory::make("villas.binary");
formatter = Format::Ptr(fmt);
if (!formatter)
throw ConfigError(json_format, "node-config-node-webrtc-format",
"Invalid format configuration");
return 0;
}
@ -120,44 +123,28 @@ int WebRTCNode::prepare() {
if (ret)
return ret;
format->start(getInputSignals(false), ~(int)SampleFlags::HAS_OFFSET);
formatter->start(getInputSignals(false), ~(int)SampleFlags::HAS_OFFSET);
// TODO: Determine output signals reliably
auto signals = std::make_shared<SignalList>();
auto output_signals = std::make_shared<SignalList>();
conn = std::make_shared<webrtc::PeerConnection>(server, session, peer,
signals, rtcConf, web, dci);
conn = std::make_shared<webrtc::PeerConnection>(
server, session, peer, output_signals, rtcConf, web, dci);
ret = pool_init(&pool, 1024, SAMPLE_LENGTH(getInputSignals(false)->size()));
if (ret) // TODO log
if (ret) {
logger->error("Failed to create pool");
return ret;
}
ret = queue_signalled_init(&queue, 1024);
if (ret) // TODO log
if (ret) {
logger->error("Failed to create queue");
return ret;
}
// TODO: Move this to a member function
conn->onMessage([this](rtc::binary msg) {
int ret;
std::vector<Sample *> smps;
smps.resize(this->in.vectorize);
ret = sample_alloc_many(&this->pool, smps.data(), smps.size());
if (ret < 0) // TODO log
return;
ret = format->sscan((const char *)msg.data(), msg.size(), nullptr,
smps.data(), ret);
if (ret < 0) // TODO log
return;
ret = queue_signalled_push_many(&this->queue, (void **)smps.data(), ret);
if (ret < 0) // TODO log
return;
this->logger->trace(
"onMessage(rtc::binary) callback finished pushing {} samples", ret);
});
conn->onMessage(
std::bind(&WebRTCNode::onMessage, this, std::placeholders::_1));
return 0;
}
@ -214,9 +201,12 @@ int WebRTCNode::_write(struct Sample *smps[], unsigned cnt) {
size_t wbytes;
buf.resize(4 * 1024);
int ret = format->sprint((char *)buf.data(), buf.size(), &wbytes, smps, cnt);
if (ret < 0) // TODO log
int ret =
formatter->sprint((char *)buf.data(), buf.size(), &wbytes, smps, cnt);
if (ret < 0) {
logger->error("Failed to format payload");
return ret;
}
buf.resize(wbytes);
conn->sendMessage(buf);
@ -231,6 +221,34 @@ json_t *WebRTCNode::_readStatus() const {
return conn->readStatus();
}
void WebRTCNode::onMessage(rtc::binary msg) {
int ret;
std::vector<Sample *> smps;
smps.resize(in.vectorize);
ret = sample_alloc_many(&pool, smps.data(), smps.size());
if (ret < 0) {
logger->error("Failed to allocate samples");
return;
}
ret = formatter->sscan((const char *)msg.data(), msg.size(), nullptr,
smps.data(), ret);
if (ret < 0) {
logger->error("Failed to parse payload");
return;
}
ret = queue_signalled_push_many(&queue, (void **)smps.data(), ret);
if (ret < 0) {
logger->error("Failed to enqueue samples");
return;
}
logger->trace("onMessage(rtc::binary) callback finished pushing {} samples",
ret);
}
int WebRTCNodeFactory::start(SuperNode *sn) {
web = sn->getWeb();
if (!web->isEnabled())

View file

@ -8,10 +8,12 @@
*/
#include <chrono>
#include <functional>
#include <fmt/chrono.h>
#include <fmt/core.h>
#include <fmt/ostream.h>
#include <rtc/common.hpp>
#include <villas/exceptions.hpp>
#include <villas/nodes/webrtc/peer_connection.hpp>
#include <villas/utils.hpp>
@ -25,20 +27,23 @@ using namespace villas::node::webrtc;
PeerConnection::PeerConnection(const std::string &server,
const std::string &session,
const std::string &peer,
std::shared_ptr<SignalList> signals,
std::shared_ptr<SignalList> out_signals,
rtc::Configuration cfg, Web *w,
rtc::DataChannelInit d)
: web(w), extraServers({}), dataChannelInit(d), defaultConfig(cfg),
conn(nullptr), chan(nullptr), signals(signals),
conn(nullptr), chan(nullptr), out_signals(out_signals),
logger(logging.get("webrtc:pc")), stopStartup(false),
warnNotConnected(false), standby(true), first(false), firstID(INT_MAX),
secondID(INT_MAX), onMessageCallback(nullptr) {
client = std::make_shared<SignalingClient>(server, session, peer, web);
client->onConnected([this]() { this->onSignalingConnected(); });
client->onDisconnected([this]() { this->onSignalingDisconnected(); });
client->onError([this](auto err) { this->onSignalingError(std::move(err)); });
client->onMessage(
[this](auto msg) { this->onSignalingMessage(std::move(msg)); });
client->onConnected(std::bind(&PeerConnection::onSignalingConnected, this));
client->onDisconnected(
std::bind(&PeerConnection::onSignalingDisconnected, this));
client->onError(std::bind(&PeerConnection::onSignalingError, this,
std::placeholders::_1));
client->onMessage(std::bind(&PeerConnection::onSignalingMessage, this,
std::placeholders::_1));
auto lock = std::unique_lock{mutex};
resetConnectionAndStandby(lock);
@ -135,18 +140,18 @@ void PeerConnection::setupPeerConnection(
std::begin(extraServers), std::end(extraServers));
conn = pc ? std::move(pc) : std::make_shared<rtc::PeerConnection>(config);
conn->onLocalDescription(
[this](auto desc) { this->onLocalDescription(std::move(desc)); });
conn->onLocalCandidate(
[this](auto cand) { this->onLocalCandidate(std::move(cand)); });
conn->onLocalDescription(std::bind(&PeerConnection::onLocalDescription, this,
std::placeholders::_1));
conn->onLocalCandidate(std::bind(&PeerConnection::onLocalCandidate, this,
std::placeholders::_1));
conn->onDataChannel(
[this](auto channel) { this->onDataChannel(std::move(channel)); });
conn->onGatheringStateChange(
[this](auto state) { this->onGatheringStateChange(std::move(state)); });
conn->onSignalingStateChange(
[this](auto state) { this->onSignalingStateChange(std::move(state)); });
conn->onStateChange(
[this](auto state) { this->onConnectionStateChange(std::move(state)); });
std::bind(&PeerConnection::onDataChannel, this, std::placeholders::_1));
conn->onGatheringStateChange(std::bind(
&PeerConnection::onGatheringStateChange, this, std::placeholders::_1));
conn->onSignalingStateChange(std::bind(
&PeerConnection::onSignalingStateChange, this, std::placeholders::_1));
conn->onStateChange(std::bind(&PeerConnection::onConnectionStateChange, this,
std::placeholders::_1));
}
void PeerConnection::setupDataChannel(std::shared_ptr<rtc::DataChannel> dc) {
@ -155,12 +160,14 @@ void PeerConnection::setupDataChannel(std::shared_ptr<rtc::DataChannel> dc) {
assert(conn);
chan =
dc ? std::move(dc) : conn->createDataChannel("villas", dataChannelInit);
chan->onMessage(
[this](rtc::binary msg) { this->onDataChannelMessage(std::move(msg)); },
[this](rtc::string msg) { this->onDataChannelMessage(std::move(msg)); });
chan->onOpen([this]() { this->onDataChannelOpen(); });
chan->onClosed([this]() { this->onDataChannelClosed(); });
chan->onError([this](auto err) { this->onDataChannelError(std::move(err)); });
chan->onOpen(std::bind(&PeerConnection::onDataChannelOpen, this));
chan->onClosed(std::bind(&PeerConnection::onDataChannelClosed, this));
chan->onError(std::bind(&PeerConnection::onDataChannelError, this,
std::placeholders::_1));
// If this node has it's data channel set up, don't accept any new ones
conn->onDataChannel(nullptr);
@ -251,7 +258,7 @@ void PeerConnection::onSignalingConnected() {
auto lock = std::unique_lock{mutex};
client->sendMessage({*signals});
client->sendMessage({*out_signals});
}
void PeerConnection::onSignalingDisconnected() {
@ -343,7 +350,9 @@ void PeerConnection::onSignalingMessage(SignalingMessage msg) {
conn->addRemoteCandidate(c);
},
[&](auto other) { logger->warn("unknown signaling message"); }},
[&](auto other) {
logger->warn("Signaling message has been skipped");
}},
msg.message);
}