diff --git a/etc/examples/nodes/webrtc.conf b/etc/examples/nodes/webrtc.conf index 7537cff18..f587093ed 100644 --- a/etc/examples/nodes/webrtc.conf +++ b/etc/examples/nodes/webrtc.conf @@ -1,25 +1,15 @@ + nodes = { - webrtc_node = { - type = "webrtc", - + webrtc = { + type = "webrtc" + # required session key. + session = "" + # optional signaling server. + server = "
" + # optional format. format = "json" - - # A unique session identifier which must be shared between two nodes - session = "my-session-name" - - # Address to the websocket signaling server - # server = "wss://villas.k8s.eonerc.rwth-aachen.de/ws/signaling" - server = "ws://docker.for.mac.host.internal:1234" - - # Setting for Interactive Connectivity Establishment - ice = { - # List of STUN/TURN servers - servers = [ - "stun://stun.l.google.com:19302", - "stun:stun.0l.de:3478", - "turn://villas:villas@turn.0l.de:3478?transport=udp", - "turn://villas:villas@turn.0l.de:3478?transport=tcp" - ] - } + # optional initial connect timeout. + wait_seconds = 120 } } + diff --git a/include/villas/nodes/webrtc.hpp b/include/villas/nodes/webrtc.hpp index 6ad999828..7baf24693 100644 --- a/include/villas/nodes/webrtc.hpp +++ b/include/villas/nodes/webrtc.hpp @@ -14,6 +14,8 @@ #include #include #include +#include +#include namespace villas { namespace node { @@ -27,12 +29,14 @@ protected: std::string server; std::string session; - bool wait; - bool ordered; - int max_retransmits; + int wait_seconds; + Format *format; + struct CQueueSignalled queue; + struct Pool pool; std::shared_ptr conn; rtc::Configuration config; + rtc::DataChannelInit dci; virtual int _read(struct Sample *smps[], unsigned cnt); @@ -63,8 +67,8 @@ public: virtual int start(); - // virtual - // int stop(); + virtual + int stop(); // virtual // int pause(); @@ -78,8 +82,8 @@ public: // virtual // int reverse(); - // virtual - // std::vector getPollFDs(); + virtual + std::vector getPollFDs(); // virtual // std::vector getNetemFDs(); diff --git a/include/villas/nodes/webrtc/peer_connection.hpp b/include/villas/nodes/webrtc/peer_connection.hpp index 22fddb0f0..757f5716f 100644 --- a/include/villas/nodes/webrtc/peer_connection.hpp +++ b/include/villas/nodes/webrtc/peer_connection.hpp @@ -21,16 +21,21 @@ namespace webrtc { class PeerConnection { public: - PeerConnection(const std::string &server, const std::string &session, const rtc::Configuration &config, Web *w); + PeerConnection(const std::string &server, const std::string &session, rtc::Configuration config, Web *w, rtc::DataChannelInit d); ~PeerConnection(); - void waitForDataChannel(); + bool waitForDataChannel(std::chrono::seconds timeout); + void onMessage(std::function callback); + void sendMessage(rtc::binary msg); void connect(); + void disconnect(); protected: Web *web; - rtc::Configuration config; + std::vector extraServers; + rtc::DataChannelInit dataChannelInit; + rtc::Configuration defaultConfig; std::shared_ptr conn; std::shared_ptr chan; @@ -38,37 +43,44 @@ protected: Logger logger; - bool makingOffer; - bool ignoreOffer; + std::mutex mutex; + + std::condition_variable_any startupCondition; + bool stopStartup; + + bool warnNotConnected; + bool standby; bool first; - bool polite; - bool rollback; + int firstID; + int secondID; - void createPeerConnection(); - void rollbackPeerConnection(); - void createDatachannel(); + std::function onMessageCallback; - void onConnectionCreated(); + void resetConnection(std::unique_lock &lock); + void resetConnectionAndStandby(std::unique_lock &lock); + void notifyStartup(); + + void setupPeerConnection(std::shared_ptr = nullptr); + void setupDataChannel(std::shared_ptr = nullptr); void onLocalDescription(rtc::Description sdp); void onLocalCandidate(rtc::Candidate cand); - void onNegotiationNeeded(); - void onConnectionStateChange(rtc::PeerConnection::State state); void onSignalingStateChange(rtc::PeerConnection::SignalingState state); void onGatheringStateChange(rtc::PeerConnection::GatheringState state); void onSignalingConnected(); void onSignalingDisconnected(); - void onSignalingError(const std::string &err); - void onSignalingMessage(const SignalingMessage &msg); + void onSignalingError(std::string err); + void onSignalingMessage(SignalingMessage msg); void onDataChannel(std::shared_ptr dc); void onDataChannelOpen(); void onDataChannelClosed(); void onDataChannelError(std::string err); - void onDataChannelMessage(rtc::message_variant msg); + void onDataChannelMessage(rtc::string msg); + void onDataChannelMessage(rtc::binary msg); }; } /* namespace webrtc */ diff --git a/include/villas/nodes/webrtc/signaling_client.hpp b/include/villas/nodes/webrtc/signaling_client.hpp index 1ba62e4bc..7dc21cae6 100644 --- a/include/villas/nodes/webrtc/signaling_client.hpp +++ b/include/villas/nodes/webrtc/signaling_client.hpp @@ -34,17 +34,17 @@ protected: SignalingClient *self; } sul_helper; - uint16_t retry_count; /**> Count of consequetive retries */ + uint16_t retry_count; /**> Count of consecutive retries */ struct lws *wsi; - struct lws_client_connect_info info; + struct lws_client_connect_info info; /* The retry and backoff policy we want to use for our client connections */ - static constexpr uint32_t backoff_ms[] = { 1000, 2000, 3000, 4000, 5000 }; + static constexpr uint32_t backoff_ms[] = { 1<<4, 1<<6, 1<<8, 1<<10, 1<<12, 1<<14, 1<<16 }; static constexpr lws_retry_bo_t retry = { .retry_ms_table = backoff_ms, .retry_ms_table_count = LWS_ARRAY_SIZE(backoff_ms), - .conceal_count = LWS_ARRAY_SIZE(backoff_ms), + .conceal_count = LWS_ARRAY_SIZE(backoff_ms) + 1, .secs_since_valid_ping = 3, /* force PINGs after secs idle */ .secs_since_valid_hangup = 10, /* hangup after secs idle */ @@ -52,10 +52,10 @@ protected: .jitter_percent = 20, }; - std::function cbMessage; + std::function cbMessage; std::function cbConnected; std::function cbDisconnected; - std::function cbError; + std::function cbError; Queue outgoingMessages; @@ -64,6 +64,8 @@ protected: char *uri; char *path; + std::atomic running; + Logger logger; int protocolCallback(struct lws *wsi, enum lws_callback_reasons reason, void *in, size_t len); @@ -86,7 +88,7 @@ public: void sendMessage(const SignalingMessage &msg); - void onMessage(std::function callback) + void onMessage(std::function callback) { cbMessage = callback; } @@ -101,7 +103,7 @@ public: cbDisconnected = callback; } - void onError(std::function callback) + void onError(std::function callback) { cbError = callback; } diff --git a/include/villas/nodes/webrtc/signaling_message.hpp b/include/villas/nodes/webrtc/signaling_message.hpp index 9ecc0a04e..db4279995 100644 --- a/include/villas/nodes/webrtc/signaling_message.hpp +++ b/include/villas/nodes/webrtc/signaling_message.hpp @@ -10,7 +10,8 @@ #include #include -#include +#include +#include #include #include @@ -29,40 +30,29 @@ struct Connection { json_t * toJSON() const; }; +struct RelayMessage { + std::vector servers; + + RelayMessage(json_t *j); +}; + struct ControlMessage { int connectionID; - std::list connections; + std::vector connections; ControlMessage(json_t *j); json_t * toJSON() const; }; -class SignalingMessage { +class SignalingMessage : public std::variant { public: - enum Type { - TYPE_CONTROL, - TYPE_OFFER, - TYPE_ANSWER, - TYPE_CANDIDATE - }; - - Type type; - - ControlMessage *control; - rtc::Description *description; - rtc::Candidate *candidate; - std::string mid; - - SignalingMessage(rtc::Description desc, bool answer = false); - SignalingMessage(rtc::Candidate cand); + using variant::variant; SignalingMessage(json_t *j); - ~SignalingMessage(); - json_t * toJSON() const; - std::string toString() const; }; + } /* namespace webrtc */ } /* namespace node */ } /* namespace villas */ diff --git a/lib/nodes/webrtc.cpp b/lib/nodes/webrtc.cpp index 459c4826c..9efd48472 100644 --- a/lib/nodes/webrtc.cpp +++ b/lib/nodes/webrtc.cpp @@ -24,20 +24,18 @@ static villas::node::Web *web; WebRTCNode::WebRTCNode(const std::string &name) : Node(name), server("wss://villas.k8s.eonerc.rwth-aachen.de/ws/signaling"), - wait(true), - ordered(false), - max_retransmits(0) + wait_seconds(0), + dci({}) { - config.iceServers = { - rtc::IceServer("stun://stun.l.google.com:19302"), - rtc::IceServer("stun://villas:villas@stun.0l.de"), - rtc::IceServer("turn://villas:villas@turn.0l.de?transport=udp"), - rtc::IceServer("turn://villas:villas@turn.0l.de?transport=tcp"), - }; + dci.reliability.type = rtc::Reliability::Type::Rexmit; } WebRTCNode::~WebRTCNode() -{ } +{ + int ret = pool_destroy(&pool); + if (ret) // TODO log + ; +} int WebRTCNode::parse(json_t *json, const uuid_t sn_uuid) { @@ -47,17 +45,20 @@ int WebRTCNode::parse(json_t *json, const uuid_t sn_uuid) const char *sess; const char *svr = nullptr; - int w = - 1, ord = -1; - json_t *json_ice = nullptr; + int ord = -1; + int &rexmit = dci.reliability.rexmit.emplace(0); + json_t *ice_json = nullptr; + json_t *fmt_json = nullptr; json_error_t err; - ret = json_unpack_ex(json, &err, 0, "{ s: s, s?: s, s?: b, s?: i, s?: b, s?: o }", + ret = json_unpack_ex(json, &err, 0, "{ s:s, s?s, s?i, s?i, s?b, s?o }", "session", &sess, "server", &svr, - "wait", &w, - "max_retransmits", &max_retransmits, + "wait_seconds", &wait_seconds, + "max_retransmits", &rexmit, "ordered", &ord, - "ice", &json_ice + "ice", &ice_json, + "format", &fmt_json ); if (ret) throw ConfigError(json, err, "node-config-node-webrtc"); @@ -67,16 +68,13 @@ int WebRTCNode::parse(json_t *json, const uuid_t sn_uuid) if (svr) server = svr; - if (w >= 0) - wait = w != 0; + if (ord) + dci.reliability.unordered = !ord; - if (ord >= 0) - ordered = ordered != 0; - - if (json_ice) { + if (ice_json) { json_t *json_servers = nullptr; - ret = json_unpack_ex(json_ice, &err, 0, "{ s?: o }", + ret = json_unpack_ex(ice_json, &err, 0, "{ s?: o }", "servers", &json_servers ); if (ret) @@ -101,43 +99,85 @@ int WebRTCNode::parse(json_t *json, const uuid_t sn_uuid) } } + format = fmt_json + ? FormatFactory::make(fmt_json) + : FormatFactory::make("villas.binary"); + + assert(format); + return 0; } int WebRTCNode::check() { - return 0; + return Node::check(); } int WebRTCNode::prepare() { - conn = std::make_shared(server, session, config, web); + int ret = Node::prepare(); + if (ret) + return ret; - return Node::prepare(); -} + format->start(getInputSignals(false), ~(int) SampleFlags::HAS_OFFSET); -int WebRTCNode::start() -{ - conn->connect(); + conn = std::make_shared(server, session, config, web, dci); - if (wait) { - logger->info("Wait until datachannel is connected..."); + ret = pool_init(&pool, 1024, SAMPLE_LENGTH(getInputSignals(false)->size())); + if (ret) // TODO log + return ret; - conn->waitForDataChannel(); - } + ret = queue_signalled_init(&queue, 1024); + if (ret) // TODO log + return ret; - int ret = Node::start(); - if (!ret) - state = State::STARTED; + conn->onMessage([this](rtc::binary msg){ + int ret; + std::vector smps; + smps.resize(this->in.vectorize); + + ret = sample_alloc_many(&this->pool, smps.data(), smps.size()); + if (ret < 0) // TODO log + return; + + ret = format->sscan((const char *)msg.data(), msg.size(), nullptr, smps.data(), ret); + if (ret < 0) // TODO log + return; + + ret = queue_signalled_push_many(&this->queue, (void **) smps.data(), ret); + if (ret < 0) // TODO log + return; + + this->logger->debug("onMessage(rtc::binary) callback finished pushing {} samples", ret); + }); return 0; } -// int WebRTCNode::stop() -// { -// // TODO add implementation here -// return 0; -// } +int WebRTCNode::start() +{ + int ret = Node::start(); + if (!ret) + state = State::STARTED; + + conn->connect(); + + if (wait_seconds > 0) { + logger->info("Waiting for datachannel..."); + + if (!conn->waitForDataChannel(std::chrono::seconds { wait_seconds })) { + throw RuntimeError { "Waiting for datachannel timed out after {} seconds", wait_seconds }; + } + } + + return 0; +} + +int WebRTCNode::stop() +{ + conn->disconnect(); + return Node::stop(); +} // int WebRTCNode::pause() // { @@ -163,11 +203,10 @@ int WebRTCNode::start() // return 0; // } -// std::vector WebRTCNode::getPollFDs() -// { -// // TODO add implementation here -// return {}; -// } +std::vector WebRTCNode::getPollFDs() +{ + return { queue_signalled_fd(&queue) }; +} // std::vector WebRTCNode::getNetemFDs() // { @@ -188,36 +227,31 @@ const std::string & WebRTCNode::getDetails() int WebRTCNode::_read(struct Sample *smps[], unsigned cnt) { - int read; - struct timespec now; + std::vector smpt; + smpt.resize(cnt); - /* TODO: Add implementation here. The following is just an example */ + int pulled = queue_signalled_pull_many(&queue, (void **) smpt.data(), smpt.size()); - assert(cnt >= 1 && smps[0]->capacity >= 1); + sample_copy_many(smps, smpt.data(), pulled); + sample_decref_many(smpt.data(), pulled); - now = time_now(); - - // smps[0]->data[0].f = time_delta(&now, &start_time); - - /* Dont forget to set other flags in struct Sample::flags - * E.g. for sequence no, timestamps... */ - smps[0]->flags = (int) SampleFlags::HAS_DATA; - smps[0]->signals = getInputSignals(false); - - read = 1; /* The number of samples read */ - - return read; + return pulled; } int WebRTCNode::_write(struct Sample *smps[], unsigned cnt) { - int written; + rtc::binary buf; + size_t wbytes; - /* TODO: Add implementation here. */ + buf.resize(4 * 1024); + int ret = format->sprint((char *) buf.data(), buf.size(), &wbytes, smps, cnt); + if (ret < 0) // TODO log + return ret; - written = 0; /* The number of samples written */ + buf.resize(wbytes); + conn->sendMessage(buf); - return written; + return ret; } int WebRTCNodeFactory::start(SuperNode *sn) diff --git a/lib/nodes/webrtc/peer_connection.cpp b/lib/nodes/webrtc/peer_connection.cpp index d04e1f1a2..77e5cd56f 100644 --- a/lib/nodes/webrtc/peer_connection.cpp +++ b/lib/nodes/webrtc/peer_connection.cpp @@ -8,7 +8,10 @@ #include #include - +#include +#include +#include +#include #include using namespace std::placeholders; @@ -17,23 +20,59 @@ using namespace villas; using namespace villas::node; using namespace villas::node::webrtc; -PeerConnection::PeerConnection(const std::string &server, const std::string &session, const rtc::Configuration &cfg, Web *w) : +PeerConnection::PeerConnection(const std::string &server, const std::string &session, rtc::Configuration cfg, Web *w, rtc::DataChannelInit d) : web(w), - config(cfg), + dataChannelInit(d), + defaultConfig(cfg), logger(logging.get("webrtc:pc")) - { client = std::make_shared(server, session, web); + client->onConnected([this](){ this->onSignalingConnected(); }); + client->onDisconnected([this](){ this->onSignalingDisconnected(); }); + client->onError([this](auto err){ this->onSignalingError(std::move(err)); }); + client->onMessage([this](auto msg){ this->onSignalingMessage(std::move(msg)); }); - client->onConnected(std::bind(&PeerConnection::onSignalingConnected, this)); - client->onDisconnected(std::bind(&PeerConnection::onSignalingDisconnected, this)); - client->onError(std::bind(&PeerConnection::onSignalingError, this, _1)); - client->onMessage(std::bind(&PeerConnection::onSignalingMessage, this, _1)); + auto lock = std::unique_lock { mutex }; + resetConnectionAndStandby(lock); } PeerConnection::~PeerConnection() { - // TODO +} + +bool PeerConnection::waitForDataChannel(std::chrono::seconds timeout) +{ + auto lock = std::unique_lock { mutex }; + + auto deadline = std::chrono::steady_clock::now() + timeout; + + return startupCondition.wait_until(lock, deadline, [this](){ return this->stopStartup; }); +} + +void PeerConnection::notifyStartup() +{ + stopStartup = true; + startupCondition.notify_all(); +} + +void PeerConnection::onMessage(std::function callback) +{ + auto lock = std::unique_lock { mutex }; + + onMessageCallback = callback; +} + +void PeerConnection::sendMessage(rtc::binary msg) +{ + auto lock = std::unique_lock { mutex }; + + if (chan && chan->isOpen()) { + chan->send(msg); + warnNotConnected = true; + } else if (warnNotConnected) { + logger->warn("Dropping messages. No peer connected."); + warnNotConnected = false; + } } void PeerConnection::connect() @@ -41,79 +80,154 @@ void PeerConnection::connect() client->connect(); } -void PeerConnection::waitForDataChannel() +void PeerConnection::disconnect() { - while (!chan) { - // TODO: signal available via condition variable - std::this_thread::sleep_for(std::chrono::seconds(1)); - } + client->disconnect(); } -void PeerConnection::createPeerConnection() +void PeerConnection::resetConnection(std::unique_lock &lock) { - // Setup peer connection - conn = std::make_shared(config); - - conn->onLocalDescription(std::bind(&PeerConnection::onLocalDescription, this, _1)); - conn->onLocalCandidate(std::bind(&PeerConnection::onLocalCandidate, this, _1)); - - conn->onDataChannel(std::bind(&PeerConnection::onDataChannel, this, _1)); - conn->onGatheringStateChange(std::bind(&PeerConnection::onGatheringStateChange, this, _1)); - conn->onSignalingStateChange(std::bind(&PeerConnection::onSignalingStateChange, this, _1)); - conn->onStateChange(std::bind(&PeerConnection::onConnectionStateChange, this, _1)); + lock.unlock(); + chan.reset(); + conn.reset(); + lock.lock(); } -void PeerConnection::rollbackPeerConnection() +void PeerConnection::resetConnectionAndStandby(std::unique_lock &lock) { + if (!standby) + logger->info("Going to standby"); + standby = true; + first = false; + firstID = INT_MAX; + secondID = INT_MAX; + warnNotConnected = false; + + resetConnection(lock); } -void PeerConnection::createDatachannel() +void PeerConnection::setupPeerConnection(std::shared_ptr pc) { + logger->debug("Setup {} peer connection", pc ? "existing" : "new"); + auto config = defaultConfig; + config.iceServers.insert(std::end(config.iceServers), std::begin(extraServers), std::end(extraServers)); + + conn = pc ? std::move(pc) : std::make_shared(config); + conn->onLocalDescription([this](auto desc){ this->onLocalDescription(std::move(desc)); }); + conn->onLocalCandidate([this](auto cand){ this->onLocalCandidate(std::move(cand)); }); + conn->onDataChannel([this](auto channel){ this->onDataChannel(std::move(channel)); }); + conn->onGatheringStateChange([this](auto state){ this->onGatheringStateChange(std::move(state)); }); + conn->onSignalingStateChange([this](auto state){ this->onSignalingStateChange(std::move(state)); }); + conn->onStateChange([this](auto state){ this->onConnectionStateChange(std::move(state)); }); } -void PeerConnection::onConnectionCreated() +void PeerConnection::setupDataChannel(std::shared_ptr dc) { - logger->debug("Connection created"); + logger->debug("Setup {} data channel", dc ? "existing" : "new"); + + assert(conn); + chan = dc ? std::move(dc) : conn->createDataChannel("villas", dataChannelInit); + chan->onMessage( + [this](rtc::binary msg){ this->onDataChannelMessage(std::move(msg)); }, + [this](rtc::string msg){ this->onDataChannelMessage(std::move(msg)); } + ); + chan->onOpen([this](){ this->onDataChannelOpen(); }); + chan->onClosed([this](){ this->onDataChannelClosed(); }); + chan->onError([this](auto err){ this->onDataChannelError(std::move(err)); }); + + // if this node has it's data channel set up, don't accept any new ones + conn->onDataChannel(nullptr); } -void PeerConnection::onLocalDescription(rtc::Description sdp) +void PeerConnection::onLocalDescription(rtc::Description desc) { - logger->debug("New local description: {}", std::string(sdp)); + logger->debug("New local description: type={} sdp=\n{}", desc.typeString(), desc.generateSdp()); - SignalingMessage msg(sdp); + auto lock = std::unique_lock { mutex }; - client->sendMessage(msg); + client->sendMessage(desc); } void PeerConnection::onLocalCandidate(rtc::Candidate cand) { - logger->debug("New local candidate: {}", std::string(cand)); + logger->debug("New local candidate: {}", std::string { cand }); - SignalingMessage msg(cand); + auto lock = std::unique_lock { mutex }; - client->sendMessage(msg); -} - -void PeerConnection::onNegotiationNeeded() -{ - logger->debug("Negotation needed"); + client->sendMessage(cand); } void PeerConnection::onConnectionStateChange(rtc::PeerConnection::State state) { - logger->debug("Connection State changed: {}", state); + //logger->debug("Connection State changed: {}", state); + + auto lock = std::unique_lock { mutex }; + + switch (state) { + case rtc::PeerConnection::State::New: { + logger->debug("New peer connection"); + break; + } + + case rtc::PeerConnection::State::Connecting: { + logger->debug("Peer connection connecting."); + break; + } + + case rtc::PeerConnection::State::Connected: { + rtc::Candidate local, remote; + std::optional rtt = conn->rtt(); + if (conn->getSelectedCandidatePair(&local, &remote)) { + std::stringstream l, r; + l << local, r << remote; + logger->debug( + "Peer connection connected:\n" + "local: {}\n" + "remote: {}\n" + "bytes sent: {} / bytes received: {} / rtt: {}\n", + l.str(), + r.str(), + conn->bytesSent(), + conn->bytesReceived(), + rtt.value_or(decltype(rtt)::value_type { 0 }) + ); + } else { + logger->debug( + "Peer connection connected.\n" + "Could not get candidate pair info.\n" + ); + } + break; + } + + case rtc::PeerConnection::State::Disconnected: + case rtc::PeerConnection::State::Failed: { + logger->debug("Closing peer connection"); + break; + } + + case rtc::PeerConnection::State::Closed: { + logger->debug("Closed peer connection"); + resetConnectionAndStandby(lock); + break; + } + } } void PeerConnection::onSignalingStateChange(rtc::PeerConnection::SignalingState state) { - logger->debug("Signaling state changed: {}", state); + std::stringstream s; + s << state; + logger->debug("Signaling state changed: {}", s.str()); } void PeerConnection::onGatheringStateChange(rtc::PeerConnection::GatheringState state) { - logger->debug("Gathering state changed: {}", state); + std::stringstream s; + s << state; + logger->debug("Gathering state changed: {}", s.str()); } void PeerConnection::onSignalingConnected() @@ -124,55 +238,153 @@ void PeerConnection::onSignalingConnected() void PeerConnection::onSignalingDisconnected() { logger->debug("Signaling connection closed"); + + auto lock = std::unique_lock { mutex }; + + resetConnectionAndStandby(lock); } -void PeerConnection::onSignalingError(const std::string &err) +void PeerConnection::onSignalingError(std::string err) { logger->debug("Signaling connection error: {}", err); + + auto lock = std::unique_lock { mutex }; + + resetConnectionAndStandby(lock); } -void PeerConnection::onSignalingMessage(const SignalingMessage &msg) +void PeerConnection::onSignalingMessage(SignalingMessage msg) { logger->debug("Signaling message received: {}", msg.toString()); - if (msg.type == SignalingMessage::TYPE_ANSWER || msg.type == SignalingMessage::TYPE_OFFER) - conn->setRemoteDescription(*msg.description); + auto lock = std::unique_lock { mutex }; - if (msg.type == SignalingMessage::TYPE_CANDIDATE) - conn->addRemoteCandidate(*msg.candidate); + std::visit(villas::utils::overloaded { + [&](RelayMessage &c){ + extraServers = std::move(c.servers); + }, + + [&](ControlMessage &c){ + auto const &id = c.connectionID; + + if (c.connections.size() < 2) { + resetConnectionAndStandby(lock); + return; + } + + auto fst = INT_MAX, snd = INT_MAX; + for (auto &c : c.connections) { + if (c.id < fst) { + snd = fst; + fst = c.id; + } else if (c.id < snd) { + snd = c.id; + } + } + + standby = (id != fst && id != snd); + + if (standby) { + logger->error("There are already two peers connected to this session. Waiting in standby."); + return; + } + + if (fst == firstID && snd == secondID) { + logger->debug("Ignoring control message. This connection is already being established."); + return; + } + + resetConnection(lock); + + first = (id == fst); + firstID = fst; + secondID = snd; + + setupPeerConnection(); + + if (!first) { + setupDataChannel(); + conn->setLocalDescription(rtc::Description::Type::Offer); + } + + logger->trace("New connection pair: first={}, second={}, I am {}", firstID, secondID, first ? "first" : "second"); + }, + + [&](rtc::Description d){ + if (standby || !conn || (!first && d.type() == rtc::Description::Type::Offer)) + return; + + conn->setRemoteDescription(d); + }, + + [&](rtc::Candidate c){ + if (standby || !conn) + return; + + conn->addRemoteCandidate(c); + }, + + [&](auto other){ + logger->warn("unknown signaling message"); + } + }, msg); } void PeerConnection::onDataChannel(std::shared_ptr dc) { - logger->debug("New data channel: id={}, stream={}, protocol={}, max_msg_size={}, label={}", + /*logger->debug("New data channel: id={}, stream={}, protocol={}, max_msg_size={}, label={}", dc->id(), dc->stream(), dc->protocol(), dc->maxMessageSize(), dc->label()); + */ - dc->onMessage(std::bind(&PeerConnection::onDataChannelMessage, this, _1)); - dc->onOpen(std::bind(&PeerConnection::onDataChannelOpen, this)); - dc->onClosed(std::bind(&PeerConnection::onDataChannelClosed, this)); - dc->onError(std::bind(&PeerConnection::onDataChannelError, this, _1)); + auto lock = std::unique_lock { mutex }; + + setupDataChannel(std::move(dc)); } void PeerConnection::onDataChannelOpen() { logger->debug("Datachannel opened"); + + auto lock = std::unique_lock { mutex }; + + chan->send("Hello from VILLASnode"); + + notifyStartup(); } void PeerConnection::onDataChannelClosed() { logger->debug("Datachannel closed"); + + auto lock = std::unique_lock { mutex }; + + resetConnectionAndStandby(lock); } void PeerConnection::onDataChannelError(std::string err) { logger->error("Datachannel error: {}", err); + + auto lock = std::unique_lock { mutex }; + + resetConnectionAndStandby(lock); } -void PeerConnection::onDataChannelMessage(rtc::message_variant msg) +void PeerConnection::onDataChannelMessage(rtc::string msg) { - logger->debug("Datachannel message received"); + logger->info("Received: {}", msg); +} + +void PeerConnection::onDataChannelMessage(rtc::binary msg) +{ + logger->trace("Received binary data"); + + auto lock = std::unique_lock { mutex }; + + if (onMessageCallback) + onMessageCallback(msg); } diff --git a/lib/nodes/webrtc/signaling_client.cpp b/lib/nodes/webrtc/signaling_client.cpp index e99569860..30b95bb71 100644 --- a/lib/nodes/webrtc/signaling_client.cpp +++ b/lib/nodes/webrtc/signaling_client.cpp @@ -16,20 +16,20 @@ using namespace villas::node::webrtc; SignalingClient::SignalingClient(const std::string &srv, const std::string &sess, Web *w) : web(w), + running(false), logger(logging.get("webrtc:signal")) { const char *prot, *a, *p; memset(&info, 0, sizeof(info)); - asprintf(&uri, "%s/%s", srv.c_str(), sess.c_str()); + (void)!asprintf(&uri, "%s/%s", srv.c_str(), sess.c_str()); int ret = lws_parse_uri(uri, &prot, &a, &info.port, &p); if (ret) throw RuntimeError("Failed to parse WebSocket URI: '{}'", uri); - - asprintf(&path, "/%s", p); + (void)!asprintf(&path, "/%s", p); info.ssl_connection = !strcmp(prot, "https"); info.address = a; @@ -44,25 +44,34 @@ SignalingClient::SignalingClient(const std::string &srv, const std::string &sess info.userdata = this; sul_helper.self = this; + sul_helper.sul = {}; } SignalingClient::~SignalingClient() { + disconnect(); + free(path); free(uri); } void SignalingClient::connect() { - info.context = web->getContext(); - info.vhost = web->getVHost(); + running = true; - lws_sul_schedule(info.context, 0, &sul_helper.sul, connectStatic, 1); + info.context = web->getContext(); + + lws_sul_schedule(info.context, 0, &sul_helper.sul, connectStatic, 1 * LWS_US_PER_SEC); } void SignalingClient::disconnect() { + running = false; // TODO + // wait for connectStatic to exit + // close LWS connection + if (wsi) + lws_callback_on_writable(wsi); } void SignalingClient::connectStatic(struct lws_sorted_usec_list *sul) @@ -104,6 +113,7 @@ int SignalingClient::protocolCallback(struct lws *wsi, enum lws_callback_reasons break; case LWS_CALLBACK_CLIENT_ESTABLISHED: + retry_count = 0; cbConnected(); break; @@ -145,20 +155,33 @@ do_retry: int SignalingClient::writable() { - // Skip if we have nothing to send - if (outgoingMessages.empty()) + if (!running) { + auto reason = "Signaling Client Closing"; + lws_close_reason(wsi, LWS_CLOSE_STATUS_GOINGAWAY, (unsigned char *) reason, strlen(reason)); return 0; + } + + // Skip if we have nothing to send + if (outgoingMessages.empty()) { + return 0; + } auto msg = outgoingMessages.pop(); auto *jsonMsg = msg.toJSON(); - char buf[LWS_PRE + 1024]; - auto len = json_dumpb(jsonMsg, buf+LWS_PRE, 1024, JSON_INDENT(2)); + if (!jsonMsg) { + return 0; + } - auto ret = lws_write(wsi, (unsigned char *) buf, len, LWS_WRITE_TEXT); + char buf[LWS_PRE + 1024]; + auto len = json_dumpb(jsonMsg, buf + LWS_PRE, 1024, JSON_INDENT(2)); + + auto ret = lws_write(wsi, (unsigned char *) buf + LWS_PRE, len, LWS_WRITE_TEXT); if (ret < 0) return ret; + logger->debug("Signaling message sent: {:.{}}", buf + LWS_PRE, len); + // Reschedule callback if there are more messages to be send if (!outgoingMessages.empty()) lws_callback_on_writable(wsi); @@ -175,9 +198,9 @@ int SignalingClient::receive(void *in, size_t len) return -1; } - auto msg = SignalingMessage(json); + logger->debug("Signaling message received: {:.{}}", (char *)in, len); - cbMessage(msg); + cbMessage(SignalingMessage { json }); return 0; } diff --git a/lib/nodes/webrtc/signaling_message.cpp b/lib/nodes/webrtc/signaling_message.cpp index f7b69754f..f751c0cdf 100644 --- a/lib/nodes/webrtc/signaling_message.cpp +++ b/lib/nodes/webrtc/signaling_message.cpp @@ -5,8 +5,11 @@ * @license Apache 2.0 *********************************************************************************/ +#include +#include #include #include +#include using namespace villas; using namespace villas::node; @@ -15,7 +18,7 @@ using namespace villas::node::webrtc; json_t * Connection::toJSON() const { - return json_pack("{ s: i, s: s, s: s, s: s }", + return json_pack("{ s:i, s:s, s:s, s:s }", "id", id, "remote", remote.c_str(), "user_agent", userAgent.c_str(), @@ -23,11 +26,11 @@ json_t * Connection::toJSON() const ); } -Connection::Connection(json_t *j) +Connection::Connection(json_t *json) { const char *rem, *ua, *ts; - int ret = json_unpack(j, "{ s: i, s: s, s: s, s: s }", + int ret = json_unpack(json, "{ s:i, s:s, s:s, s:s }", "id", &id, "remote", &rem, "user_agent", &ua, @@ -42,6 +45,40 @@ Connection::Connection(json_t *j) // TODO: created } +RelayMessage::RelayMessage(json_t *json) +{ + + if (!json_is_array(json)) + throw RuntimeError("Failed to decode signaling message"); + + int ret; + char *url; + char *user; + char *pass; + char *realm; + char *expires; + json_t *server_json; + size_t i; + json_array_foreach(json, i, server_json) { + ret = json_unpack(server_json, "{ s:s, s:s, s:s, s:s, s:s }", + "url", &url, + "user", &user, + "pass", &pass, + "realm", &realm, + "expires", &expires + ); + if (ret) + throw RuntimeError("Failed to decode signaling message"); + + auto &server = servers.emplace_back(url); + server.username = user; + server.password = pass; + + // TODO warn about unsupported realm + // TODO log info expires time + } +} + json_t * ControlMessage::toJSON() const { json_t *json_connections = json_array(); @@ -49,10 +86,10 @@ json_t * ControlMessage::toJSON() const for (auto &c : connections) { json_t *json_connection = c.toJSON(); - json_array_append(json_connections, json_connection); + json_array_append_new(json_connections, json_connection); } - return json_pack("{ s: i, s: o }", + return json_pack("{ s:i, s:o }", "connection_id", connectionID, "connections", json_connections ); @@ -64,7 +101,7 @@ ControlMessage::ControlMessage(json_t *j) json_t *json_connections; - ret = json_unpack(j, "{ s: i, s: o }", + ret = json_unpack(j, "{ s:i, s:o }", "connection_id", &connectionID, "connections", &json_connections ); @@ -82,104 +119,88 @@ ControlMessage::ControlMessage(json_t *j) json_t * SignalingMessage::toJSON() const { - switch (type) { - case TYPE_CONTROL: - return json_pack("{ s: s, s: o }", - "type", "control", - "control", control->toJSON() - ); - - case TYPE_OFFER: - case TYPE_ANSWER: - return json_pack("{ s: s, s: o }", - "type", type == TYPE_OFFER ? "offer" : "answer", - "description", std::string(*description).c_str() - ); - - case TYPE_CANDIDATE: - return json_pack("{ s: s, s: o }", - "type", "candidate", - "candidate", std::string(*candidate).c_str() - ); - } - - return nullptr; + return std::visit(villas::utils::overloaded { + [](ControlMessage const &c){ + return json_pack("{ s:o }", "control", c.toJSON()); + }, + [](rtc::Description const &d){ + return json_pack("{ s:{ s:s, s:s } }", "description", + "spd", d.generateSdp().c_str(), + "type", d.typeString().c_str() + ); + }, + [](rtc::Candidate const &c){ + return json_pack("{ s:{ s:s, s:s } }", "candidate", + "spd", c.candidate().c_str(), + "mid", c.mid().c_str() + ); + }, + [](auto &other){ + return (json_t *) { nullptr }; + } + }, *this); } std::string SignalingMessage::toString() const { - switch (type) { - case TYPE_ANSWER: - return fmt::format("type=answer, description={}", std::string(*description)); - - case TYPE_OFFER: - return fmt::format("type=offer, description={}", std::string(*description)); - - case TYPE_CANDIDATE: - return fmt::format("type=candidate, candidate={}", std::string(*candidate)); - - case TYPE_CONTROL: - return fmt::format("type=control, control={}", json_dumps(control->toJSON(), 0)); - } - - return ""; + return std::visit(villas::utils::overloaded { + [](RelayMessage const &r){ + return fmt::format("type=relay"); + }, + [](ControlMessage const &c){ + return fmt::format("type=control, control={}", json_dumps(c.toJSON(), 0)); + }, + [](rtc::Description const &d){ + return fmt::format("type=description, type={}, spd=\n{}", d.typeString(), d.generateSdp()); + }, + [](rtc::Candidate const &c){ + return fmt::format("type=candidate, mid={}, spd=\n{}", c.candidate(), c.mid()); + }, + [](auto other){ + return fmt::format("invalid signaling message"); + } + }, *this); } -SignalingMessage::SignalingMessage(rtc::Description desc, bool answer) : - type(answer ? TYPE_ANSWER : TYPE_OFFER), - description(new rtc::Description(desc)) -{ } - -SignalingMessage::SignalingMessage(rtc::Candidate cand) : - type(TYPE_CANDIDATE), - candidate(new rtc::Candidate(cand)) -{ } - -SignalingMessage::~SignalingMessage() +SignalingMessage::SignalingMessage(json_t *json) { - if (description) - delete description; - - if (candidate) - delete candidate; - - if (control) - delete control; -} - -SignalingMessage::SignalingMessage(json_t *j) : - control(nullptr), - description(nullptr), - candidate(nullptr) -{ - json_t *json_control = nullptr; - const char *desc = nullptr; + // relay message + json_t *rlys = nullptr; + // control message + json_t *ctrl = nullptr; + // candidate message const char *cand = nullptr; + const char *mid = nullptr; + // description message + const char *desc = nullptr; const char *typ = nullptr; - int ret = json_unpack(j, "{ s: s, s?: s, s?: s, s?: o }", - "type", &typ, - "description", &desc, - "candidate", &cand, - "control", &json_control + int ret = json_unpack(json, "{ s?o, s?o, s?{ s:s, s:s }, s?{ s:s, s:s } }", + "servers", &rlys, + "control", &ctrl, + "candidate", + "spd", &cand, + "mid", &mid, + "description", + "spd", &desc, + "type", &typ ); - if (ret) + + // exactly 1 field may be specified + const void *fields[] = { ctrl, cand, desc }; + if (ret || std::count(std::begin(fields), std::end(fields), nullptr) < std::make_signed_t(std::size(fields)) - 1) throw RuntimeError("Failed to decode signaling message"); - if (!strcmp(typ, "offer")) { - type = TYPE_OFFER; - description = new rtc::Description(desc); + if (rlys) { + emplace(rlys); } - else if (!strcmp(typ, "answer")) { - type = TYPE_ANSWER; - description = new rtc::Description(desc); + else if (ctrl) { + emplace(ctrl); } - else if (!strcmp(typ, "candidate")) { - type = TYPE_CANDIDATE; - candidate = new rtc::Candidate(cand); + else if (cand) { + emplace(cand, mid); } - else if (!strcmp(typ, "control")) { - type = TYPE_CONTROL; - control = new ControlMessage(json_control); + else if (desc) { + emplace(desc, typ); } }