1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

Merge pull request #686 from VILLASframework/webrtc-new-signaling

Refactor WebRTC signaling and Node UUID initialization
This commit is contained in:
Steffen Vogel 2023-07-01 00:13:47 +02:00 committed by GitHub
commit e7e168d5f2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
55 changed files with 406 additions and 539 deletions

View file

@ -1,28 +0,0 @@
if (CMAKE_HOST_SYSTEM_PROCESSOR STREQUAL "x86_64")
set(GO_DOWNLOAD_ARCH "amd64")
set(GO_DOWNLOAD_HASH "980e65a863377e69fd9b67df9d8395fd8e93858e7a24c9f55803421e453f4f99")
elseif(CMAKE_HOST_SYSTEM_PROCESSOR STREQUAL "aarch64")
set(GO_DOWNLOAD_ARCH "arm64")
set(GO_DOWNLOAD_HASH "57a9171682e297df1a5bd287be056ed0280195ad079af90af16dcad4f64710cb")
elseif(CMAKE_HOST_SYSTEM_PROCESSOR STREQUAL "armv7l")
set(GO_DOWNLOAD_ARCH "armv6l")
set(GO_DOWNLOAD_HASH "3287ca2fe6819fa87af95182d5942bf4fa565aff8f145812c6c70c0466ce25ae")
endif()
FetchContent_Declare(
go
URL https://go.dev/dl/go1.17.8.linux-${GO_DOWNLOAD_ARCH}.tar.gz
URL_HASH SHA256=${GO_DOWNLOAD_HASH}
)
message(STATUS "Downloading Go toolchain for linux/${GO_DOWNLOAD_ARCH}")
FetchContent_MakeAvailable(go)
find_program(GO
NAMES go
NO_DEFAULT_PATH
PATHS
${CMAKE_CURRENT_BINARY_DIR}/_deps/go-src/bin
${go_SOURCE_DIR}
)

2
common

@ -1 +1 @@
Subproject commit d9d4ac76a5403e14f7899dae480781e9cdcf0572
Subproject commit 120312e938dc298b4dc13792e1acf7510190bbf4

View file

@ -8,8 +8,6 @@
#pragma once
#include <uuid/uuid.h>
#include <villas/api/request.hpp>
namespace villas {

View file

@ -6,8 +6,6 @@
* @license Apache 2.0
*********************************************************************************/
#include <uuid.h>
#include <villas/api/request.hpp>
namespace villas {

View file

@ -108,7 +108,7 @@ public:
virtual ~LuaHook();
virtual void parse(json_t *cfg);
virtual void parse(json_t *json);
virtual void prepare();

View file

@ -10,13 +10,12 @@
#include <stdbool.h>
#include <uuid/uuid.h>
#include <jansson.h>
typedef void *vnode;
typedef void *vsample;
vnode * node_new(const char *json_str, const char *sn_uuid_str);
vnode * node_new(const char *id_str, const char *json_str);
int node_prepare(vnode *n);

View file

@ -18,6 +18,7 @@
#include <villas/sample.hpp>
#include <villas/list.hpp>
#include <villas/queue.h>
#include <villas/colors.hpp>
#include <villas/common.hpp>
#include <villas/stats.hpp>
#include <villas/log.hpp>
@ -105,9 +106,15 @@ protected:
return -1;
}
virtual
json_t * _readStatus() const
{
return nullptr;
}
public:
/** Initialize node with default values */
Node(const std::string &name = "");
Node(const uuid_t &id = {}, const std::string &name = "");
/** Destroy node by freeing dynamically allocated memory. */
virtual
@ -124,7 +131,7 @@ public:
* @retval <0 Error. Something went wrong.
*/
virtual
int parse(json_t *json, const uuid_t sn_uuid);
int parse(json_t *json);
/** Validate node configuration. */
virtual
@ -351,6 +358,8 @@ protected:
n->logger = getLogger();
n->factory = this;
n->name_long = fmt::format(CLR_RED("{}") "(" CLR_YEL("{}") ")", n->name_short, getName());
instances.push_back(n);
}
@ -376,13 +385,13 @@ public:
}
virtual
Node * make() = 0;
Node * make(const uuid_t &id = {}, const std::string &nme = "") = 0;
static
Node * make(json_t *json, uuid_t uuid);
Node * make(json_t *json, const uuid_t &id, const std::string &name = "");
static
Node * make(const std::string &type);
Node * make(const std::string &type, const uuid_t &id = {}, const std::string &name = "");
virtual
std::string getType() const
@ -438,9 +447,9 @@ class NodePlugin : public NodeFactory {
public:
virtual
Node * make()
Node * make(const uuid_t &id = {}, const std::string &nme = "")
{
T* n = new T();
T* n = new T(id, nme);
init(n);

View file

@ -43,7 +43,7 @@ public:
NodeCompat *node;
json_t *cfg;
NodeCompat(struct NodeCompatType *vt);
NodeCompat(struct NodeCompatType *vt, const uuid_t &id, const std::string &name);
NodeCompat(const NodeCompat& n);
NodeCompat& operator=(const NodeCompat& other);
@ -66,12 +66,12 @@ public:
/** Parse node connection details.
*
* @param cfg A JSON object containing the configuration of the node.
* @param json A JSON object containing the configuration of the node.
* @retval 0 Success. Everything went well.
* @retval <0 Error. Something went wrong.
*/
virtual
int parse(json_t *cfg, const uuid_t sn_uuid);
int parse(json_t *json);
/** Returns a string with a textual represenation of this node. */
virtual
@ -170,7 +170,7 @@ public:
{ }
virtual
Node * make();
Node * make(const uuid_t &id = {}, const std::string &name = "");
/// Get plugin name
virtual

View file

@ -22,7 +22,7 @@ struct Sample;
class APINode : public Node {
public:
APINode(const std::string &name = "");
APINode(const uuid_t &id = {}, const std::string &name = "");
struct Direction {
Sample *sample;
@ -42,7 +42,7 @@ public:
protected:
virtual
int parse(json_t *json, const uuid_t sn_uuid);
int parse(json_t *json);
virtual
int _read(struct Sample *smps[], unsigned cnt);

View file

@ -42,7 +42,7 @@ protected:
int _write(struct Sample *smps[], unsigned cnt);
public:
ExampleNode(const std::string &name = "");
ExampleNode(const uuid_t &id = {}, const std::string &name = "");
/* All of the following virtual-declared functions are optional.
* Have a look at node.hpp/node.cpp for the default behaviour.
@ -55,7 +55,7 @@ public:
int prepare();
virtual
int parse(json_t *json, const uuid_t sn_uuid);
int parse(json_t *json);
/** Validate node configuration. */
virtual

View file

@ -42,8 +42,8 @@ protected:
int _write(struct Sample * smps[], unsigned cnt);
public:
ExecNode(const std::string &name = "") :
Node(name),
ExecNode(const uuid_t &id = {}, const std::string &name = "") :
Node(id, name),
stream_in(nullptr),
stream_out(nullptr),
flush(true),
@ -66,7 +66,7 @@ public:
int prepare();
virtual
int parse(json_t *json, const uuid_t sn_uuid);
int parse(json_t *json);
virtual
std::vector<int> getPollFDs();

View file

@ -51,13 +51,13 @@ protected:
int _write(Sample *smps[], unsigned cnt);
public:
FpgaNode(const std::string &name = "");
FpgaNode(const uuid_t &id = {}, const std::string &name = "");
virtual
~FpgaNode();
virtual
int parse(json_t *cfg, const uuid_t sn_uuid);
int parse(json_t *json);
virtual
const std::string & getDetails();
@ -79,9 +79,9 @@ public:
using NodeFactory::NodeFactory;
virtual
Node * make()
Node * make(const uuid_t &id = {}, const std::string &nme = "")
{
auto *n = new FpgaNode;
auto *n = new FpgaNode(id, nme);
init(n);

View file

@ -1,20 +0,0 @@
/** Node-type implemeted in Go language
*
* @file
* @author Steffen Vogel <post@steffenvogel.de>
* @copyright 2014-2022, Institute for Automation of Complex Power Systems, EONERC
* @license Apache 2.0
*********************************************************************************/
#pragma once
typedef void *_go_plugin;
typedef void *_go_plugin_list;
typedef void *_go_logger;
typedef void (*_go_register_node_factory_cb)(_go_plugin_list pl, char *name, char *desc, int flags);
typedef void (*_go_logger_log_cb)(_go_logger l, int level, char *msg);
_go_logger * _go_logger_get(char *name);
void _go_logger_log(_go_logger *l, int level, char *msg);

View file

@ -1,127 +0,0 @@
/** Node-type implemeted in Go language
*
* @file
* @author Steffen Vogel <post@steffenvogel.de>
* @copyright 2014-2022, Institute for Automation of Complex Power Systems, EONERC
* @license Apache 2.0
*********************************************************************************/
#pragma once
#include <villas/node.hpp>
namespace villas {
namespace node {
/* Forward declarations */
struct Sample;
class Format;
class GoNodeFactory;
class GoNode : public Node {
friend GoNodeFactory;
protected:
uintptr_t node; // runtime/cgo.Handle
std::string _details;
Format *formatter;
virtual
int _read(struct Sample * smps[], unsigned cnt);
virtual
int _write(struct Sample * smps[], unsigned cnt);
public:
GoNode(uintptr_t n);
virtual
~GoNode();
virtual
int parse(json_t *json, const uuid_t sn_uuid);
virtual
std::vector<int> getPollFDs();
virtual
std::vector<int> getNetemFDs();
virtual
const std::string & getDetails();
virtual
int prepare();
virtual
int check();
virtual
int start();
virtual
int stop();
virtual
int pause();
virtual
int resume();
virtual
int restart();
virtual
int reverse();
};
class GoNodeFactory : public NodeFactory {
protected:
std::string name;
std::string description;
int flags;
public:
GoNodeFactory(char *n, char *d, int f = 0) :
NodeFactory(),
name(n),
description(d),
flags(f)
{ }
virtual
Node * make();
virtual
std::string getName() const
{
return name;
}
virtual
std::string getDescription() const
{
return description;
}
virtual
int getFlags() const
{
return flags;
}
};
class GoPluginRegistry : public plugin::SubRegistry {
public:
GoPluginRegistry();
plugin::List<> lookup();
};
} /* namespace node */
} /* namespace villas */

View file

@ -48,7 +48,7 @@ public:
};
// Parse the config json
static ASDUData parse(json_t *signal_json, std::optional<ASDUData> last_data, bool duplicate_ioa_is_sequence);
static ASDUData parse(json_t *json_signal, std::optional<ASDUData> last_data, bool duplicate_ioa_is_sequence);
// Does this data include a timestamp
bool hasTimestamp() const;
@ -171,13 +171,13 @@ protected:
int _write(struct Sample *smps[], unsigned cnt) override;
public:
SlaveNode(const std::string &name = "");
SlaveNode(const uuid_t &id = {}, const std::string &name = "");
virtual
~SlaveNode() override;
virtual
int parse(json_t *json, const uuid_t sn_uuid) override;
int parse(json_t *json) override;
virtual
int start() override;

View file

@ -233,7 +233,7 @@ protected:
int _write(struct Sample *smps[], unsigned cnt) override;
public:
GooseNode(const std::string &name = "");
GooseNode(const uuid_t &id = {}, const std::string &name = "");
virtual
~GooseNode() override;
@ -242,7 +242,7 @@ public:
std::vector<int> getPollFDs() override;
virtual
int parse(json_t *json, const uuid_t sn_uuid) override;
int parse(json_t *json) override;
virtual
int prepare() override;

View file

@ -29,7 +29,7 @@ protected:
int _read(struct Sample * smps[], unsigned cnt);
public:
LoopbackNode(const std::string &name = "");
LoopbackNode(const uuid_t &id = {}, const std::string &name = "");
virtual
~LoopbackNode();

View file

@ -47,7 +47,7 @@ public:
using NodeFactory::NodeFactory;
virtual
Node * make()
Node * make(const uuid_t &id = {}, const std::string &name = "")
{
return nullptr;
}

View file

@ -82,13 +82,13 @@ protected:
int _read(struct Sample *smps[], unsigned cnt);
public:
SignalNode(const std::string &name = "");
SignalNode(const uuid_t &id = {}, const std::string &name = "");
virtual
const std::string & getDetails();
virtual
int parse(json_t *json, const uuid_t sn_uuid);
int parse(json_t *json);
virtual
int start();

View file

@ -30,6 +30,7 @@ class WebRTCNode : public Node {
protected:
std::string server;
std::string session;
std::string peer;
int wait_seconds;
Format *format;
@ -46,8 +47,11 @@ protected:
virtual
int _write(struct Sample *smps[], unsigned cnt);
virtual
json_t * _readStatus() const;
public:
WebRTCNode(const std::string &name = "");
WebRTCNode(const uuid_t &id = {}, const std::string &name = "");
virtual
~WebRTCNode();
@ -56,10 +60,7 @@ public:
int prepare();
virtual
int parse(json_t *json, const uuid_t sn_uuid);
virtual
int check();
int parse(json_t *json);
virtual
int start();
@ -81,9 +82,9 @@ public:
using NodeFactory::NodeFactory;
virtual
Node * make()
Node * make(const uuid_t &id = {}, const std::string &nme = "")
{
auto *n = new WebRTCNode();
auto *n = new WebRTCNode(id, nme);
init(n);

View file

@ -10,10 +10,12 @@
#pragma once
#include <jansson.h>
#include <rtc/rtc.hpp>
#include <villas/log.hpp>
#include <villas/web.hpp>
#include <villas/signal_list.hpp>
#include <villas/nodes/webrtc/signaling_client.hpp>
namespace villas {
@ -23,13 +25,15 @@ namespace webrtc {
class PeerConnection {
public:
PeerConnection(const std::string &server, const std::string &session, rtc::Configuration config, Web *w, rtc::DataChannelInit d);
PeerConnection(const std::string &server, const std::string &session, const std::string &peer, std::shared_ptr<SignalList> signals, rtc::Configuration config, Web *w, rtc::DataChannelInit d);
~PeerConnection();
bool waitForDataChannel(std::chrono::seconds timeout);
void onMessage(std::function<void(rtc::binary)> callback);
void sendMessage(rtc::binary msg);
json_t * readStatus() const;
void connect();
void disconnect();
@ -42,6 +46,7 @@ protected:
std::shared_ptr<rtc::PeerConnection> conn;
std::shared_ptr<rtc::DataChannel> chan;
std::shared_ptr<SignalingClient> client;
std::shared_ptr<SignalList> signals;
Logger logger;

View file

@ -81,7 +81,7 @@ protected:
int writable();
public:
SignalingClient(const std::string &server, const std::string &session, Web *w);
SignalingClient(const std::string &server, const std::string &session, const std::string &peer, Web *w);
~SignalingClient();
static

View file

@ -18,18 +18,21 @@
#include <rtc/rtc.hpp>
#include <jansson.h>
#include <villas/signal_list.hpp>
namespace villas {
namespace node {
namespace webrtc {
struct Connection {
struct Peer {
int id;
std::string name;
std::string remote;
std::string userAgent;
std::chrono::time_point<std::chrono::system_clock> created;
Connection(json_t *j);
json_t * toJSON() const;
Peer(json_t *j);
json_t * toJson() const;
};
struct RelayMessage {
@ -39,18 +42,18 @@ struct RelayMessage {
};
struct ControlMessage {
int connectionID;
std::vector<Connection> connections;
int peerID;
std::vector<Peer> peers;
ControlMessage(json_t *j);
json_t * toJSON() const;
json_t * toJson() const;
};
struct SignalingMessage {
std::variant<std::monostate, RelayMessage, ControlMessage, rtc::Description, rtc::Candidate> message;
std::variant<std::monostate, RelayMessage, ControlMessage, SignalList, rtc::Description, rtc::Candidate> message;
static SignalingMessage fromJSON(json_t *j);
json_t * toJSON() const;
static SignalingMessage fromJson(json_t *j);
json_t * toJson() const;
std::string toString() const;
};

View file

@ -188,6 +188,12 @@ public:
return state;
}
/** Get the UUID of this path. */
const uuid_t & getUuid() const
{
return uuid;
}
json_t * toJson() const;
};

View file

@ -15,6 +15,7 @@
#include <villas/log.hpp>
#include <villas/signal.hpp>
#include <villas/exceptions.hpp>
namespace villas {
namespace node {
@ -29,6 +30,12 @@ public:
SignalList(unsigned len, enum SignalType fmt);
SignalList(const char *dt);
SignalList(json_t *json)
{
int ret = parse(json);
if (ret)
throw RuntimeError("Failed to parse signal list");
}
int parse(json_t *json);

View file

@ -146,9 +146,9 @@ public:
return state;
}
void getUUID(uuid_t out) const
const uuid_t & getUuid() const
{
uuid_copy(out, uuid);
return uuid;
}
struct timespec getStartTime() const

View file

@ -134,11 +134,6 @@ target_include_directories(villas PUBLIC ${INCLUDE_DIRS})
target_link_libraries(villas PUBLIC ${LIBRARIES})
target_link_libraries(villas PRIVATE -Wl,--whole-archive ${WHOLE_ARCHIVES} -Wl,--no-whole-archive)
# We need to link with -Bsymbolic in order to use link libvillas
# with Go code (Cgo)
# See also: https://stackoverflow.com/a/67299849
# target_link_options(villas PUBLIC "-Wl,-Bsymbolic")
set_target_properties(villas PROPERTIES
VERSION ${CMAKE_PROJECT_VERSION}
SOVERSION 1

View file

@ -6,7 +6,6 @@
*********************************************************************************/
#include <jansson.h>
#include <uuid/uuid.h>
#include <villas/super_node.hpp>
#include <villas/node.hpp>

View file

@ -6,7 +6,6 @@
*********************************************************************************/
#include <jansson.h>
#include <uuid/uuid.h>
#include <villas/super_node.hpp>
#include <villas/node.hpp>

View file

@ -6,7 +6,6 @@
*********************************************************************************/
#include <jansson.h>
#include <uuid/uuid.h>
#include <villas/super_node.hpp>
#include <villas/path.hpp>

View file

@ -6,16 +6,14 @@
*********************************************************************************/
#include <time.h>
#include <uuid/uuid.h>
#include <sys/utsname.h>
#include <sys/sysinfo.h>
#include <villas/uuid.hpp>
#include <villas/timing.hpp>
#include <villas/api/request.hpp>
#include <villas/api/response.hpp>
typedef char uuid_string_t[37];
namespace villas {
namespace node {
namespace api {
@ -37,15 +35,10 @@ public:
auto *sn = session->getSuperNode();
uuid_t uuid;
uuid_string_t uuid_str;
struct utsname uts;
struct sysinfo sinfo;
char hname[128];
sn->getUUID(uuid);
uuid_unparse_lower(uuid, uuid_str);
auto now = time_now();
auto started = sn->getStartTime();
@ -73,7 +66,7 @@ public:
"build_id", PROJECT_BUILD_ID,
"build_date", PROJECT_BUILD_DATE,
"hostname", hname,
"uuid", uuid_str,
"uuid", uuid::toString(sn->getUuid()).c_str(),
"time_now", time_to_double(&now),
"time_started", time_to_double(&started),
"timezone",

View file

@ -5,8 +5,7 @@
* @license Apache 2.0
*********************************************************************************/
#include <uuid.h>
#include <villas/uuid.hpp>
#include <villas/api/requests/universal.hpp>
#include <villas/api/response.hpp>
#include <villas/node.hpp>
@ -28,14 +27,9 @@ public:
if (body != nullptr)
throw BadRequest("This endpoint does not accept any body data");
auto uid = node->getUuid();
char uid_str[UUID_STR_LEN];
uuid_unparse(uid, uid_str);
auto *info = json_pack("{ s: s, s: s, s: { s: s, s: s, s: s } }",
"id", node->getNameShort().c_str(),
"uuid", uid_str,
"uuid", uuid::toString(node->getUuid()).c_str(),
"transport",
"type", "villas",

View file

@ -178,13 +178,13 @@ public:
int windowSizeIn = 0; // Size of window in samples
json_t *pairings_json = nullptr;
json_t *json_pairings = nullptr;
MultiSignalHook::parse(json);
result = json_unpack_ex(json, &json_error, 0, "{ s: i , s: o, s?: b, s?: b, s?: b, s?: b, s?: b, s? : s, s? : s }",
"window_size", &windowSizeIn,
"pairings", &pairings_json,
"pairings", &json_pairings,
"add_channel_name", &channelNameEnable,
"active_power", &calcActivePower,
"reactive_power", &calcReactivePower,
@ -223,16 +223,16 @@ public:
throw ConfigError(json, "node-config-hook-dft-angle-unit", "Angle unit {} not recognized", angleUnitC);
// Pairings
if (!json_is_array(pairings_json))
throw ConfigError(pairings_json, "node-config-hook-power", "Pairings are expected as json array");
if (!json_is_array(json_pairings))
throw ConfigError(json_pairings, "node-config-hook-power", "Pairings are expected as json array");
size_t i = 0;
json_t *pairings_json_value;
json_array_foreach(pairings_json, i, pairings_json_value) {
json_t *json_pairings_value;
json_array_foreach(json_pairings, i, json_pairings_value) {
const char *voltageNameC = nullptr;
const char *currentNameC = nullptr;
json_unpack_ex(pairings_json_value, &json_error, 0, "{ s: s, s: s}",
json_unpack_ex(json_pairings_value, &json_error, 0, "{ s: s, s: s}",
"voltage", &voltageNameC,
"current", &currentNameC
);

View file

@ -41,7 +41,7 @@ using namespace villas;
using namespace villas::node;
using namespace villas::utils;
Node::Node(const std::string &n) :
Node::Node(const uuid_t &id, const std::string &name) :
logger(logging.get("node")),
sequence_init(0),
sequence(0),
@ -57,14 +57,23 @@ Node::Node(const std::string &n) :
state(State::INITIALIZED),
enabled(true),
config(nullptr),
name_short(n),
name_long(),
name_short(name),
affinity(-1), /* all cores */
factory(nullptr)
{
uuid_clear(uuid);
if (uuid_is_null(id)) {
uuid_generate_random(uuid);
} else {
uuid_copy(uuid, id);
}
name_long = fmt::format(CLR_RED("{}"), n);
if (!name_short.empty()) {
name_long = fmt::format(CLR_RED("{}"), name_short);
}
else if (name_short.empty()) {
name_short = "<none>";
name_long = CLR_RED("<none>");
}
}
Node::~Node()
@ -94,7 +103,7 @@ int Node::prepare()
return 0;
}
int Node::parse(json_t *json, const uuid_t sn_uuid)
int Node::parse(json_t *json)
{
assert(state == State::INITIALIZED ||
state == State::PARSED ||
@ -105,12 +114,7 @@ int Node::parse(json_t *json, const uuid_t sn_uuid)
json_error_t err;
json_t *json_netem = nullptr;
const char *uuid_str = nullptr;
const char *name_str = nullptr;
ret = json_unpack_ex(json, &err, 0, "{ s?: s, s?: s, s?: b, s?: i }",
"name", &name_str,
"uuid", &uuid_str,
ret = json_unpack_ex(json, &err, 0, "{ s?: b, s?: i }",
"enabled", &en,
"initial_sequenceno", &init_seq
);
@ -120,9 +124,6 @@ int Node::parse(json_t *json, const uuid_t sn_uuid)
if (init_seq >= 0)
sequence_init = init_seq;
if (name_str)
logger = logging.get(fmt::format("node:{}", name_str));
#ifdef __linux__
ret = json_unpack_ex(json, &err, 0, "{ s?: { s?: o, s?: i } }",
"out",
@ -135,24 +136,6 @@ int Node::parse(json_t *json, const uuid_t sn_uuid)
enabled = en;
if (name_str) {
name_short = std::string(name_str);
name_long = fmt::format(CLR_RED("{}") "(" CLR_YEL("{}") ")", name_str, factory->getName());
}
else if (name_short.empty()) {
name_short = "<none>";
name_long = CLR_RED("<none>");
}
if (uuid_str) {
ret = uuid_parse(uuid_str, uuid);
if (ret)
throw ConfigError(json, "node-config-node-uuid", "Failed to parse UUID: {}", uuid_str);
}
else
/* Generate UUID from hashed config including node name */
uuid::generateFromJson(uuid, json, sn_uuid);
if (json_netem) {
#ifdef WITH_NETEM
int enabled = 1;
@ -184,7 +167,6 @@ int Node::parse(json_t *json, const uuid_t sn_uuid)
/* Skip if direction is unused */
if (!json_dir) {
json_dir = json_pack("{ s: b }", "enabled", 0);
// json_object_set_new(json, dirs[j].str, json_dir);
}
/* Copy missing fields from main node config to direction config */
@ -382,11 +364,8 @@ int Node::write(struct Sample * smps[], unsigned cnt)
const std::string & Node::getNameFull()
{
if (name_full.empty()) {
char uuid_str[37];
uuid_unparse(uuid, uuid_str);
name_full = fmt::format("{}: uuid={}, #in.signals={}/{}, #in.hooks={}, #out.hooks={}, in.vectorize={}, out.vectorize={}",
getName(), uuid_str,
getName(), uuid::toString(uuid).c_str(),
getInputSignals(false)->size(),
getInputSignals(true)->size(),
in.hooks.size(), out.hooks.size(),
@ -456,9 +435,6 @@ json_t * Node::toJson() const
json_t *json_signals_in = nullptr;
json_t *json_signals_out = nullptr;
char uuid_str[37];
uuid_unparse(uuid, uuid_str);
json_signals_in = getInputSignals()->toJson();
auto output_signals = getOutputSignals();
@ -467,7 +443,7 @@ json_t * Node::toJson() const
json_node = json_pack("{ s: s, s: s, s: s, s: i, s: { s: i, s: o? }, s: { s: i, s: o? } }",
"name", getNameShort().c_str(),
"uuid", uuid_str,
"uuid", uuid::toString(uuid).c_str(),
"state", stateToString(state).c_str(),
"affinity", affinity,
"in",
@ -481,6 +457,10 @@ json_t * Node::toJson() const
if (stats)
json_object_set_new(json_node, "stats", stats->toJson());
auto *status = _readStatus();
if (status)
json_object_set_new(json_node, "status", status);
/* Add all additional fields of node here.
* This can be used for metadata */
json_object_update(json_node, config);
@ -492,45 +472,39 @@ void Node::swapSignals() {
SWAP(in.signals, out.signals);
}
Node * NodeFactory::make(json_t *json, uuid_t uuid)
Node * NodeFactory::make(json_t *json, const uuid_t &id, const std::string &name)
{
int ret;
std::string type;
Node *n;
if (json_is_string(json)) {
type = json_string_value(json);
if (json_is_object(json))
throw ConfigError(json, "node-config-node", "Node configuration must be an object");
return NodeFactory::make(type);
json_t *json_type = json_object_get(json, "type");
type = json_string_value(json_type);
n = NodeFactory::make(type, id, name);
if (!n)
return nullptr;
ret = n->parse(json);
if (ret) {
delete n;
return nullptr;
}
else if (json_is_object(json)) {
json_t *json_type = json_object_get(json, "type");
type = json_string_value(json_type);
n = NodeFactory::make(type);
if (!n)
return nullptr;
ret = n->parse(json, uuid);
if (ret) {
delete n;
return nullptr;
}
return n;
}
else
throw ConfigError(json, "node-config-node", "Invalid node config");
return n;
}
Node * NodeFactory::make(const std::string &type)
Node * NodeFactory::make(const std::string &type, const uuid_t &id, const std::string &name)
{
NodeFactory *nf = plugin::registry->lookup<NodeFactory>(type);
if (!nf)
throw RuntimeError("Unknown node-type: {}", type);
return nf->make();
return nf->make(id, name);
}
int NodeFactory::start(SuperNode *sn)

View file

@ -16,13 +16,13 @@ extern "C" {
using namespace villas;
using namespace villas::node;
vnode * node_new(const char *json_str, const char *sn_uuid_str)
vnode * node_new(const char *id_str, const char *json_str)
{
json_error_t err;
uuid_t sn_uuid;
uuid_parse(sn_uuid_str, sn_uuid);
uuid_t id;
uuid_parse(id_str, id);
auto *json = json_loads(json_str, 0, &err);
return (vnode *) NodeFactory::make(json, sn_uuid);
return (vnode *) NodeFactory::make(json, id);
}
int node_prepare(vnode *n)

View file

@ -12,8 +12,8 @@
using namespace villas;
using namespace villas::node;
NodeCompat::NodeCompat(struct NodeCompatType *vt) :
Node(),
NodeCompat::NodeCompat(struct NodeCompatType *vt, const uuid_t &id, const std::string &name) :
Node(id, name),
_vt(vt)
{
_vd = new char[_vt->size];
@ -89,14 +89,14 @@ int NodeCompat::prepare()
return Node::prepare();
}
int NodeCompat::parse(json_t *cfg, const uuid_t sn_uuid)
int NodeCompat::parse(json_t *json)
{
int ret = Node::parse(cfg, sn_uuid);
int ret = Node::parse(json);
if (ret)
return ret;
ret = _vt->parse
? _vt->parse(this, cfg)
? _vt->parse(this, json)
: 0;
if (!ret)
state = State::PARSED;
@ -270,9 +270,9 @@ const std::string & NodeCompat::getDetails()
return _details;
}
Node * NodeCompatFactory::make()
Node * NodeCompatFactory::make(const uuid_t &id, const std::string &name)
{
auto *n = new NodeCompat(_vt);
auto *n = new NodeCompat(_vt, id, name);
init(n);

View file

@ -16,8 +16,8 @@ using namespace villas;
using namespace villas::node;
using namespace villas::node::api::universal;
APINode::APINode(const std::string &name) :
Node(name),
APINode::APINode(const uuid_t &id, const std::string &name) :
Node(id, name),
read(),
write()
{
@ -94,9 +94,9 @@ int APINode::_write(struct Sample *smps[], unsigned cnt)
return 1;
}
int APINode::parse(json_t *json, const uuid_t sn_uuid)
int APINode::parse(json_t *json)
{
int ret = Node::parse(json, sn_uuid);
int ret = Node::parse(json);
if (ret)
return ret;

View file

@ -20,8 +20,8 @@ using namespace villas;
using namespace villas::node;
using namespace villas::utils;
ExampleNode::ExampleNode(const std::string &name) :
Node(name),
ExampleNode::ExampleNode(const uuid_t &id, const std::string &name) :
Node(id, name),
setting1(72),
setting2("something"),
state1(0)
@ -40,7 +40,7 @@ int ExampleNode::prepare()
return 0;
}
int ExampleNode::parse(json_t *json, const uuid_t sn_uuid)
int ExampleNode::parse(json_t *json)
{
/* TODO: Add implementation here. The following is just an example */

View file

@ -27,9 +27,9 @@ ExecNode::~ExecNode()
fclose(stream_out);
}
int ExecNode::parse(json_t *json, const uuid_t sn_uuid)
int ExecNode::parse(json_t *json)
{
int ret = Node::parse(json, sn_uuid);
int ret = Node::parse(json);
if (ret)
return ret;

View file

@ -42,8 +42,8 @@ static std::shared_ptr<kernel::vfio::Container> vfioContainer;
using namespace villas;
using namespace villas::node;
FpgaNode::FpgaNode(const std::string &name) :
Node(name),
FpgaNode::FpgaNode(const uuid_t &id, const std::string &name) :
Node(id, name),
irqFd(-1),
coalesce(0),
polling(true)
@ -52,9 +52,9 @@ FpgaNode::FpgaNode(const std::string &name) :
FpgaNode::~FpgaNode()
{ }
int FpgaNode::parse(json_t *cfg, const uuid_t sn_uuid)
int FpgaNode::parse(json_t *json)
{
int ret = Node::parse(cfg, sn_uuid);
int ret = Node::parse(json);
if (ret)
return ret;
@ -65,7 +65,7 @@ int FpgaNode::parse(json_t *cfg, const uuid_t sn_uuid)
const char *dma = nullptr;
int poll = polling;
ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: s, s?: s, s?: i, s?: b }",
ret = json_unpack_ex(json, &err, 0, "{ s?: s, s?: s, s?: s, s?: i, s?: b }",
"card", &card,
"interface", &intf,
"dma", &dma,
@ -73,7 +73,7 @@ int FpgaNode::parse(json_t *cfg, const uuid_t sn_uuid)
"polling", &polling
);
if (ret)
throw ConfigError(cfg, err, "node-config-fpga", "Failed to parse configuration of node {}", this->getName());
throw ConfigError(json, err, "node-config-fpga", "Failed to parse configuration of node {}", this->getName());
if (card)
cardName = card;

View file

@ -35,7 +35,7 @@ static timespec cp56time2a_to_timespec(CP56Time2a cp56time2a) {
return time;
}
ASDUData ASDUData::parse(json_t *signal_json, std::optional<ASDUData> last_data, bool duplicate_ioa_is_sequence) {
ASDUData ASDUData::parse(json_t *json_signal, std::optional<ASDUData> last_data, bool duplicate_ioa_is_sequence) {
json_error_t err;
char const *asdu_type_name = nullptr;
int with_timestamp = -1;
@ -43,13 +43,13 @@ ASDUData ASDUData::parse(json_t *signal_json, std::optional<ASDUData> last_data,
std::optional<int> ioa_sequence_start = std::nullopt;
int ioa = -1;
if (json_unpack_ex(signal_json, &err, 0, "{ s?: s, s?: b, s?: s, s: i }",
if (json_unpack_ex(json_signal, &err, 0, "{ s?: s, s?: b, s?: s, s: i }",
"asdu_type", &asdu_type_name,
"with_timestamp", &with_timestamp,
"asdu_type_id", &asdu_type_id,
"ioa", &ioa
))
throw ConfigError(signal_json, err, "node-config-node-iec60870-5-104");
throw ConfigError(json_signal, err, "node-config-node-iec60870-5-104");
// Increase the ioa if it is found twice to make it a sequence
if ( duplicate_ioa_is_sequence &&
@ -630,8 +630,8 @@ int SlaveNode::_write(Sample *samples[], unsigned sample_count)
return sample_count;
}
SlaveNode::SlaveNode(const std::string &name) :
Node(name)
SlaveNode::SlaveNode(const uuid_t &id, const std::string &name) :
Node(id, name)
{
server.state = SlaveNode::Server::NONE;
@ -662,16 +662,16 @@ SlaveNode::~SlaveNode()
destroySlave();
}
int SlaveNode::parse(json_t *json, const uuid_t sn_uuid)
int SlaveNode::parse(json_t *json)
{
int ret = Node::parse(json, sn_uuid);
int ret = Node::parse(json);
if (ret)
return ret;
json_error_t err;
auto signals = getOutputSignals();
json_t *out_json = nullptr;
json_t *json_out = nullptr;
char const *address = nullptr;
int apci_t0 = -1;
int apci_t1 = -1;
@ -681,7 +681,7 @@ int SlaveNode::parse(json_t *json, const uuid_t sn_uuid)
int apci_w = -1;
ret = json_unpack_ex(json, &err, 0, "{ s?: o, s?: s, s?: i, s?: i, s?: i, s?: i, s?: i, s?: i, s?: i, s?: i, s?: i, s?: i }",
"out", &out_json,
"out", &json_out,
"address", &address,
"port", &server.local_port,
"ca", &server.common_address,
@ -718,28 +718,28 @@ int SlaveNode::parse(json_t *json, const uuid_t sn_uuid)
if (address)
server.local_address = address;
json_t *signals_json = nullptr;
json_t *json_signals = nullptr;
int duplicate_ioa_is_sequence = false;
if (out_json) {
if (json_out) {
output.enabled = true;
ret = json_unpack_ex(out_json, &err, 0, "{ s: o, s?: b }",
"signals", &signals_json,
ret = json_unpack_ex(json_out, &err, 0, "{ s: o, s?: b }",
"signals", &json_signals,
"duplicate_ioa_is_sequence", &duplicate_ioa_is_sequence
);
if (ret)
throw ConfigError(out_json, err, "node-config-node-iec60870-5-104");
throw ConfigError(json_out, err, "node-config-node-iec60870-5-104");
}
if (signals_json) {
json_t *signal_json;
if (json_signals) {
json_t *json_signal;
size_t i;
std::optional<ASDUData> last_data = std::nullopt;
json_array_foreach(signals_json, i, signal_json) {
json_array_foreach(json_signals, i, json_signal) {
auto signal = signals ? signals->getByIndex(i) : Signal::Ptr{};
auto asdu_data = ASDUData::parse(signal_json, last_data, duplicate_ioa_is_sequence);
auto asdu_data = ASDUData::parse(json_signal, last_data, duplicate_ioa_is_sequence);
last_data = asdu_data;
SignalData initial_value;

View file

@ -572,8 +572,8 @@ int GooseNode::_write(Sample *samples[], unsigned sample_count)
return sample_count;
}
GooseNode::GooseNode(const std::string &name) :
Node(name)
GooseNode::GooseNode(const uuid_t &id, const std::string &name) :
Node(id, name)
{
input.state = Input::NONE;
@ -601,26 +601,26 @@ GooseNode::~GooseNode()
err = pool_destroy(&input.pool);
}
int GooseNode::parse(json_t *json, const uuid_t sn_uuid)
int GooseNode::parse(json_t *json)
{
int ret;
json_error_t err;
ret = Node::parse(json, sn_uuid);
ret = Node::parse(json);
if (ret)
return ret;
json_t *in_json = nullptr;
json_t *out_json = nullptr;
json_t *json_in = nullptr;
json_t *json_out = nullptr;
ret = json_unpack_ex(json, &err, 0, "{ s: o, s: o }",
"in", &in_json,
"out", &out_json
"in", &json_in,
"out", &json_out
);
if (ret)
throw ConfigError(json, err, "node-config-node-iec61850-8-1");
parseInput(in_json);
parseOutput(out_json);
parseInput(json_in);
parseOutput(json_out);
return 0;
}
@ -630,21 +630,21 @@ void GooseNode::parseInput(json_t *json)
int ret;
json_error_t err;
json_t *subscribers_json = nullptr;
json_t *signals_json = nullptr;
json_t *json_subscribers = nullptr;
json_t *json_signals = nullptr;
char const *interface_id = input.interface_id.c_str();
int with_timestamp = true;
ret = json_unpack_ex(json, &err, 0, "{ s: o, s: o, s?: s, s: b }",
"subscribers", &subscribers_json,
"signals", &signals_json,
"subscribers", &json_subscribers,
"signals", &json_signals,
"interface", &interface_id,
"with_timestamp", &with_timestamp
);
if (ret)
throw ConfigError(json, err, "node-config-node-iec61850-8-1");
parseSubscribers(subscribers_json, input.contexts);
parseInputSignals(signals_json, input.mappings);
parseSubscribers(json_subscribers, input.contexts);
parseInputSignals(json_signals, input.mappings);
input.interface_id = interface_id;
input.with_timestamp = with_timestamp;
@ -691,15 +691,15 @@ void GooseNode::parseSubscriber(json_t *json, GooseNode::SubscriberConfig &sc)
void GooseNode::parseSubscribers(json_t *json, std::map<std::string, InputEventContext> &ctx)
{
char const* key;
json_t* subscriber_json;
json_t* json_subscriber;
if (!json_is_object(json))
throw RuntimeError("subscribers is not an object");
json_object_foreach(json, key, subscriber_json) {
json_object_foreach(json, key, json_subscriber) {
SubscriberConfig sc;
parseSubscriber(subscriber_json, sc);
parseSubscriber(json_subscriber, sc);
ctx[key] = InputEventContext { sc };
}
@ -741,19 +741,19 @@ void GooseNode::parseOutput(json_t *json)
int ret;
json_error_t err;
json_t *publishers_json = nullptr;
json_t *signals_json = nullptr;
json_t *json_publishers = nullptr;
json_t *json_signals = nullptr;
char const *interface_id = output.interface_id.c_str();
ret = json_unpack_ex(json, &err, 0, "{ s:o, s:o, s?:s, s?:f }",
"publishers", &publishers_json,
"signals", &signals_json,
ret = json_unpack_ex(json, &err, 0, "{ s: o, s: o, s?: s, s?: f }",
"publishers", &json_publishers,
"signals", &json_signals,
"interface", &interface_id,
"resend_interval", &output.resend_interval
);
if (ret)
throw ConfigError(json, err, "node-config-node-iec61850-8-1");
parsePublishers(publishers_json, output.contexts);
parsePublishers(json_publishers, output.contexts);
output.interface_id = interface_id;
}
@ -763,20 +763,20 @@ void GooseNode::parsePublisherData(json_t *json, std::vector<OutputData> &data)
int ret;
json_error_t err;
int index;
json_t* signal_or_value_json;
json_t* json_signal_or_value;
if (!json_is_array(json))
throw RuntimeError("publisher data is not an array");
json_array_foreach(json, index, signal_or_value_json) {
json_array_foreach(json, index, json_signal_or_value) {
char const *mms_type = nullptr;
char const *signal_str = nullptr;
json_t *value_json = nullptr;
json_t *json_value = nullptr;
int bitstring_size = -1;
ret = json_unpack_ex(signal_or_value_json, &err, 0, "{ s:s, s?:s, s?:o, s?:i }",
ret = json_unpack_ex(json_signal_or_value, &err, 0, "{ s: s, s?: s, s?: o, s?: i }",
"mms_type", &mms_type,
"signal", &signal_str,
"value", &value_json,
"value", &json_value,
"mms_bitstring_size", &bitstring_size
);
if (ret)
@ -790,8 +790,8 @@ void GooseNode::parsePublisherData(json_t *json, std::vector<OutputData> &data)
auto signal_data = SignalData {};
if (value_json) {
ret = signal_data.parseJson(goose_type->signal_type, value_json);
if (json_value) {
ret = signal_data.parseJson(goose_type->signal_type, json_value);
if (ret)
throw ConfigError(json, err, "node-config-node-iec61850-8-1");
}
@ -823,8 +823,8 @@ void GooseNode::parsePublisher(json_t *json, PublisherConfig &pc)
int conf_rev = 0;
int time_allowed_to_live = 0;
int burst = 1;
json_t *data_json = nullptr;
ret = json_unpack_ex(json, &err, 0, "{ s:s, s:s, s:s, s:s, s:i, s:i, s:i, s?:i, s:o }",
json_t *json_data = nullptr;
ret = json_unpack_ex(json, &err, 0, "{ s: s, s: s, s: s, s: s, s: i, s: i, s: i, s?: i, s: o }",
"go_id", &go_id,
"go_cb_ref", &go_cb_ref,
"data_set_ref", &data_set_ref,
@ -833,7 +833,7 @@ void GooseNode::parsePublisher(json_t *json, PublisherConfig &pc)
"conf_rev", &conf_rev,
"time_allowed_to_live", &time_allowed_to_live,
"burst", &burst,
"data", &data_json
"data", &json_data
);
if (ret)
throw ConfigError(json, err, "node-config-node-iec61850-8-1");
@ -851,18 +851,18 @@ void GooseNode::parsePublisher(json_t *json, PublisherConfig &pc)
pc.time_allowed_to_live = time_allowed_to_live;
pc.burst = burst;
parsePublisherData(data_json, pc.data);
parsePublisherData(json_data, pc.data);
}
void GooseNode::parsePublishers(json_t *json, std::vector<OutputContext> &ctx)
{
int index;
json_t* publisher_json;
json_t* json_publisher;
json_array_foreach(json, index, publisher_json) {
json_array_foreach(json, index, json_publisher) {
PublisherConfig pc;
parsePublisher(publisher_json, pc);
parsePublisher(json_publisher, pc);
ctx.push_back(OutputContext { pc });
}

View file

@ -18,8 +18,8 @@ using namespace villas;
using namespace villas::node;
using namespace villas::utils;
LoopbackNode::LoopbackNode(const std::string &name) :
Node(name),
LoopbackNode::LoopbackNode(const uuid_t &id, const std::string &name) :
Node(id, name),
queuelen(DEFAULT_QUEUE_LENGTH),
mode(QueueSignalledMode::AUTO)
{
@ -110,7 +110,7 @@ int LoopbackNode::parse(json_t *json)
int ret;
ret = json_unpack_ex(json, &err, 0, "{ s?: i, s?: s }",
"queuelen", queuelen,
"queuelen", &queuelen,
"mode", &mode_str
);
if (ret)
@ -131,7 +131,7 @@ int LoopbackNode::parse(json_t *json)
throw ConfigError(json, "node-config-node-loopback-mode", "Unknown mode '{}'", mode_str);
}
return 0;
return Node::parse(json);
}
static char n[] = "loopback";

View file

@ -20,14 +20,18 @@ using namespace villas::node;
static InternalLoopbackNodeFactory nf;
InternalLoopbackNode::InternalLoopbackNode(Node *src, unsigned id, unsigned ql) :
Node(fmt::format("{}.lo{}", src->getNameShort(), id)),
queuelen(ql),
source(src)
{
auto name = fmt::format("{}.lo{}", src->getNameShort(), id);
uuid_t uuid;
int ret = uuid::generateFromString(uuid, fmt::format("lo{}", id), src->getUuid());
if (ret)
throw RuntimeError("Failed to initialize UUID");
Node(uuid, name);
factory = &nf;
name_long = fmt::format(CLR_RED("{}") "(" CLR_YEL("{}") ")", name_short, nf.getName());

View file

@ -188,8 +188,8 @@ Signal::Ptr SignalNodeSignal::toSignal(Signal::Ptr tpl) const
return sig;
}
SignalNode::SignalNode(const std::string &name) :
Node(name),
SignalNode::SignalNode(const uuid_t &id, const std::string &name) :
Node(id, name),
task(CLOCK_MONOTONIC),
rt(1),
rate(10),
@ -215,9 +215,9 @@ int SignalNode::prepare()
return Node::prepare();
}
int SignalNode::parse(json_t *json, const uuid_t sn_uuid)
int SignalNode::parse(json_t *json)
{
int r = -1, m = -1, ret = Node::parse(json, sn_uuid);
int r = -1, m = -1, ret = Node::parse(json);
if (ret)
return ret;

View file

@ -11,6 +11,7 @@
#include <villas/node_compat.hpp>
#include <villas/nodes/webrtc.hpp>
#include <villas/uuid.hpp>
#include <villas/utils.hpp>
#include <villas/sample.hpp>
#include <villas/exceptions.hpp>
@ -23,9 +24,10 @@ using namespace villas::utils;
static villas::node::Web *web;
WebRTCNode::WebRTCNode(const std::string &name) :
Node(name),
WebRTCNode::WebRTCNode(const uuid_t &id, const std::string &name) :
Node(id, name),
server("https://villas.k8s.eonerc.rwth-aachen.de/ws/signaling"),
peer(uuid::toString(id)),
wait_seconds(0),
format(nullptr),
queue({}),
@ -42,28 +44,30 @@ WebRTCNode::~WebRTCNode()
;
}
int WebRTCNode::parse(json_t *json, const uuid_t sn_uuid)
int WebRTCNode::parse(json_t *json)
{
int ret = Node::parse(json, sn_uuid);
int ret = Node::parse(json);
if (ret)
return ret;
const char *sess;
const char *svr = nullptr;
const char *pr = nullptr;
int ord = -1;
int &rexmit = dci.reliability.rexmit.emplace<int>(0);
json_t *ice_json = nullptr;
json_t *fmt_json = nullptr;
json_t *json_ice = nullptr;
json_t *json_format = nullptr;
json_error_t err;
ret = json_unpack_ex(json, &err, 0, "{ s:s, s?s, s?i, s?i, s?b, s?o }",
ret = json_unpack_ex(json, &err, 0, "{ s: s, s?: s, s?: s, s?: i, s?: i, s?: b, s?: o }",
"session", &sess,
"peer", &pr,
"server", &svr,
"wait_seconds", &wait_seconds,
"max_retransmits", &rexmit,
"ordered", &ord,
"ice", &ice_json,
"format", &fmt_json
"ice", &json_ice,
"format", &json_format
);
if (ret)
throw ConfigError(json, err, "node-config-node-webrtc");
@ -73,13 +77,16 @@ int WebRTCNode::parse(json_t *json, const uuid_t sn_uuid)
if (svr)
server = svr;
if (pr)
peer = pr;
if (ord)
dci.reliability.unordered = !ord;
if (ice_json) {
if (json_ice) {
json_t *json_servers = nullptr;
ret = json_unpack_ex(ice_json, &err, 0, "{ s?: o }",
ret = json_unpack_ex(json_ice, &err, 0, "{ s?: o }",
"servers", &json_servers
);
if (ret)
@ -104,8 +111,8 @@ int WebRTCNode::parse(json_t *json, const uuid_t sn_uuid)
}
}
format = fmt_json
? FormatFactory::make(fmt_json)
format = json_format
? FormatFactory::make(json_format)
: FormatFactory::make("villas.binary");
assert(format);
@ -113,11 +120,6 @@ int WebRTCNode::parse(json_t *json, const uuid_t sn_uuid)
return 0;
}
int WebRTCNode::check()
{
return Node::check();
}
int WebRTCNode::prepare()
{
int ret = Node::prepare();
@ -126,7 +128,10 @@ int WebRTCNode::prepare()
format->start(getInputSignals(false), ~(int) SampleFlags::HAS_OFFSET);
conn = std::make_shared<webrtc::PeerConnection>(server, session, rtcConf, web, dci);
// TODO: Determine output signals reliably
auto signals = std::make_shared<SignalList>();
conn = std::make_shared<webrtc::PeerConnection>(server, session, peer, signals, rtcConf, web, dci);
ret = pool_init(&pool, 1024, SAMPLE_LENGTH(getInputSignals(false)->size()));
if (ret) // TODO log
@ -136,6 +141,7 @@ int WebRTCNode::prepare()
if (ret) // TODO log
return ret;
// TODO: Move this to a member function
conn->onMessage([this](rtc::binary msg){
int ret;
std::vector<Sample *> smps;
@ -191,6 +197,7 @@ std::vector<int> WebRTCNode::getPollFDs()
const std::string & WebRTCNode::getDetails()
{
// TODO
details = fmt::format("");
return details;
}
@ -224,6 +231,14 @@ int WebRTCNode::_write(struct Sample *smps[], unsigned cnt)
return ret;
}
json_t * WebRTCNode::_readStatus() const
{
if (!conn)
return nullptr;
return conn->readStatus();
}
int WebRTCNodeFactory::start(SuperNode *sn)
{
web = sn->getWeb();

View file

@ -10,9 +10,11 @@
#include <chrono>
#include <thread>
#include <fmt/core.h>
#include <fmt/ostream.h>
#include <fmt/chrono.h>
#include <villas/utils.hpp>
#include <villas/exceptions.hpp>
#include <villas/nodes/webrtc/peer_connection.hpp>
@ -39,13 +41,14 @@ namespace rtc {
using ::operator<<;
}
PeerConnection::PeerConnection(const std::string &server, const std::string &session, rtc::Configuration cfg, Web *w, rtc::DataChannelInit d) :
PeerConnection::PeerConnection(const std::string &server, const std::string &session, const std::string &peer, std::shared_ptr<SignalList> signals, rtc::Configuration cfg, Web *w, rtc::DataChannelInit d) :
web(w),
extraServers({}),
dataChannelInit(d),
defaultConfig(cfg),
conn(nullptr),
chan(nullptr),
signals(signals),
logger(logging.get("webrtc:pc")),
stopStartup(false),
warnNotConnected(false),
@ -55,7 +58,7 @@ PeerConnection::PeerConnection(const std::string &server, const std::string &ses
secondID(INT_MAX),
onMessageCallback(nullptr)
{
client = std::make_shared<SignalingClient>(server, session, web);
client = std::make_shared<SignalingClient>(server, session, peer, web);
client->onConnected([this](){ this->onSignalingConnected(); });
client->onDisconnected([this](){ this->onSignalingDisconnected(); });
client->onError([this](auto err){ this->onSignalingError(std::move(err)); });
@ -78,6 +81,31 @@ bool PeerConnection::waitForDataChannel(std::chrono::seconds timeout)
return startupCondition.wait_until(lock, deadline, [this](){ return this->stopStartup; });
}
json_t * PeerConnection::readStatus() const
{
auto *json = json_pack("{ s: I, s: I }",
"bytes_received", conn->bytesReceived(),
"bytes_sent", conn->bytesSent()
);
auto rtt = conn->rtt();
if (rtt.has_value()) {
auto *json_rtt = json_real(rtt.value().count() / 1e3);
json_object_set_new(json, "rtt", json_rtt);
}
rtc::Candidate local, remote;
if (conn->getSelectedCandidatePair(&local, &remote)) {
auto *json_cp = json_pack("{ s: s, s: s }",
"local", std::string(local).c_str(),
"remote", std::string(remote).c_str()
);
json_object_set_new(json, "candidate_pair", json_cp);
}
return json;
}
void PeerConnection::notifyStartup()
{
stopStartup = true;
@ -262,6 +290,10 @@ void PeerConnection::onGatheringStateChange(rtc::PeerConnection::GatheringState
void PeerConnection::onSignalingConnected()
{
logger->debug("Signaling connection established");
auto lock = std::unique_lock { mutex };
client->sendMessage({ *signals });
}
void PeerConnection::onSignalingDisconnected()
@ -294,15 +326,15 @@ void PeerConnection::onSignalingMessage(SignalingMessage msg)
},
[&](ControlMessage &c){
auto const &id = c.connectionID;
auto const &id = c.peerID;
if (c.connections.size() < 2) {
if (c.peers.size() < 2) {
resetConnectionAndStandby(lock);
return;
}
auto fst = INT_MAX, snd = INT_MAX;
for (auto &c : c.connections) {
for (auto &c : c.peers) {
if (c.id < fst) {
snd = fst;
fst = c.id;

View file

@ -7,16 +7,20 @@
* @license Apache 2.0
*********************************************************************************/
#include <fmt/format.h>
#include <villas/utils.hpp>
#include <villas/web.hpp>
#include <villas/exceptions.hpp>
#include <villas/nodes/webrtc/signaling_client.hpp>
#include <villas/nodes/webrtc/signaling_message.hpp>
using namespace villas;
using namespace villas::utils;
using namespace villas::node;
using namespace villas::node::webrtc;
SignalingClient::SignalingClient(const std::string &srv, const std::string &sess, Web *w) :
SignalingClient::SignalingClient(const std::string &server, const std::string &session, const std::string &peer, Web *w) :
retry_count(0),
web(w),
running(false),
@ -27,7 +31,7 @@ SignalingClient::SignalingClient(const std::string &srv, const std::string &sess
memset(&info, 0, sizeof(info));
ret = asprintf(&uri, "%s/%s", srv.c_str(), sess.c_str());
ret = asprintf(&uri, "%s/%s/%s", server.c_str(), session.c_str(), peer.c_str());
if (ret < 0)
throw RuntimeError { "Could not format signaling server uri" };
@ -120,7 +124,7 @@ int SignalingClient::protocolCallback(struct lws *wsi, enum lws_callback_reasons
buffer.append((char *) in, len);
if (lws_is_final_fragment(wsi)) {
logger->trace("Received signaling message: {:.{}}", buffer.data(), buffer.size());
logger->trace("Signaling message received: {:.{}}", buffer.data(), buffer.size());
auto *json = buffer.decode();
if (json == nullptr) {
@ -128,7 +132,7 @@ int SignalingClient::protocolCallback(struct lws *wsi, enum lws_callback_reasons
goto do_retry;
}
cbMessage(SignalingMessage::fromJSON(json));
cbMessage(SignalingMessage::fromJson(json));
json_decref(json);
}
@ -190,7 +194,7 @@ int SignalingClient::writable()
}
auto msg = outgoingMessages.pop();
auto *jsonMsg = msg.toJSON();
auto *jsonMsg = msg.toJson();
if (!jsonMsg) {
return 0;
@ -201,7 +205,7 @@ int SignalingClient::writable()
if (len > sizeof(buf) - LWS_PRE)
return -1;
logger->trace("Sending signaling message: {:.{}}", buf + LWS_PRE, len);
logger->trace("Signaling message send: {:.{}}", buf + LWS_PRE, len);
auto ret = lws_write(wsi, (unsigned char *) buf + LWS_PRE, len, LWS_WRITE_TEXT);
if (ret < 0)

View file

@ -17,33 +17,42 @@ using namespace villas;
using namespace villas::node;
using namespace villas::node::webrtc;
json_t * Connection::toJSON() const
json_t * Peer::toJson() const
{
return json_pack("{ s:i, s:s, s:s, s:s }",
return json_pack("{ s: i, s: s*, s: s*, s: s* }",
"id", id,
"remote", remote.c_str(),
"user_agent", userAgent.c_str(),
"created", "" // TODO: create json timestamp
"name", name.empty() ? nullptr : name.c_str(),
"remote", remote.empty() ? nullptr : remote.c_str(),
"user_agent", userAgent.empty() ? nullptr : userAgent.c_str()
// TODO: created, connected
);
}
Connection::Connection(json_t *json)
Peer::Peer(json_t *json)
{
const char *rem, *ua, *ts;
const char *nme = nullptr, *rem = nullptr, *ua = nullptr, *tscreat, *tsconn;
int ret = json_unpack(json, "{ s:i, s:s, s:s, s:s }",
int ret = json_unpack(json, "{ s: i, s?: s, s?: s, s?: s, s?: s, s?: s }",
"id", &id,
"name", &nme,
"remote", &rem,
"user_agent", &ua,
"created", &ts
"created", &tscreat,
"connected", &tsconn
);
if (ret)
throw RuntimeError("Failed to decode signaling message");
remote = rem;
userAgent = ua;
if (nme)
name = nme;
// TODO: created
if (rem)
remote = rem;
if (ua)
userAgent = ua;
// TODO: created, connected
}
RelayMessage::RelayMessage(json_t *json)
@ -58,10 +67,10 @@ RelayMessage::RelayMessage(json_t *json)
char *pass;
char *realm;
char *expires;
json_t *server_json;
json_t *json_server;
size_t i;
json_array_foreach(json, i, server_json) {
ret = json_unpack(server_json, "{ s:s, s:s, s:s, s:s, s:s }",
json_array_foreach(json, i, json_server) {
ret = json_unpack(json_server, "{ s: s, s: s, s: s, s: s, s: s }",
"url", &url,
"user", &user,
"pass", &pass,
@ -80,19 +89,19 @@ RelayMessage::RelayMessage(json_t *json)
}
}
json_t * ControlMessage::toJSON() const
json_t * ControlMessage::toJson() const
{
json_t *json_connections = json_array();
json_t *json_peers = json_array();
for (auto &c : connections) {
json_t *json_connection = c.toJSON();
for (auto &p : peers) {
json_t *json_peer = p.toJson();
json_array_append_new(json_connections, json_connection);
json_array_append_new(json_peers, json_peer);
}
return json_pack("{ s:i, s:o }",
"connection_id", connectionID,
"connections", json_connections
return json_pack("{ s: i, s: o }",
"peer_id", peerID,
"peers", json_peers
);
}
@ -100,39 +109,42 @@ ControlMessage::ControlMessage(json_t *j)
{
int ret;
json_t *json_connections;
json_t *json_peers;
ret = json_unpack(j, "{ s:i, s:o }",
"connection_id", &connectionID,
"connections", &json_connections
ret = json_unpack(j, "{ s: i, s: o }",
"peer_id", &peerID,
"peers", &json_peers
);
if (ret)
throw RuntimeError("Failed to decode signaling message");
if (!json_is_array(json_connections))
if (!json_is_array(json_peers))
throw RuntimeError("Failed to decode signaling message");
json_t *json_connection;
json_t *json_peer;
size_t i;
// cppcheck-suppress unknownMacro
json_array_foreach(json_connections, i, json_connection)
connections.emplace_back(json_connection);
json_array_foreach(json_peers, i, json_peer)
peers.emplace_back(json_peer);
}
json_t * SignalingMessage::toJSON() const
json_t * SignalingMessage::toJson() const
{
return std::visit(villas::utils::overloaded {
[](ControlMessage const &c){
return json_pack("{ s:o }", "control", c.toJSON());
return json_pack("{ s: o }", "control", c.toJson());
},
[](SignalList const &s){
return json_pack("{ s: o }", "signals", s.toJson());
},
[](rtc::Description const &d){
return json_pack("{ s:{ s:s, s:s } }", "description",
return json_pack("{ s: { s: s, s: s } }", "description",
"spd", d.generateSdp().c_str(),
"type", d.typeString().c_str()
);
},
[](rtc::Candidate const &c){
return json_pack("{ s:{ s:s, s:s } }", "candidate",
return json_pack("{ s: { s: s, s: s } }", "candidate",
"spd", c.candidate().c_str(),
"mid", c.mid().c_str()
);
@ -150,7 +162,10 @@ std::string SignalingMessage::toString() const
return fmt::format("type=relay");
},
[](ControlMessage const &c){
return fmt::format("type=control, control={}", json_dumps(c.toJSON(), 0));
return fmt::format("type=control, control={}", json_dumps(c.toJson(), 0));
},
[](SignalList const &s){
return fmt::format("type=signal");
},
[](rtc::Description const &d){
return fmt::format("type=description, type={}, spd=\n{}", d.typeString(), d.generateSdp());
@ -164,12 +179,14 @@ std::string SignalingMessage::toString() const
}, message);
}
SignalingMessage SignalingMessage::fromJSON(json_t *json)
SignalingMessage SignalingMessage::fromJson(json_t *json)
{
auto self = SignalingMessage { std::monostate() };
// Relay message
json_t *rlys = nullptr;
// Signal message
json_t *sigs = nullptr;
// Control message
json_t *ctrl = nullptr;
// Candidate message
@ -179,8 +196,9 @@ SignalingMessage SignalingMessage::fromJSON(json_t *json)
const char *desc = nullptr;
const char *typ = nullptr;
int ret = json_unpack(json, "{ s?o, s?o, s?{ s:s, s:s }, s?{ s:s, s:s } }",
int ret = json_unpack(json, "{ s?: o, s?: o, s?: o, s?: { s: s, s: s }, s?: { s: s, s: s } }",
"servers", &rlys,
"signals", &sigs,
"control", &ctrl,
"candidate",
"spd", &cand,
@ -198,6 +216,9 @@ SignalingMessage SignalingMessage::fromJSON(json_t *json)
if (rlys) {
self.message.emplace<RelayMessage>(rlys);
}
else if (sigs) {
self.message.emplace<SignalList>(sigs);
}
else if (ctrl) {
self.message.emplace<ControlMessage>(ctrl);
}

View file

@ -696,9 +696,6 @@ unsigned Path::getOutputSignalsMaxCount()
json_t * Path::toJson() const
{
char uuid_str[37];
uuid_unparse(uuid, uuid_str);
json_t *json_signals = signals->toJson();
#ifdef WITH_HOOKS
json_t *json_hooks = hooks.toJson();
@ -715,7 +712,7 @@ json_t * Path::toJson() const
json_array_append_new(json_destinations, json_string(pd->node->getNameShort().c_str()));
json_t *json_path = json_pack("{ s: s, s: s, s: s, s: b, s: b s: b, s: b, s: b, s: b s: i, s: o, s: o, s: o, s: o }",
"uuid", uuid_str,
"uuid", uuid::toString(uuid).c_str(),
"state", stateToString(state).c_str(),
"mode", mode == Mode::ANY ? "any" : "all",
"enabled", enabled,

View file

@ -29,8 +29,6 @@
using namespace villas;
using namespace villas::node;
typedef char uuid_string_t[37];
SuperNode::SuperNode() :
state(State::INITIALIZED),
idleStop(-1),
@ -128,34 +126,43 @@ void SuperNode::parse(json_t *root)
if (!json_is_object(json_nodes))
throw ConfigError(json_nodes, "node-config-nodes", "Setting 'nodes' must be a group with node name => group mappings.");
const char *name;
const char *node_name;
json_t *json_node;
json_object_foreach(json_nodes, name, json_node) {
const char *type;
json_object_foreach(json_nodes, node_name, json_node) {
uuid_t node_uuid;
const char *node_type;
const char *node_uuid_str = nullptr;
ret = Node::isValidName(name);
ret = Node::isValidName(node_name);
if (!ret)
throw RuntimeError("Invalid name for node: {}", name);
throw RuntimeError("Invalid name for node: {}", node_name);
ret = json_unpack_ex(json_node, &err, 0, "{ s: s }", "type", &type);
ret = json_unpack_ex(json_node, &err, 0, "{ s: s, s?: s }",
"type", &node_type,
"uuid", &node_uuid_str
);
if (ret)
throw ConfigError(root, err, "node-config-node-type", "Failed to parse type of node '{}'", name);
throw ConfigError(root, err, "node-config-node-type", "Failed to parse type of node '{}'", node_name);
json_object_set_new(json_node, "name", json_string(name));
if (node_uuid_str) {
ret = uuid_parse(uuid_str, uuid);
if (ret)
throw ConfigError(json_node, "node-config-node-uuid", "Failed to parse UUID: {}", uuid_str);
}
else
// Generate UUID from node name and super-node UUID
uuid::generateFromString(node_uuid, node_name, uuid::toString(uuid));
auto *n = NodeFactory::make(type);
auto *n = NodeFactory::make(node_type, node_uuid, node_name);
if (!n)
throw MemoryAllocationError();
ret = n->parse(json_node, uuid);
ret = n->parse(json_node);
if (ret) {
auto config_id = fmt::format("node-config-node-{}", type);
throw ConfigError(json_node, config_id, "Failed to parse configuration of node '{}'", name);
auto config_id = fmt::format("node-config-node-{}", node_type);
throw ConfigError(json_node, config_id, "Failed to parse configuration of node '{}'", node_name);
}
json_object_del(json_node, "name");
nodes.push_back(n);
}
}
@ -504,15 +511,11 @@ graph_t * SuperNode::getGraph()
std::map<Node *, Agnode_t *> nodeMap;
uuid_string_t uuid_str;
for (auto *n : nodes) {
nodeMap[n] = agnode(g, (char *) n->getNameShort().c_str(), 1);
uuid_unparse(n->getUuid(), uuid_str);
set_attr(nodeMap[n], "shape", "ellipse");
set_attr(nodeMap[n], "tooltip", fmt::format("type={}, uuid={}", n->getFactory()->getName(), uuid_str));
set_attr(nodeMap[n], "tooltip", fmt::format("type={}, uuid={}", n->getFactory()->getName(), uuid::toString(n->getUuid()).c_str()));
// set_attr(nodeMap[n], "fixedsize", "true");
// set_attr(nodeMap[n], "width", "0.15");
// set_attr(nodeMap[n], "height", "0.15");
@ -524,10 +527,8 @@ graph_t * SuperNode::getGraph()
m = agnode(g, (char *) name.c_str(), 1);
uuid_unparse(p->uuid, uuid_str);
set_attr(m, "shape", "box");
set_attr(m, "tooltip", fmt::format("uuid={}", uuid_str));
set_attr(m, "tooltip", fmt::format("uuid={}", uuid::toString(p->getUuid()).c_str()));
for (auto ps : p->sources)
agedge(g, nodeMap[ps->getNode()], m, nullptr, 1);

View file

@ -3,11 +3,11 @@
"common": {
"flake": false,
"locked": {
"lastModified": 1686671462,
"narHash": "sha256-yvpCp9eZO05lWWz/bYifDlalDTd9WjH84QwFxmsjoHg=",
"lastModified": 1688124353,
"narHash": "sha256-TPupRVaYV/Rxpj2BHjnEeSXZOVRKhnFtG1fdqLxVWcY=",
"owner": "VILLASframework",
"repo": "common",
"rev": "d9d4ac76a5403e14f7899dae480781e9cdcf0572",
"rev": "120312e938dc298b4dc13792e1acf7510190bbf4",
"type": "github"
},
"original": {

View file

@ -131,7 +131,7 @@
pkgs = pkgsFor system;
shellHook = ''[ -z "$PS1" ] || exec "$SHELL"'';
hardeningDisable = ["all"];
packages = with pkgs; [bashInteractive criterion];
packages = with pkgs; [bashInteractive criterion bc jq];
in rec {
default = full;

View file

@ -26,8 +26,6 @@
#include "villas-relay.hpp"
typedef char uuid_string_t[37];
using namespace villas;
using namespace villas::node;
@ -100,12 +98,9 @@ json_t * RelaySession::toJson() const
json_array_append(json_connections, conn->toJson());
}
uuid_string_t uuid_str;
uuid_unparse_lower(uuid, uuid_str);
return json_pack("{ s: s, s: s, s: o, s: I, s: i }",
"identifier", identifier.c_str(),
"uuid", uuid_str,
"uuid", uuid::toString(uuid).c_str(),
"connections", json_connections,
"created", created,
"connects", connects
@ -296,14 +291,11 @@ int Relay::httpProtocolCallback(lws *wsi, enum lws_callback_reasons reason, void
json_array_append(json_sessions, session->toJson());
}
uuid_string_t uuid_str;
uuid_unparse(r->uuid, uuid_str);
json_body = json_pack("{ s: o, s: s, s: s, s: s, s: { s: b, s: i, s: s } }",
"sessions", json_sessions,
"version", PROJECT_VERSION_STR,
"hostname", r->hostname.c_str(),
"uuid", uuid_str,
"uuid", uuid::toString(r->uuid).c_str(),
"options",
"loopback", r->loopback,
"port", r->port,

View file

@ -244,10 +244,7 @@ check: if (optarg == endptr)
exit(EXIT_FAILURE);
}
uuid_t uuid;
uuid_clear(uuid);
ret = node->parse(json, uuid);
ret = node->parse(json);
if (ret) {
usage();
exit(EXIT_FAILURE);