1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

fpga: improve dma latency

Signed-off-by: Niklas Eiling <niklas.eiling@eonerc.rwth-aachen.de>
This commit is contained in:
Niklas Eiling 2024-03-23 18:59:24 +01:00 committed by pipeacosta
parent 937cdda11f
commit f67ca37b0c
4 changed files with 136 additions and 53 deletions

View file

@ -49,6 +49,7 @@ public:
: writeCompleteSimple();
}
size_t pollReadScatterGather(bool lock);
Completion readComplete() {
return hasScatterGather() ? readCompleteScatterGather()
: readCompleteSimple();
@ -61,11 +62,11 @@ public:
inline bool hasScatterGather() const { return xConfig.HasSg; }
const StreamVertex &getDefaultSlavePort() const {
const StreamVertex &getDefaultSlavePort() const override {
return getSlavePort(s2mmPort);
}
const StreamVertex &getDefaultMasterPort() const {
const StreamVertex &getDefaultMasterPort() const override {
return getMasterPort(mm2sPort);
}
@ -103,7 +104,7 @@ private:
// Optional Scatter-Gather interface to access descriptors
static constexpr char sgInterface[] = "M_AXI_SG";
std::list<MemoryBlockName> getMemoryBlocks() const {
std::list<MemoryBlockName> getMemoryBlocks() const override {
return {registerMemory};
}
@ -114,8 +115,8 @@ private:
bool configDone = false;
// use polling to wait for DMA completion or interrupts via efds
bool polling = false;
bool cyclic = false;
bool polling = false; // polling mode is significantly lower latency
bool cyclic = false; // not fully implemented
// Timeout after which the DMA controller issues in interrupt if no data has been received
// Delay is 125 x <delay> x (clock period of SG clock). SG clock is 100 MHz by default.
int delay = 0;
@ -140,19 +141,19 @@ private:
class DmaFactory : NodeFactory {
public:
virtual std::string getName() const { return "dma"; }
virtual std::string getName() const override { return "dma"; }
virtual std::string getDescription() const {
virtual std::string getDescription() const override {
return "Xilinx's AXI4 Direct Memory Access Controller";
}
private:
virtual Vlnv getCompatibleVlnv() const {
virtual Vlnv getCompatibleVlnv() const override {
return Vlnv("xilinx.com:ip:axi_dma:");
}
// Create a concrete IP instance
Core *make() const { return new Dma; };
Core *make() const override { return new Dma; };
protected:
virtual void parse(Core &ip, json_t *json) override;

View file

@ -9,6 +9,8 @@
#include <sstream>
#include <string>
#include "xilinx/xaxidma_bd.h"
#include "xilinx/xaxidma_hw.h"
#include <sys/types.h>
#include <xilinx/xaxidma.h>
@ -334,7 +336,7 @@ bool Dma::writeScatterGather(const void *buf, size_t len) {
}
bool Dma::readScatterGather(void *buf, size_t len) {
int ret = XST_FAILURE;
uint32_t ret = XST_FAILURE;
if (len < readCoalesce * readMsgSize) {
throw RuntimeError(
@ -397,7 +399,7 @@ Dma::Completion Dma::writeCompleteScatterGather() {
Completion c;
XAxiDma_Bd *bd = nullptr, *curBd;
auto txRing = XAxiDma_GetTxRing(&xDma);
int ret = XST_FAILURE;
uint32_t ret = XST_FAILURE;
static size_t errcnt = 32;
uint32_t irqStatus = 0;
@ -410,9 +412,7 @@ Dma::Completion Dma::writeCompleteScatterGather() {
BdSts = XAxiDma_ReadReg((UINTPTR)CurBdPtr, XAXIDMA_BD_STS_OFFSET);
} while (!(BdSts & XAXIDMA_BD_STS_COMPLETE_MASK));
// At this point, we know that the transmission is complete, but we haven't accessed the
// PCIe address space, yet. The subsequent DMA Controller management can be done in a
// separate thread to keep latencies in this thread extremly low. We know that we have
// received one BD.
// PCIe address space, yet. We know that we have received at least one BD.
} else {
c.interrupts = irqs[mm2sInterrupt].irqController->waitForInterrupt(
irqs[mm2sInterrupt].num);
@ -451,7 +451,7 @@ Dma::Completion Dma::writeCompleteScatterGather() {
if ((ret & XAXIDMA_BD_STS_ALL_ERR_MASK) ||
(!(ret & XAXIDMA_BD_STS_COMPLETE_MASK))) {
hwLock.unlock();
throw RuntimeError("Bd Status register shows error: {}", ret);
throw RuntimeError("Write: Bd Status register shows error: {:#x}", ret);
}
c.bytes += XAxiDma_BdGetLength(bd, txRing->MaxTransferLen);
@ -468,6 +468,25 @@ Dma::Completion Dma::writeCompleteScatterGather() {
return c;
}
size_t Dma::pollReadScatterGather(bool lock) {
if (lock) {
hwLock.lock();
}
auto rxRing = XAxiDma_GetRxRing(&xDma);
XAxiDma_Bd *CurBdPtr = rxRing->HwHead;
volatile uint32_t BdSts;
// Poll BD status to avoid accessing PCIe address space
do {
BdSts = XAxiDma_ReadReg((UINTPTR)CurBdPtr, XAXIDMA_BD_STS_OFFSET);
} while (!(BdSts & XAXIDMA_BD_STS_COMPLETE_MASK));
// At this point, we know that the transmission is complete, but we haven't accessed the
// PCIe address space, yet. We know that we have received at least one BD.
if (lock) {
hwLock.unlock();
}
return XAxiDma_BdGetActualLength(CurBdPtr, XAXIDMA_MCHAN_MAX_TRANSFER_LEN);
}
Dma::Completion Dma::readCompleteScatterGather() {
Completion c;
XAxiDma_Bd *bd = nullptr, *curBd;
@ -479,16 +498,7 @@ Dma::Completion Dma::readCompleteScatterGather() {
uint32_t irqStatus = 0;
if (polling) {
hwLock.lock();
XAxiDma_Bd *CurBdPtr = rxRing->HwHead;
volatile uint32_t BdSts;
// Poll BD status to avoid accessing PCIe address space
do {
BdSts = XAxiDma_ReadReg((UINTPTR)CurBdPtr, XAXIDMA_BD_STS_OFFSET);
} while (!(BdSts & XAXIDMA_BD_STS_COMPLETE_MASK));
// At this point, we know that the transmission is complete, but we haven't accessed the
// PCIe address space, yet. The subsequent DMA Controller management can be done in a
// separate thread to keep latencies in this thread extremly low. We know that we have
// received one BD.
pollReadScatterGather(false);
intrs = 1;
} else {
intrs = irqs[s2mmInterrupt].irqController->waitForInterrupt(
@ -507,7 +517,6 @@ Dma::Completion Dma::readCompleteScatterGather() {
c.interrupts = 0;
return c;
} else {
hwLock.unlock();
c.interrupts = intrs;
}
if (!polling) {
@ -550,7 +559,7 @@ Dma::Completion Dma::readCompleteScatterGather() {
if ((ret & XAXIDMA_BD_STS_ALL_ERR_MASK) ||
(!(ret & XAXIDMA_BD_STS_COMPLETE_MASK))) {
hwLock.unlock();
throw RuntimeError("Bd Status register shows error: {}", ret);
throw RuntimeError("Read: Bd Status register shows error: {}", ret);
}
c.bytes += XAxiDma_BdGetActualLength(bd, rxRing->MaxTransferLen);

View file

@ -9,6 +9,7 @@
#pragma once
#include <thread>
#include <villas/format.hpp>
#include <villas/node.hpp>
#include <villas/node/config.hpp>
@ -31,17 +32,40 @@ protected:
std::string cardName;
std::list<std::string> connectStrings;
// This setting decouples DMA management from Data processing.
// With this setting set to true, the DMA management for both read and
// write transactions is performed after the write command has been send
// the DMA controller.
// This allows us to achieve very low latencies for an application that
// waits for data from the FPGA processes it, and finished a time step
// by issuing a write to the FPGA.
bool lowLatencyMode;
// This setting performs synchronization with DMA controller in separate
// threads. It requires lowLatencyMode to be set to true.
// This may improve latency, because DMA management is completely decoupled
// from the data path, or may increase latency because of additional thread
// synchronization overhead. Only use after verifying that it improves latency.
bool asyncDmaManagement;
// State
std::shared_ptr<fpga::Card> card;
std::shared_ptr<villas::fpga::ip::Dma> dma;
std::shared_ptr<villas::MemoryBlock> blockRx[2];
std::shared_ptr<villas::MemoryBlock> blockRx;
std::shared_ptr<villas::MemoryBlock> blockTx;
// Non-public methods
virtual int asyncRead(Sample *smps[], unsigned cnt);
virtual int slowRead(Sample *smps[], unsigned cnt);
virtual int _read(Sample *smps[], unsigned cnt) override;
virtual int _write(Sample *smps[], unsigned cnt) override;
// only used if asyncDmaManagement is true
volatile std::atomic_bool readActive;
volatile std::atomic_bool writeActive;
volatile std::atomic_bool stopThreads;
std::shared_ptr<std::thread> dmaThread;
virtual int dmaMgmtThread();
public:
FpgaNode(const uuid_t &id = {}, const std::string &name = "");
@ -55,6 +79,8 @@ public:
virtual int start() override;
virtual int stop() override;
virtual std::vector<int> getPollFDs() override;
virtual const std::string &getDetails() override;

View file

@ -7,6 +7,7 @@
#include <memory>
#include <string>
#include <unistd.h>
#include <vector>
#include <jansson.h>
@ -32,8 +33,9 @@ static std::list<std::shared_ptr<fpga::Card>> cards;
static std::shared_ptr<kernel::vfio::Container> vfioContainer;
FpgaNode::FpgaNode(const uuid_t &id, const std::string &name)
: Node(id, name), cardName(""), card(nullptr), dma(), blockRx(), blockTx() {
}
: Node(id, name), cardName(""), connectStrings(), card(nullptr), dma(),
blockRx(), blockTx(), readActive(false), writeActive(false),
stopThreads(false), dmaThread() {}
FpgaNode::~FpgaNode() {}
@ -75,14 +77,12 @@ int FpgaNode::prepare() {
auto &alloc = HostRam::getAllocator();
blockRx[0] = alloc.allocateBlock(0x200 * sizeof(float));
blockRx[1] = alloc.allocateBlock(0x200 * sizeof(float));
blockRx = alloc.allocateBlock(0x200 * sizeof(float));
blockTx = alloc.allocateBlock(0x200 * sizeof(float));
villas::MemoryAccessor<float> memRx[] = {*(blockRx[0]), *(blockRx[1])};
villas::MemoryAccessor<float> memRx = *blockRx;
villas::MemoryAccessor<float> memTx = *blockTx;
dma->makeAccesibleFromVA(blockRx[0]);
dma->makeAccesibleFromVA(blockRx[1]);
dma->makeAccesibleFromVA(blockRx);
dma->makeAccesibleFromVA(blockTx);
MemoryManager::get().printGraph();
@ -90,6 +90,16 @@ int FpgaNode::prepare() {
return Node::prepare();
}
int FpgaNode::stop() {
if (asyncDmaManagement) {
stopThreads = true;
if (dmaThread) {
dmaThread->join();
}
}
return Node::stop();
}
int FpgaNode::parse(json_t *json) {
int ret = Node::parse(json);
if (ret) {
@ -105,8 +115,9 @@ int FpgaNode::parse(json_t *json) {
vfioContainer = std::make_shared<kernel::vfio::Container>();
}
ret = json_unpack_ex(json, &err, 0, "{ s: o, s?: o}", "card", &jsonCard,
"connect", &jsonConnectStrings);
ret = json_unpack_ex(json, &err, 0, "{ s: o, s?: o, s?: b}", "card",
&jsonCard, "connect", &jsonConnectStrings,
"asyncDmaManagement", &asyncDmaManagement);
if (ret) {
throw ConfigError(json, err, "node-config-fpga",
"Failed to parse configuration of node {}",
@ -167,20 +178,49 @@ const std::string &FpgaNode::getDetails() {
int FpgaNode::check() { return 0; }
int FpgaNode::start() {
// enque first read
// dma->read(*(blockRx[0]), blockRx[0]->getSize());
if (asyncDmaManagement) {
dmaThread = std::make_shared<std::thread>(&FpgaNode::dmaMgmtThread, this);
}
dma->read(*blockRx, blockRx->getSize());
return Node::start();
}
int FpgaNode::dmaMgmtThread() {
while (readActive) {
usleep(1);
}
while (!stopThreads) {
// readActive must be true, writeActive must be false
dma->read(*blockRx, blockRx->getSize());
readActive = true;
while (readActive && !stopThreads) {
}
while (!writeActive && !stopThreads) {
}
// readActive must be false, writeActive must be true
dma->writeComplete();
writeActive = false;
}
return 0;
}
int FpgaNode::_read(Sample *smps[], unsigned cnt) {
static size_t cur = 0, next = 0;
unsigned read;
Sample *smp = smps[0];
assert(cnt == 1);
dma->read(*(blockRx[next]), blockRx[next]->getSize()); // TODO: calc size
if (asyncDmaManagement) {
while (!readActive.load(std::memory_order_relaxed) && !stopThreads)
;
} else {
// dma->read(*blockRx, blockRx->getSize());
}
auto c = dma->readComplete();
if (asyncDmaManagement) {
readActive.store(false, std::memory_order_relaxed);
}
read = c.bytes / sizeof(float);
@ -188,7 +228,7 @@ int FpgaNode::_read(Sample *smps[], unsigned cnt) {
logger->warn("Missed {} interrupts", c.interrupts - 1);
}
auto mem = MemoryAccessor<float>(*(blockRx[cur]));
auto mem = MemoryAccessor<float>(*blockRx);
smp->length = 0;
for (unsigned i = 0; i < MIN(read, smp->capacity); i++) {
@ -198,18 +238,20 @@ int FpgaNode::_read(Sample *smps[], unsigned cnt) {
smp->flags = (int)SampleFlags::HAS_DATA;
smp->signals = in.signals;
//cur = next;
//next = (next + 1) % (sizeof(blockRx) / sizeof(blockRx[0]));
return 1;
}
int FpgaNode::_write(Sample *smps[], unsigned cnt) {
unsigned int written;
// unsigned int written;
Sample *smp = smps[0];
assert(cnt == 1 && smps != nullptr && smps[0] != nullptr);
if (asyncDmaManagement) {
while (writeActive.load(std::memory_order_relaxed) && !stopThreads)
;
}
auto mem = MemoryAccessor<uint32_t>(*blockTx);
float scaled;
@ -224,15 +266,20 @@ int FpgaNode::_write(Sample *smps[], unsigned cnt) {
}
bool state = dma->write(*blockTx, smp->length * sizeof(float));
if (!state)
if (!state) {
return -1;
}
if (asyncDmaManagement) {
writeActive.store(true, std::memory_order_relaxed);
} else {
auto written = dma->writeComplete().bytes /
sizeof(float); // The number of samples written
written = dma->writeComplete().bytes /
sizeof(float); // The number of samples written
if (written != smp->length) {
logger->warn("Wrote {} samples, but {} were expected", written,
smp->length);
if (written != smp->length) {
logger->warn("Wrote {} samples, but {} were expected", written,
smp->length);
}
dma->read(*blockRx, blockRx->getSize());
}
return 1;