1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

several fixes for new opal format

This commit is contained in:
Steffen Vogel 2022-02-24 07:31:00 -05:00
parent 28380d4e73
commit dfcafed20a
9 changed files with 75 additions and 61 deletions

View file

@ -64,12 +64,6 @@ public:
virtual
~Format();
virtual
bool isBinaryPayload()
{
return false;
}
const SignalList::Ptr getSignals() const
{
return signals;
@ -151,9 +145,6 @@ class BinaryFormat : public Format {
public:
using Format::Format;
virtual bool isBinaryPayload()
{ return true; }
};
class FormatFactory : public plugin::Plugin {

View file

@ -36,7 +36,7 @@ struct Sample;
class OpalAsyncIPFormat : public BinaryFormat {
protected:
const int MAXSIZE = 64;
const unsigned MAXSIZE = 64;
struct Payload {
int16_t dev_id; // (2 bytes) Sender device ID
@ -71,7 +71,7 @@ public:
virtual
Format * make()
{
return new OpalAsyncIPFormat((int) SampleFlags::HAS_TS_ORIGIN | (int) SampleFlags::HAS_SEQUENCE | (int) SampleFlags::HAS_DATA);
return new OpalAsyncIPFormat((int) SampleFlags::HAS_SEQUENCE | (int) SampleFlags::HAS_DATA);
}
/// Get plugin name

View file

@ -114,12 +114,14 @@ int LineFormat::scan(FILE *f, struct Sample * const smps[], unsigned cnt)
}
}
for (i = 0; i < cnt; i++) {
for (i = 0; i < cnt && !feof(f); i++) {
size_t rbytes;
char *ptr;
skip: bytes = getdelim(&in.buffer, &in.buflen, delimiter, f);
if (bytes < 0)
if (feof(f))
break;
else if (bytes < 0)
return -1; /* An error or eof occured */
/* Skip whitespaces, empty and comment lines */

View file

@ -48,7 +48,10 @@ int OpalAsyncIPFormat::sprint(char *buf, size_t len, size_t *wbytes, const struc
pl->msg_id = htole32(smp->sequence);
pl->msg_len = htole16(smp->length * sizeof(double));
for (unsigned j = 0; j < smp->length; j++) {
if (smp->length > MAXSIZE)
logger->warn("Can not sent more then {} signals via opal.asyncip format. We only send the first {}..", MAXSIZE, MAXSIZE);
for (unsigned j = 0; j < MIN(MAXSIZE, smp->length); j++) {
auto sig = smp->signals->getByIndex(j);
auto d = smp->data[j];
@ -71,15 +74,11 @@ int OpalAsyncIPFormat::sscan(const char *buf, size_t len, size_t *rbytes, struct
{
unsigned i;
auto *ptr = buf;
ssize_t slen = len;
if (len % 8 != 0)
return -1; /* Packet size is invalid: Must be multiple of 4 bytes */
for (i = 0; i < cnt && ptr - buf + sizeof(struct Payload) < slen; i++) {
if (len < 8)
return -1; /* Packet size is invalid: Must be multiple of 4 bytes */
return -1; /* Packet size is invalid: Must be multiple of 8 bytes */
for (i = 0; i < cnt && ptr - buf + sizeof(struct Payload) < len; i++) {
auto *pl = (struct Payload *) ptr;
auto *smp = smps[i];
@ -89,7 +88,7 @@ int OpalAsyncIPFormat::sscan(const char *buf, size_t len, size_t *rbytes, struct
smp->sequence = le32toh(pl->msg_id);
smp->length = rlen / sizeof(double);
smp->flags = (int) SampleFlags::HAS_SEQUENCE;
smp->flags = (int) SampleFlags::HAS_SEQUENCE | (int) SampleFlags::HAS_DATA;
smp->signals = signals;
for (unsigned j = 0; j < MIN(smp->length, smp->capacity); j++) {

View file

@ -41,7 +41,7 @@ size_t VILLASHumanFormat::sprintLine(char *buf, size_t len, const struct Sample
off += snprintf(buf + off, len - off, ".%09llu", (unsigned long long) smp->ts.origin.tv_nsec);
}
else
off += snprintf(buf + off, len - off, "nan.nan");
off += snprintf(buf + off, len - off, "0.0");
}
if (flags & (int) SampleFlags::HAS_OFFSET) {

View file

@ -266,7 +266,8 @@ int villas::node::websocket_protocol_cb(struct lws *wsi, enum lws_callback_reaso
size_t wbytes;
c->formatter->sprint(c->buffers.send->data() + LWS_PRE, c->buffers.send->size() - LWS_PRE, &wbytes, smps, pulled);
ret = lws_write(wsi, (unsigned char *) c->buffers.send->data() + LWS_PRE, wbytes, c->formatter->isBinaryPayload() ? LWS_WRITE_BINARY : LWS_WRITE_TEXT);
auto isBinary = dynamic_cast<BinaryFormat*>(c->formatter) != nullptr;
ret = lws_write(wsi, (unsigned char *) c->buffers.send->data() + LWS_PRE, wbytes, isBinary ? LWS_WRITE_BINARY : LWS_WRITE_TEXT);
sample_decref_many(smps, pulled);

View file

@ -28,7 +28,9 @@
#include <villas/utils.hpp>
#include <villas/log.hpp>
#include <villas/format.hpp>
#include <villas/formats/line.hpp>
#include <villas/sample.hpp>
#include <villas/pool.hpp>
#include <villas/exceptions.hpp>
#include <villas/node/config.hpp>
#include <villas/node/memory.hpp>
@ -141,16 +143,30 @@ protected:
dirs[i].formatter->start(dtypes);
}
struct Sample *smp = sample_alloc_mem(DEFAULT_SAMPLE_LENGTH);
// Line based formats are processed sample-by-sample
// while for others, we process them in chunks of 128 samples
auto isLine = dynamic_cast<LineFormat *>(dirs[0].formatter) != nullptr;
auto cnt = isLine ? 1 : 128;
while (true) {
ret = dirs[0].formatter->scan(stdin, smp);
/* Initialize memory */
struct Pool pool;
ret = pool_init(&pool, cnt, SAMPLE_LENGTH(DEFAULT_SAMPLE_LENGTH), &memory::heap);
if (ret)
throw RuntimeError("Failed to allocate memory for pool.");
struct Sample *smps[cnt];
ret = sample_alloc_many(&pool, smps, cnt);
if (ret < 0)
throw MemoryAllocationError();
while (!feof(stdin)) {
ret = dirs[0].formatter->scan(stdin, smps, cnt);
if (ret == 0)
continue;
else if (ret < 0)
break;
dirs[1].formatter->print(stdout, smp);
dirs[1].formatter->print(stdout, smps, ret);
}
for (unsigned i = 0; i < ARRAY_LEN(dirs); i++)

View file

@ -144,15 +144,9 @@ public:
scanned = formatter->scan(stdin, smps, allocated);
if (scanned < 0) {
if (feof(stdin))
goto leave;
else if (stop)
goto leave2;
logger->warn("Failed to read from stdin");
if (!stop)
logger->warn("Failed to read from stdin");
}
else if (scanned == 0)
continue;
/* Fill in missing sequence numbers */
for (int i = 0; i < scanned; i++) {
@ -164,23 +158,28 @@ public:
sent = node->write(smps, scanned);
sample_decref_many(smps, scanned);
sample_decref_many(smps, allocated);
count += sent;
if (limit > 0 && count >= limit)
goto leave;
goto leave_limit;
if (feof(stdin))
goto leave_eof;
}
goto leave2;
goto leave;
leave_eof:
logger->info("Reached end-of-file.");
raise(SIGUSR1);
goto leave;
leave_limit:
logger->info("Reached send limit.");
raise(SIGUSR1);
leave:
if (feof(stdin))
logger->info("Reached end-of-file.");
else
logger->info("Reached send limit.");
raise(SIGUSR1);
leave2:
logger->debug("Send thread stopped");
}
};
@ -211,30 +210,31 @@ public:
recv = node->read(smps, allocated);
if (recv < 0) {
if (node->getState() == State::STOPPING || stop)
goto leave2;
else
logger->warn("Failed to receive samples from node {}: reason={}", *node, recv);
}
else {
formatter->print(stdout, smps, recv);
count += recv;
if (limit > 0 && count >= limit)
if (node->getState() == State::STOPPING || stop) {
sample_decref_many(smps, allocated);
goto leave;
}
logger->warn("Failed to receive samples from node {}: reason={}", *node, recv);
}
formatter->print(stdout, smps, recv);
sample_decref_many(smps, allocated);
count += recv;
if (limit > 0 && count >= limit)
goto leave_limit;
}
goto leave2;
leave:
goto leave;
leave_limit:
logger->info("Reached receive limit.");
raise(SIGUSR1);
leave2:
logger->debug("Receive thread stopped");
sample_decref_many(smps, allocated);
leave:
logger->debug("Receive thread stopped");
}
};

View file

@ -41,5 +41,10 @@ for FORMAT in ${FORMATS}; do
villas convert -o ${FORMAT} < input.dat | tee ${TEMP} | \
villas convert -i ${FORMAT} > output.dat
villas compare input.dat output.dat
CMP_FLAGS=""
if [ ${FORMAT} = "opal.asyncip" ]; then
CMP_FLAGS+=-T
fi
villas compare ${CMP_FLAGS} input.dat output.dat
done