mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
webrtc: Fix several TODOs and other smaller tweaks
Signed-off-by: Steffen Vogel <post@steffenvogel.de>
This commit is contained in:
parent
8a3be3d8ba
commit
93cbc5d518
4 changed files with 95 additions and 65 deletions
|
@ -32,7 +32,7 @@ protected:
|
|||
std::string peer;
|
||||
|
||||
int wait_seconds;
|
||||
Format *format;
|
||||
Format::Ptr formatter;
|
||||
struct CQueueSignalled queue;
|
||||
struct Pool pool;
|
||||
|
||||
|
@ -40,6 +40,8 @@ protected:
|
|||
rtc::Configuration rtcConf;
|
||||
rtc::DataChannelInit dci;
|
||||
|
||||
void onMessage(rtc::binary msg);
|
||||
|
||||
virtual int _read(struct Sample *smps[], unsigned cnt);
|
||||
|
||||
virtual int _write(struct Sample *smps[], unsigned cnt);
|
||||
|
|
|
@ -14,8 +14,8 @@
|
|||
#include <rtc/peerconnection.hpp>
|
||||
#include <rtc/rtc.hpp>
|
||||
#include <villas/config.hpp>
|
||||
#include <villas/node/config.hpp>
|
||||
#include <villas/log.hpp>
|
||||
#include <villas/node/config.hpp>
|
||||
#include <villas/nodes/webrtc/signaling_client.hpp>
|
||||
#include <villas/signal_list.hpp>
|
||||
#include <villas/web.hpp>
|
||||
|
@ -54,7 +54,8 @@ class PeerConnection {
|
|||
|
||||
public:
|
||||
PeerConnection(const std::string &server, const std::string &session,
|
||||
const std::string &peer, std::shared_ptr<SignalList> signals,
|
||||
const std::string &peer,
|
||||
std::shared_ptr<SignalList> out_signals,
|
||||
rtc::Configuration config, Web *w, rtc::DataChannelInit d);
|
||||
~PeerConnection();
|
||||
|
||||
|
@ -76,7 +77,7 @@ protected:
|
|||
std::shared_ptr<rtc::PeerConnection> conn;
|
||||
std::shared_ptr<rtc::DataChannel> chan;
|
||||
std::shared_ptr<SignalingClient> client;
|
||||
std::shared_ptr<SignalList> signals;
|
||||
std::shared_ptr<SignalList> out_signals;
|
||||
|
||||
Logger logger;
|
||||
|
||||
|
|
|
@ -7,16 +7,17 @@
|
|||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
#include <functional>
|
||||
#include <vector>
|
||||
|
||||
#include <villas/exceptions.hpp>
|
||||
#include <villas/node/config.hpp>
|
||||
#include <villas/node_compat.hpp>
|
||||
#include <villas/nodes/webrtc.hpp>
|
||||
#include <villas/sample.hpp>
|
||||
#include <villas/super_node.hpp>
|
||||
#include <villas/utils.hpp>
|
||||
#include <villas/uuid.hpp>
|
||||
#include <villas/node/config.hpp>
|
||||
|
||||
using namespace villas;
|
||||
using namespace villas::node;
|
||||
|
@ -27,7 +28,7 @@ static villas::node::Web *web;
|
|||
WebRTCNode::WebRTCNode(const uuid_t &id, const std::string &name)
|
||||
: Node(id, name),
|
||||
server("https://villas.k8s.eonerc.rwth-aachen.de/ws/signaling"),
|
||||
peer(uuid::toString(id)), wait_seconds(0), format(nullptr), queue({}),
|
||||
peer(uuid::toString(id)), wait_seconds(0), formatter(), queue({}),
|
||||
pool({}), dci({}) {
|
||||
|
||||
#if RTC_VERSION < 0x001400
|
||||
|
@ -40,8 +41,8 @@ WebRTCNode::WebRTCNode(const uuid_t &id, const std::string &name)
|
|||
|
||||
WebRTCNode::~WebRTCNode() {
|
||||
int ret = pool_destroy(&pool);
|
||||
if (ret) // TODO log
|
||||
;
|
||||
if (ret)
|
||||
logger->error("Failed to destroy pool");
|
||||
}
|
||||
|
||||
int WebRTCNode::parse(json_t *json) {
|
||||
|
@ -107,10 +108,12 @@ int WebRTCNode::parse(json_t *json) {
|
|||
}
|
||||
}
|
||||
|
||||
format = json_format ? FormatFactory::make(json_format)
|
||||
: FormatFactory::make("villas.binary");
|
||||
|
||||
assert(format);
|
||||
auto *fmt = json_format ? FormatFactory::make(json_format)
|
||||
: FormatFactory::make("villas.binary");
|
||||
formatter = Format::Ptr(fmt);
|
||||
if (!formatter)
|
||||
throw ConfigError(json_format, "node-config-node-webrtc-format",
|
||||
"Invalid format configuration");
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -120,44 +123,28 @@ int WebRTCNode::prepare() {
|
|||
if (ret)
|
||||
return ret;
|
||||
|
||||
format->start(getInputSignals(false), ~(int)SampleFlags::HAS_OFFSET);
|
||||
formatter->start(getInputSignals(false), ~(int)SampleFlags::HAS_OFFSET);
|
||||
|
||||
// TODO: Determine output signals reliably
|
||||
auto signals = std::make_shared<SignalList>();
|
||||
auto output_signals = std::make_shared<SignalList>();
|
||||
|
||||
conn = std::make_shared<webrtc::PeerConnection>(server, session, peer,
|
||||
signals, rtcConf, web, dci);
|
||||
conn = std::make_shared<webrtc::PeerConnection>(
|
||||
server, session, peer, output_signals, rtcConf, web, dci);
|
||||
|
||||
ret = pool_init(&pool, 1024, SAMPLE_LENGTH(getInputSignals(false)->size()));
|
||||
if (ret) // TODO log
|
||||
if (ret) {
|
||||
logger->error("Failed to create pool");
|
||||
return ret;
|
||||
}
|
||||
|
||||
ret = queue_signalled_init(&queue, 1024);
|
||||
if (ret) // TODO log
|
||||
if (ret) {
|
||||
logger->error("Failed to create queue");
|
||||
return ret;
|
||||
}
|
||||
|
||||
// TODO: Move this to a member function
|
||||
conn->onMessage([this](rtc::binary msg) {
|
||||
int ret;
|
||||
std::vector<Sample *> smps;
|
||||
smps.resize(this->in.vectorize);
|
||||
|
||||
ret = sample_alloc_many(&this->pool, smps.data(), smps.size());
|
||||
if (ret < 0) // TODO log
|
||||
return;
|
||||
|
||||
ret = format->sscan((const char *)msg.data(), msg.size(), nullptr,
|
||||
smps.data(), ret);
|
||||
if (ret < 0) // TODO log
|
||||
return;
|
||||
|
||||
ret = queue_signalled_push_many(&this->queue, (void **)smps.data(), ret);
|
||||
if (ret < 0) // TODO log
|
||||
return;
|
||||
|
||||
this->logger->trace(
|
||||
"onMessage(rtc::binary) callback finished pushing {} samples", ret);
|
||||
});
|
||||
conn->onMessage(
|
||||
std::bind(&WebRTCNode::onMessage, this, std::placeholders::_1));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -214,9 +201,12 @@ int WebRTCNode::_write(struct Sample *smps[], unsigned cnt) {
|
|||
size_t wbytes;
|
||||
|
||||
buf.resize(4 * 1024);
|
||||
int ret = format->sprint((char *)buf.data(), buf.size(), &wbytes, smps, cnt);
|
||||
if (ret < 0) // TODO log
|
||||
int ret =
|
||||
formatter->sprint((char *)buf.data(), buf.size(), &wbytes, smps, cnt);
|
||||
if (ret < 0) {
|
||||
logger->error("Failed to format payload");
|
||||
return ret;
|
||||
}
|
||||
|
||||
buf.resize(wbytes);
|
||||
conn->sendMessage(buf);
|
||||
|
@ -231,6 +221,34 @@ json_t *WebRTCNode::_readStatus() const {
|
|||
return conn->readStatus();
|
||||
}
|
||||
|
||||
void WebRTCNode::onMessage(rtc::binary msg) {
|
||||
int ret;
|
||||
std::vector<Sample *> smps;
|
||||
smps.resize(in.vectorize);
|
||||
|
||||
ret = sample_alloc_many(&pool, smps.data(), smps.size());
|
||||
if (ret < 0) {
|
||||
logger->error("Failed to allocate samples");
|
||||
return;
|
||||
}
|
||||
|
||||
ret = formatter->sscan((const char *)msg.data(), msg.size(), nullptr,
|
||||
smps.data(), ret);
|
||||
if (ret < 0) {
|
||||
logger->error("Failed to parse payload");
|
||||
return;
|
||||
}
|
||||
|
||||
ret = queue_signalled_push_many(&queue, (void **)smps.data(), ret);
|
||||
if (ret < 0) {
|
||||
logger->error("Failed to enqueue samples");
|
||||
return;
|
||||
}
|
||||
|
||||
logger->trace("onMessage(rtc::binary) callback finished pushing {} samples",
|
||||
ret);
|
||||
}
|
||||
|
||||
int WebRTCNodeFactory::start(SuperNode *sn) {
|
||||
web = sn->getWeb();
|
||||
if (!web->isEnabled())
|
||||
|
|
|
@ -8,10 +8,12 @@
|
|||
*/
|
||||
|
||||
#include <chrono>
|
||||
#include <functional>
|
||||
|
||||
#include <fmt/chrono.h>
|
||||
#include <fmt/core.h>
|
||||
#include <fmt/ostream.h>
|
||||
#include <rtc/common.hpp>
|
||||
#include <villas/exceptions.hpp>
|
||||
#include <villas/nodes/webrtc/peer_connection.hpp>
|
||||
#include <villas/utils.hpp>
|
||||
|
@ -25,20 +27,23 @@ using namespace villas::node::webrtc;
|
|||
PeerConnection::PeerConnection(const std::string &server,
|
||||
const std::string &session,
|
||||
const std::string &peer,
|
||||
std::shared_ptr<SignalList> signals,
|
||||
std::shared_ptr<SignalList> out_signals,
|
||||
rtc::Configuration cfg, Web *w,
|
||||
rtc::DataChannelInit d)
|
||||
: web(w), extraServers({}), dataChannelInit(d), defaultConfig(cfg),
|
||||
conn(nullptr), chan(nullptr), signals(signals),
|
||||
conn(nullptr), chan(nullptr), out_signals(out_signals),
|
||||
logger(logging.get("webrtc:pc")), stopStartup(false),
|
||||
warnNotConnected(false), standby(true), first(false), firstID(INT_MAX),
|
||||
secondID(INT_MAX), onMessageCallback(nullptr) {
|
||||
client = std::make_shared<SignalingClient>(server, session, peer, web);
|
||||
client->onConnected([this]() { this->onSignalingConnected(); });
|
||||
client->onDisconnected([this]() { this->onSignalingDisconnected(); });
|
||||
client->onError([this](auto err) { this->onSignalingError(std::move(err)); });
|
||||
client->onMessage(
|
||||
[this](auto msg) { this->onSignalingMessage(std::move(msg)); });
|
||||
|
||||
client->onConnected(std::bind(&PeerConnection::onSignalingConnected, this));
|
||||
client->onDisconnected(
|
||||
std::bind(&PeerConnection::onSignalingDisconnected, this));
|
||||
client->onError(std::bind(&PeerConnection::onSignalingError, this,
|
||||
std::placeholders::_1));
|
||||
client->onMessage(std::bind(&PeerConnection::onSignalingMessage, this,
|
||||
std::placeholders::_1));
|
||||
|
||||
auto lock = std::unique_lock{mutex};
|
||||
resetConnectionAndStandby(lock);
|
||||
|
@ -135,18 +140,18 @@ void PeerConnection::setupPeerConnection(
|
|||
std::begin(extraServers), std::end(extraServers));
|
||||
|
||||
conn = pc ? std::move(pc) : std::make_shared<rtc::PeerConnection>(config);
|
||||
conn->onLocalDescription(
|
||||
[this](auto desc) { this->onLocalDescription(std::move(desc)); });
|
||||
conn->onLocalCandidate(
|
||||
[this](auto cand) { this->onLocalCandidate(std::move(cand)); });
|
||||
conn->onLocalDescription(std::bind(&PeerConnection::onLocalDescription, this,
|
||||
std::placeholders::_1));
|
||||
conn->onLocalCandidate(std::bind(&PeerConnection::onLocalCandidate, this,
|
||||
std::placeholders::_1));
|
||||
conn->onDataChannel(
|
||||
[this](auto channel) { this->onDataChannel(std::move(channel)); });
|
||||
conn->onGatheringStateChange(
|
||||
[this](auto state) { this->onGatheringStateChange(std::move(state)); });
|
||||
conn->onSignalingStateChange(
|
||||
[this](auto state) { this->onSignalingStateChange(std::move(state)); });
|
||||
conn->onStateChange(
|
||||
[this](auto state) { this->onConnectionStateChange(std::move(state)); });
|
||||
std::bind(&PeerConnection::onDataChannel, this, std::placeholders::_1));
|
||||
conn->onGatheringStateChange(std::bind(
|
||||
&PeerConnection::onGatheringStateChange, this, std::placeholders::_1));
|
||||
conn->onSignalingStateChange(std::bind(
|
||||
&PeerConnection::onSignalingStateChange, this, std::placeholders::_1));
|
||||
conn->onStateChange(std::bind(&PeerConnection::onConnectionStateChange, this,
|
||||
std::placeholders::_1));
|
||||
}
|
||||
|
||||
void PeerConnection::setupDataChannel(std::shared_ptr<rtc::DataChannel> dc) {
|
||||
|
@ -155,12 +160,14 @@ void PeerConnection::setupDataChannel(std::shared_ptr<rtc::DataChannel> dc) {
|
|||
assert(conn);
|
||||
chan =
|
||||
dc ? std::move(dc) : conn->createDataChannel("villas", dataChannelInit);
|
||||
|
||||
chan->onMessage(
|
||||
[this](rtc::binary msg) { this->onDataChannelMessage(std::move(msg)); },
|
||||
[this](rtc::string msg) { this->onDataChannelMessage(std::move(msg)); });
|
||||
chan->onOpen([this]() { this->onDataChannelOpen(); });
|
||||
chan->onClosed([this]() { this->onDataChannelClosed(); });
|
||||
chan->onError([this](auto err) { this->onDataChannelError(std::move(err)); });
|
||||
chan->onOpen(std::bind(&PeerConnection::onDataChannelOpen, this));
|
||||
chan->onClosed(std::bind(&PeerConnection::onDataChannelClosed, this));
|
||||
chan->onError(std::bind(&PeerConnection::onDataChannelError, this,
|
||||
std::placeholders::_1));
|
||||
|
||||
// If this node has it's data channel set up, don't accept any new ones
|
||||
conn->onDataChannel(nullptr);
|
||||
|
@ -251,7 +258,7 @@ void PeerConnection::onSignalingConnected() {
|
|||
|
||||
auto lock = std::unique_lock{mutex};
|
||||
|
||||
client->sendMessage({*signals});
|
||||
client->sendMessage({*out_signals});
|
||||
}
|
||||
|
||||
void PeerConnection::onSignalingDisconnected() {
|
||||
|
@ -343,7 +350,9 @@ void PeerConnection::onSignalingMessage(SignalingMessage msg) {
|
|||
conn->addRemoteCandidate(c);
|
||||
},
|
||||
|
||||
[&](auto other) { logger->warn("unknown signaling message"); }},
|
||||
[&](auto other) {
|
||||
logger->warn("Signaling message has been skipped");
|
||||
}},
|
||||
msg.message);
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue