1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

node-webrtc: initial working transmission

Signed-off-by: Philipp Jungkamp <Philipp.Jungkamp@opal-rt.com>
This commit is contained in:
Philipp Jungkamp 2023-06-13 17:48:04 +02:00
parent 6779e6d1e8
commit 6760272a4b
9 changed files with 588 additions and 300 deletions

View file

@ -1,25 +1,15 @@
nodes = {
webrtc_node = {
type = "webrtc",
webrtc = {
type = "webrtc"
# required session key.
session = "<YOUR SESSION>"
# optional signaling server.
server = "<ADDRESS OF YOUR SIGNALING SERVER>"
# optional format.
format = "json"
# A unique session identifier which must be shared between two nodes
session = "my-session-name"
# Address to the websocket signaling server
# server = "wss://villas.k8s.eonerc.rwth-aachen.de/ws/signaling"
server = "ws://docker.for.mac.host.internal:1234"
# Setting for Interactive Connectivity Establishment
ice = {
# List of STUN/TURN servers
servers = [
"stun://stun.l.google.com:19302",
"stun:stun.0l.de:3478",
"turn://villas:villas@turn.0l.de:3478?transport=udp",
"turn://villas:villas@turn.0l.de:3478?transport=tcp"
]
}
# optional initial connect timeout.
wait_seconds = 120
}
}

View file

@ -14,6 +14,8 @@
#include <villas/node/config.hpp>
#include <villas/node.hpp>
#include <villas/timing.hpp>
#include <villas/format.hpp>
#include <villas/queue_signalled.h>
namespace villas {
namespace node {
@ -27,12 +29,14 @@ protected:
std::string server;
std::string session;
bool wait;
bool ordered;
int max_retransmits;
int wait_seconds;
Format *format;
struct CQueueSignalled queue;
struct Pool pool;
std::shared_ptr<webrtc::PeerConnection> conn;
rtc::Configuration config;
rtc::DataChannelInit dci;
virtual
int _read(struct Sample *smps[], unsigned cnt);
@ -63,8 +67,8 @@ public:
virtual
int start();
// virtual
// int stop();
virtual
int stop();
// virtual
// int pause();
@ -78,8 +82,8 @@ public:
// virtual
// int reverse();
// virtual
// std::vector<int> getPollFDs();
virtual
std::vector<int> getPollFDs();
// virtual
// std::vector<int> getNetemFDs();

View file

@ -21,16 +21,21 @@ namespace webrtc {
class PeerConnection {
public:
PeerConnection(const std::string &server, const std::string &session, const rtc::Configuration &config, Web *w);
PeerConnection(const std::string &server, const std::string &session, rtc::Configuration config, Web *w, rtc::DataChannelInit d);
~PeerConnection();
void waitForDataChannel();
bool waitForDataChannel(std::chrono::seconds timeout);
void onMessage(std::function<void(rtc::binary)> callback);
void sendMessage(rtc::binary msg);
void connect();
void disconnect();
protected:
Web *web;
rtc::Configuration config;
std::vector<rtc::IceServer> extraServers;
rtc::DataChannelInit dataChannelInit;
rtc::Configuration defaultConfig;
std::shared_ptr<rtc::PeerConnection> conn;
std::shared_ptr<rtc::DataChannel> chan;
@ -38,37 +43,44 @@ protected:
Logger logger;
bool makingOffer;
bool ignoreOffer;
std::mutex mutex;
std::condition_variable_any startupCondition;
bool stopStartup;
bool warnNotConnected;
bool standby;
bool first;
bool polite;
bool rollback;
int firstID;
int secondID;
void createPeerConnection();
void rollbackPeerConnection();
void createDatachannel();
std::function<void(rtc::binary)> onMessageCallback;
void onConnectionCreated();
void resetConnection(std::unique_lock<decltype(PeerConnection::mutex)> &lock);
void resetConnectionAndStandby(std::unique_lock<decltype(PeerConnection::mutex)> &lock);
void notifyStartup();
void setupPeerConnection(std::shared_ptr<rtc::PeerConnection> = nullptr);
void setupDataChannel(std::shared_ptr<rtc::DataChannel> = nullptr);
void onLocalDescription(rtc::Description sdp);
void onLocalCandidate(rtc::Candidate cand);
void onNegotiationNeeded();
void onConnectionStateChange(rtc::PeerConnection::State state);
void onSignalingStateChange(rtc::PeerConnection::SignalingState state);
void onGatheringStateChange(rtc::PeerConnection::GatheringState state);
void onSignalingConnected();
void onSignalingDisconnected();
void onSignalingError(const std::string &err);
void onSignalingMessage(const SignalingMessage &msg);
void onSignalingError(std::string err);
void onSignalingMessage(SignalingMessage msg);
void onDataChannel(std::shared_ptr<rtc::DataChannel> dc);
void onDataChannelOpen();
void onDataChannelClosed();
void onDataChannelError(std::string err);
void onDataChannelMessage(rtc::message_variant msg);
void onDataChannelMessage(rtc::string msg);
void onDataChannelMessage(rtc::binary msg);
};
} /* namespace webrtc */

View file

@ -34,17 +34,17 @@ protected:
SignalingClient *self;
} sul_helper;
uint16_t retry_count; /**> Count of consequetive retries */
uint16_t retry_count; /**> Count of consecutive retries */
struct lws *wsi;
struct lws_client_connect_info info;
struct lws_client_connect_info info;
/* The retry and backoff policy we want to use for our client connections */
static constexpr uint32_t backoff_ms[] = { 1000, 2000, 3000, 4000, 5000 };
static constexpr uint32_t backoff_ms[] = { 1<<4, 1<<6, 1<<8, 1<<10, 1<<12, 1<<14, 1<<16 };
static constexpr lws_retry_bo_t retry = {
.retry_ms_table = backoff_ms,
.retry_ms_table_count = LWS_ARRAY_SIZE(backoff_ms),
.conceal_count = LWS_ARRAY_SIZE(backoff_ms),
.conceal_count = LWS_ARRAY_SIZE(backoff_ms) + 1,
.secs_since_valid_ping = 3, /* force PINGs after secs idle */
.secs_since_valid_hangup = 10, /* hangup after secs idle */
@ -52,10 +52,10 @@ protected:
.jitter_percent = 20,
};
std::function<void(const SignalingMessage &)> cbMessage;
std::function<void(SignalingMessage)> cbMessage;
std::function<void()> cbConnected;
std::function<void()> cbDisconnected;
std::function<void(const std::string &)> cbError;
std::function<void(std::string)> cbError;
Queue<SignalingMessage> outgoingMessages;
@ -64,6 +64,8 @@ protected:
char *uri;
char *path;
std::atomic<bool> running;
Logger logger;
int protocolCallback(struct lws *wsi, enum lws_callback_reasons reason, void *in, size_t len);
@ -86,7 +88,7 @@ public:
void sendMessage(const SignalingMessage &msg);
void onMessage(std::function<void(const SignalingMessage&)> callback)
void onMessage(std::function<void(SignalingMessage)> callback)
{
cbMessage = callback;
}
@ -101,7 +103,7 @@ public:
cbDisconnected = callback;
}
void onError(std::function<void(const std::string &)> callback)
void onError(std::function<void(std::string)> callback)
{
cbError = callback;
}

View file

@ -10,7 +10,8 @@
#include <string>
#include <chrono>
#include <list>
#include <vector>
#include <variant>
#include <rtc/rtc.hpp>
#include <jansson.h>
@ -29,40 +30,29 @@ struct Connection {
json_t * toJSON() const;
};
struct RelayMessage {
std::vector<rtc::IceServer> servers;
RelayMessage(json_t *j);
};
struct ControlMessage {
int connectionID;
std::list<Connection> connections;
std::vector<Connection> connections;
ControlMessage(json_t *j);
json_t * toJSON() const;
};
class SignalingMessage {
class SignalingMessage : public std::variant<std::monostate, RelayMessage, ControlMessage, rtc::Description, rtc::Candidate> {
public:
enum Type {
TYPE_CONTROL,
TYPE_OFFER,
TYPE_ANSWER,
TYPE_CANDIDATE
};
Type type;
ControlMessage *control;
rtc::Description *description;
rtc::Candidate *candidate;
std::string mid;
SignalingMessage(rtc::Description desc, bool answer = false);
SignalingMessage(rtc::Candidate cand);
using variant::variant;
SignalingMessage(json_t *j);
~SignalingMessage();
json_t * toJSON() const;
std::string toString() const;
};
} /* namespace webrtc */
} /* namespace node */
} /* namespace villas */

View file

@ -24,20 +24,18 @@ static villas::node::Web *web;
WebRTCNode::WebRTCNode(const std::string &name) :
Node(name),
server("wss://villas.k8s.eonerc.rwth-aachen.de/ws/signaling"),
wait(true),
ordered(false),
max_retransmits(0)
wait_seconds(0),
dci({})
{
config.iceServers = {
rtc::IceServer("stun://stun.l.google.com:19302"),
rtc::IceServer("stun://villas:villas@stun.0l.de"),
rtc::IceServer("turn://villas:villas@turn.0l.de?transport=udp"),
rtc::IceServer("turn://villas:villas@turn.0l.de?transport=tcp"),
};
dci.reliability.type = rtc::Reliability::Type::Rexmit;
}
WebRTCNode::~WebRTCNode()
{ }
{
int ret = pool_destroy(&pool);
if (ret) // TODO log
;
}
int WebRTCNode::parse(json_t *json, const uuid_t sn_uuid)
{
@ -47,17 +45,20 @@ int WebRTCNode::parse(json_t *json, const uuid_t sn_uuid)
const char *sess;
const char *svr = nullptr;
int w = - 1, ord = -1;
json_t *json_ice = nullptr;
int ord = -1;
int &rexmit = dci.reliability.rexmit.emplace<int>(0);
json_t *ice_json = nullptr;
json_t *fmt_json = nullptr;
json_error_t err;
ret = json_unpack_ex(json, &err, 0, "{ s: s, s?: s, s?: b, s?: i, s?: b, s?: o }",
ret = json_unpack_ex(json, &err, 0, "{ s:s, s?s, s?i, s?i, s?b, s?o }",
"session", &sess,
"server", &svr,
"wait", &w,
"max_retransmits", &max_retransmits,
"wait_seconds", &wait_seconds,
"max_retransmits", &rexmit,
"ordered", &ord,
"ice", &json_ice
"ice", &ice_json,
"format", &fmt_json
);
if (ret)
throw ConfigError(json, err, "node-config-node-webrtc");
@ -67,16 +68,13 @@ int WebRTCNode::parse(json_t *json, const uuid_t sn_uuid)
if (svr)
server = svr;
if (w >= 0)
wait = w != 0;
if (ord)
dci.reliability.unordered = !ord;
if (ord >= 0)
ordered = ordered != 0;
if (json_ice) {
if (ice_json) {
json_t *json_servers = nullptr;
ret = json_unpack_ex(json_ice, &err, 0, "{ s?: o }",
ret = json_unpack_ex(ice_json, &err, 0, "{ s?: o }",
"servers", &json_servers
);
if (ret)
@ -101,43 +99,85 @@ int WebRTCNode::parse(json_t *json, const uuid_t sn_uuid)
}
}
format = fmt_json
? FormatFactory::make(fmt_json)
: FormatFactory::make("villas.binary");
assert(format);
return 0;
}
int WebRTCNode::check()
{
return 0;
return Node::check();
}
int WebRTCNode::prepare()
{
conn = std::make_shared<webrtc::PeerConnection>(server, session, config, web);
int ret = Node::prepare();
if (ret)
return ret;
return Node::prepare();
}
format->start(getInputSignals(false), ~(int) SampleFlags::HAS_OFFSET);
int WebRTCNode::start()
{
conn->connect();
conn = std::make_shared<webrtc::PeerConnection>(server, session, config, web, dci);
if (wait) {
logger->info("Wait until datachannel is connected...");
ret = pool_init(&pool, 1024, SAMPLE_LENGTH(getInputSignals(false)->size()));
if (ret) // TODO log
return ret;
conn->waitForDataChannel();
}
ret = queue_signalled_init(&queue, 1024);
if (ret) // TODO log
return ret;
int ret = Node::start();
if (!ret)
state = State::STARTED;
conn->onMessage([this](rtc::binary msg){
int ret;
std::vector<Sample *> smps;
smps.resize(this->in.vectorize);
ret = sample_alloc_many(&this->pool, smps.data(), smps.size());
if (ret < 0) // TODO log
return;
ret = format->sscan((const char *)msg.data(), msg.size(), nullptr, smps.data(), ret);
if (ret < 0) // TODO log
return;
ret = queue_signalled_push_many(&this->queue, (void **) smps.data(), ret);
if (ret < 0) // TODO log
return;
this->logger->debug("onMessage(rtc::binary) callback finished pushing {} samples", ret);
});
return 0;
}
// int WebRTCNode::stop()
// {
// // TODO add implementation here
// return 0;
// }
int WebRTCNode::start()
{
int ret = Node::start();
if (!ret)
state = State::STARTED;
conn->connect();
if (wait_seconds > 0) {
logger->info("Waiting for datachannel...");
if (!conn->waitForDataChannel(std::chrono::seconds { wait_seconds })) {
throw RuntimeError { "Waiting for datachannel timed out after {} seconds", wait_seconds };
}
}
return 0;
}
int WebRTCNode::stop()
{
conn->disconnect();
return Node::stop();
}
// int WebRTCNode::pause()
// {
@ -163,11 +203,10 @@ int WebRTCNode::start()
// return 0;
// }
// std::vector<int> WebRTCNode::getPollFDs()
// {
// // TODO add implementation here
// return {};
// }
std::vector<int> WebRTCNode::getPollFDs()
{
return { queue_signalled_fd(&queue) };
}
// std::vector<int> WebRTCNode::getNetemFDs()
// {
@ -188,36 +227,31 @@ const std::string & WebRTCNode::getDetails()
int WebRTCNode::_read(struct Sample *smps[], unsigned cnt)
{
int read;
struct timespec now;
std::vector<Sample *> smpt;
smpt.resize(cnt);
/* TODO: Add implementation here. The following is just an example */
int pulled = queue_signalled_pull_many(&queue, (void **) smpt.data(), smpt.size());
assert(cnt >= 1 && smps[0]->capacity >= 1);
sample_copy_many(smps, smpt.data(), pulled);
sample_decref_many(smpt.data(), pulled);
now = time_now();
// smps[0]->data[0].f = time_delta(&now, &start_time);
/* Dont forget to set other flags in struct Sample::flags
* E.g. for sequence no, timestamps... */
smps[0]->flags = (int) SampleFlags::HAS_DATA;
smps[0]->signals = getInputSignals(false);
read = 1; /* The number of samples read */
return read;
return pulled;
}
int WebRTCNode::_write(struct Sample *smps[], unsigned cnt)
{
int written;
rtc::binary buf;
size_t wbytes;
/* TODO: Add implementation here. */
buf.resize(4 * 1024);
int ret = format->sprint((char *) buf.data(), buf.size(), &wbytes, smps, cnt);
if (ret < 0) // TODO log
return ret;
written = 0; /* The number of samples written */
buf.resize(wbytes);
conn->sendMessage(buf);
return written;
return ret;
}
int WebRTCNodeFactory::start(SuperNode *sn)

View file

@ -8,7 +8,10 @@
#include <chrono>
#include <thread>
#include <fmt/ostream.h>
#include <fmt/chrono.h>
#include <villas/utils.hpp>
#include <villas/exceptions.hpp>
#include <villas/nodes/webrtc/peer_connection.hpp>
using namespace std::placeholders;
@ -17,23 +20,59 @@ using namespace villas;
using namespace villas::node;
using namespace villas::node::webrtc;
PeerConnection::PeerConnection(const std::string &server, const std::string &session, const rtc::Configuration &cfg, Web *w) :
PeerConnection::PeerConnection(const std::string &server, const std::string &session, rtc::Configuration cfg, Web *w, rtc::DataChannelInit d) :
web(w),
config(cfg),
dataChannelInit(d),
defaultConfig(cfg),
logger(logging.get("webrtc:pc"))
{
client = std::make_shared<SignalingClient>(server, session, web);
client->onConnected([this](){ this->onSignalingConnected(); });
client->onDisconnected([this](){ this->onSignalingDisconnected(); });
client->onError([this](auto err){ this->onSignalingError(std::move(err)); });
client->onMessage([this](auto msg){ this->onSignalingMessage(std::move(msg)); });
client->onConnected(std::bind(&PeerConnection::onSignalingConnected, this));
client->onDisconnected(std::bind(&PeerConnection::onSignalingDisconnected, this));
client->onError(std::bind(&PeerConnection::onSignalingError, this, _1));
client->onMessage(std::bind(&PeerConnection::onSignalingMessage, this, _1));
auto lock = std::unique_lock { mutex };
resetConnectionAndStandby(lock);
}
PeerConnection::~PeerConnection()
{
// TODO
}
bool PeerConnection::waitForDataChannel(std::chrono::seconds timeout)
{
auto lock = std::unique_lock { mutex };
auto deadline = std::chrono::steady_clock::now() + timeout;
return startupCondition.wait_until(lock, deadline, [this](){ return this->stopStartup; });
}
void PeerConnection::notifyStartup()
{
stopStartup = true;
startupCondition.notify_all();
}
void PeerConnection::onMessage(std::function<void(rtc::binary)> callback)
{
auto lock = std::unique_lock { mutex };
onMessageCallback = callback;
}
void PeerConnection::sendMessage(rtc::binary msg)
{
auto lock = std::unique_lock { mutex };
if (chan && chan->isOpen()) {
chan->send(msg);
warnNotConnected = true;
} else if (warnNotConnected) {
logger->warn("Dropping messages. No peer connected.");
warnNotConnected = false;
}
}
void PeerConnection::connect()
@ -41,79 +80,154 @@ void PeerConnection::connect()
client->connect();
}
void PeerConnection::waitForDataChannel()
void PeerConnection::disconnect()
{
while (!chan) {
// TODO: signal available via condition variable
std::this_thread::sleep_for(std::chrono::seconds(1));
}
client->disconnect();
}
void PeerConnection::createPeerConnection()
void PeerConnection::resetConnection(std::unique_lock<decltype(PeerConnection::mutex)> &lock)
{
// Setup peer connection
conn = std::make_shared<rtc::PeerConnection>(config);
conn->onLocalDescription(std::bind(&PeerConnection::onLocalDescription, this, _1));
conn->onLocalCandidate(std::bind(&PeerConnection::onLocalCandidate, this, _1));
conn->onDataChannel(std::bind(&PeerConnection::onDataChannel, this, _1));
conn->onGatheringStateChange(std::bind(&PeerConnection::onGatheringStateChange, this, _1));
conn->onSignalingStateChange(std::bind(&PeerConnection::onSignalingStateChange, this, _1));
conn->onStateChange(std::bind(&PeerConnection::onConnectionStateChange, this, _1));
lock.unlock();
chan.reset();
conn.reset();
lock.lock();
}
void PeerConnection::rollbackPeerConnection()
void PeerConnection::resetConnectionAndStandby(std::unique_lock<decltype(PeerConnection::mutex)> &lock)
{
if (!standby)
logger->info("Going to standby");
standby = true;
first = false;
firstID = INT_MAX;
secondID = INT_MAX;
warnNotConnected = false;
resetConnection(lock);
}
void PeerConnection::createDatachannel()
void PeerConnection::setupPeerConnection(std::shared_ptr<rtc::PeerConnection> pc)
{
logger->debug("Setup {} peer connection", pc ? "existing" : "new");
auto config = defaultConfig;
config.iceServers.insert(std::end(config.iceServers), std::begin(extraServers), std::end(extraServers));
conn = pc ? std::move(pc) : std::make_shared<rtc::PeerConnection>(config);
conn->onLocalDescription([this](auto desc){ this->onLocalDescription(std::move(desc)); });
conn->onLocalCandidate([this](auto cand){ this->onLocalCandidate(std::move(cand)); });
conn->onDataChannel([this](auto channel){ this->onDataChannel(std::move(channel)); });
conn->onGatheringStateChange([this](auto state){ this->onGatheringStateChange(std::move(state)); });
conn->onSignalingStateChange([this](auto state){ this->onSignalingStateChange(std::move(state)); });
conn->onStateChange([this](auto state){ this->onConnectionStateChange(std::move(state)); });
}
void PeerConnection::onConnectionCreated()
void PeerConnection::setupDataChannel(std::shared_ptr<rtc::DataChannel> dc)
{
logger->debug("Connection created");
logger->debug("Setup {} data channel", dc ? "existing" : "new");
assert(conn);
chan = dc ? std::move(dc) : conn->createDataChannel("villas", dataChannelInit);
chan->onMessage(
[this](rtc::binary msg){ this->onDataChannelMessage(std::move(msg)); },
[this](rtc::string msg){ this->onDataChannelMessage(std::move(msg)); }
);
chan->onOpen([this](){ this->onDataChannelOpen(); });
chan->onClosed([this](){ this->onDataChannelClosed(); });
chan->onError([this](auto err){ this->onDataChannelError(std::move(err)); });
// if this node has it's data channel set up, don't accept any new ones
conn->onDataChannel(nullptr);
}
void PeerConnection::onLocalDescription(rtc::Description sdp)
void PeerConnection::onLocalDescription(rtc::Description desc)
{
logger->debug("New local description: {}", std::string(sdp));
logger->debug("New local description: type={} sdp=\n{}", desc.typeString(), desc.generateSdp());
SignalingMessage msg(sdp);
auto lock = std::unique_lock { mutex };
client->sendMessage(msg);
client->sendMessage(desc);
}
void PeerConnection::onLocalCandidate(rtc::Candidate cand)
{
logger->debug("New local candidate: {}", std::string(cand));
logger->debug("New local candidate: {}", std::string { cand });
SignalingMessage msg(cand);
auto lock = std::unique_lock { mutex };
client->sendMessage(msg);
}
void PeerConnection::onNegotiationNeeded()
{
logger->debug("Negotation needed");
client->sendMessage(cand);
}
void PeerConnection::onConnectionStateChange(rtc::PeerConnection::State state)
{
logger->debug("Connection State changed: {}", state);
//logger->debug("Connection State changed: {}", state);
auto lock = std::unique_lock { mutex };
switch (state) {
case rtc::PeerConnection::State::New: {
logger->debug("New peer connection");
break;
}
case rtc::PeerConnection::State::Connecting: {
logger->debug("Peer connection connecting.");
break;
}
case rtc::PeerConnection::State::Connected: {
rtc::Candidate local, remote;
std::optional<std::chrono::milliseconds> rtt = conn->rtt();
if (conn->getSelectedCandidatePair(&local, &remote)) {
std::stringstream l, r;
l << local, r << remote;
logger->debug(
"Peer connection connected:\n"
"local: {}\n"
"remote: {}\n"
"bytes sent: {} / bytes received: {} / rtt: {}\n",
l.str(),
r.str(),
conn->bytesSent(),
conn->bytesReceived(),
rtt.value_or(decltype(rtt)::value_type { 0 })
);
} else {
logger->debug(
"Peer connection connected.\n"
"Could not get candidate pair info.\n"
);
}
break;
}
case rtc::PeerConnection::State::Disconnected:
case rtc::PeerConnection::State::Failed: {
logger->debug("Closing peer connection");
break;
}
case rtc::PeerConnection::State::Closed: {
logger->debug("Closed peer connection");
resetConnectionAndStandby(lock);
break;
}
}
}
void PeerConnection::onSignalingStateChange(rtc::PeerConnection::SignalingState state)
{
logger->debug("Signaling state changed: {}", state);
std::stringstream s;
s << state;
logger->debug("Signaling state changed: {}", s.str());
}
void PeerConnection::onGatheringStateChange(rtc::PeerConnection::GatheringState state)
{
logger->debug("Gathering state changed: {}", state);
std::stringstream s;
s << state;
logger->debug("Gathering state changed: {}", s.str());
}
void PeerConnection::onSignalingConnected()
@ -124,55 +238,153 @@ void PeerConnection::onSignalingConnected()
void PeerConnection::onSignalingDisconnected()
{
logger->debug("Signaling connection closed");
auto lock = std::unique_lock { mutex };
resetConnectionAndStandby(lock);
}
void PeerConnection::onSignalingError(const std::string &err)
void PeerConnection::onSignalingError(std::string err)
{
logger->debug("Signaling connection error: {}", err);
auto lock = std::unique_lock { mutex };
resetConnectionAndStandby(lock);
}
void PeerConnection::onSignalingMessage(const SignalingMessage &msg)
void PeerConnection::onSignalingMessage(SignalingMessage msg)
{
logger->debug("Signaling message received: {}", msg.toString());
if (msg.type == SignalingMessage::TYPE_ANSWER || msg.type == SignalingMessage::TYPE_OFFER)
conn->setRemoteDescription(*msg.description);
auto lock = std::unique_lock { mutex };
if (msg.type == SignalingMessage::TYPE_CANDIDATE)
conn->addRemoteCandidate(*msg.candidate);
std::visit(villas::utils::overloaded {
[&](RelayMessage &c){
extraServers = std::move(c.servers);
},
[&](ControlMessage &c){
auto const &id = c.connectionID;
if (c.connections.size() < 2) {
resetConnectionAndStandby(lock);
return;
}
auto fst = INT_MAX, snd = INT_MAX;
for (auto &c : c.connections) {
if (c.id < fst) {
snd = fst;
fst = c.id;
} else if (c.id < snd) {
snd = c.id;
}
}
standby = (id != fst && id != snd);
if (standby) {
logger->error("There are already two peers connected to this session. Waiting in standby.");
return;
}
if (fst == firstID && snd == secondID) {
logger->debug("Ignoring control message. This connection is already being established.");
return;
}
resetConnection(lock);
first = (id == fst);
firstID = fst;
secondID = snd;
setupPeerConnection();
if (!first) {
setupDataChannel();
conn->setLocalDescription(rtc::Description::Type::Offer);
}
logger->trace("New connection pair: first={}, second={}, I am {}", firstID, secondID, first ? "first" : "second");
},
[&](rtc::Description d){
if (standby || !conn || (!first && d.type() == rtc::Description::Type::Offer))
return;
conn->setRemoteDescription(d);
},
[&](rtc::Candidate c){
if (standby || !conn)
return;
conn->addRemoteCandidate(c);
},
[&](auto other){
logger->warn("unknown signaling message");
}
}, msg);
}
void PeerConnection::onDataChannel(std::shared_ptr<rtc::DataChannel> dc)
{
logger->debug("New data channel: id={}, stream={}, protocol={}, max_msg_size={}, label={}",
/*logger->debug("New data channel: id={}, stream={}, protocol={}, max_msg_size={}, label={}",
dc->id(),
dc->stream(),
dc->protocol(),
dc->maxMessageSize(),
dc->label());
*/
dc->onMessage(std::bind(&PeerConnection::onDataChannelMessage, this, _1));
dc->onOpen(std::bind(&PeerConnection::onDataChannelOpen, this));
dc->onClosed(std::bind(&PeerConnection::onDataChannelClosed, this));
dc->onError(std::bind(&PeerConnection::onDataChannelError, this, _1));
auto lock = std::unique_lock { mutex };
setupDataChannel(std::move(dc));
}
void PeerConnection::onDataChannelOpen()
{
logger->debug("Datachannel opened");
auto lock = std::unique_lock { mutex };
chan->send("Hello from VILLASnode");
notifyStartup();
}
void PeerConnection::onDataChannelClosed()
{
logger->debug("Datachannel closed");
auto lock = std::unique_lock { mutex };
resetConnectionAndStandby(lock);
}
void PeerConnection::onDataChannelError(std::string err)
{
logger->error("Datachannel error: {}", err);
auto lock = std::unique_lock { mutex };
resetConnectionAndStandby(lock);
}
void PeerConnection::onDataChannelMessage(rtc::message_variant msg)
void PeerConnection::onDataChannelMessage(rtc::string msg)
{
logger->debug("Datachannel message received");
logger->info("Received: {}", msg);
}
void PeerConnection::onDataChannelMessage(rtc::binary msg)
{
logger->trace("Received binary data");
auto lock = std::unique_lock { mutex };
if (onMessageCallback)
onMessageCallback(msg);
}

View file

@ -16,20 +16,20 @@ using namespace villas::node::webrtc;
SignalingClient::SignalingClient(const std::string &srv, const std::string &sess, Web *w) :
web(w),
running(false),
logger(logging.get("webrtc:signal"))
{
const char *prot, *a, *p;
memset(&info, 0, sizeof(info));
asprintf(&uri, "%s/%s", srv.c_str(), sess.c_str());
(void)!asprintf(&uri, "%s/%s", srv.c_str(), sess.c_str());
int ret = lws_parse_uri(uri, &prot, &a, &info.port, &p);
if (ret)
throw RuntimeError("Failed to parse WebSocket URI: '{}'", uri);
asprintf(&path, "/%s", p);
(void)!asprintf(&path, "/%s", p);
info.ssl_connection = !strcmp(prot, "https");
info.address = a;
@ -44,25 +44,34 @@ SignalingClient::SignalingClient(const std::string &srv, const std::string &sess
info.userdata = this;
sul_helper.self = this;
sul_helper.sul = {};
}
SignalingClient::~SignalingClient()
{
disconnect();
free(path);
free(uri);
}
void SignalingClient::connect()
{
info.context = web->getContext();
info.vhost = web->getVHost();
running = true;
lws_sul_schedule(info.context, 0, &sul_helper.sul, connectStatic, 1);
info.context = web->getContext();
lws_sul_schedule(info.context, 0, &sul_helper.sul, connectStatic, 1 * LWS_US_PER_SEC);
}
void SignalingClient::disconnect()
{
running = false;
// TODO
// wait for connectStatic to exit
// close LWS connection
if (wsi)
lws_callback_on_writable(wsi);
}
void SignalingClient::connectStatic(struct lws_sorted_usec_list *sul)
@ -104,6 +113,7 @@ int SignalingClient::protocolCallback(struct lws *wsi, enum lws_callback_reasons
break;
case LWS_CALLBACK_CLIENT_ESTABLISHED:
retry_count = 0;
cbConnected();
break;
@ -145,20 +155,33 @@ do_retry:
int SignalingClient::writable()
{
// Skip if we have nothing to send
if (outgoingMessages.empty())
if (!running) {
auto reason = "Signaling Client Closing";
lws_close_reason(wsi, LWS_CLOSE_STATUS_GOINGAWAY, (unsigned char *) reason, strlen(reason));
return 0;
}
// Skip if we have nothing to send
if (outgoingMessages.empty()) {
return 0;
}
auto msg = outgoingMessages.pop();
auto *jsonMsg = msg.toJSON();
char buf[LWS_PRE + 1024];
auto len = json_dumpb(jsonMsg, buf+LWS_PRE, 1024, JSON_INDENT(2));
if (!jsonMsg) {
return 0;
}
auto ret = lws_write(wsi, (unsigned char *) buf, len, LWS_WRITE_TEXT);
char buf[LWS_PRE + 1024];
auto len = json_dumpb(jsonMsg, buf + LWS_PRE, 1024, JSON_INDENT(2));
auto ret = lws_write(wsi, (unsigned char *) buf + LWS_PRE, len, LWS_WRITE_TEXT);
if (ret < 0)
return ret;
logger->debug("Signaling message sent: {:.{}}", buf + LWS_PRE, len);
// Reschedule callback if there are more messages to be send
if (!outgoingMessages.empty())
lws_callback_on_writable(wsi);
@ -175,9 +198,9 @@ int SignalingClient::receive(void *in, size_t len)
return -1;
}
auto msg = SignalingMessage(json);
logger->debug("Signaling message received: {:.{}}", (char *)in, len);
cbMessage(msg);
cbMessage(SignalingMessage { json });
return 0;
}

View file

@ -5,8 +5,11 @@
* @license Apache 2.0
*********************************************************************************/
#include <fmt/format.h>
#include <villas/utils.hpp>
#include <villas/exceptions.hpp>
#include <villas/nodes/webrtc/signaling_message.hpp>
#include <algorithm>
using namespace villas;
using namespace villas::node;
@ -15,7 +18,7 @@ using namespace villas::node::webrtc;
json_t * Connection::toJSON() const
{
return json_pack("{ s: i, s: s, s: s, s: s }",
return json_pack("{ s:i, s:s, s:s, s:s }",
"id", id,
"remote", remote.c_str(),
"user_agent", userAgent.c_str(),
@ -23,11 +26,11 @@ json_t * Connection::toJSON() const
);
}
Connection::Connection(json_t *j)
Connection::Connection(json_t *json)
{
const char *rem, *ua, *ts;
int ret = json_unpack(j, "{ s: i, s: s, s: s, s: s }",
int ret = json_unpack(json, "{ s:i, s:s, s:s, s:s }",
"id", &id,
"remote", &rem,
"user_agent", &ua,
@ -42,6 +45,40 @@ Connection::Connection(json_t *j)
// TODO: created
}
RelayMessage::RelayMessage(json_t *json)
{
if (!json_is_array(json))
throw RuntimeError("Failed to decode signaling message");
int ret;
char *url;
char *user;
char *pass;
char *realm;
char *expires;
json_t *server_json;
size_t i;
json_array_foreach(json, i, server_json) {
ret = json_unpack(server_json, "{ s:s, s:s, s:s, s:s, s:s }",
"url", &url,
"user", &user,
"pass", &pass,
"realm", &realm,
"expires", &expires
);
if (ret)
throw RuntimeError("Failed to decode signaling message");
auto &server = servers.emplace_back(url);
server.username = user;
server.password = pass;
// TODO warn about unsupported realm
// TODO log info expires time
}
}
json_t * ControlMessage::toJSON() const
{
json_t *json_connections = json_array();
@ -49,10 +86,10 @@ json_t * ControlMessage::toJSON() const
for (auto &c : connections) {
json_t *json_connection = c.toJSON();
json_array_append(json_connections, json_connection);
json_array_append_new(json_connections, json_connection);
}
return json_pack("{ s: i, s: o }",
return json_pack("{ s:i, s:o }",
"connection_id", connectionID,
"connections", json_connections
);
@ -64,7 +101,7 @@ ControlMessage::ControlMessage(json_t *j)
json_t *json_connections;
ret = json_unpack(j, "{ s: i, s: o }",
ret = json_unpack(j, "{ s:i, s:o }",
"connection_id", &connectionID,
"connections", &json_connections
);
@ -82,104 +119,88 @@ ControlMessage::ControlMessage(json_t *j)
json_t * SignalingMessage::toJSON() const
{
switch (type) {
case TYPE_CONTROL:
return json_pack("{ s: s, s: o }",
"type", "control",
"control", control->toJSON()
);
case TYPE_OFFER:
case TYPE_ANSWER:
return json_pack("{ s: s, s: o }",
"type", type == TYPE_OFFER ? "offer" : "answer",
"description", std::string(*description).c_str()
);
case TYPE_CANDIDATE:
return json_pack("{ s: s, s: o }",
"type", "candidate",
"candidate", std::string(*candidate).c_str()
);
}
return nullptr;
return std::visit(villas::utils::overloaded {
[](ControlMessage const &c){
return json_pack("{ s:o }", "control", c.toJSON());
},
[](rtc::Description const &d){
return json_pack("{ s:{ s:s, s:s } }", "description",
"spd", d.generateSdp().c_str(),
"type", d.typeString().c_str()
);
},
[](rtc::Candidate const &c){
return json_pack("{ s:{ s:s, s:s } }", "candidate",
"spd", c.candidate().c_str(),
"mid", c.mid().c_str()
);
},
[](auto &other){
return (json_t *) { nullptr };
}
}, *this);
}
std::string SignalingMessage::toString() const
{
switch (type) {
case TYPE_ANSWER:
return fmt::format("type=answer, description={}", std::string(*description));
case TYPE_OFFER:
return fmt::format("type=offer, description={}", std::string(*description));
case TYPE_CANDIDATE:
return fmt::format("type=candidate, candidate={}", std::string(*candidate));
case TYPE_CONTROL:
return fmt::format("type=control, control={}", json_dumps(control->toJSON(), 0));
}
return "";
return std::visit(villas::utils::overloaded {
[](RelayMessage const &r){
return fmt::format("type=relay");
},
[](ControlMessage const &c){
return fmt::format("type=control, control={}", json_dumps(c.toJSON(), 0));
},
[](rtc::Description const &d){
return fmt::format("type=description, type={}, spd=\n{}", d.typeString(), d.generateSdp());
},
[](rtc::Candidate const &c){
return fmt::format("type=candidate, mid={}, spd=\n{}", c.candidate(), c.mid());
},
[](auto other){
return fmt::format("invalid signaling message");
}
}, *this);
}
SignalingMessage::SignalingMessage(rtc::Description desc, bool answer) :
type(answer ? TYPE_ANSWER : TYPE_OFFER),
description(new rtc::Description(desc))
{ }
SignalingMessage::SignalingMessage(rtc::Candidate cand) :
type(TYPE_CANDIDATE),
candidate(new rtc::Candidate(cand))
{ }
SignalingMessage::~SignalingMessage()
SignalingMessage::SignalingMessage(json_t *json)
{
if (description)
delete description;
if (candidate)
delete candidate;
if (control)
delete control;
}
SignalingMessage::SignalingMessage(json_t *j) :
control(nullptr),
description(nullptr),
candidate(nullptr)
{
json_t *json_control = nullptr;
const char *desc = nullptr;
// relay message
json_t *rlys = nullptr;
// control message
json_t *ctrl = nullptr;
// candidate message
const char *cand = nullptr;
const char *mid = nullptr;
// description message
const char *desc = nullptr;
const char *typ = nullptr;
int ret = json_unpack(j, "{ s: s, s?: s, s?: s, s?: o }",
"type", &typ,
"description", &desc,
"candidate", &cand,
"control", &json_control
int ret = json_unpack(json, "{ s?o, s?o, s?{ s:s, s:s }, s?{ s:s, s:s } }",
"servers", &rlys,
"control", &ctrl,
"candidate",
"spd", &cand,
"mid", &mid,
"description",
"spd", &desc,
"type", &typ
);
if (ret)
// exactly 1 field may be specified
const void *fields[] = { ctrl, cand, desc };
if (ret || std::count(std::begin(fields), std::end(fields), nullptr) < std::make_signed_t<size_t>(std::size(fields)) - 1)
throw RuntimeError("Failed to decode signaling message");
if (!strcmp(typ, "offer")) {
type = TYPE_OFFER;
description = new rtc::Description(desc);
if (rlys) {
emplace<RelayMessage>(rlys);
}
else if (!strcmp(typ, "answer")) {
type = TYPE_ANSWER;
description = new rtc::Description(desc);
else if (ctrl) {
emplace<ControlMessage>(ctrl);
}
else if (!strcmp(typ, "candidate")) {
type = TYPE_CANDIDATE;
candidate = new rtc::Candidate(cand);
else if (cand) {
emplace<rtc::Candidate>(cand, mid);
}
else if (!strcmp(typ, "control")) {
type = TYPE_CONTROL;
control = new ControlMessage(json_control);
else if (desc) {
emplace<rtc::Description>(desc, typ);
}
}