1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

webrtc: signaling almost working

Signed-off-by: Steffen Vogel <post@steffenvogel.de>
This commit is contained in:
Steffen Vogel 2022-07-19 19:05:13 +00:00 committed by Philipp Jungkamp
parent 40f01e904e
commit 2d4783599f
12 changed files with 990 additions and 46 deletions

View file

@ -8,24 +8,18 @@ nodes = {
session = "my-session-name"
# Address to the websocket signaling server
server = "wss://villas.k8s.eonerc.rwth-aachen.de/ws/signaling"
# server = "wss://villas.k8s.eonerc.rwth-aachen.de/ws/signaling"
server = "ws://docker.for.mac.host.internal:1234"
# Setting for Interactive Connectivity Establishment
ice = {
# List of STUN/TURN servers
servers = (
{
urls = [
"stun:stun.0l.de:3478",
"turn:turn.0l.de:3478?transport=udp",
"turn:turn.0l.de:3478?transport=tcp"
],
# Credentials for TURN servers
username = "villas"
password = "villas"
}
)
servers = [
"stun://stun.l.google.com:19302",
"stun:stun.0l.de:3478",
"turn://villas:villas@turn.0l.de:3478?transport=udp",
"turn://villas:villas@turn.0l.de:3478?transport=tcp"
]
}
}
}

View file

@ -37,6 +37,7 @@
/* Available Features */
#cmakedefine WITH_WEB
#cmakedefine WITH_NODE_WEBSOCKET
#cmakedefine WITH_NODE_WEBRTC
#cmakedefine WITH_NODE_OPAL
#cmakedefine WITH_API
#cmakedefine WITH_HOOKS

View file

@ -1,4 +1,4 @@
/** The WebRTC node-type.
/** Node-type webrtc.
*
* @file
* @author Steffen Vogel <svogel2@eonerc.rwth-aachen.de>
@ -8,6 +8,9 @@
#pragma once
#include <rtc/rtc.hpp>
#include <villas/nodes/webrtc/peer_connection.hpp>
#include <villas/node/config.hpp>
#include <villas/node.hpp>
#include <villas/timing.hpp>
@ -21,6 +24,15 @@ struct Sample;
class WebRTCNode : public Node {
protected:
std::string server;
std::string session;
bool wait;
bool ordered;
int max_retransmits;
std::shared_ptr<webrtc::PeerConnection> conn;
rtc::Configuration config;
virtual
int _read(struct Sample *smps[], unsigned cnt);
@ -80,5 +92,46 @@ public:
};
class WebRTCNodeFactory : public NodeFactory {
public:
using NodeFactory::NodeFactory;
virtual
Node * make()
{
auto *n = new WebRTCNode();
init(n);
return n;
}
virtual
int getFlags() const
{
return (int) NodeFactory::Flags::SUPPORTS_READ |
(int) NodeFactory::Flags::SUPPORTS_WRITE |
(int) NodeFactory::Flags::SUPPORTS_POLL |
(int) NodeFactory::Flags::REQUIRES_WEB;
}
virtual
std::string getName() const
{
return "webrtc";
}
virtual
std::string getDescription() const
{
return "Web Real-time Communication";
}
virtual
int start(SuperNode *sn);
};
} /* namespace node */
} /* namespace villas */

View file

@ -0,0 +1,77 @@
/** WebRTC peer connection
*
* @file
* @author Steffen Vogel <svogel2@eonerc.rwth-aachen.de>
* @copyright 2014-2022, Institute for Automation of Complex Power Systems, EONERC
* @license Apache 2.0
*********************************************************************************/
#pragma once
#include <rtc/rtc.hpp>
#include <villas/log.hpp>
#include <villas/web.hpp>
#include <villas/nodes/webrtc/signaling_client.hpp>
namespace villas {
namespace node {
namespace webrtc {
class PeerConnection {
public:
PeerConnection(const std::string &server, const std::string &session, const rtc::Configuration &config, Web *w);
~PeerConnection();
void waitForDataChannel();
void connect();
protected:
Web *web;
rtc::Configuration config;
std::shared_ptr<rtc::PeerConnection> conn;
std::shared_ptr<rtc::DataChannel> chan;
std::shared_ptr<SignalingClient> client;
Logger logger;
bool makingOffer;
bool ignoreOffer;
bool first;
bool polite;
bool rollback;
void createPeerConnection();
void rollbackPeerConnection();
void createDatachannel();
void onConnectionCreated();
void onLocalDescription(rtc::Description sdp);
void onLocalCandidate(rtc::Candidate cand);
void onNegotiationNeeded();
void onConnectionStateChange(rtc::PeerConnection::State state);
void onSignalingStateChange(rtc::PeerConnection::SignalingState state);
void onGatheringStateChange(rtc::PeerConnection::GatheringState state);
void onSignalingConnected();
void onSignalingDisconnected();
void onSignalingError(const std::string &err);
void onSignalingMessage(const SignalingMessage &msg);
void onDataChannel(std::shared_ptr<rtc::DataChannel> dc);
void onDataChannelOpen();
void onDataChannelClosed();
void onDataChannelError(std::string err);
void onDataChannelMessage(rtc::message_variant msg);
};
} /* namespace webrtc */
} /* namespace node */
} /* namespace villas */

View file

@ -0,0 +1,113 @@
/** WebRTC signaling client
*
* @file
* @author Steffen Vogel <svogel2@eonerc.rwth-aachen.de>
* @copyright 2014-2022, Institute for Automation of Complex Power Systems, EONERC
* @license Apache 2.0
*********************************************************************************/
#pragma once
#include <functional>
#include <chrono>
#include <libwebsockets.h>
#include <villas/queue.hpp>
#include <villas/web.hpp>
#include <villas/log.hpp>
#include <villas/nodes/webrtc/signaling_message.hpp>
namespace villas {
namespace node {
/* Forward declarations */
class Web;
namespace webrtc {
class SignalingClient {
protected:
struct sul_offsetof_helper {
lws_sorted_usec_list_t sul; /**> Schedule connection retry */
SignalingClient *self;
} sul_helper;
uint16_t retry_count; /**> Count of consequetive retries */
struct lws *wsi;
struct lws_client_connect_info info;
/* The retry and backoff policy we want to use for our client connections */
static constexpr uint32_t backoff_ms[] = { 1000, 2000, 3000, 4000, 5000 };
static constexpr lws_retry_bo_t retry = {
.retry_ms_table = backoff_ms,
.retry_ms_table_count = LWS_ARRAY_SIZE(backoff_ms),
.conceal_count = LWS_ARRAY_SIZE(backoff_ms),
.secs_since_valid_ping = 3, /* force PINGs after secs idle */
.secs_since_valid_hangup = 10, /* hangup after secs idle */
.jitter_percent = 20,
};
std::function<void(const SignalingMessage &)> cbMessage;
std::function<void()> cbConnected;
std::function<void()> cbDisconnected;
std::function<void(const std::string &)> cbError;
Queue<SignalingMessage> outgoingMessages;
Web *web;
char *uri;
char *path;
Logger logger;
int protocolCallback(struct lws *wsi, enum lws_callback_reasons reason, void *in, size_t len);
static
void connectStatic(struct lws_sorted_usec_list *sul);
int receive(void *in, size_t len);
int writable();
public:
SignalingClient(const std::string &server, const std::string &session, Web *w);
~SignalingClient();
static
int protocolCallbackStatic(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len);
void connect();
void disconnect();
void sendMessage(const SignalingMessage &msg);
void onMessage(std::function<void(const SignalingMessage&)> callback)
{
cbMessage = callback;
}
void onConnected(std::function<void()> callback)
{
cbConnected = callback;
}
void onDisconnected(std::function<void()> callback)
{
cbDisconnected = callback;
}
void onError(std::function<void(const std::string &)> callback)
{
cbError = callback;
}
};
} /* namespace webrtc */
} /* namespace node */
} /* namespace villas */

View file

@ -0,0 +1,68 @@
/** WebRTC signaling messages.
*
* @file
* @author Steffen Vogel <svogel2@eonerc.rwth-aachen.de>
* @copyright 2014-2022, Institute for Automation of Complex Power Systems, EONERC
* @license Apache 2.0
*********************************************************************************/
#pragma once
#include <string>
#include <chrono>
#include <list>
#include <rtc/rtc.hpp>
#include <jansson.h>
namespace villas {
namespace node {
namespace webrtc {
struct Connection {
int id;
std::string remote;
std::string userAgent;
std::chrono::time_point<std::chrono::system_clock> created;
Connection(json_t *j);
json_t * toJSON() const;
};
struct ControlMessage {
int connectionID;
std::list<Connection> connections;
ControlMessage(json_t *j);
json_t * toJSON() const;
};
class SignalingMessage {
public:
enum Type {
TYPE_CONTROL,
TYPE_OFFER,
TYPE_ANSWER,
TYPE_CANDIDATE
};
Type type;
ControlMessage *control;
rtc::Description *description;
rtc::Candidate *candidate;
std::string mid;
SignalingMessage(rtc::Description desc, bool answer = false);
SignalingMessage(rtc::Candidate cand);
SignalingMessage(json_t *j);
~SignalingMessage();
json_t * toJSON() const;
std::string toString() const;
};
} /* namespace webrtc */
} /* namespace node */
} /* namespace villas */

View file

@ -188,7 +188,7 @@ endif()
# Enable WebRTC support
if(WITH_NODE_WEBRTC)
list(APPEND NODE_SRC webrtc.cpp)
list(APPEND NODE_SRC webrtc.cpp webrtc/signaling_client.cpp webrtc/signaling_message.cpp webrtc/peer_connection.cpp)
list(APPEND LIBRARIES LibDataChannel::LibDataChannel)
endif()

View file

@ -1,10 +1,12 @@
/** The WebRTC node-type.
/** Node-type: webrtc
*
* @author Steffen Vogel <svogel2@eonerc.rwth-aachen.de>
* @copyright 2014-2022, Institute for Automation of Complex Power Systems, EONERC
* @license Apache 2.0
*********************************************************************************/
#include <vector>
#include <villas/node_compat.hpp>
#include <villas/nodes/webrtc.hpp>
#include <villas/utils.hpp>
@ -17,39 +19,87 @@ using namespace villas;
using namespace villas::node;
using namespace villas::utils;
static villas::node::Web *web;
WebRTCNode::WebRTCNode(const std::string &name) :
Node(name)
{ }
Node(name),
server("wss://villas.k8s.eonerc.rwth-aachen.de/ws/signaling"),
wait(true),
ordered(false),
max_retransmits(0)
{
config.iceServers = {
rtc::IceServer("stun://stun.l.google.com:19302"),
rtc::IceServer("stun://villas:villas@stun.0l.de"),
rtc::IceServer("turn://villas:villas@turn.0l.de?transport=udp"),
rtc::IceServer("turn://villas:villas@turn.0l.de?transport=tcp"),
};
}
WebRTCNode::~WebRTCNode()
{ }
int WebRTCNode::prepare()
{
// state1 = setting1;
// if (setting2 == "double")
// state1 *= 2;
return 0;
}
int WebRTCNode::parse(json_t *json, const uuid_t sn_uuid)
{
/* TODO: Add implementation here. The following is just an example */
int ret = Node::parse(json, sn_uuid);
if (ret)
return ret;
// const char *setting2_str = nullptr;
const char *sess;
const char *svr = nullptr;
int w = - 1, ord = -1;
json_t *json_ice = nullptr;
// json_error_t err;
// int ret = json_unpack_ex(json, &err, 0, "{ s?: i, s?: s }",
// "setting1", &setting1,
// "setting2", &setting2_str
// );
// if (ret)
// throw ConfigError(json, err, "node-config-node-example");
json_error_t err;
ret = json_unpack_ex(json, &err, 0, "{ s: s, s?: s, s?: b, s?: i, s?: b, s?: o }",
"session", &sess,
"server", &svr,
"wait", &w,
"max_retransmits", &max_retransmits,
"ordered", &ord,
"ice", &json_ice
);
if (ret)
throw ConfigError(json, err, "node-config-node-webrtc");
// if (setting2_str)
// setting2 = setting2_str;
session = sess;
if (svr)
server = svr;
if (w >= 0)
wait = w != 0;
if (ord >= 0)
ordered = ordered != 0;
if (json_ice) {
json_t *json_servers = nullptr;
ret = json_unpack_ex(json_ice, &err, 0, "{ s?: o }",
"servers", &json_servers
);
if (ret)
throw ConfigError(json, err, "node-config-node-webrtc-ice");
if (json_servers) {
config.iceServers.clear();
if (!json_is_array(json_servers))
throw ConfigError(json_servers, "node-config-node-webrtc-ice-servers", "ICE Servers must be a an array of server configurations.");
size_t i;
json_t *json_server;
json_array_foreach(json_servers, i, json_server) {
if (!json_is_string(json_server))
throw ConfigError(json_server, "node-config-node-webrtc-ice-server", "ICE servers must be provided as STUN/TURN url.");
std::string uri = json_string_value(json_server);
config.iceServers.emplace_back(uri);
}
}
}
return 0;
}
@ -59,11 +109,26 @@ int WebRTCNode::check()
return 0;
}
int WebRTCNode::prepare()
{
conn = std::make_shared<webrtc::PeerConnection>(server, session, config, web);
return Node::prepare();
}
int WebRTCNode::start()
{
// TODO add implementation here
conn->connect();
// start_time = time_now();
if (wait) {
logger->info("Wait until datachannel is connected...");
conn->waitForDataChannel();
}
int ret = Node::start();
if (!ret)
state = State::STARTED;
return 0;
}
@ -155,7 +220,14 @@ int WebRTCNode::_write(struct Sample *smps[], unsigned cnt)
return written;
}
int WebRTCNodeFactory::start(SuperNode *sn)
{
web = sn->getWeb();
if (!web->isEnabled())
return -1;
static char n[] = "webrtc";
static char d[] = "Web Real-time Communication";
static NodePlugin<WebRTCNode, n , d, (int) NodeFactory::Flags::SUPPORTS_READ | (int) NodeFactory::Flags::SUPPORTS_WRITE | (int) NodeFactory::Flags::SUPPORTS_POLL | (int) NodeFactory::Flags::HIDDEN> p;
return 0;
}
static WebRTCNodeFactory p;

View file

@ -0,0 +1,178 @@
/** WebRTC peer connection
*
* @file
* @author Steffen Vogel <svogel2@eonerc.rwth-aachen.de>
* @copyright 2014-2022, Institute for Automation of Complex Power Systems, EONERC
* @license Apache 2.0
*********************************************************************************/
#include <chrono>
#include <thread>
#include <villas/nodes/webrtc/peer_connection.hpp>
using namespace std::placeholders;
using namespace villas;
using namespace villas::node;
using namespace villas::node::webrtc;
PeerConnection::PeerConnection(const std::string &server, const std::string &session, const rtc::Configuration &cfg, Web *w) :
web(w),
config(cfg),
logger(logging.get("webrtc:pc"))
{
client = std::make_shared<SignalingClient>(server, session, web);
client->onConnected(std::bind(&PeerConnection::onSignalingConnected, this));
client->onDisconnected(std::bind(&PeerConnection::onSignalingDisconnected, this));
client->onError(std::bind(&PeerConnection::onSignalingError, this, _1));
client->onMessage(std::bind(&PeerConnection::onSignalingMessage, this, _1));
}
PeerConnection::~PeerConnection()
{
// TODO
}
void PeerConnection::connect()
{
client->connect();
}
void PeerConnection::waitForDataChannel()
{
while (!chan) {
// TODO: signal available via condition variable
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
void PeerConnection::createPeerConnection()
{
// Setup peer connection
conn = std::make_shared<rtc::PeerConnection>(config);
conn->onLocalDescription(std::bind(&PeerConnection::onLocalDescription, this, _1));
conn->onLocalCandidate(std::bind(&PeerConnection::onLocalCandidate, this, _1));
conn->onDataChannel(std::bind(&PeerConnection::onDataChannel, this, _1));
conn->onGatheringStateChange(std::bind(&PeerConnection::onGatheringStateChange, this, _1));
conn->onSignalingStateChange(std::bind(&PeerConnection::onSignalingStateChange, this, _1));
conn->onStateChange(std::bind(&PeerConnection::onConnectionStateChange, this, _1));
}
void PeerConnection::rollbackPeerConnection()
{
}
void PeerConnection::createDatachannel()
{
}
void PeerConnection::onConnectionCreated()
{
logger->debug("Connection created");
}
void PeerConnection::onLocalDescription(rtc::Description sdp)
{
logger->debug("New local description: {}", std::string(sdp));
SignalingMessage msg(sdp);
client->sendMessage(msg);
}
void PeerConnection::onLocalCandidate(rtc::Candidate cand)
{
logger->debug("New local candidate: {}", std::string(cand));
SignalingMessage msg(cand);
client->sendMessage(msg);
}
void PeerConnection::onNegotiationNeeded()
{
logger->debug("Negotation needed");
}
void PeerConnection::onConnectionStateChange(rtc::PeerConnection::State state)
{
logger->debug("Connection State changed: {}", state);
}
void PeerConnection::onSignalingStateChange(rtc::PeerConnection::SignalingState state)
{
logger->debug("Signaling state changed: {}", state);
}
void PeerConnection::onGatheringStateChange(rtc::PeerConnection::GatheringState state)
{
logger->debug("Gathering state changed: {}", state);
}
void PeerConnection::onSignalingConnected()
{
logger->debug("Signaling connection established");
}
void PeerConnection::onSignalingDisconnected()
{
logger->debug("Signaling connection closed");
}
void PeerConnection::onSignalingError(const std::string &err)
{
logger->debug("Signaling connection error: {}", err);
}
void PeerConnection::onSignalingMessage(const SignalingMessage &msg)
{
logger->debug("Signaling message received: {}", msg.toString());
if (msg.type == SignalingMessage::TYPE_ANSWER || msg.type == SignalingMessage::TYPE_OFFER)
conn->setRemoteDescription(*msg.description);
if (msg.type == SignalingMessage::TYPE_CANDIDATE)
conn->addRemoteCandidate(*msg.candidate);
}
void PeerConnection::onDataChannel(std::shared_ptr<rtc::DataChannel> dc)
{
logger->debug("New data channel: id={}, stream={}, protocol={}, max_msg_size={}, label={}",
dc->id(),
dc->stream(),
dc->protocol(),
dc->maxMessageSize(),
dc->label());
dc->onMessage(std::bind(&PeerConnection::onDataChannelMessage, this, _1));
dc->onOpen(std::bind(&PeerConnection::onDataChannelOpen, this));
dc->onClosed(std::bind(&PeerConnection::onDataChannelClosed, this));
dc->onError(std::bind(&PeerConnection::onDataChannelError, this, _1));
}
void PeerConnection::onDataChannelOpen()
{
logger->debug("Datachannel opened");
}
void PeerConnection::onDataChannelClosed()
{
logger->debug("Datachannel closed");
}
void PeerConnection::onDataChannelError(std::string err)
{
logger->error("Datachannel error: {}", err);
}
void PeerConnection::onDataChannelMessage(rtc::message_variant msg)
{
logger->debug("Datachannel message received");
}

View file

@ -0,0 +1,190 @@
/** WebRTC signaling client
*
* @author Steffen Vogel <svogel2@eonerc.rwth-aachen.de>
* @copyright 2014-2022, Institute for Automation of Complex Power Systems, EONERC
* @license Apache 2.0
*********************************************************************************/
#include <villas/web.hpp>
#include <villas/exceptions.hpp>
#include <villas/nodes/webrtc/signaling_client.hpp>
#include <villas/nodes/webrtc/signaling_message.hpp>
using namespace villas;
using namespace villas::node;
using namespace villas::node::webrtc;
SignalingClient::SignalingClient(const std::string &srv, const std::string &sess, Web *w) :
web(w),
logger(logging.get("webrtc:signal"))
{
const char *prot, *a, *p;
memset(&info, 0, sizeof(info));
asprintf(&uri, "%s/%s", srv.c_str(), sess.c_str());
int ret = lws_parse_uri(uri, &prot, &a, &info.port, &p);
if (ret)
throw RuntimeError("Failed to parse WebSocket URI: '{}'", uri);
asprintf(&path, "/%s", p);
info.ssl_connection = !strcmp(prot, "https");
info.address = a;
info.path = path;
info.host = a;
info.origin = a;
info.protocol = "webrtc-signaling";
info.local_protocol_name = "webrtc-signaling";
info.pwsi = &wsi;
info.retry_and_idle_policy = &retry;
info.ietf_version_or_minus_one = -1;
info.userdata = this;
sul_helper.self = this;
}
SignalingClient::~SignalingClient()
{
free(path);
free(uri);
}
void SignalingClient::connect()
{
info.context = web->getContext();
info.vhost = web->getVHost();
lws_sul_schedule(info.context, 0, &sul_helper.sul, connectStatic, 1);
}
void SignalingClient::disconnect()
{
// TODO
}
void SignalingClient::connectStatic(struct lws_sorted_usec_list *sul)
{
auto *sh = lws_container_of(sul, struct sul_offsetof_helper, sul);
auto *c = sh->self;
if (!lws_client_connect_via_info(&c->info)) {
/* Failed... schedule a retry... we can't use the _retry_wsi()
* convenience wrapper api here because no valid wsi at this
* point.
*/
if (lws_retry_sul_schedule(c->info.context, 0, sul, nullptr, connectStatic, &c->retry_count))
c->logger->error("Signaling connection attempts exhausted");
}
}
int SignalingClient::protocolCallbackStatic(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
{
auto *c = reinterpret_cast<SignalingClient *>(user);
return c->protocolCallback(wsi, reason, in, len);
}
int SignalingClient::protocolCallback(struct lws *wsi, enum lws_callback_reasons reason, void *in, size_t len)
{
int ret;
switch (reason) {
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
cbError(in ? (char *) in : "unknown error");
goto do_retry;
case LWS_CALLBACK_CLIENT_RECEIVE:
ret = receive(in, len);
if (ret)
goto do_retry;
break;
case LWS_CALLBACK_CLIENT_ESTABLISHED:
cbConnected();
break;
case LWS_CALLBACK_CLIENT_CLOSED:
cbDisconnected();
goto do_retry;
case LWS_CALLBACK_CLIENT_WRITEABLE: {
ret = writable();
if (ret)
goto do_retry;
break;
}
default:
break;
}
return lws_callback_http_dummy(wsi, reason, this, in, len);
do_retry:
logger->info("Attempting to reconnect...");
/* Retry the connection to keep it nailed up
*
* For this example, we try to conceal any problem for one set of
* backoff retries and then exit the app.
*
* If you set retry.conceal_count to be larger than the number of
* elements in the backoff table, it will never give up and keep
* retrying at the last backoff delay plus the random jitter amount.
*/
if (lws_retry_sul_schedule_retry_wsi(wsi, &sul_helper.sul, connectStatic, &retry_count))
logger->error("Signaling connection attempts exhaused");
return 0;
}
int SignalingClient::writable()
{
// Skip if we have nothing to send
if (outgoingMessages.empty())
return 0;
auto msg = outgoingMessages.pop();
auto *jsonMsg = msg.toJSON();
char buf[LWS_PRE + 1024];
auto len = json_dumpb(jsonMsg, buf+LWS_PRE, 1024, JSON_INDENT(2));
auto ret = lws_write(wsi, (unsigned char *) buf, len, LWS_WRITE_TEXT);
if (ret < 0)
return ret;
// Reschedule callback if there are more messages to be send
if (!outgoingMessages.empty())
lws_callback_on_writable(wsi);
return 0;
}
int SignalingClient::receive(void *in, size_t len)
{
json_error_t err;
json_t *json = json_loadb((char *) in, len, 0, &err);
if (!json) {
logger->error("Failed to decode json: {} at ({}:{})", err.text, err.line, err.column);
return -1;
}
auto msg = SignalingMessage(json);
cbMessage(msg);
return 0;
}
void SignalingClient::sendMessage(const SignalingMessage &msg)
{
outgoingMessages.push(msg);
web->callbackOnWritable(wsi);
}

View file

@ -0,0 +1,185 @@
/** WebRTC signaling messages.
*
* @author Steffen Vogel <svogel2@eonerc.rwth-aachen.de>
* @copyright 2014-2022, Institute for Automation of Complex Power Systems, EONERC
* @license Apache 2.0
*********************************************************************************/
#include <villas/exceptions.hpp>
#include <villas/nodes/webrtc/signaling_message.hpp>
using namespace villas;
using namespace villas::node;
using namespace villas::node::webrtc;
json_t * Connection::toJSON() const
{
return json_pack("{ s: i, s: s, s: s, s: s }",
"id", id,
"remote", remote.c_str(),
"user_agent", userAgent.c_str(),
"created", "" // TODO
);
}
Connection::Connection(json_t *j)
{
const char *rem, *ua, *ts;
int ret = json_unpack(j, "{ s: i, s: s, s: s, s: s }",
"id", &id,
"remote", &rem,
"user_agent", &ua,
"created", &ts
);
if (ret)
throw RuntimeError("Failed to decode signaling message");
remote = rem;
userAgent = ua;
// TODO: created
}
json_t * ControlMessage::toJSON() const
{
json_t *json_connections = json_array();
for (auto &c : connections) {
json_t *json_connection = c.toJSON();
json_array_append(json_connections, json_connection);
}
return json_pack("{ s: i, s: o }",
"connection_id", connectionID,
"connections", json_connections
);
}
ControlMessage::ControlMessage(json_t *j)
{
int ret;
json_t *json_connections;
ret = json_unpack(j, "{ s: i, s: o }",
"connection_id", &connectionID,
"connections", &json_connections
);
if (ret)
throw RuntimeError("Failed to decode signaling message");
if (!json_is_array(json_connections))
throw RuntimeError("Failed to decode signaling message");
json_t *json_connection;
size_t i;
json_array_foreach(json_connections, i, json_connection)
connections.emplace_back(json_connection);
}
json_t * SignalingMessage::toJSON() const
{
switch (type) {
case TYPE_CONTROL:
return json_pack("{ s: s, s: o }",
"type", "control",
"control", control->toJSON()
);
case TYPE_OFFER:
case TYPE_ANSWER:
return json_pack("{ s: s, s: o }",
"type", type == TYPE_OFFER ? "offer" : "answer",
"description", std::string(*description).c_str()
);
case TYPE_CANDIDATE:
return json_pack("{ s: s, s: o }",
"type", "candidate",
"candidate", std::string(*candidate).c_str()
);
}
return nullptr;
}
std::string SignalingMessage::toString() const
{
switch (type) {
case TYPE_ANSWER:
return fmt::format("type=answer, description={}", std::string(*description));
case TYPE_OFFER:
return fmt::format("type=offer, description={}", std::string(*description));
case TYPE_CANDIDATE:
return fmt::format("type=candidate, candidate={}", std::string(*candidate));
case TYPE_CONTROL:
return fmt::format("type=control, control={}", json_dumps(control->toJSON(), 0));
}
return "";
}
SignalingMessage::SignalingMessage(rtc::Description desc, bool answer) :
type(answer ? TYPE_ANSWER : TYPE_OFFER),
description(new rtc::Description(desc))
{ }
SignalingMessage::SignalingMessage(rtc::Candidate cand) :
type(TYPE_CANDIDATE),
candidate(new rtc::Candidate(cand))
{ }
SignalingMessage::~SignalingMessage()
{
if (description)
delete description;
if (candidate)
delete candidate;
if (control)
delete control;
}
SignalingMessage::SignalingMessage(json_t *j) :
control(nullptr),
description(nullptr),
candidate(nullptr)
{
json_t *json_control = nullptr;
const char *desc = nullptr;
const char *cand = nullptr;
const char *typ = nullptr;
int ret = json_unpack(j, "{ s: s, s?: s, s?: s, s?: o }",
"type", &typ,
"description", &desc,
"candidate", &cand,
"control", &json_control
);
if (ret)
throw RuntimeError("Failed to decode signaling message");
if (!strcmp(typ, "offer")) {
type = TYPE_OFFER;
description = new rtc::Description(desc);
}
else if (!strcmp(typ, "answer")) {
type = TYPE_ANSWER;
description = new rtc::Description(desc);
}
else if (!strcmp(typ, "candidate")) {
type = TYPE_CANDIDATE;
candidate = new rtc::Candidate(cand);
}
else if (!strcmp(typ, "control")) {
type = TYPE_CONTROL;
control = new ControlMessage(json_control);
}
}

View file

@ -7,6 +7,7 @@
#include <cstring>
#include <villas/config.hpp>
#include <villas/node/config.hpp>
#include <villas/utils.hpp>
#include <villas/web.hpp>
@ -15,6 +16,10 @@
#include <villas/node/exceptions.hpp>
#include <villas/nodes/websocket.hpp>
#ifdef WITH_NODE_WEBRTC
#include <villas/nodes/webrtc/signaling_client.hpp>
#endif
using namespace villas;
using namespace villas::node;
@ -45,8 +50,16 @@ lws_protocols protocols[] = {
.rx_buffer_size = 0
},
#endif /* WITH_NODE_WEBSOCKET */
#ifdef WITH_NODE_WEBRTC
{
.name = nullptr /* terminator */
.name = "webrtc-signaling",
.callback = webrtc::SignalingClient::protocolCallbackStatic,
.per_session_data_size = sizeof(webrtc::SignalingClient),
.rx_buffer_size = 0
},
#endif
{
.name = nullptr,
}
};