diff --git a/fpga/include/villas/fpga/ips/dma.hpp b/fpga/include/villas/fpga/ips/dma.hpp index c047735a3..10d5108b7 100644 --- a/fpga/include/villas/fpga/ips/dma.hpp +++ b/fpga/include/villas/fpga/ips/dma.hpp @@ -49,6 +49,7 @@ public: : writeCompleteSimple(); } + size_t pollReadScatterGather(bool lock); Completion readComplete() { return hasScatterGather() ? readCompleteScatterGather() : readCompleteSimple(); @@ -61,11 +62,11 @@ public: inline bool hasScatterGather() const { return xConfig.HasSg; } - const StreamVertex &getDefaultSlavePort() const { + const StreamVertex &getDefaultSlavePort() const override { return getSlavePort(s2mmPort); } - const StreamVertex &getDefaultMasterPort() const { + const StreamVertex &getDefaultMasterPort() const override { return getMasterPort(mm2sPort); } @@ -103,7 +104,7 @@ private: // Optional Scatter-Gather interface to access descriptors static constexpr char sgInterface[] = "M_AXI_SG"; - std::list getMemoryBlocks() const { + std::list getMemoryBlocks() const override { return {registerMemory}; } @@ -114,8 +115,8 @@ private: bool configDone = false; // use polling to wait for DMA completion or interrupts via efds - bool polling = false; - bool cyclic = false; + bool polling = false; // polling mode is significantly lower latency + bool cyclic = false; // not fully implemented // Timeout after which the DMA controller issues in interrupt if no data has been received // Delay is 125 x x (clock period of SG clock). SG clock is 100 MHz by default. int delay = 0; @@ -140,19 +141,19 @@ private: class DmaFactory : NodeFactory { public: - virtual std::string getName() const { return "dma"; } + virtual std::string getName() const override { return "dma"; } - virtual std::string getDescription() const { + virtual std::string getDescription() const override { return "Xilinx's AXI4 Direct Memory Access Controller"; } private: - virtual Vlnv getCompatibleVlnv() const { + virtual Vlnv getCompatibleVlnv() const override { return Vlnv("xilinx.com:ip:axi_dma:"); } // Create a concrete IP instance - Core *make() const { return new Dma; }; + Core *make() const override { return new Dma; }; protected: virtual void parse(Core &ip, json_t *json) override; diff --git a/fpga/lib/ips/dma.cpp b/fpga/lib/ips/dma.cpp index 3e915b7e3..cf973bd21 100644 --- a/fpga/lib/ips/dma.cpp +++ b/fpga/lib/ips/dma.cpp @@ -9,6 +9,8 @@ #include #include +#include "xilinx/xaxidma_bd.h" +#include "xilinx/xaxidma_hw.h" #include #include @@ -334,7 +336,7 @@ bool Dma::writeScatterGather(const void *buf, size_t len) { } bool Dma::readScatterGather(void *buf, size_t len) { - int ret = XST_FAILURE; + uint32_t ret = XST_FAILURE; if (len < readCoalesce * readMsgSize) { throw RuntimeError( @@ -397,7 +399,7 @@ Dma::Completion Dma::writeCompleteScatterGather() { Completion c; XAxiDma_Bd *bd = nullptr, *curBd; auto txRing = XAxiDma_GetTxRing(&xDma); - int ret = XST_FAILURE; + uint32_t ret = XST_FAILURE; static size_t errcnt = 32; uint32_t irqStatus = 0; @@ -410,9 +412,7 @@ Dma::Completion Dma::writeCompleteScatterGather() { BdSts = XAxiDma_ReadReg((UINTPTR)CurBdPtr, XAXIDMA_BD_STS_OFFSET); } while (!(BdSts & XAXIDMA_BD_STS_COMPLETE_MASK)); // At this point, we know that the transmission is complete, but we haven't accessed the - // PCIe address space, yet. The subsequent DMA Controller management can be done in a - // separate thread to keep latencies in this thread extremly low. We know that we have - // received one BD. + // PCIe address space, yet. We know that we have received at least one BD. } else { c.interrupts = irqs[mm2sInterrupt].irqController->waitForInterrupt( irqs[mm2sInterrupt].num); @@ -451,7 +451,7 @@ Dma::Completion Dma::writeCompleteScatterGather() { if ((ret & XAXIDMA_BD_STS_ALL_ERR_MASK) || (!(ret & XAXIDMA_BD_STS_COMPLETE_MASK))) { hwLock.unlock(); - throw RuntimeError("Bd Status register shows error: {}", ret); + throw RuntimeError("Write: Bd Status register shows error: {:#x}", ret); } c.bytes += XAxiDma_BdGetLength(bd, txRing->MaxTransferLen); @@ -468,6 +468,25 @@ Dma::Completion Dma::writeCompleteScatterGather() { return c; } +size_t Dma::pollReadScatterGather(bool lock) { + if (lock) { + hwLock.lock(); + } + auto rxRing = XAxiDma_GetRxRing(&xDma); + XAxiDma_Bd *CurBdPtr = rxRing->HwHead; + volatile uint32_t BdSts; + // Poll BD status to avoid accessing PCIe address space + do { + BdSts = XAxiDma_ReadReg((UINTPTR)CurBdPtr, XAXIDMA_BD_STS_OFFSET); + } while (!(BdSts & XAXIDMA_BD_STS_COMPLETE_MASK)); + // At this point, we know that the transmission is complete, but we haven't accessed the + // PCIe address space, yet. We know that we have received at least one BD. + if (lock) { + hwLock.unlock(); + } + return XAxiDma_BdGetActualLength(CurBdPtr, XAXIDMA_MCHAN_MAX_TRANSFER_LEN); +} + Dma::Completion Dma::readCompleteScatterGather() { Completion c; XAxiDma_Bd *bd = nullptr, *curBd; @@ -479,16 +498,7 @@ Dma::Completion Dma::readCompleteScatterGather() { uint32_t irqStatus = 0; if (polling) { hwLock.lock(); - XAxiDma_Bd *CurBdPtr = rxRing->HwHead; - volatile uint32_t BdSts; - // Poll BD status to avoid accessing PCIe address space - do { - BdSts = XAxiDma_ReadReg((UINTPTR)CurBdPtr, XAXIDMA_BD_STS_OFFSET); - } while (!(BdSts & XAXIDMA_BD_STS_COMPLETE_MASK)); - // At this point, we know that the transmission is complete, but we haven't accessed the - // PCIe address space, yet. The subsequent DMA Controller management can be done in a - // separate thread to keep latencies in this thread extremly low. We know that we have - // received one BD. + pollReadScatterGather(false); intrs = 1; } else { intrs = irqs[s2mmInterrupt].irqController->waitForInterrupt( @@ -507,7 +517,6 @@ Dma::Completion Dma::readCompleteScatterGather() { c.interrupts = 0; return c; } else { - hwLock.unlock(); c.interrupts = intrs; } if (!polling) { @@ -550,7 +559,7 @@ Dma::Completion Dma::readCompleteScatterGather() { if ((ret & XAXIDMA_BD_STS_ALL_ERR_MASK) || (!(ret & XAXIDMA_BD_STS_COMPLETE_MASK))) { hwLock.unlock(); - throw RuntimeError("Bd Status register shows error: {}", ret); + throw RuntimeError("Read: Bd Status register shows error: {}", ret); } c.bytes += XAxiDma_BdGetActualLength(bd, rxRing->MaxTransferLen); diff --git a/include/villas/nodes/fpga.hpp b/include/villas/nodes/fpga.hpp index da9bdebd2..bf6f4a24d 100644 --- a/include/villas/nodes/fpga.hpp +++ b/include/villas/nodes/fpga.hpp @@ -9,6 +9,7 @@ #pragma once +#include #include #include #include @@ -31,17 +32,40 @@ protected: std::string cardName; std::list connectStrings; + // This setting decouples DMA management from Data processing. + // With this setting set to true, the DMA management for both read and + // write transactions is performed after the write command has been send + // the DMA controller. + // This allows us to achieve very low latencies for an application that + // waits for data from the FPGA processes it, and finished a time step + // by issuing a write to the FPGA. + bool lowLatencyMode; + // This setting performs synchronization with DMA controller in separate + // threads. It requires lowLatencyMode to be set to true. + // This may improve latency, because DMA management is completely decoupled + // from the data path, or may increase latency because of additional thread + // synchronization overhead. Only use after verifying that it improves latency. + bool asyncDmaManagement; + // State std::shared_ptr card; std::shared_ptr dma; - std::shared_ptr blockRx[2]; + std::shared_ptr blockRx; std::shared_ptr blockTx; // Non-public methods + virtual int asyncRead(Sample *smps[], unsigned cnt); + virtual int slowRead(Sample *smps[], unsigned cnt); virtual int _read(Sample *smps[], unsigned cnt) override; - virtual int _write(Sample *smps[], unsigned cnt) override; + // only used if asyncDmaManagement is true + volatile std::atomic_bool readActive; + volatile std::atomic_bool writeActive; + volatile std::atomic_bool stopThreads; + std::shared_ptr dmaThread; + virtual int dmaMgmtThread(); + public: FpgaNode(const uuid_t &id = {}, const std::string &name = ""); @@ -55,6 +79,8 @@ public: virtual int start() override; + virtual int stop() override; + virtual std::vector getPollFDs() override; virtual const std::string &getDetails() override; diff --git a/lib/nodes/fpga.cpp b/lib/nodes/fpga.cpp index 8d0cd0975..849ab7bc8 100644 --- a/lib/nodes/fpga.cpp +++ b/lib/nodes/fpga.cpp @@ -7,6 +7,7 @@ #include #include +#include #include #include @@ -32,8 +33,9 @@ static std::list> cards; static std::shared_ptr vfioContainer; FpgaNode::FpgaNode(const uuid_t &id, const std::string &name) - : Node(id, name), cardName(""), card(nullptr), dma(), blockRx(), blockTx() { -} + : Node(id, name), cardName(""), connectStrings(), card(nullptr), dma(), + blockRx(), blockTx(), readActive(false), writeActive(false), + stopThreads(false), dmaThread() {} FpgaNode::~FpgaNode() {} @@ -75,14 +77,12 @@ int FpgaNode::prepare() { auto &alloc = HostRam::getAllocator(); - blockRx[0] = alloc.allocateBlock(0x200 * sizeof(float)); - blockRx[1] = alloc.allocateBlock(0x200 * sizeof(float)); + blockRx = alloc.allocateBlock(0x200 * sizeof(float)); blockTx = alloc.allocateBlock(0x200 * sizeof(float)); - villas::MemoryAccessor memRx[] = {*(blockRx[0]), *(blockRx[1])}; + villas::MemoryAccessor memRx = *blockRx; villas::MemoryAccessor memTx = *blockTx; - dma->makeAccesibleFromVA(blockRx[0]); - dma->makeAccesibleFromVA(blockRx[1]); + dma->makeAccesibleFromVA(blockRx); dma->makeAccesibleFromVA(blockTx); MemoryManager::get().printGraph(); @@ -90,6 +90,16 @@ int FpgaNode::prepare() { return Node::prepare(); } +int FpgaNode::stop() { + if (asyncDmaManagement) { + stopThreads = true; + if (dmaThread) { + dmaThread->join(); + } + } + return Node::stop(); +} + int FpgaNode::parse(json_t *json) { int ret = Node::parse(json); if (ret) { @@ -105,8 +115,9 @@ int FpgaNode::parse(json_t *json) { vfioContainer = std::make_shared(); } - ret = json_unpack_ex(json, &err, 0, "{ s: o, s?: o}", "card", &jsonCard, - "connect", &jsonConnectStrings); + ret = json_unpack_ex(json, &err, 0, "{ s: o, s?: o, s?: b}", "card", + &jsonCard, "connect", &jsonConnectStrings, + "asyncDmaManagement", &asyncDmaManagement); if (ret) { throw ConfigError(json, err, "node-config-fpga", "Failed to parse configuration of node {}", @@ -167,20 +178,49 @@ const std::string &FpgaNode::getDetails() { int FpgaNode::check() { return 0; } int FpgaNode::start() { - // enque first read - // dma->read(*(blockRx[0]), blockRx[0]->getSize()); + if (asyncDmaManagement) { + dmaThread = std::make_shared(&FpgaNode::dmaMgmtThread, this); + } + dma->read(*blockRx, blockRx->getSize()); return Node::start(); } +int FpgaNode::dmaMgmtThread() { + while (readActive) { + usleep(1); + } + while (!stopThreads) { + // readActive must be true, writeActive must be false + dma->read(*blockRx, blockRx->getSize()); + readActive = true; + while (readActive && !stopThreads) { + } + while (!writeActive && !stopThreads) { + } + // readActive must be false, writeActive must be true + dma->writeComplete(); + writeActive = false; + } + + return 0; +} + int FpgaNode::_read(Sample *smps[], unsigned cnt) { - static size_t cur = 0, next = 0; unsigned read; Sample *smp = smps[0]; assert(cnt == 1); - dma->read(*(blockRx[next]), blockRx[next]->getSize()); // TODO: calc size + if (asyncDmaManagement) { + while (!readActive.load(std::memory_order_relaxed) && !stopThreads) + ; + } else { + // dma->read(*blockRx, blockRx->getSize()); + } auto c = dma->readComplete(); + if (asyncDmaManagement) { + readActive.store(false, std::memory_order_relaxed); + } read = c.bytes / sizeof(float); @@ -188,7 +228,7 @@ int FpgaNode::_read(Sample *smps[], unsigned cnt) { logger->warn("Missed {} interrupts", c.interrupts - 1); } - auto mem = MemoryAccessor(*(blockRx[cur])); + auto mem = MemoryAccessor(*blockRx); smp->length = 0; for (unsigned i = 0; i < MIN(read, smp->capacity); i++) { @@ -198,18 +238,20 @@ int FpgaNode::_read(Sample *smps[], unsigned cnt) { smp->flags = (int)SampleFlags::HAS_DATA; smp->signals = in.signals; - //cur = next; - //next = (next + 1) % (sizeof(blockRx) / sizeof(blockRx[0])); return 1; } int FpgaNode::_write(Sample *smps[], unsigned cnt) { - unsigned int written; + // unsigned int written; Sample *smp = smps[0]; assert(cnt == 1 && smps != nullptr && smps[0] != nullptr); + if (asyncDmaManagement) { + while (writeActive.load(std::memory_order_relaxed) && !stopThreads) + ; + } auto mem = MemoryAccessor(*blockTx); float scaled; @@ -224,15 +266,20 @@ int FpgaNode::_write(Sample *smps[], unsigned cnt) { } bool state = dma->write(*blockTx, smp->length * sizeof(float)); - if (!state) + if (!state) { return -1; + } + if (asyncDmaManagement) { + writeActive.store(true, std::memory_order_relaxed); + } else { + auto written = dma->writeComplete().bytes / + sizeof(float); // The number of samples written - written = dma->writeComplete().bytes / - sizeof(float); // The number of samples written - - if (written != smp->length) { - logger->warn("Wrote {} samples, but {} were expected", written, - smp->length); + if (written != smp->length) { + logger->warn("Wrote {} samples, but {} were expected", written, + smp->length); + } + dma->read(*blockRx, blockRx->getSize()); } return 1;