/** Node-type: webrtc * * @author Steffen Vogel * @copyright 2014-2022, Institute for Automation of Complex Power Systems, EONERC * @license Apache 2.0 *********************************************************************************/ #include #include #include #include #include #include #include #include using namespace villas; using namespace villas::node; using namespace villas::utils; static villas::node::Web *web; WebRTCNode::WebRTCNode(const std::string &name) : Node(name), server("wss://villas.k8s.eonerc.rwth-aachen.de/ws/signaling"), wait_seconds(0), format(nullptr), queue({}), pool({}), dci({}) { dci.reliability.type = rtc::Reliability::Type::Rexmit; } WebRTCNode::~WebRTCNode() { int ret = pool_destroy(&pool); if (ret) // TODO log ; } int WebRTCNode::parse(json_t *json, const uuid_t sn_uuid) { int ret = Node::parse(json, sn_uuid); if (ret) return ret; const char *sess; const char *svr = nullptr; int ord = -1; int &rexmit = dci.reliability.rexmit.emplace(0); json_t *ice_json = nullptr; json_t *fmt_json = nullptr; json_error_t err; ret = json_unpack_ex(json, &err, 0, "{ s:s, s?s, s?i, s?i, s?b, s?o }", "session", &sess, "server", &svr, "wait_seconds", &wait_seconds, "max_retransmits", &rexmit, "ordered", &ord, "ice", &ice_json, "format", &fmt_json ); if (ret) throw ConfigError(json, err, "node-config-node-webrtc"); session = sess; if (svr) server = svr; if (ord) dci.reliability.unordered = !ord; if (ice_json) { json_t *json_servers = nullptr; ret = json_unpack_ex(ice_json, &err, 0, "{ s?: o }", "servers", &json_servers ); if (ret) throw ConfigError(json, err, "node-config-node-webrtc-ice"); if (json_servers) { rtcConf.iceServers.clear(); if (!json_is_array(json_servers)) throw ConfigError(json_servers, "node-config-node-webrtc-ice-servers", "ICE Servers must be a an array of server configurations."); size_t i; json_t *json_server; json_array_foreach(json_servers, i, json_server) { if (!json_is_string(json_server)) throw ConfigError(json_server, "node-config-node-webrtc-ice-server", "ICE servers must be provided as STUN/TURN url."); std::string uri = json_string_value(json_server); rtcConf.iceServers.emplace_back(uri); } } } format = fmt_json ? FormatFactory::make(fmt_json) : FormatFactory::make("villas.binary"); assert(format); return 0; } int WebRTCNode::check() { return Node::check(); } int WebRTCNode::prepare() { int ret = Node::prepare(); if (ret) return ret; format->start(getInputSignals(false), ~(int) SampleFlags::HAS_OFFSET); conn = std::make_shared(server, session, rtcConf, web, dci); ret = pool_init(&pool, 1024, SAMPLE_LENGTH(getInputSignals(false)->size())); if (ret) // TODO log return ret; ret = queue_signalled_init(&queue, 1024); if (ret) // TODO log return ret; conn->onMessage([this](rtc::binary msg){ int ret; std::vector smps; smps.resize(this->in.vectorize); ret = sample_alloc_many(&this->pool, smps.data(), smps.size()); if (ret < 0) // TODO log return; ret = format->sscan((const char *)msg.data(), msg.size(), nullptr, smps.data(), ret); if (ret < 0) // TODO log return; ret = queue_signalled_push_many(&this->queue, (void **) smps.data(), ret); if (ret < 0) // TODO log return; this->logger->debug("onMessage(rtc::binary) callback finished pushing {} samples", ret); }); return 0; } int WebRTCNode::start() { int ret = Node::start(); if (!ret) state = State::STARTED; conn->connect(); if (wait_seconds > 0) { logger->info("Waiting for datachannel..."); if (!conn->waitForDataChannel(std::chrono::seconds { wait_seconds })) { throw RuntimeError { "Waiting for datachannel timed out after {} seconds", wait_seconds }; } } return 0; } int WebRTCNode::stop() { conn->disconnect(); return Node::stop(); } // int WebRTCNode::pause() // { // // TODO add implementation here // return 0; // } // int WebRTCNode::resume() // { // // TODO add implementation here // return 0; // } // int WebRTCNode::restart() // { // // TODO add implementation here // return 0; // } // int WebRTCNode::reverse() // { // // TODO add implementation here // return 0; // } std::vector WebRTCNode::getPollFDs() { return { queue_signalled_fd(&queue) }; } // std::vector WebRTCNode::getNetemFDs() // { // // TODO add implementation here // return {}; // } // struct villas::node::memory::Type * WebRTCNode::getMemoryType() // { // // TODO add implementation here // } const std::string & WebRTCNode::getDetails() { details = fmt::format(""); return details; } int WebRTCNode::_read(struct Sample *smps[], unsigned cnt) { std::vector smpt; smpt.resize(cnt); int pulled = queue_signalled_pull_many(&queue, (void **) smpt.data(), smpt.size()); sample_copy_many(smps, smpt.data(), pulled); sample_decref_many(smpt.data(), pulled); return pulled; } int WebRTCNode::_write(struct Sample *smps[], unsigned cnt) { rtc::binary buf; size_t wbytes; buf.resize(4 * 1024); int ret = format->sprint((char *) buf.data(), buf.size(), &wbytes, smps, cnt); if (ret < 0) // TODO log return ret; buf.resize(wbytes); conn->sendMessage(buf); return ret; } int WebRTCNodeFactory::start(SuperNode *sn) { web = sn->getWeb(); if (!web->isEnabled()) return -1; return 0; } static WebRTCNodeFactory p;