mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
Merge pull request #633 from VILLASframework/libfmt9
Make compatible to fmt version 9.0.3
This commit is contained in:
commit
3d052cdf6f
19 changed files with 95 additions and 83 deletions
|
@ -17,6 +17,7 @@
|
|||
#include <villas/format.hpp>
|
||||
#include <villas/node.hpp>
|
||||
#include <villas/node/config.hpp>
|
||||
#include <villas/node_compat.hpp>
|
||||
|
||||
/* Forward declaration */
|
||||
struct lws;
|
||||
|
@ -24,9 +25,6 @@ struct lws;
|
|||
namespace villas {
|
||||
namespace node {
|
||||
|
||||
/* Forward declarations */
|
||||
class NodeCompat;
|
||||
|
||||
#define DEFAULT_WEBSOCKET_QUEUE_LENGTH (DEFAULT_QUEUE_LENGTH * 64)
|
||||
|
||||
/** Internal data per websocket node */
|
||||
|
@ -89,12 +87,19 @@ struct websocket_connection {
|
|||
os << "dest=" << c.destination->info.address << ":" << c.destination->info.port;
|
||||
|
||||
if (c.node)
|
||||
os << ", node=" << *c.node;
|
||||
os << ", node=" << c.node->getName();
|
||||
|
||||
os << ", mode=" << (c.mode == websocket_connection::Mode::CLIENT ? "client" : "server");
|
||||
|
||||
return os;
|
||||
}
|
||||
|
||||
std::string toString()
|
||||
{
|
||||
std::stringstream ss;
|
||||
ss << *this;
|
||||
return ss.str();
|
||||
}
|
||||
};
|
||||
|
||||
int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len);
|
||||
|
|
|
@ -126,6 +126,13 @@ public:
|
|||
return os;
|
||||
}
|
||||
|
||||
std::string toString()
|
||||
{
|
||||
std::stringstream ss;
|
||||
ss << *this;
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
Path();
|
||||
~Path();
|
||||
|
||||
|
|
|
@ -120,9 +120,9 @@ public:
|
|||
buf[wbytes-1] = 0;
|
||||
|
||||
if (node)
|
||||
logger->info("{}{} {}", prefix, *node, buf);
|
||||
logger->info("{}{} {}", prefix, node->getName(), buf);
|
||||
else if (path)
|
||||
logger->info("{}{} {}", prefix, *path, buf);
|
||||
logger->info("{}{} {}", prefix, path->toString(), buf);
|
||||
}
|
||||
else
|
||||
formatter->print(output, smp);
|
||||
|
|
|
@ -49,7 +49,7 @@ public:
|
|||
/* A wrap around of the sequence no should not be treated as a simulation restart */
|
||||
if (smp->sequence == 0 && prev->sequence != 0 && prev->sequence < UINT64_MAX - 16) {
|
||||
logger->warn("Simulation from node {} restarted (previous->sequence={}, current->sequence={})",
|
||||
*node, prev->sequence, smp->sequence);
|
||||
node->getName(), prev->sequence, smp->sequence);
|
||||
|
||||
smp->flags |= (int) SampleFlags::IS_FIRST;
|
||||
|
||||
|
|
|
@ -405,7 +405,7 @@ const std::string & Node::getNameFull()
|
|||
getOutputSignals(false) ? getOutputSignals(false)->size() : 0,
|
||||
getOutputSignals() ? getOutputSignals()->size() : 0);
|
||||
|
||||
name_full += fmt::format(", out.path={}", *out.path);
|
||||
name_full += fmt::format(", out.path={}", out.path->toString());
|
||||
}
|
||||
|
||||
/* Append node-type specific details */
|
||||
|
|
|
@ -71,7 +71,7 @@ int FpgaNode::parse(json_t *cfg, const uuid_t sn_uuid)
|
|||
"polling", &polling
|
||||
);
|
||||
if (ret)
|
||||
throw ConfigError(cfg, err, "node-config-fpga", "Failed to parse configuration of node {}", *this);
|
||||
throw ConfigError(cfg, err, "node-config-fpga", "Failed to parse configuration of node {}", this->getName());
|
||||
|
||||
if (card)
|
||||
cardName = card;
|
||||
|
|
|
@ -199,7 +199,7 @@ int villas::node::kafka_parse(NodeCompat *n, json_t *json)
|
|||
throw ConfigError(json, "node-config-node-kafka-protocol", "Invalid security protocol: {}", protocol);
|
||||
|
||||
if (!k->produce && !k->consume)
|
||||
throw ConfigError(json, "node-config-node-kafka", "At least one topic has to be specified for node {}", *n);
|
||||
throw ConfigError(json, "node-config-node-kafka", "At least one topic has to be specified for node {}", n->getName());
|
||||
|
||||
if (json_ssl) {
|
||||
const char *ca;
|
||||
|
@ -208,7 +208,7 @@ int villas::node::kafka_parse(NodeCompat *n, json_t *json)
|
|||
"ca", &ca
|
||||
);
|
||||
if (ret)
|
||||
throw ConfigError(json_ssl, err, "node-config-node-kafka-ssl", "Failed to parse SSL configuration of node {}", *n);
|
||||
throw ConfigError(json_ssl, err, "node-config-node-kafka-ssl", "Failed to parse SSL configuration of node {}", n->getName());
|
||||
|
||||
k->ssl.ca = strdup(ca);
|
||||
}
|
||||
|
|
|
@ -206,7 +206,7 @@ int villas::node::mqtt_parse(NodeCompat *n, json_t *json)
|
|||
m->password = password ? strdup(password) : nullptr;
|
||||
|
||||
if (!m->publish && !m->subscribe)
|
||||
throw ConfigError(json, "node-config-node-mqtt", "At least one topic has to be specified for node {}", *n);
|
||||
throw ConfigError(json, "node-config-node-mqtt", "At least one topic has to be specified for node {}", n->getName());
|
||||
|
||||
if (json_ssl) {
|
||||
m->ssl.enabled = 1;
|
||||
|
@ -230,10 +230,10 @@ int villas::node::mqtt_parse(NodeCompat *n, json_t *json)
|
|||
"tls_version", &tls_version
|
||||
);
|
||||
if (ret)
|
||||
throw ConfigError(json_ssl, err, "node-config-node-mqtt-ssl", "Failed to parse SSL configuration of node {}", *n);
|
||||
throw ConfigError(json_ssl, err, "node-config-node-mqtt-ssl", "Failed to parse SSL configuration of node {}", n->getName());
|
||||
|
||||
if (m->ssl.enabled && !cafile && !capath)
|
||||
throw ConfigError(json_ssl, "node-config-node-mqtt-ssl", "Either 'ssl.cafile' or 'ssl.capath' settings must be set for node {}.", *n);
|
||||
throw ConfigError(json_ssl, "node-config-node-mqtt-ssl", "Either 'ssl.cafile' or 'ssl.capath' settings must be set for node {}.", n->getName());
|
||||
|
||||
m->ssl.cafile = cafile ? strdup(cafile) : nullptr;
|
||||
m->ssl.capath = capath ? strdup(capath) : nullptr;
|
||||
|
|
|
@ -640,13 +640,13 @@ int villas::node::ngsi_parse(NodeCompat *n, json_t *json)
|
|||
if (json_signals_in) {
|
||||
ret = ngsi_parse_signals(json_signals_in, &i->in.signals, n->in.signals);
|
||||
if (ret)
|
||||
throw ConfigError(json_signals_in, "node-config-node-ngsi-in-signals", "Invalid setting 'in.signals' of node {}", *n);
|
||||
throw ConfigError(json_signals_in, "node-config-node-ngsi-in-signals", "Invalid setting 'in.signals' of node {}", n->getName());
|
||||
}
|
||||
|
||||
if (json_signals_out) {
|
||||
ret = ngsi_parse_signals(json_signals_out, &i->out.signals, n->out.signals);
|
||||
if (ret)
|
||||
throw ConfigError(json_signals_out, "node-config-node-ngsi-out-signals", "Invalid setting 'out.signals' of node {}", *n);
|
||||
throw ConfigError(json_signals_out, "node-config-node-ngsi-out-signals", "Invalid setting 'out.signals' of node {}", n->getName());
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -698,7 +698,7 @@ int villas::node::ngsi_start(NodeCompat *n)
|
|||
|
||||
int ret = ngsi_request_context_update(i->out.curl, i->endpoint, "APPEND", json_entity, n->logger);
|
||||
if (ret)
|
||||
throw RuntimeError("Failed to create NGSI context for node {}", *n);
|
||||
throw RuntimeError("Failed to create NGSI context for node {}", n->getName());
|
||||
|
||||
json_decref(json_entity);
|
||||
}
|
||||
|
|
|
@ -244,7 +244,7 @@ int villas::node::signal_node_parse(NodeCompat *n, json_t *json)
|
|||
break;
|
||||
|
||||
default:
|
||||
throw ConfigError(json_type, "node-config-node-signal", "Invalid setting 'signal' for node {}", *n);
|
||||
throw ConfigError(json_type, "node-config-node-signal", "Invalid setting 'signal' for node {}", n->getName());
|
||||
}
|
||||
|
||||
for (auto &a : arrays) {
|
||||
|
|
|
@ -106,11 +106,11 @@ int websocket_connection_write(struct websocket_connection *c, struct Sample * c
|
|||
|
||||
pushed = queue_push_many(&c->queue, (void **) smps, cnt);
|
||||
if (pushed < (int) cnt)
|
||||
c->node->logger->warn("Queue overrun in WebSocket connection: {}", *c);
|
||||
c->node->logger->warn("Queue overrun in WebSocket connection: {}", c->toString());
|
||||
|
||||
sample_incref_many(smps, pushed);
|
||||
|
||||
c->node->logger->debug("Enqueued {} samples to {}", pushed, *c);
|
||||
c->node->logger->debug("Enqueued {} samples to {}", pushed, c->toString());
|
||||
|
||||
/* Client connections which are currently connecting don't have an associate c->wsi yet */
|
||||
if (c->wsi)
|
||||
|
@ -126,7 +126,7 @@ void websocket_connection_close(struct websocket_connection *c, struct lws *wsi,
|
|||
{
|
||||
lws_close_reason(wsi, status, (unsigned char *) reason, strlen(reason));
|
||||
|
||||
c->node->logger->debug("Closing WebSocket connection with {}: status={}, reason={}", *c, status, reason);
|
||||
c->node->logger->debug("Closing WebSocket connection with {}: status={}, reason={}", c->toString(), status, reason);
|
||||
|
||||
c->state = websocket_connection::State::CLOSED;
|
||||
}
|
||||
|
@ -205,7 +205,7 @@ int villas::node::websocket_protocol_cb(struct lws *wsi, enum lws_callback_reaso
|
|||
|
||||
c->wsi = wsi;
|
||||
c->state = websocket_connection::State::ESTABLISHED;
|
||||
c->node->logger->info("Established WebSocket connection: {}", *c);
|
||||
c->node->logger->info("Established WebSocket connection: {}", c->toString());
|
||||
|
||||
{
|
||||
std::lock_guard guard(connections_lock);
|
||||
|
@ -223,7 +223,7 @@ int villas::node::websocket_protocol_cb(struct lws *wsi, enum lws_callback_reaso
|
|||
|
||||
case LWS_CALLBACK_CLOSED:
|
||||
c->state = websocket_connection::State::CLOSED;
|
||||
c->node->logger->debug("Closed WebSocket connection: {}", *c);
|
||||
c->node->logger->debug("Closed WebSocket connection: {}", c->toString());
|
||||
|
||||
if (c->state != websocket_connection::State::CLOSING) {
|
||||
/** @todo Attempt reconnect here */
|
||||
|
@ -260,7 +260,7 @@ int villas::node::websocket_protocol_cb(struct lws *wsi, enum lws_callback_reaso
|
|||
if (ret < 0)
|
||||
return ret;
|
||||
|
||||
c->node->logger->debug("Send {} samples to connection: {}, bytes={}", pulled, *c, ret);
|
||||
c->node->logger->debug("Send {} samples to connection: {}, bytes={}", pulled, c->toString(), ret);
|
||||
}
|
||||
|
||||
if (queue_available(&c->queue) > 0)
|
||||
|
@ -291,15 +291,15 @@ int villas::node::websocket_protocol_cb(struct lws *wsi, enum lws_callback_reaso
|
|||
|
||||
avail = sample_alloc_many(&w->pool, smps, cnt);
|
||||
if (avail < cnt)
|
||||
c->node->logger->warn("Pool underrun for connection: {}", *c);
|
||||
c->node->logger->warn("Pool underrun for connection: {}", c->toString());
|
||||
|
||||
recvd = c->formatter->sscan(c->buffers.recv->data(), c->buffers.recv->size(), nullptr, smps, avail);
|
||||
if (recvd < 0) {
|
||||
c->node->logger->warn("Failed to parse sample data received on connection: {}", *c);
|
||||
c->node->logger->warn("Failed to parse sample data received on connection: {}", c->toString());
|
||||
break;
|
||||
}
|
||||
|
||||
c->node->logger->debug("Received {} samples from connection: {}", recvd, *c);
|
||||
c->node->logger->debug("Received {} samples from connection: {}", recvd, c->toString());
|
||||
|
||||
/* Set receive timestamp */
|
||||
for (int i = 0; i < recvd; i++) {
|
||||
|
@ -309,7 +309,7 @@ int villas::node::websocket_protocol_cb(struct lws *wsi, enum lws_callback_reaso
|
|||
|
||||
enqueued = queue_signalled_push_many(&w->queue, (void **) smps, recvd);
|
||||
if (enqueued < recvd)
|
||||
c->node->logger->warn("Queue overrun in connection: {}", *c);
|
||||
c->node->logger->warn("Queue overrun in connection: {}", c->toString());
|
||||
|
||||
/* Release unused samples back to pool */
|
||||
if (enqueued < avail)
|
||||
|
|
26
lib/path.cpp
26
lib/path.cpp
|
@ -144,7 +144,7 @@ void Path::startPoll()
|
|||
auto fds = ps->getNode()->getPollFDs();
|
||||
for (auto fd : fds) {
|
||||
if (fd < 0)
|
||||
throw RuntimeError("Failed to get file descriptor for node {}", *ps->getNode());
|
||||
throw RuntimeError("Failed to get file descriptor for node {}", ps->getNode()->getName());
|
||||
|
||||
/* This slot is only used if it is not masked */
|
||||
struct pollfd pfd = {
|
||||
|
@ -166,7 +166,7 @@ void Path::startPoll()
|
|||
};
|
||||
|
||||
if (pfd.fd < 0)
|
||||
throw RuntimeError("Failed to get file descriptor for timer of path {}", *this);
|
||||
throw RuntimeError("Failed to get file descriptor for timer of path {}", this->toString());
|
||||
|
||||
pfds.push_back(pfd);
|
||||
}
|
||||
|
@ -186,7 +186,7 @@ void Path::prepare(NodeList &nodes)
|
|||
/* Prepare mappings */
|
||||
ret = mappings.prepare(nodes);
|
||||
if (ret)
|
||||
throw RuntimeError("Failed to prepare mappings of path: {}", *this);
|
||||
throw RuntimeError("Failed to prepare mappings of path: {}", this->toString());
|
||||
|
||||
/* Create path sources */
|
||||
std::map<Node *, PathSource::Ptr> psm;
|
||||
|
@ -256,7 +256,7 @@ void Path::prepare(NodeList &nodes)
|
|||
if (me->type == MappingEntry::Type::DATA) {
|
||||
sig = sigs->getByIndex(me->data.offset + j);
|
||||
if (!sig) {
|
||||
logger->warn("Failed to create signal description for path {}", *this);
|
||||
logger->warn("Failed to create signal description for path {}", this->toString());
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
@ -290,7 +290,7 @@ void Path::prepare(NodeList &nodes)
|
|||
|
||||
ret = pd->prepare(queuelen);
|
||||
if (ret)
|
||||
throw RuntimeError("Failed to prepare path destination {} of path {}", *pd->node, *this);
|
||||
throw RuntimeError("Failed to prepare path destination {} of path {}", pd->node->getName(), this->toString());
|
||||
}
|
||||
|
||||
/* Autodetect whether to use original sequence numbers or not */
|
||||
|
@ -316,7 +316,7 @@ void Path::prepare(NodeList &nodes)
|
|||
|
||||
/* Add internal hooks if they are not already in the list */
|
||||
hooks.prepare(signals, m, this, nullptr);
|
||||
hooks.dump(logger, fmt::format("path {}", *this));
|
||||
hooks.dump(logger, fmt::format("path {}", this->toString()));
|
||||
#endif /* WITH_HOOKS */
|
||||
|
||||
/* Prepare pool */
|
||||
|
@ -325,9 +325,9 @@ void Path::prepare(NodeList &nodes)
|
|||
|
||||
ret = pool_init(&pool, pool_size, SAMPLE_LENGTH(osigs->size()), pool_mt);
|
||||
if (ret)
|
||||
throw RuntimeError("Failed to initialize pool of path: {}", *this);
|
||||
throw RuntimeError("Failed to initialize pool of path: {}", this->toString());
|
||||
|
||||
logger->debug("Prepared path {} with {} output signals:", *this, osigs->size());
|
||||
logger->debug("Prepared path {} with {} output signals:", this->toString(), osigs->size());
|
||||
if (logger->level() <= spdlog::level::debug)
|
||||
osigs->dump(logger);
|
||||
|
||||
|
@ -397,7 +397,7 @@ void Path::parse(json_t *json, NodeList &nodes, const uuid_t sn_uuid)
|
|||
/* Input node(s) */
|
||||
ret = mappings.parse(json_in);
|
||||
if (ret)
|
||||
throw ConfigError(json_in, "node-config-path-in", "Failed to parse input mapping of path {}", *this);
|
||||
throw ConfigError(json_in, "node-config-path-in", "Failed to parse input mapping of path {}", this->toString());
|
||||
|
||||
/* Output node(s) */
|
||||
NodeList dests;
|
||||
|
@ -462,7 +462,7 @@ void Path::check()
|
|||
assert(state != State::DESTROYED);
|
||||
|
||||
if (rate < 0)
|
||||
throw RuntimeError("Setting 'rate' of path {} must be a positive number.", *this);
|
||||
throw RuntimeError("Setting 'rate' of path {} must be a positive number.", this->toString());
|
||||
|
||||
if (!IS_POW2(queuelen)) {
|
||||
queuelen = LOG2_CEIL(queuelen);
|
||||
|
@ -492,7 +492,7 @@ void Path::checkPrepared()
|
|||
/* Check that all path sources provide a file descriptor for polling if fixed rate is disabled */
|
||||
for (auto ps : sources) {
|
||||
if (!(ps->getNode()->getFactory()->getFlags() & (int) NodeFactory::Flags::SUPPORTS_POLL))
|
||||
throw RuntimeError("Node {} can not be used in polling mode with path {}", *ps->getNode(), *this);
|
||||
throw RuntimeError("Node {} can not be used in polling mode with path {}", ps->getNode()->getName(), this->toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -522,7 +522,7 @@ void Path::start()
|
|||
logger->info("Starting path {}: #signals={}/{}, #hooks={}, #sources={}, "
|
||||
"#destinations={}, mode={}, poll={}, mask=0b{:b}, rate={}, "
|
||||
"enabled={}, reversed={}, queuelen={}, original_sequence_no={}",
|
||||
*this,
|
||||
this->toString(),
|
||||
signals->size(),
|
||||
getOutputSignals()->size(),
|
||||
hooks.size(),
|
||||
|
@ -596,7 +596,7 @@ void Path::stop()
|
|||
state != State::STOPPING)
|
||||
return;
|
||||
|
||||
logger->info("Stopping path: {}", *this);
|
||||
logger->info("Stopping path: {}", this->toString());
|
||||
|
||||
if (state != State::STOPPING)
|
||||
state = State::STOPPING;
|
||||
|
|
|
@ -49,17 +49,17 @@ void PathDestination::enqueueAll(Path *p, const struct Sample * const smps[], un
|
|||
|
||||
cloned = sample_clone_many(clones, smps, cnt);
|
||||
if (cloned < cnt)
|
||||
p->logger->warn("Pool underrun in path {}", *p);
|
||||
p->logger->warn("Pool underrun in path {}", p->toString());
|
||||
|
||||
for (auto pd : p->destinations) {
|
||||
enqueued = queue_push_many(&pd->queue, (void **) clones, cloned);
|
||||
if (enqueued != cnt)
|
||||
p->logger->warn("Queue overrun for path {}", *p);
|
||||
p->logger->warn("Queue overrun for path {}", p->toString());
|
||||
|
||||
/* Increase reference counter of these samples as they are now also owned by the queue. */
|
||||
sample_incref_many(clones, cloned);
|
||||
|
||||
p->logger->debug("Enqueued {} samples to destination {} of path {}", enqueued, *pd->node, *p);
|
||||
p->logger->debug("Enqueued {} samples to destination {} of path {}", enqueued, pd->node->getName(), p->toString());
|
||||
}
|
||||
|
||||
sample_decref_many(clones, cloned);
|
||||
|
@ -79,17 +79,17 @@ void PathDestination::write()
|
|||
if (allocated == 0)
|
||||
break;
|
||||
else if (allocated < cnt)
|
||||
path->logger->debug("Queue underrun for path {}: allocated={} expected={}", *path, allocated, cnt);
|
||||
path->logger->debug("Queue underrun for path {}: allocated={} expected={}", path->toString(), allocated, cnt);
|
||||
|
||||
path->logger->debug("Dequeued {} samples from queue of node {} which is part of path {}", allocated, *node, *path);
|
||||
path->logger->debug("Dequeued {} samples from queue of node {} which is part of path {}", allocated, node->getName(), path->toString());
|
||||
|
||||
sent = node->write(smps, allocated);
|
||||
if (sent < 0) {
|
||||
path->logger->error("Failed to sent {} samples to node {}: reason={}", cnt, *node, sent);
|
||||
path->logger->error("Failed to sent {} samples to node {}: reason={}", cnt, node->getName(), sent);
|
||||
return;
|
||||
}
|
||||
else if (sent < allocated)
|
||||
path->logger->debug("Partial write to node {}: written={}, expected={}", *node, sent, allocated);
|
||||
path->logger->debug("Partial write to node {}: written={}, expected={}", node->getName(), sent, allocated);
|
||||
|
||||
int released = sample_decref_many(smps, allocated);
|
||||
|
||||
|
@ -100,8 +100,8 @@ void PathDestination::write()
|
|||
void PathDestination::check()
|
||||
{
|
||||
if (!node->isEnabled())
|
||||
throw RuntimeError("Destination {} is not enabled", *node);
|
||||
throw RuntimeError("Destination {} is not enabled", node->getName());
|
||||
|
||||
if (!(node->getFactory()->getFlags() & (int) NodeFactory::Flags::SUPPORTS_WRITE))
|
||||
throw RuntimeError("Destination node {} is not supported as a sink for path ", *node);
|
||||
throw RuntimeError("Destination node {} is not supported as a sink for path ", node->getName());
|
||||
}
|
||||
|
|
|
@ -55,7 +55,7 @@ int PathSource::read(int i)
|
|||
/* Fill smps[] free sample blocks from the pool */
|
||||
allocated = sample_alloc_many(&pool, read_smps, cnt);
|
||||
if (allocated != cnt)
|
||||
path->logger->warn("Pool underrun for path source {}", *node);
|
||||
path->logger->warn("Pool underrun for path source {}", node->getName());
|
||||
|
||||
/* Read ready samples and store them to blocks pointed by smps[] */
|
||||
recv = node->read(read_smps, allocated);
|
||||
|
@ -71,13 +71,13 @@ int PathSource::read(int i)
|
|||
goto out2;
|
||||
}
|
||||
else {
|
||||
path->logger->error("Failed to read samples from node {}", *node);
|
||||
path->logger->error("Failed to read samples from node {}", node->getName());
|
||||
enqueued = 0;
|
||||
goto out2;
|
||||
}
|
||||
}
|
||||
else if (recv < allocated)
|
||||
path->logger->warn("Partial read for path {}: read={}, expected={}", *path, recv, allocated);
|
||||
path->logger->warn("Partial read for path {}: read={}, expected={}", path->toString(), recv, allocated);
|
||||
|
||||
/* Let the master path sources forward received samples to their secondaries */
|
||||
writeToSecondaries(read_smps, recv);
|
||||
|
@ -96,7 +96,7 @@ int PathSource::read(int i)
|
|||
? sample_clone(path->last_sample)
|
||||
: sample_clone(muxed_smps[i-1]);
|
||||
if (!muxed_smps[i]) {
|
||||
path->logger->error("Pool underrun in path {}", *path);
|
||||
path->logger->error("Pool underrun in path {}", path->toString());
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -137,7 +137,7 @@ int PathSource::read(int i)
|
|||
else if (toenqueue != tomux) {
|
||||
int skipped = tomux - toenqueue;
|
||||
|
||||
path->logger->debug("Hooks skipped {} out of {} samples for path {}", skipped, tomux, *path);
|
||||
path->logger->debug("Hooks skipped {} out of {} samples for path {}", skipped, tomux, path->toString());
|
||||
}
|
||||
#else
|
||||
toenqueue = tomux;
|
||||
|
@ -178,10 +178,10 @@ out2: sample_decref_many(read_smps, recv);
|
|||
void PathSource::check()
|
||||
{
|
||||
if (!node->isEnabled())
|
||||
throw RuntimeError("Source {} is not enabled", *node);
|
||||
throw RuntimeError("Source {} is not enabled", node->getName());
|
||||
|
||||
if (!(node->getFactory()->getFlags() & (int) NodeFactory::Flags::SUPPORTS_READ))
|
||||
throw RuntimeError("Node {} is not supported as a source for a path", *node);
|
||||
throw RuntimeError("Node {} is not supported as a source for a path", node->getName());
|
||||
}
|
||||
|
||||
MasterPathSource::MasterPathSource(Path *p, Node *n) :
|
||||
|
@ -193,9 +193,9 @@ void MasterPathSource::writeToSecondaries(struct Sample *smps[], unsigned cnt)
|
|||
for (auto sps : secondaries) {
|
||||
int sent = sps->getNode()->write(smps, cnt);
|
||||
if (sent < 0)
|
||||
throw RuntimeError("Failed to write secondary path source {} of path {}", *sps->getNode(), *path);
|
||||
throw RuntimeError("Failed to write secondary path source {} of path {}", sps->getNode()->getName(), path->toString());
|
||||
else if ((unsigned) sent < cnt)
|
||||
path->logger->warn("Partial write to secondary path source {} of path {}", *sps->getNode(), *path);
|
||||
path->logger->warn("Partial write to secondary path source {} of path {}", sps->getNode()->getName(), path->toString());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -214,7 +214,7 @@ void SuperNode::check()
|
|||
for (auto *n : nodes) {
|
||||
ret = n->check();
|
||||
if (ret)
|
||||
throw RuntimeError("Invalid configuration for node {}", *n);
|
||||
throw RuntimeError("Invalid configuration for node {}", n->getName());
|
||||
}
|
||||
|
||||
for (auto *p : paths)
|
||||
|
@ -235,7 +235,7 @@ void SuperNode::prepareNodeTypes()
|
|||
|
||||
ret = nf->start(this);
|
||||
if (ret)
|
||||
throw RuntimeError("Failed to start node-type: {}", *n->getFactory());
|
||||
throw RuntimeError("Failed to start node-type: {}", n->getFactory()->getName());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -262,7 +262,7 @@ void SuperNode::startNodes()
|
|||
|
||||
ret = n->start();
|
||||
if (ret)
|
||||
throw RuntimeError("Failed to start node: {}", *n);
|
||||
throw RuntimeError("Failed to start node: {}", n->getName());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -284,7 +284,7 @@ void SuperNode::prepareNodes()
|
|||
|
||||
ret = n->prepare();
|
||||
if (ret)
|
||||
throw RuntimeError("Failed to prepare node: {}", *n);
|
||||
throw RuntimeError("Failed to prepare node: {}", n->getName());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -315,7 +315,7 @@ void SuperNode::prepare()
|
|||
for (auto *n : nodes) {
|
||||
if (n->sources.size() == 0 &&
|
||||
n->destinations.size() == 0) {
|
||||
logger->info("Node {} is not used by any path. Disabling...", *n);
|
||||
logger->info("Node {} is not used by any path. Disabling...", n->getName());
|
||||
n->setEnabled(false);
|
||||
}
|
||||
}
|
||||
|
@ -366,7 +366,7 @@ void SuperNode::stopNodes()
|
|||
n->getState() == State::STOPPING) {
|
||||
ret = n->stop();
|
||||
if (ret)
|
||||
throw RuntimeError("Failed to stop node: {}", *n);
|
||||
throw RuntimeError("Failed to stop node: {}", n->getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -383,7 +383,7 @@ void SuperNode::stopNodeTypes()
|
|||
|
||||
ret = nf->stop();
|
||||
if (ret)
|
||||
throw RuntimeError("Failed to stop node-type: {}", *n->getFactory());
|
||||
throw RuntimeError("Failed to stop node-type: {}", n->getFactory()->getName());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -508,7 +508,7 @@ graph_t * SuperNode::getGraph()
|
|||
uuid_unparse(n->getUuid(), uuid_str);
|
||||
|
||||
set_attr(nodeMap[n], "shape", "ellipse");
|
||||
set_attr(nodeMap[n], "tooltip", fmt::format("type={}, uuid={}", *n->getFactory(), uuid_str));
|
||||
set_attr(nodeMap[n], "tooltip", fmt::format("type={}, uuid={}", n->getFactory()->getName(), uuid_str));
|
||||
// set_attr(nodeMap[n], "fixedsize", "true");
|
||||
// set_attr(nodeMap[n], "width", "0.15");
|
||||
// set_attr(nodeMap[n], "height", "0.15");
|
||||
|
|
|
@ -18,7 +18,7 @@ public:
|
|||
|
||||
virtual void restart()
|
||||
{
|
||||
logger->info("The path {} restarted!", *path);
|
||||
logger->info("The path {} restarted!", path->toString());
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -200,7 +200,7 @@ public:
|
|||
goto leave;
|
||||
}
|
||||
|
||||
logger->warn("Failed to receive samples from node {}: reason={}", *node, recv);
|
||||
logger->warn("Failed to receive samples from node {}: reason={}", node->getName(), recv);
|
||||
} else
|
||||
formatter->print(stdout, smps, recv);
|
||||
|
||||
|
@ -452,7 +452,7 @@ check: if (optarg == endptr)
|
|||
|
||||
ret = node->getFactory()->start(&sn);
|
||||
if (ret)
|
||||
throw RuntimeError("Failed to intialize node type {}: reason={}", *node->getFactory(), ret);
|
||||
throw RuntimeError("Failed to intialize node type {}: reason={}", node->getFactory()->getName(), ret);
|
||||
|
||||
sn.startInterfaces();
|
||||
|
||||
|
@ -462,11 +462,11 @@ check: if (optarg == endptr)
|
|||
|
||||
ret = node->prepare();
|
||||
if (ret)
|
||||
throw RuntimeError("Failed to prepare node {}: reason={}", *node, ret);
|
||||
throw RuntimeError("Failed to prepare node {}: reason={}", node->getName(), ret);
|
||||
|
||||
ret = node->start();
|
||||
if (ret)
|
||||
throw RuntimeError("Failed to start node {}: reason={}", *node, ret);
|
||||
throw RuntimeError("Failed to start node {}: reason={}", node->getName(), ret);
|
||||
|
||||
recv.dir = std::make_unique<PipeReceiveDirection>(node, formatter, recv.enabled, recv.limit);
|
||||
send.dir = std::make_unique<PipeSendDirection>(node, formatter, send.enabled, send.limit);
|
||||
|
@ -484,7 +484,7 @@ check: if (optarg == endptr)
|
|||
* Node::read() call and allow it to be joined(). */
|
||||
ret = node->stop();
|
||||
if (ret)
|
||||
throw RuntimeError("Failed to stop node {}: reason={}", *node, ret);
|
||||
throw RuntimeError("Failed to stop node {}: reason={}", node->getName(), ret);
|
||||
|
||||
recv.dir->stopThread();
|
||||
send.dir->stopThread();
|
||||
|
@ -493,7 +493,7 @@ check: if (optarg == endptr)
|
|||
|
||||
ret = node->getFactory()->stop();
|
||||
if (ret)
|
||||
throw RuntimeError("Failed to stop node type {}: reason={}", *node->getFactory(), ret);
|
||||
throw RuntimeError("Failed to stop node type {}: reason={}", node->getFactory()->getName(), ret);
|
||||
|
||||
#if defined(WITH_NODE_WEBSOCKET) && defined(WITH_WEB)
|
||||
/* Only start web subsystem if villas-pipe is used with a websocket node */
|
||||
|
|
|
@ -255,7 +255,7 @@ check: if (optarg == endptr)
|
|||
|
||||
ret = node->getFactory()->start(nullptr);
|
||||
if (ret)
|
||||
throw RuntimeError("Failed to intialize node type {}: reason={}", *node->getFactory(), ret);
|
||||
throw RuntimeError("Failed to intialize node type {}: reason={}", node->getFactory()->getName(), ret);
|
||||
|
||||
ret = node->check();
|
||||
if (ret)
|
||||
|
@ -263,7 +263,7 @@ check: if (optarg == endptr)
|
|||
|
||||
ret = node->prepare();
|
||||
if (ret)
|
||||
throw RuntimeError("Failed to prepare node {}: reason={}", *node, ret);
|
||||
throw RuntimeError("Failed to prepare node {}: reason={}", node->getName(), ret);
|
||||
|
||||
/* Try parsing format config as JSON */
|
||||
json_format = json_loads(format.c_str(), 0, &err);
|
||||
|
@ -281,7 +281,7 @@ check: if (optarg == endptr)
|
|||
|
||||
ret = node->start();
|
||||
if (ret)
|
||||
throw RuntimeError("Failed to start node {}: reason={}", *node, ret);
|
||||
throw RuntimeError("Failed to start node {}: reason={}", node->getName(), ret);
|
||||
|
||||
while (!stop && node->getState() == State::STARTED) {
|
||||
t = sample_alloc(&pool);
|
||||
|
@ -304,7 +304,7 @@ out: sample_decref(t);
|
|||
|
||||
ret = node->getFactory()->stop();
|
||||
if (ret)
|
||||
throw RuntimeError("Failed to de-intialize node type {}: reason={}", *node->getFactory(), ret);
|
||||
throw RuntimeError("Failed to de-intialize node type {}: reason={}", node->getFactory()->getName(), ret);
|
||||
|
||||
delete node;
|
||||
delete formatter;
|
||||
|
|
|
@ -175,15 +175,15 @@ check: if (optarg == endptr)
|
|||
|
||||
ret = node->getFactory()->start(&sn);
|
||||
if (ret)
|
||||
throw RuntimeError("Failed to start node-type {}: reason={}", *node->getFactory(), ret);
|
||||
throw RuntimeError("Failed to start node-type {}: reason={}", node->getFactory()->getName(), ret);
|
||||
|
||||
ret = node->prepare();
|
||||
if (ret)
|
||||
throw RuntimeError("Failed to prepare node {}: reason={}", *node, ret);
|
||||
throw RuntimeError("Failed to prepare node {}: reason={}", node->getName(), ret);
|
||||
|
||||
ret = node->start();
|
||||
if (ret)
|
||||
throw RuntimeError("Failed to start node {}: reason={}", *node, ret);
|
||||
throw RuntimeError("Failed to start node {}: reason={}", node->getName(), ret);
|
||||
|
||||
/* Print header */
|
||||
fprintf(stdout, "%17s%5s%10s%10s%10s%10s%10s\n", "timestamp", "seq", "rtt", "min", "max", "mean", "stddev");
|
||||
|
@ -225,11 +225,11 @@ check: if (optarg == endptr)
|
|||
|
||||
ret = node->stop();
|
||||
if (ret)
|
||||
throw RuntimeError("Failed to stop node {}: reason={}", *node, ret);
|
||||
throw RuntimeError("Failed to stop node {}: reason={}", node->getName(), ret);
|
||||
|
||||
ret = node->getFactory()->stop();
|
||||
if (ret)
|
||||
throw RuntimeError("Failed to stop node-type {}: reason={}", *node->getFactory(), ret);
|
||||
throw RuntimeError("Failed to stop node-type {}: reason={}", node->getFactory()->getName(), ret);
|
||||
|
||||
delete smp_send;
|
||||
delete smp_recv;
|
||||
|
|
Loading…
Add table
Reference in a new issue