diff --git a/include/villas/nodes/fpga.hpp b/include/villas/nodes/fpga.hpp index bf6f4a24d..c292f4e56 100644 --- a/include/villas/nodes/fpga.hpp +++ b/include/villas/nodes/fpga.hpp @@ -32,20 +32,18 @@ protected: std::string cardName; std::list connectStrings; - // This setting decouples DMA management from Data processing. - // With this setting set to true, the DMA management for both read and - // write transactions is performed after the write command has been send - // the DMA controller. - // This allows us to achieve very low latencies for an application that - // waits for data from the FPGA processes it, and finished a time step - // by issuing a write to the FPGA. + // This setting improves latency by remove various checks. + // Use with caution! Requires read cache in FPGA design! + // The common use case in VILLASfpga is that we have exactly + // one write for every read and the number of exchanged signals + // do not change. If this is the case, we can reuse the buffer + // descriptors during reads and write, thus avoidng freeing, + // reallocating and setting them up. + // We set up the descriptors in start, and in write or read, + // we only reset the complete bit in the buffer descriptor and + // write to the tdesc register to start the DMA transfer. + // Improves read/write latency by approx. 40%. bool lowLatencyMode; - // This setting performs synchronization with DMA controller in separate - // threads. It requires lowLatencyMode to be set to true. - // This may improve latency, because DMA management is completely decoupled - // from the data path, or may increase latency because of additional thread - // synchronization overhead. Only use after verifying that it improves latency. - bool asyncDmaManagement; // State std::shared_ptr card; @@ -54,18 +52,13 @@ protected: std::shared_ptr blockTx; // Non-public methods - virtual int asyncRead(Sample *smps[], unsigned cnt); + virtual int fastRead(Sample *smps[], unsigned cnt); virtual int slowRead(Sample *smps[], unsigned cnt); virtual int _read(Sample *smps[], unsigned cnt) override; + virtual int fastWrite(Sample *smps[], unsigned cnt); + virtual int slowWrite(Sample *smps[], unsigned cnt); virtual int _write(Sample *smps[], unsigned cnt) override; - // only used if asyncDmaManagement is true - volatile std::atomic_bool readActive; - volatile std::atomic_bool writeActive; - volatile std::atomic_bool stopThreads; - std::shared_ptr dmaThread; - virtual int dmaMgmtThread(); - public: FpgaNode(const uuid_t &id = {}, const std::string &name = ""); diff --git a/lib/nodes/fpga.cpp b/lib/nodes/fpga.cpp index 849ab7bc8..c2be73f08 100644 --- a/lib/nodes/fpga.cpp +++ b/lib/nodes/fpga.cpp @@ -33,9 +33,8 @@ static std::list> cards; static std::shared_ptr vfioContainer; FpgaNode::FpgaNode(const uuid_t &id, const std::string &name) - : Node(id, name), cardName(""), connectStrings(), card(nullptr), dma(), - blockRx(), blockTx(), readActive(false), writeActive(false), - stopThreads(false), dmaThread() {} + : Node(id, name), cardName(""), connectStrings(), lowLatencyMode(false), + card(nullptr), dma(), blockRx(), blockTx() {} FpgaNode::~FpgaNode() {} @@ -90,15 +89,7 @@ int FpgaNode::prepare() { return Node::prepare(); } -int FpgaNode::stop() { - if (asyncDmaManagement) { - stopThreads = true; - if (dmaThread) { - dmaThread->join(); - } - } - return Node::stop(); -} +int FpgaNode::stop() { return Node::stop(); } int FpgaNode::parse(json_t *json) { int ret = Node::parse(json); @@ -115,9 +106,9 @@ int FpgaNode::parse(json_t *json) { vfioContainer = std::make_shared(); } - ret = json_unpack_ex(json, &err, 0, "{ s: o, s?: o, s?: b}", "card", + ret = json_unpack_ex(json, &err, 0, "{ s: o, s?: o, s?: b, s?: b}", "card", &jsonCard, "connect", &jsonConnectStrings, - "asyncDmaManagement", &asyncDmaManagement); + "lowLatencyMode", &lowLatencyMode); if (ret) { throw ConfigError(json, err, "node-config-fpga", "Failed to parse configuration of node {}", @@ -169,7 +160,8 @@ const std::string &FpgaNode::getDetails() { std::copy(connectStrings.begin(), connectStrings.end(), std::ostream_iterator(imploded, delim)); - details = fmt::format("fpga={}, connect={}", name, imploded.str()); + details = fmt::format("fpga={}, connect={}, lowLatencyMode={}", name, + imploded.str(), lowLatencyMode); } return details; @@ -178,49 +170,94 @@ const std::string &FpgaNode::getDetails() { int FpgaNode::check() { return 0; } int FpgaNode::start() { - if (asyncDmaManagement) { - dmaThread = std::make_shared(&FpgaNode::dmaMgmtThread, this); + if (getInputSignalsMaxCount() * sizeof(float) > blockRx->getSize()) { + logger->error("Input signals exceed block size."); + throw villas ::RuntimeError("Input signals exceed block size."); } - dma->read(*blockRx, blockRx->getSize()); + if (lowLatencyMode) { + dma->readScatterGatherPrepare(*blockRx, blockRx->getSize()); + if (getInputSignalsMaxCount() != 0) { + dma->writeScatterGatherPrepare(*blockTx, + getInputSignalsMaxCount() * sizeof(float)); + } else { + logger->warn("No input signals defined. Not preparing write buffer - " + "writes will not work."); + } + } + return Node::start(); } -int FpgaNode::dmaMgmtThread() { - while (readActive) { - usleep(1); - } - while (!stopThreads) { - // readActive must be true, writeActive must be false - dma->read(*blockRx, blockRx->getSize()); - readActive = true; - while (readActive && !stopThreads) { - } - while (!writeActive && !stopThreads) { - } - // readActive must be false, writeActive must be true - dma->writeComplete(); - writeActive = false; +int FpgaNode::fastWrite(Sample *smps[], unsigned cnt) { + Sample *smp = smps[0]; + + assert(cnt == 1 && smps != nullptr && smps[0] != nullptr); + + auto mem = MemoryAccessor(*blockTx); + float scaled; + + for (unsigned i = 0; i < smp->length; i++) { + if (smp->signals->getByIndex(i)->type == SignalType::FLOAT) { + scaled = smp->data[i].f; + if (scaled > 10.) { + scaled = 10.; + } else if (scaled < -10.) { + scaled = -10.; + } + mem[i] = (scaled + 10.) * ((float)0xFFFF / 20.); + } else { + mem[i] = smp->data[i].i; + } } - return 0; + dma->writeScatterGatherFast(); + auto written = dma->writeScatterGatherPoll() / + sizeof(float); // The number of samples written + + if (written != smp->length) { + logger->warn("Wrote {} samples, but {} were expected", written, + smp->length); + } + + return 1; +} + +int FpgaNode::fastRead(Sample *smps[], unsigned cnt) { + Sample *smp = smps[0]; + auto mem = MemoryAccessor(*blockRx); + + smp->flags = (int)SampleFlags::HAS_DATA; + smp->signals = in.signals; + + dma->readScatterGatherFast(); + auto read = dma->readScatterGatherPoll(true); + // We assume a lot without checking at this point. All for the latency! + + smp->length = 0; + for (unsigned i = 0; i < MIN(read / sizeof(float), smp->capacity); i++) { + smp->data[i].f = static_cast(mem[i]); + smp->length++; + } + + return 1; } int FpgaNode::_read(Sample *smps[], unsigned cnt) { + if (lowLatencyMode) { + return fastRead(smps, cnt); + } else { + return slowRead(smps, cnt); + } +} + +int FpgaNode::slowRead(Sample *smps[], unsigned cnt) { unsigned read; Sample *smp = smps[0]; assert(cnt == 1); - if (asyncDmaManagement) { - while (!readActive.load(std::memory_order_relaxed) && !stopThreads) - ; - } else { - // dma->read(*blockRx, blockRx->getSize()); - } + dma->read(*blockRx, blockRx->getSize()); auto c = dma->readComplete(); - if (asyncDmaManagement) { - readActive.store(false, std::memory_order_relaxed); - } read = c.bytes / sizeof(float); @@ -243,15 +280,19 @@ int FpgaNode::_read(Sample *smps[], unsigned cnt) { } int FpgaNode::_write(Sample *smps[], unsigned cnt) { + if (lowLatencyMode) { + return fastWrite(smps, cnt); + } else { + return slowWrite(smps, cnt); + } +} + +int FpgaNode::slowWrite(Sample *smps[], unsigned cnt) { // unsigned int written; Sample *smp = smps[0]; assert(cnt == 1 && smps != nullptr && smps[0] != nullptr); - if (asyncDmaManagement) { - while (writeActive.load(std::memory_order_relaxed) && !stopThreads) - ; - } auto mem = MemoryAccessor(*blockTx); float scaled; @@ -269,17 +310,12 @@ int FpgaNode::_write(Sample *smps[], unsigned cnt) { if (!state) { return -1; } - if (asyncDmaManagement) { - writeActive.store(true, std::memory_order_relaxed); - } else { - auto written = dma->writeComplete().bytes / - sizeof(float); // The number of samples written + auto written = dma->writeComplete().bytes / + sizeof(float); // The number of samples written - if (written != smp->length) { - logger->warn("Wrote {} samples, but {} were expected", written, - smp->length); - } - dma->read(*blockRx, blockRx->getSize()); + if (written != smp->length) { + logger->warn("Wrote {} samples, but {} were expected", written, + smp->length); } return 1;