diff --git a/etc/examples/nodes/webrtc.conf b/etc/examples/nodes/webrtc.conf index 69b130eae..7537cff18 100644 --- a/etc/examples/nodes/webrtc.conf +++ b/etc/examples/nodes/webrtc.conf @@ -8,24 +8,18 @@ nodes = { session = "my-session-name" # Address to the websocket signaling server - server = "wss://villas.k8s.eonerc.rwth-aachen.de/ws/signaling" + # server = "wss://villas.k8s.eonerc.rwth-aachen.de/ws/signaling" + server = "ws://docker.for.mac.host.internal:1234" # Setting for Interactive Connectivity Establishment ice = { # List of STUN/TURN servers - servers = ( - { - urls = [ - "stun:stun.0l.de:3478", - "turn:turn.0l.de:3478?transport=udp", - "turn:turn.0l.de:3478?transport=tcp" - ], - - # Credentials for TURN servers - username = "villas" - password = "villas" - } - ) + servers = [ + "stun://stun.l.google.com:19302", + "stun:stun.0l.de:3478", + "turn://villas:villas@turn.0l.de:3478?transport=udp", + "turn://villas:villas@turn.0l.de:3478?transport=tcp" + ] } } } diff --git a/include/villas/node/config.hpp.in b/include/villas/node/config.hpp.in index f1d3d30d3..68e9a245a 100644 --- a/include/villas/node/config.hpp.in +++ b/include/villas/node/config.hpp.in @@ -37,6 +37,7 @@ /* Available Features */ #cmakedefine WITH_WEB #cmakedefine WITH_NODE_WEBSOCKET +#cmakedefine WITH_NODE_WEBRTC #cmakedefine WITH_NODE_OPAL #cmakedefine WITH_API #cmakedefine WITH_HOOKS diff --git a/include/villas/nodes/webrtc.hpp b/include/villas/nodes/webrtc.hpp index da46639a1..6ad999828 100644 --- a/include/villas/nodes/webrtc.hpp +++ b/include/villas/nodes/webrtc.hpp @@ -1,4 +1,4 @@ -/** The WebRTC node-type. +/** Node-type webrtc. * * @file * @author Steffen Vogel @@ -8,6 +8,9 @@ #pragma once +#include + +#include #include #include #include @@ -21,6 +24,15 @@ struct Sample; class WebRTCNode : public Node { protected: + std::string server; + std::string session; + + bool wait; + bool ordered; + int max_retransmits; + + std::shared_ptr conn; + rtc::Configuration config; virtual int _read(struct Sample *smps[], unsigned cnt); @@ -80,5 +92,46 @@ public: }; +class WebRTCNodeFactory : public NodeFactory { + +public: + using NodeFactory::NodeFactory; + + virtual + Node * make() + { + auto *n = new WebRTCNode(); + + init(n); + + return n; + } + + virtual + int getFlags() const + { + return (int) NodeFactory::Flags::SUPPORTS_READ | + (int) NodeFactory::Flags::SUPPORTS_WRITE | + (int) NodeFactory::Flags::SUPPORTS_POLL | + (int) NodeFactory::Flags::REQUIRES_WEB; + } + + virtual + std::string getName() const + { + return "webrtc"; + } + + virtual + std::string getDescription() const + { + return "Web Real-time Communication"; + } + + virtual + int start(SuperNode *sn); +}; + + } /* namespace node */ } /* namespace villas */ diff --git a/include/villas/nodes/webrtc/peer_connection.hpp b/include/villas/nodes/webrtc/peer_connection.hpp new file mode 100644 index 000000000..22fddb0f0 --- /dev/null +++ b/include/villas/nodes/webrtc/peer_connection.hpp @@ -0,0 +1,77 @@ +/** WebRTC peer connection + * + * @file + * @author Steffen Vogel + * @copyright 2014-2022, Institute for Automation of Complex Power Systems, EONERC + * @license Apache 2.0 + *********************************************************************************/ + +#pragma once + +#include + +#include +#include +#include + +namespace villas { +namespace node { +namespace webrtc { + +class PeerConnection { + +public: + PeerConnection(const std::string &server, const std::string &session, const rtc::Configuration &config, Web *w); + ~PeerConnection(); + + void waitForDataChannel(); + + void connect(); + +protected: + Web *web; + rtc::Configuration config; + + std::shared_ptr conn; + std::shared_ptr chan; + std::shared_ptr client; + + Logger logger; + + bool makingOffer; + bool ignoreOffer; + bool first; + bool polite; + bool rollback; + + void createPeerConnection(); + void rollbackPeerConnection(); + void createDatachannel(); + + void onConnectionCreated(); + + void onLocalDescription(rtc::Description sdp); + void onLocalCandidate(rtc::Candidate cand); + + void onNegotiationNeeded(); + + void onConnectionStateChange(rtc::PeerConnection::State state); + void onSignalingStateChange(rtc::PeerConnection::SignalingState state); + void onGatheringStateChange(rtc::PeerConnection::GatheringState state); + + void onSignalingConnected(); + void onSignalingDisconnected(); + void onSignalingError(const std::string &err); + void onSignalingMessage(const SignalingMessage &msg); + + void onDataChannel(std::shared_ptr dc); + void onDataChannelOpen(); + void onDataChannelClosed(); + void onDataChannelError(std::string err); + void onDataChannelMessage(rtc::message_variant msg); +}; + +} /* namespace webrtc */ +} /* namespace node */ +} /* namespace villas */ + diff --git a/include/villas/nodes/webrtc/signaling_client.hpp b/include/villas/nodes/webrtc/signaling_client.hpp new file mode 100644 index 000000000..1ba62e4bc --- /dev/null +++ b/include/villas/nodes/webrtc/signaling_client.hpp @@ -0,0 +1,113 @@ +/** WebRTC signaling client + * + * @file + * @author Steffen Vogel + * @copyright 2014-2022, Institute for Automation of Complex Power Systems, EONERC + * @license Apache 2.0 + *********************************************************************************/ + +#pragma once + +#include +#include + +#include + +#include +#include +#include +#include + +namespace villas { +namespace node { + +/* Forward declarations */ +class Web; + +namespace webrtc { + +class SignalingClient { + +protected: + struct sul_offsetof_helper { + lws_sorted_usec_list_t sul; /**> Schedule connection retry */ + SignalingClient *self; + } sul_helper; + + uint16_t retry_count; /**> Count of consequetive retries */ + + struct lws *wsi; + struct lws_client_connect_info info; + + /* The retry and backoff policy we want to use for our client connections */ + static constexpr uint32_t backoff_ms[] = { 1000, 2000, 3000, 4000, 5000 }; + static constexpr lws_retry_bo_t retry = { + .retry_ms_table = backoff_ms, + .retry_ms_table_count = LWS_ARRAY_SIZE(backoff_ms), + .conceal_count = LWS_ARRAY_SIZE(backoff_ms), + + .secs_since_valid_ping = 3, /* force PINGs after secs idle */ + .secs_since_valid_hangup = 10, /* hangup after secs idle */ + + .jitter_percent = 20, + }; + + std::function cbMessage; + std::function cbConnected; + std::function cbDisconnected; + std::function cbError; + + Queue outgoingMessages; + + Web *web; + + char *uri; + char *path; + + Logger logger; + + int protocolCallback(struct lws *wsi, enum lws_callback_reasons reason, void *in, size_t len); + + static + void connectStatic(struct lws_sorted_usec_list *sul); + + int receive(void *in, size_t len); + int writable(); + +public: + SignalingClient(const std::string &server, const std::string &session, Web *w); + ~SignalingClient(); + + static + int protocolCallbackStatic(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len); + + void connect(); + void disconnect(); + + void sendMessage(const SignalingMessage &msg); + + void onMessage(std::function callback) + { + cbMessage = callback; + } + + void onConnected(std::function callback) + { + cbConnected = callback; + } + + void onDisconnected(std::function callback) + { + cbDisconnected = callback; + } + + void onError(std::function callback) + { + cbError = callback; + } +}; + +} /* namespace webrtc */ +} /* namespace node */ +} /* namespace villas */ + diff --git a/include/villas/nodes/webrtc/signaling_message.hpp b/include/villas/nodes/webrtc/signaling_message.hpp new file mode 100644 index 000000000..9ecc0a04e --- /dev/null +++ b/include/villas/nodes/webrtc/signaling_message.hpp @@ -0,0 +1,68 @@ +/** WebRTC signaling messages. + * + * @file + * @author Steffen Vogel + * @copyright 2014-2022, Institute for Automation of Complex Power Systems, EONERC + * @license Apache 2.0 + *********************************************************************************/ + +#pragma once + +#include +#include +#include + +#include +#include + +namespace villas { +namespace node { +namespace webrtc { + +struct Connection { + int id; + std::string remote; + std::string userAgent; + std::chrono::time_point created; + + Connection(json_t *j); + json_t * toJSON() const; +}; + +struct ControlMessage { + int connectionID; + std::list connections; + + ControlMessage(json_t *j); + json_t * toJSON() const; +}; + +class SignalingMessage { +public: + enum Type { + TYPE_CONTROL, + TYPE_OFFER, + TYPE_ANSWER, + TYPE_CANDIDATE + }; + + Type type; + + ControlMessage *control; + rtc::Description *description; + rtc::Candidate *candidate; + std::string mid; + + SignalingMessage(rtc::Description desc, bool answer = false); + SignalingMessage(rtc::Candidate cand); + SignalingMessage(json_t *j); + ~SignalingMessage(); + + json_t * toJSON() const; + + std::string toString() const; +}; + +} /* namespace webrtc */ +} /* namespace node */ +} /* namespace villas */ diff --git a/lib/nodes/CMakeLists.txt b/lib/nodes/CMakeLists.txt index 266c22ecb..9ebbd2311 100644 --- a/lib/nodes/CMakeLists.txt +++ b/lib/nodes/CMakeLists.txt @@ -188,7 +188,7 @@ endif() # Enable WebRTC support if(WITH_NODE_WEBRTC) - list(APPEND NODE_SRC webrtc.cpp) + list(APPEND NODE_SRC webrtc.cpp webrtc/signaling_client.cpp webrtc/signaling_message.cpp webrtc/peer_connection.cpp) list(APPEND LIBRARIES LibDataChannel::LibDataChannel) endif() diff --git a/lib/nodes/webrtc.cpp b/lib/nodes/webrtc.cpp index 3f3ce1d52..459c4826c 100644 --- a/lib/nodes/webrtc.cpp +++ b/lib/nodes/webrtc.cpp @@ -1,10 +1,12 @@ -/** The WebRTC node-type. +/** Node-type: webrtc * * @author Steffen Vogel * @copyright 2014-2022, Institute for Automation of Complex Power Systems, EONERC * @license Apache 2.0 *********************************************************************************/ +#include + #include #include #include @@ -17,39 +19,87 @@ using namespace villas; using namespace villas::node; using namespace villas::utils; +static villas::node::Web *web; + WebRTCNode::WebRTCNode(const std::string &name) : - Node(name) -{ } + Node(name), + server("wss://villas.k8s.eonerc.rwth-aachen.de/ws/signaling"), + wait(true), + ordered(false), + max_retransmits(0) +{ + config.iceServers = { + rtc::IceServer("stun://stun.l.google.com:19302"), + rtc::IceServer("stun://villas:villas@stun.0l.de"), + rtc::IceServer("turn://villas:villas@turn.0l.de?transport=udp"), + rtc::IceServer("turn://villas:villas@turn.0l.de?transport=tcp"), + }; +} WebRTCNode::~WebRTCNode() { } -int WebRTCNode::prepare() -{ - // state1 = setting1; - - // if (setting2 == "double") - // state1 *= 2; - - return 0; -} - int WebRTCNode::parse(json_t *json, const uuid_t sn_uuid) { - /* TODO: Add implementation here. The following is just an example */ + int ret = Node::parse(json, sn_uuid); + if (ret) + return ret; - // const char *setting2_str = nullptr; + const char *sess; + const char *svr = nullptr; + int w = - 1, ord = -1; + json_t *json_ice = nullptr; - // json_error_t err; - // int ret = json_unpack_ex(json, &err, 0, "{ s?: i, s?: s }", - // "setting1", &setting1, - // "setting2", &setting2_str - // ); - // if (ret) - // throw ConfigError(json, err, "node-config-node-example"); + json_error_t err; + ret = json_unpack_ex(json, &err, 0, "{ s: s, s?: s, s?: b, s?: i, s?: b, s?: o }", + "session", &sess, + "server", &svr, + "wait", &w, + "max_retransmits", &max_retransmits, + "ordered", &ord, + "ice", &json_ice + ); + if (ret) + throw ConfigError(json, err, "node-config-node-webrtc"); - // if (setting2_str) - // setting2 = setting2_str; + session = sess; + + if (svr) + server = svr; + + if (w >= 0) + wait = w != 0; + + if (ord >= 0) + ordered = ordered != 0; + + if (json_ice) { + json_t *json_servers = nullptr; + + ret = json_unpack_ex(json_ice, &err, 0, "{ s?: o }", + "servers", &json_servers + ); + if (ret) + throw ConfigError(json, err, "node-config-node-webrtc-ice"); + + if (json_servers) { + config.iceServers.clear(); + + if (!json_is_array(json_servers)) + throw ConfigError(json_servers, "node-config-node-webrtc-ice-servers", "ICE Servers must be a an array of server configurations."); + + size_t i; + json_t *json_server; + json_array_foreach(json_servers, i, json_server) { + if (!json_is_string(json_server)) + throw ConfigError(json_server, "node-config-node-webrtc-ice-server", "ICE servers must be provided as STUN/TURN url."); + + std::string uri = json_string_value(json_server); + + config.iceServers.emplace_back(uri); + } + } + } return 0; } @@ -59,11 +109,26 @@ int WebRTCNode::check() return 0; } +int WebRTCNode::prepare() +{ + conn = std::make_shared(server, session, config, web); + + return Node::prepare(); +} + int WebRTCNode::start() { - // TODO add implementation here + conn->connect(); - // start_time = time_now(); + if (wait) { + logger->info("Wait until datachannel is connected..."); + + conn->waitForDataChannel(); + } + + int ret = Node::start(); + if (!ret) + state = State::STARTED; return 0; } @@ -155,7 +220,14 @@ int WebRTCNode::_write(struct Sample *smps[], unsigned cnt) return written; } +int WebRTCNodeFactory::start(SuperNode *sn) +{ + web = sn->getWeb(); + if (!web->isEnabled()) + return -1; -static char n[] = "webrtc"; -static char d[] = "Web Real-time Communication"; -static NodePlugin p; + return 0; +} + + +static WebRTCNodeFactory p; diff --git a/lib/nodes/webrtc/peer_connection.cpp b/lib/nodes/webrtc/peer_connection.cpp new file mode 100644 index 000000000..d04e1f1a2 --- /dev/null +++ b/lib/nodes/webrtc/peer_connection.cpp @@ -0,0 +1,178 @@ +/** WebRTC peer connection + * + * @file + * @author Steffen Vogel + * @copyright 2014-2022, Institute for Automation of Complex Power Systems, EONERC + * @license Apache 2.0 + *********************************************************************************/ + +#include +#include + +#include + +using namespace std::placeholders; + +using namespace villas; +using namespace villas::node; +using namespace villas::node::webrtc; + +PeerConnection::PeerConnection(const std::string &server, const std::string &session, const rtc::Configuration &cfg, Web *w) : + web(w), + config(cfg), + logger(logging.get("webrtc:pc")) + +{ + client = std::make_shared(server, session, web); + + client->onConnected(std::bind(&PeerConnection::onSignalingConnected, this)); + client->onDisconnected(std::bind(&PeerConnection::onSignalingDisconnected, this)); + client->onError(std::bind(&PeerConnection::onSignalingError, this, _1)); + client->onMessage(std::bind(&PeerConnection::onSignalingMessage, this, _1)); +} + +PeerConnection::~PeerConnection() +{ + // TODO +} + +void PeerConnection::connect() +{ + client->connect(); +} + +void PeerConnection::waitForDataChannel() +{ + while (!chan) { + // TODO: signal available via condition variable + std::this_thread::sleep_for(std::chrono::seconds(1)); + } +} + +void PeerConnection::createPeerConnection() +{ + // Setup peer connection + conn = std::make_shared(config); + + conn->onLocalDescription(std::bind(&PeerConnection::onLocalDescription, this, _1)); + conn->onLocalCandidate(std::bind(&PeerConnection::onLocalCandidate, this, _1)); + + conn->onDataChannel(std::bind(&PeerConnection::onDataChannel, this, _1)); + conn->onGatheringStateChange(std::bind(&PeerConnection::onGatheringStateChange, this, _1)); + conn->onSignalingStateChange(std::bind(&PeerConnection::onSignalingStateChange, this, _1)); + conn->onStateChange(std::bind(&PeerConnection::onConnectionStateChange, this, _1)); +} + +void PeerConnection::rollbackPeerConnection() +{ + +} + +void PeerConnection::createDatachannel() +{ + +} + +void PeerConnection::onConnectionCreated() +{ + logger->debug("Connection created"); +} + +void PeerConnection::onLocalDescription(rtc::Description sdp) +{ + logger->debug("New local description: {}", std::string(sdp)); + + SignalingMessage msg(sdp); + + client->sendMessage(msg); +} + +void PeerConnection::onLocalCandidate(rtc::Candidate cand) +{ + logger->debug("New local candidate: {}", std::string(cand)); + + SignalingMessage msg(cand); + + client->sendMessage(msg); +} + +void PeerConnection::onNegotiationNeeded() +{ + logger->debug("Negotation needed"); +} + +void PeerConnection::onConnectionStateChange(rtc::PeerConnection::State state) +{ + logger->debug("Connection State changed: {}", state); +} + +void PeerConnection::onSignalingStateChange(rtc::PeerConnection::SignalingState state) +{ + logger->debug("Signaling state changed: {}", state); +} + +void PeerConnection::onGatheringStateChange(rtc::PeerConnection::GatheringState state) +{ + logger->debug("Gathering state changed: {}", state); +} + +void PeerConnection::onSignalingConnected() +{ + logger->debug("Signaling connection established"); +} + +void PeerConnection::onSignalingDisconnected() +{ + logger->debug("Signaling connection closed"); +} + +void PeerConnection::onSignalingError(const std::string &err) +{ + logger->debug("Signaling connection error: {}", err); +} + +void PeerConnection::onSignalingMessage(const SignalingMessage &msg) +{ + logger->debug("Signaling message received: {}", msg.toString()); + + if (msg.type == SignalingMessage::TYPE_ANSWER || msg.type == SignalingMessage::TYPE_OFFER) + conn->setRemoteDescription(*msg.description); + + if (msg.type == SignalingMessage::TYPE_CANDIDATE) + conn->addRemoteCandidate(*msg.candidate); +} + +void PeerConnection::onDataChannel(std::shared_ptr dc) +{ + logger->debug("New data channel: id={}, stream={}, protocol={}, max_msg_size={}, label={}", + dc->id(), + dc->stream(), + dc->protocol(), + dc->maxMessageSize(), + dc->label()); + + dc->onMessage(std::bind(&PeerConnection::onDataChannelMessage, this, _1)); + dc->onOpen(std::bind(&PeerConnection::onDataChannelOpen, this)); + dc->onClosed(std::bind(&PeerConnection::onDataChannelClosed, this)); + dc->onError(std::bind(&PeerConnection::onDataChannelError, this, _1)); +} + +void PeerConnection::onDataChannelOpen() +{ + logger->debug("Datachannel opened"); +} + +void PeerConnection::onDataChannelClosed() +{ + logger->debug("Datachannel closed"); +} + +void PeerConnection::onDataChannelError(std::string err) +{ + logger->error("Datachannel error: {}", err); +} + +void PeerConnection::onDataChannelMessage(rtc::message_variant msg) +{ + logger->debug("Datachannel message received"); +} diff --git a/lib/nodes/webrtc/signaling_client.cpp b/lib/nodes/webrtc/signaling_client.cpp new file mode 100644 index 000000000..e99569860 --- /dev/null +++ b/lib/nodes/webrtc/signaling_client.cpp @@ -0,0 +1,190 @@ +/** WebRTC signaling client + * + * @author Steffen Vogel + * @copyright 2014-2022, Institute for Automation of Complex Power Systems, EONERC + * @license Apache 2.0 + *********************************************************************************/ + +#include +#include +#include +#include + +using namespace villas; +using namespace villas::node; +using namespace villas::node::webrtc; + +SignalingClient::SignalingClient(const std::string &srv, const std::string &sess, Web *w) : + web(w), + logger(logging.get("webrtc:signal")) +{ + const char *prot, *a, *p; + + memset(&info, 0, sizeof(info)); + + asprintf(&uri, "%s/%s", srv.c_str(), sess.c_str()); + + int ret = lws_parse_uri(uri, &prot, &a, &info.port, &p); + if (ret) + throw RuntimeError("Failed to parse WebSocket URI: '{}'", uri); + + + asprintf(&path, "/%s", p); + + info.ssl_connection = !strcmp(prot, "https"); + info.address = a; + info.path = path; + info.host = a; + info.origin = a; + info.protocol = "webrtc-signaling"; + info.local_protocol_name = "webrtc-signaling"; + info.pwsi = &wsi; + info.retry_and_idle_policy = &retry; + info.ietf_version_or_minus_one = -1; + info.userdata = this; + + sul_helper.self = this; +} + +SignalingClient::~SignalingClient() +{ + free(path); + free(uri); +} + +void SignalingClient::connect() +{ + info.context = web->getContext(); + info.vhost = web->getVHost(); + + lws_sul_schedule(info.context, 0, &sul_helper.sul, connectStatic, 1); +} + +void SignalingClient::disconnect() +{ + // TODO +} + +void SignalingClient::connectStatic(struct lws_sorted_usec_list *sul) +{ + auto *sh = lws_container_of(sul, struct sul_offsetof_helper, sul); + auto *c = sh->self; + + if (!lws_client_connect_via_info(&c->info)) { + /* Failed... schedule a retry... we can't use the _retry_wsi() + * convenience wrapper api here because no valid wsi at this + * point. + */ + if (lws_retry_sul_schedule(c->info.context, 0, sul, nullptr, connectStatic, &c->retry_count)) + c->logger->error("Signaling connection attempts exhausted"); + } +} + +int SignalingClient::protocolCallbackStatic(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) +{ + auto *c = reinterpret_cast(user); + + return c->protocolCallback(wsi, reason, in, len); +} + +int SignalingClient::protocolCallback(struct lws *wsi, enum lws_callback_reasons reason, void *in, size_t len) +{ + int ret; + + switch (reason) { + case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: + cbError(in ? (char *) in : "unknown error"); + goto do_retry; + + case LWS_CALLBACK_CLIENT_RECEIVE: + ret = receive(in, len); + if (ret) + goto do_retry; + + break; + + case LWS_CALLBACK_CLIENT_ESTABLISHED: + cbConnected(); + break; + + case LWS_CALLBACK_CLIENT_CLOSED: + cbDisconnected(); + goto do_retry; + + case LWS_CALLBACK_CLIENT_WRITEABLE: { + ret = writable(); + if (ret) + goto do_retry; + + break; + } + + default: + break; + } + + return lws_callback_http_dummy(wsi, reason, this, in, len); + +do_retry: + logger->info("Attempting to reconnect..."); + + /* Retry the connection to keep it nailed up + * + * For this example, we try to conceal any problem for one set of + * backoff retries and then exit the app. + * + * If you set retry.conceal_count to be larger than the number of + * elements in the backoff table, it will never give up and keep + * retrying at the last backoff delay plus the random jitter amount. + */ + if (lws_retry_sul_schedule_retry_wsi(wsi, &sul_helper.sul, connectStatic, &retry_count)) + logger->error("Signaling connection attempts exhaused"); + + return 0; +} + +int SignalingClient::writable() +{ + // Skip if we have nothing to send + if (outgoingMessages.empty()) + return 0; + + auto msg = outgoingMessages.pop(); + auto *jsonMsg = msg.toJSON(); + + char buf[LWS_PRE + 1024]; + auto len = json_dumpb(jsonMsg, buf+LWS_PRE, 1024, JSON_INDENT(2)); + + auto ret = lws_write(wsi, (unsigned char *) buf, len, LWS_WRITE_TEXT); + if (ret < 0) + return ret; + + // Reschedule callback if there are more messages to be send + if (!outgoingMessages.empty()) + lws_callback_on_writable(wsi); + + return 0; +} + +int SignalingClient::receive(void *in, size_t len) +{ + json_error_t err; + json_t *json = json_loadb((char *) in, len, 0, &err); + if (!json) { + logger->error("Failed to decode json: {} at ({}:{})", err.text, err.line, err.column); + return -1; + } + + auto msg = SignalingMessage(json); + + cbMessage(msg); + + return 0; +} + +void SignalingClient::sendMessage(const SignalingMessage &msg) +{ + outgoingMessages.push(msg); + + web->callbackOnWritable(wsi); +} diff --git a/lib/nodes/webrtc/signaling_message.cpp b/lib/nodes/webrtc/signaling_message.cpp new file mode 100644 index 000000000..f7b69754f --- /dev/null +++ b/lib/nodes/webrtc/signaling_message.cpp @@ -0,0 +1,185 @@ +/** WebRTC signaling messages. + * + * @author Steffen Vogel + * @copyright 2014-2022, Institute for Automation of Complex Power Systems, EONERC + * @license Apache 2.0 + *********************************************************************************/ + +#include +#include + +using namespace villas; +using namespace villas::node; +using namespace villas::node::webrtc; + + +json_t * Connection::toJSON() const +{ + return json_pack("{ s: i, s: s, s: s, s: s }", + "id", id, + "remote", remote.c_str(), + "user_agent", userAgent.c_str(), + "created", "" // TODO + ); +} + +Connection::Connection(json_t *j) +{ + const char *rem, *ua, *ts; + + int ret = json_unpack(j, "{ s: i, s: s, s: s, s: s }", + "id", &id, + "remote", &rem, + "user_agent", &ua, + "created", &ts + ); + if (ret) + throw RuntimeError("Failed to decode signaling message"); + + remote = rem; + userAgent = ua; + + // TODO: created +} + +json_t * ControlMessage::toJSON() const +{ + json_t *json_connections = json_array(); + + for (auto &c : connections) { + json_t *json_connection = c.toJSON(); + + json_array_append(json_connections, json_connection); + } + + return json_pack("{ s: i, s: o }", + "connection_id", connectionID, + "connections", json_connections + ); +} + +ControlMessage::ControlMessage(json_t *j) +{ + int ret; + + json_t *json_connections; + + ret = json_unpack(j, "{ s: i, s: o }", + "connection_id", &connectionID, + "connections", &json_connections + ); + if (ret) + throw RuntimeError("Failed to decode signaling message"); + + if (!json_is_array(json_connections)) + throw RuntimeError("Failed to decode signaling message"); + + json_t *json_connection; + size_t i; + json_array_foreach(json_connections, i, json_connection) + connections.emplace_back(json_connection); +} + +json_t * SignalingMessage::toJSON() const +{ + switch (type) { + case TYPE_CONTROL: + return json_pack("{ s: s, s: o }", + "type", "control", + "control", control->toJSON() + ); + + case TYPE_OFFER: + case TYPE_ANSWER: + return json_pack("{ s: s, s: o }", + "type", type == TYPE_OFFER ? "offer" : "answer", + "description", std::string(*description).c_str() + ); + + case TYPE_CANDIDATE: + return json_pack("{ s: s, s: o }", + "type", "candidate", + "candidate", std::string(*candidate).c_str() + ); + } + + return nullptr; +} + +std::string SignalingMessage::toString() const +{ + switch (type) { + case TYPE_ANSWER: + return fmt::format("type=answer, description={}", std::string(*description)); + + case TYPE_OFFER: + return fmt::format("type=offer, description={}", std::string(*description)); + + case TYPE_CANDIDATE: + return fmt::format("type=candidate, candidate={}", std::string(*candidate)); + + case TYPE_CONTROL: + return fmt::format("type=control, control={}", json_dumps(control->toJSON(), 0)); + } + + return ""; +} + +SignalingMessage::SignalingMessage(rtc::Description desc, bool answer) : + type(answer ? TYPE_ANSWER : TYPE_OFFER), + description(new rtc::Description(desc)) +{ } + +SignalingMessage::SignalingMessage(rtc::Candidate cand) : + type(TYPE_CANDIDATE), + candidate(new rtc::Candidate(cand)) +{ } + +SignalingMessage::~SignalingMessage() +{ + if (description) + delete description; + + if (candidate) + delete candidate; + + if (control) + delete control; +} + +SignalingMessage::SignalingMessage(json_t *j) : + control(nullptr), + description(nullptr), + candidate(nullptr) +{ + json_t *json_control = nullptr; + const char *desc = nullptr; + const char *cand = nullptr; + const char *typ = nullptr; + + int ret = json_unpack(j, "{ s: s, s?: s, s?: s, s?: o }", + "type", &typ, + "description", &desc, + "candidate", &cand, + "control", &json_control + ); + if (ret) + throw RuntimeError("Failed to decode signaling message"); + + if (!strcmp(typ, "offer")) { + type = TYPE_OFFER; + description = new rtc::Description(desc); + } + else if (!strcmp(typ, "answer")) { + type = TYPE_ANSWER; + description = new rtc::Description(desc); + } + else if (!strcmp(typ, "candidate")) { + type = TYPE_CANDIDATE; + candidate = new rtc::Candidate(cand); + } + else if (!strcmp(typ, "control")) { + type = TYPE_CONTROL; + control = new ControlMessage(json_control); + } +} diff --git a/lib/web.cpp b/lib/web.cpp index 85ef147c1..c4bc4db67 100644 --- a/lib/web.cpp +++ b/lib/web.cpp @@ -7,6 +7,7 @@ #include +#include #include #include #include @@ -15,6 +16,10 @@ #include #include +#ifdef WITH_NODE_WEBRTC + #include +#endif + using namespace villas; using namespace villas::node; @@ -45,8 +50,16 @@ lws_protocols protocols[] = { .rx_buffer_size = 0 }, #endif /* WITH_NODE_WEBSOCKET */ +#ifdef WITH_NODE_WEBRTC { - .name = nullptr /* terminator */ + .name = "webrtc-signaling", + .callback = webrtc::SignalingClient::protocolCallbackStatic, + .per_session_data_size = sizeof(webrtc::SignalingClient), + .rx_buffer_size = 0 + }, +#endif + { + .name = nullptr, } };