mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
fpga: add lowLatencyMode setting
This setting improves latency by remove various checks. Use with caution! Requires read cache in FPGA design! The common use case in VILLASfpga is that we have exactly one write for every read and the number of exchanged signals do not change. If this is the case, we can reuse the buffer descriptors during reads and write, thus avoidng freeing, reallocating and setting them up. We set up the descriptors in start, and in write or read, we only reset the complete bit in the buffer descriptor and write to the tdesc register to start the DMA transfer. Improves read/write latency by approx. 40%. Signed-off-by: Niklas Eiling <niklas.eiling@eonerc.rwth-aachen.de>
This commit is contained in:
parent
d03b19613f
commit
87a1628c4a
2 changed files with 107 additions and 78 deletions
|
@ -32,20 +32,18 @@ protected:
|
|||
std::string cardName;
|
||||
std::list<std::string> connectStrings;
|
||||
|
||||
// This setting decouples DMA management from Data processing.
|
||||
// With this setting set to true, the DMA management for both read and
|
||||
// write transactions is performed after the write command has been send
|
||||
// the DMA controller.
|
||||
// This allows us to achieve very low latencies for an application that
|
||||
// waits for data from the FPGA processes it, and finished a time step
|
||||
// by issuing a write to the FPGA.
|
||||
// This setting improves latency by remove various checks.
|
||||
// Use with caution! Requires read cache in FPGA design!
|
||||
// The common use case in VILLASfpga is that we have exactly
|
||||
// one write for every read and the number of exchanged signals
|
||||
// do not change. If this is the case, we can reuse the buffer
|
||||
// descriptors during reads and write, thus avoidng freeing,
|
||||
// reallocating and setting them up.
|
||||
// We set up the descriptors in start, and in write or read,
|
||||
// we only reset the complete bit in the buffer descriptor and
|
||||
// write to the tdesc register to start the DMA transfer.
|
||||
// Improves read/write latency by approx. 40%.
|
||||
bool lowLatencyMode;
|
||||
// This setting performs synchronization with DMA controller in separate
|
||||
// threads. It requires lowLatencyMode to be set to true.
|
||||
// This may improve latency, because DMA management is completely decoupled
|
||||
// from the data path, or may increase latency because of additional thread
|
||||
// synchronization overhead. Only use after verifying that it improves latency.
|
||||
bool asyncDmaManagement;
|
||||
|
||||
// State
|
||||
std::shared_ptr<fpga::Card> card;
|
||||
|
@ -54,18 +52,13 @@ protected:
|
|||
std::shared_ptr<villas::MemoryBlock> blockTx;
|
||||
|
||||
// Non-public methods
|
||||
virtual int asyncRead(Sample *smps[], unsigned cnt);
|
||||
virtual int fastRead(Sample *smps[], unsigned cnt);
|
||||
virtual int slowRead(Sample *smps[], unsigned cnt);
|
||||
virtual int _read(Sample *smps[], unsigned cnt) override;
|
||||
virtual int fastWrite(Sample *smps[], unsigned cnt);
|
||||
virtual int slowWrite(Sample *smps[], unsigned cnt);
|
||||
virtual int _write(Sample *smps[], unsigned cnt) override;
|
||||
|
||||
// only used if asyncDmaManagement is true
|
||||
volatile std::atomic_bool readActive;
|
||||
volatile std::atomic_bool writeActive;
|
||||
volatile std::atomic_bool stopThreads;
|
||||
std::shared_ptr<std::thread> dmaThread;
|
||||
virtual int dmaMgmtThread();
|
||||
|
||||
public:
|
||||
FpgaNode(const uuid_t &id = {}, const std::string &name = "");
|
||||
|
||||
|
|
|
@ -33,9 +33,8 @@ static std::list<std::shared_ptr<fpga::Card>> cards;
|
|||
static std::shared_ptr<kernel::vfio::Container> vfioContainer;
|
||||
|
||||
FpgaNode::FpgaNode(const uuid_t &id, const std::string &name)
|
||||
: Node(id, name), cardName(""), connectStrings(), card(nullptr), dma(),
|
||||
blockRx(), blockTx(), readActive(false), writeActive(false),
|
||||
stopThreads(false), dmaThread() {}
|
||||
: Node(id, name), cardName(""), connectStrings(), lowLatencyMode(false),
|
||||
card(nullptr), dma(), blockRx(), blockTx() {}
|
||||
|
||||
FpgaNode::~FpgaNode() {}
|
||||
|
||||
|
@ -90,15 +89,7 @@ int FpgaNode::prepare() {
|
|||
return Node::prepare();
|
||||
}
|
||||
|
||||
int FpgaNode::stop() {
|
||||
if (asyncDmaManagement) {
|
||||
stopThreads = true;
|
||||
if (dmaThread) {
|
||||
dmaThread->join();
|
||||
}
|
||||
}
|
||||
return Node::stop();
|
||||
}
|
||||
int FpgaNode::stop() { return Node::stop(); }
|
||||
|
||||
int FpgaNode::parse(json_t *json) {
|
||||
int ret = Node::parse(json);
|
||||
|
@ -115,9 +106,9 @@ int FpgaNode::parse(json_t *json) {
|
|||
vfioContainer = std::make_shared<kernel::vfio::Container>();
|
||||
}
|
||||
|
||||
ret = json_unpack_ex(json, &err, 0, "{ s: o, s?: o, s?: b}", "card",
|
||||
ret = json_unpack_ex(json, &err, 0, "{ s: o, s?: o, s?: b, s?: b}", "card",
|
||||
&jsonCard, "connect", &jsonConnectStrings,
|
||||
"asyncDmaManagement", &asyncDmaManagement);
|
||||
"lowLatencyMode", &lowLatencyMode);
|
||||
if (ret) {
|
||||
throw ConfigError(json, err, "node-config-fpga",
|
||||
"Failed to parse configuration of node {}",
|
||||
|
@ -169,7 +160,8 @@ const std::string &FpgaNode::getDetails() {
|
|||
std::copy(connectStrings.begin(), connectStrings.end(),
|
||||
std::ostream_iterator<std::string>(imploded, delim));
|
||||
|
||||
details = fmt::format("fpga={}, connect={}", name, imploded.str());
|
||||
details = fmt::format("fpga={}, connect={}, lowLatencyMode={}", name,
|
||||
imploded.str(), lowLatencyMode);
|
||||
}
|
||||
|
||||
return details;
|
||||
|
@ -178,49 +170,94 @@ const std::string &FpgaNode::getDetails() {
|
|||
int FpgaNode::check() { return 0; }
|
||||
|
||||
int FpgaNode::start() {
|
||||
if (asyncDmaManagement) {
|
||||
dmaThread = std::make_shared<std::thread>(&FpgaNode::dmaMgmtThread, this);
|
||||
if (getInputSignalsMaxCount() * sizeof(float) > blockRx->getSize()) {
|
||||
logger->error("Input signals exceed block size.");
|
||||
throw villas ::RuntimeError("Input signals exceed block size.");
|
||||
}
|
||||
dma->read(*blockRx, blockRx->getSize());
|
||||
if (lowLatencyMode) {
|
||||
dma->readScatterGatherPrepare(*blockRx, blockRx->getSize());
|
||||
if (getInputSignalsMaxCount() != 0) {
|
||||
dma->writeScatterGatherPrepare(*blockTx,
|
||||
getInputSignalsMaxCount() * sizeof(float));
|
||||
} else {
|
||||
logger->warn("No input signals defined. Not preparing write buffer - "
|
||||
"writes will not work.");
|
||||
}
|
||||
}
|
||||
|
||||
return Node::start();
|
||||
}
|
||||
|
||||
int FpgaNode::dmaMgmtThread() {
|
||||
while (readActive) {
|
||||
usleep(1);
|
||||
}
|
||||
while (!stopThreads) {
|
||||
// readActive must be true, writeActive must be false
|
||||
dma->read(*blockRx, blockRx->getSize());
|
||||
readActive = true;
|
||||
while (readActive && !stopThreads) {
|
||||
}
|
||||
while (!writeActive && !stopThreads) {
|
||||
}
|
||||
// readActive must be false, writeActive must be true
|
||||
dma->writeComplete();
|
||||
writeActive = false;
|
||||
int FpgaNode::fastWrite(Sample *smps[], unsigned cnt) {
|
||||
Sample *smp = smps[0];
|
||||
|
||||
assert(cnt == 1 && smps != nullptr && smps[0] != nullptr);
|
||||
|
||||
auto mem = MemoryAccessor<uint32_t>(*blockTx);
|
||||
float scaled;
|
||||
|
||||
for (unsigned i = 0; i < smp->length; i++) {
|
||||
if (smp->signals->getByIndex(i)->type == SignalType::FLOAT) {
|
||||
scaled = smp->data[i].f;
|
||||
if (scaled > 10.) {
|
||||
scaled = 10.;
|
||||
} else if (scaled < -10.) {
|
||||
scaled = -10.;
|
||||
}
|
||||
mem[i] = (scaled + 10.) * ((float)0xFFFF / 20.);
|
||||
} else {
|
||||
mem[i] = smp->data[i].i;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
dma->writeScatterGatherFast();
|
||||
auto written = dma->writeScatterGatherPoll() /
|
||||
sizeof(float); // The number of samples written
|
||||
|
||||
if (written != smp->length) {
|
||||
logger->warn("Wrote {} samples, but {} were expected", written,
|
||||
smp->length);
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
int FpgaNode::fastRead(Sample *smps[], unsigned cnt) {
|
||||
Sample *smp = smps[0];
|
||||
auto mem = MemoryAccessor<float>(*blockRx);
|
||||
|
||||
smp->flags = (int)SampleFlags::HAS_DATA;
|
||||
smp->signals = in.signals;
|
||||
|
||||
dma->readScatterGatherFast();
|
||||
auto read = dma->readScatterGatherPoll(true);
|
||||
// We assume a lot without checking at this point. All for the latency!
|
||||
|
||||
smp->length = 0;
|
||||
for (unsigned i = 0; i < MIN(read / sizeof(float), smp->capacity); i++) {
|
||||
smp->data[i].f = static_cast<double>(mem[i]);
|
||||
smp->length++;
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
int FpgaNode::_read(Sample *smps[], unsigned cnt) {
|
||||
if (lowLatencyMode) {
|
||||
return fastRead(smps, cnt);
|
||||
} else {
|
||||
return slowRead(smps, cnt);
|
||||
}
|
||||
}
|
||||
|
||||
int FpgaNode::slowRead(Sample *smps[], unsigned cnt) {
|
||||
unsigned read;
|
||||
Sample *smp = smps[0];
|
||||
|
||||
assert(cnt == 1);
|
||||
|
||||
if (asyncDmaManagement) {
|
||||
while (!readActive.load(std::memory_order_relaxed) && !stopThreads)
|
||||
;
|
||||
} else {
|
||||
// dma->read(*blockRx, blockRx->getSize());
|
||||
}
|
||||
dma->read(*blockRx, blockRx->getSize());
|
||||
auto c = dma->readComplete();
|
||||
if (asyncDmaManagement) {
|
||||
readActive.store(false, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
read = c.bytes / sizeof(float);
|
||||
|
||||
|
@ -243,15 +280,19 @@ int FpgaNode::_read(Sample *smps[], unsigned cnt) {
|
|||
}
|
||||
|
||||
int FpgaNode::_write(Sample *smps[], unsigned cnt) {
|
||||
if (lowLatencyMode) {
|
||||
return fastWrite(smps, cnt);
|
||||
} else {
|
||||
return slowWrite(smps, cnt);
|
||||
}
|
||||
}
|
||||
|
||||
int FpgaNode::slowWrite(Sample *smps[], unsigned cnt) {
|
||||
// unsigned int written;
|
||||
Sample *smp = smps[0];
|
||||
|
||||
assert(cnt == 1 && smps != nullptr && smps[0] != nullptr);
|
||||
|
||||
if (asyncDmaManagement) {
|
||||
while (writeActive.load(std::memory_order_relaxed) && !stopThreads)
|
||||
;
|
||||
}
|
||||
auto mem = MemoryAccessor<uint32_t>(*blockTx);
|
||||
float scaled;
|
||||
|
||||
|
@ -269,17 +310,12 @@ int FpgaNode::_write(Sample *smps[], unsigned cnt) {
|
|||
if (!state) {
|
||||
return -1;
|
||||
}
|
||||
if (asyncDmaManagement) {
|
||||
writeActive.store(true, std::memory_order_relaxed);
|
||||
} else {
|
||||
auto written = dma->writeComplete().bytes /
|
||||
sizeof(float); // The number of samples written
|
||||
auto written = dma->writeComplete().bytes /
|
||||
sizeof(float); // The number of samples written
|
||||
|
||||
if (written != smp->length) {
|
||||
logger->warn("Wrote {} samples, but {} were expected", written,
|
||||
smp->length);
|
||||
}
|
||||
dma->read(*blockRx, blockRx->getSize());
|
||||
if (written != smp->length) {
|
||||
logger->warn("Wrote {} samples, but {} were expected", written,
|
||||
smp->length);
|
||||
}
|
||||
|
||||
return 1;
|
||||
|
|
Loading…
Add table
Reference in a new issue