diff --git a/include/villas/format.hpp b/include/villas/format.hpp index 8969fa392..dbf02ff8f 100644 --- a/include/villas/format.hpp +++ b/include/villas/format.hpp @@ -64,12 +64,6 @@ public: virtual ~Format(); - virtual - bool isBinaryPayload() - { - return false; - } - const SignalList::Ptr getSignals() const { return signals; @@ -151,9 +145,6 @@ class BinaryFormat : public Format { public: using Format::Format; - - virtual bool isBinaryPayload() - { return true; } }; class FormatFactory : public plugin::Plugin { diff --git a/include/villas/formats/opal_asyncip.hpp b/include/villas/formats/opal_asyncip.hpp index 90884c831..6040a9d92 100644 --- a/include/villas/formats/opal_asyncip.hpp +++ b/include/villas/formats/opal_asyncip.hpp @@ -36,7 +36,7 @@ struct Sample; class OpalAsyncIPFormat : public BinaryFormat { protected: - const int MAXSIZE = 64; + const unsigned MAXSIZE = 64; struct Payload { int16_t dev_id; // (2 bytes) Sender device ID @@ -71,7 +71,7 @@ public: virtual Format * make() { - return new OpalAsyncIPFormat((int) SampleFlags::HAS_TS_ORIGIN | (int) SampleFlags::HAS_SEQUENCE | (int) SampleFlags::HAS_DATA); + return new OpalAsyncIPFormat((int) SampleFlags::HAS_SEQUENCE | (int) SampleFlags::HAS_DATA); } /// Get plugin name diff --git a/lib/formats/line.cpp b/lib/formats/line.cpp index 4f684ce21..a2110a67e 100644 --- a/lib/formats/line.cpp +++ b/lib/formats/line.cpp @@ -114,12 +114,14 @@ int LineFormat::scan(FILE *f, struct Sample * const smps[], unsigned cnt) } } - for (i = 0; i < cnt; i++) { + for (i = 0; i < cnt && !feof(f); i++) { size_t rbytes; char *ptr; skip: bytes = getdelim(&in.buffer, &in.buflen, delimiter, f); - if (bytes < 0) + if (feof(f)) + break; + else if (bytes < 0) return -1; /* An error or eof occured */ /* Skip whitespaces, empty and comment lines */ diff --git a/lib/formats/opal_asyncip.cpp b/lib/formats/opal_asyncip.cpp index 647972c2a..7d79f0bdd 100644 --- a/lib/formats/opal_asyncip.cpp +++ b/lib/formats/opal_asyncip.cpp @@ -48,7 +48,10 @@ int OpalAsyncIPFormat::sprint(char *buf, size_t len, size_t *wbytes, const struc pl->msg_id = htole32(smp->sequence); pl->msg_len = htole16(smp->length * sizeof(double)); - for (unsigned j = 0; j < smp->length; j++) { + if (smp->length > MAXSIZE) + logger->warn("Can not sent more then {} signals via opal.asyncip format. We only send the first {}..", MAXSIZE, MAXSIZE); + + for (unsigned j = 0; j < MIN(MAXSIZE, smp->length); j++) { auto sig = smp->signals->getByIndex(j); auto d = smp->data[j]; @@ -71,15 +74,11 @@ int OpalAsyncIPFormat::sscan(const char *buf, size_t len, size_t *rbytes, struct { unsigned i; auto *ptr = buf; - ssize_t slen = len; if (len % 8 != 0) - return -1; /* Packet size is invalid: Must be multiple of 4 bytes */ - - for (i = 0; i < cnt && ptr - buf + sizeof(struct Payload) < slen; i++) { - if (len < 8) - return -1; /* Packet size is invalid: Must be multiple of 4 bytes */ + return -1; /* Packet size is invalid: Must be multiple of 8 bytes */ + for (i = 0; i < cnt && ptr - buf + sizeof(struct Payload) < len; i++) { auto *pl = (struct Payload *) ptr; auto *smp = smps[i]; @@ -89,7 +88,7 @@ int OpalAsyncIPFormat::sscan(const char *buf, size_t len, size_t *rbytes, struct smp->sequence = le32toh(pl->msg_id); smp->length = rlen / sizeof(double); - smp->flags = (int) SampleFlags::HAS_SEQUENCE; + smp->flags = (int) SampleFlags::HAS_SEQUENCE | (int) SampleFlags::HAS_DATA; smp->signals = signals; for (unsigned j = 0; j < MIN(smp->length, smp->capacity); j++) { diff --git a/lib/formats/villas_human.cpp b/lib/formats/villas_human.cpp index a22d33862..7678ff68e 100644 --- a/lib/formats/villas_human.cpp +++ b/lib/formats/villas_human.cpp @@ -41,7 +41,7 @@ size_t VILLASHumanFormat::sprintLine(char *buf, size_t len, const struct Sample off += snprintf(buf + off, len - off, ".%09llu", (unsigned long long) smp->ts.origin.tv_nsec); } else - off += snprintf(buf + off, len - off, "nan.nan"); + off += snprintf(buf + off, len - off, "0.0"); } if (flags & (int) SampleFlags::HAS_OFFSET) { diff --git a/lib/nodes/websocket.cpp b/lib/nodes/websocket.cpp index c65ab67ce..335faf21f 100644 --- a/lib/nodes/websocket.cpp +++ b/lib/nodes/websocket.cpp @@ -266,7 +266,8 @@ int villas::node::websocket_protocol_cb(struct lws *wsi, enum lws_callback_reaso size_t wbytes; c->formatter->sprint(c->buffers.send->data() + LWS_PRE, c->buffers.send->size() - LWS_PRE, &wbytes, smps, pulled); - ret = lws_write(wsi, (unsigned char *) c->buffers.send->data() + LWS_PRE, wbytes, c->formatter->isBinaryPayload() ? LWS_WRITE_BINARY : LWS_WRITE_TEXT); + auto isBinary = dynamic_cast(c->formatter) != nullptr; + ret = lws_write(wsi, (unsigned char *) c->buffers.send->data() + LWS_PRE, wbytes, isBinary ? LWS_WRITE_BINARY : LWS_WRITE_TEXT); sample_decref_many(smps, pulled); diff --git a/src/villas-convert.cpp b/src/villas-convert.cpp index 4a4ed7fa6..5d97767cd 100644 --- a/src/villas-convert.cpp +++ b/src/villas-convert.cpp @@ -28,7 +28,9 @@ #include #include #include +#include #include +#include #include #include #include @@ -141,16 +143,30 @@ protected: dirs[i].formatter->start(dtypes); } - struct Sample *smp = sample_alloc_mem(DEFAULT_SAMPLE_LENGTH); + // Line based formats are processed sample-by-sample + // while for others, we process them in chunks of 128 samples + auto isLine = dynamic_cast(dirs[0].formatter) != nullptr; + auto cnt = isLine ? 1 : 128; - while (true) { - ret = dirs[0].formatter->scan(stdin, smp); + /* Initialize memory */ + struct Pool pool; + ret = pool_init(&pool, cnt, SAMPLE_LENGTH(DEFAULT_SAMPLE_LENGTH), &memory::heap); + if (ret) + throw RuntimeError("Failed to allocate memory for pool."); + + struct Sample *smps[cnt]; + ret = sample_alloc_many(&pool, smps, cnt); + if (ret < 0) + throw MemoryAllocationError(); + + while (!feof(stdin)) { + ret = dirs[0].formatter->scan(stdin, smps, cnt); if (ret == 0) continue; else if (ret < 0) break; - dirs[1].formatter->print(stdout, smp); + dirs[1].formatter->print(stdout, smps, ret); } for (unsigned i = 0; i < ARRAY_LEN(dirs); i++) diff --git a/src/villas-pipe.cpp b/src/villas-pipe.cpp index d5c1ee85b..5c9518836 100644 --- a/src/villas-pipe.cpp +++ b/src/villas-pipe.cpp @@ -144,15 +144,9 @@ public: scanned = formatter->scan(stdin, smps, allocated); if (scanned < 0) { - if (feof(stdin)) - goto leave; - else if (stop) - goto leave2; - - logger->warn("Failed to read from stdin"); + if (!stop) + logger->warn("Failed to read from stdin"); } - else if (scanned == 0) - continue; /* Fill in missing sequence numbers */ for (int i = 0; i < scanned; i++) { @@ -164,23 +158,28 @@ public: sent = node->write(smps, scanned); - sample_decref_many(smps, scanned); + sample_decref_many(smps, allocated); count += sent; if (limit > 0 && count >= limit) - goto leave; + goto leave_limit; + + if (feof(stdin)) + goto leave_eof; } - goto leave2; + goto leave; + +leave_eof: + logger->info("Reached end-of-file."); + raise(SIGUSR1); + goto leave; + +leave_limit: + logger->info("Reached send limit."); + raise(SIGUSR1); leave: - if (feof(stdin)) - logger->info("Reached end-of-file."); - else - logger->info("Reached send limit."); - - raise(SIGUSR1); -leave2: logger->debug("Send thread stopped"); } }; @@ -211,30 +210,31 @@ public: recv = node->read(smps, allocated); if (recv < 0) { - if (node->getState() == State::STOPPING || stop) - goto leave2; - else - logger->warn("Failed to receive samples from node {}: reason={}", *node, recv); - } - else { - formatter->print(stdout, smps, recv); - - count += recv; - if (limit > 0 && count >= limit) + if (node->getState() == State::STOPPING || stop) { + sample_decref_many(smps, allocated); goto leave; + } + + logger->warn("Failed to receive samples from node {}: reason={}", *node, recv); } + formatter->print(stdout, smps, recv); + sample_decref_many(smps, allocated); + + count += recv; + if (limit > 0 && count >= limit) + goto leave_limit; } - goto leave2; -leave: + goto leave; + +leave_limit: logger->info("Reached receive limit."); raise(SIGUSR1); -leave2: - logger->debug("Receive thread stopped"); - sample_decref_many(smps, allocated); +leave: + logger->debug("Receive thread stopped"); } }; diff --git a/tests/integration/convert.sh b/tests/integration/convert.sh index 86d41d0ca..00fc64f34 100755 --- a/tests/integration/convert.sh +++ b/tests/integration/convert.sh @@ -41,5 +41,10 @@ for FORMAT in ${FORMATS}; do villas convert -o ${FORMAT} < input.dat | tee ${TEMP} | \ villas convert -i ${FORMAT} > output.dat - villas compare input.dat output.dat + CMP_FLAGS="" + if [ ${FORMAT} = "opal.asyncip" ]; then + CMP_FLAGS+=-T + fi + + villas compare ${CMP_FLAGS} input.dat output.dat done