1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

add check for missed interrupts when handling reads

introduce new struct Completion that is returned by Dma::readCompletion
and Dma::writeCompletion

Signed-off-by: Niklas Eiling <niklas.eiling@eonerc.rwth-aachen.de>
This commit is contained in:
Niklas Eiling 2023-01-11 15:08:55 +01:00
parent ab39f57405
commit 590cef10d0
5 changed files with 74 additions and 62 deletions

View file

@ -38,12 +38,19 @@ public:
// Stream to memory-mapped (S2MM)
bool read(const MemoryBlock &mem, size_t len);
size_t writeComplete()
struct Completion {
Completion() : bytes(0), bds(0), interrupts(0) { }
size_t bytes; // Number of bytes transferred
size_t bds; // Number of buffer descriptors used (only for scatter-gather)
size_t interrupts; // Number of interrupts received since last call (only if interrupts enabled)
};
Completion writeComplete()
{
return hasScatterGather() ? writeCompleteScatterGather() : writeCompleteSimple();
}
size_t readComplete()
Completion readComplete()
{
return hasScatterGather() ? readCompleteScatterGather() : readCompleteSimple();
}
@ -69,22 +76,6 @@ public:
return getMasterPort(mm2sPort);
}
private:
bool writeScatterGather(const void* buf, size_t len);
bool readScatterGather(void* buf, size_t len);
size_t writeCompleteScatterGather();
size_t readCompleteScatterGather();
bool writeSimple(const void* buf, size_t len);
bool readSimple(void* buf, size_t len);
size_t writeCompleteSimple();
size_t readCompleteSimple();
void setupScatterGather();
void setupScatterGatherRingRx();
void setupScatterGatherRingTx();
public:
static constexpr const char* s2mmPort = "S2MM";
static constexpr const char* mm2sPort = "MM2S";
@ -92,8 +83,21 @@ public:
virtual
void dump() override;
private:
bool writeScatterGather(const void* buf, size_t len);
bool readScatterGather(void* buf, size_t len);
Completion writeCompleteScatterGather();
Completion readCompleteScatterGather();
bool writeSimple(const void* buf, size_t len);
bool readSimple(void* buf, size_t len);
Completion writeCompleteSimple();
Completion readCompleteSimple();
void setupScatterGather();
void setupScatterGatherRingRx();
void setupScatterGatherRingTx();
static constexpr char registerMemory[] = "Reg";
static constexpr char mm2sInterrupt[] = "mm2s_introut";

View file

@ -107,7 +107,7 @@ public:
if (std::snprintf(nextBufPos(), formatStringSize+1, formatString, sampleCnt, value) > (int)formatStringSize) {
throw RuntimeError("Output buffer too small");
}
sampleCnt = (sampleCnt+1)%100000;
sampleCnt = (sampleCnt+1) % 100000;
}
protected:
@ -116,7 +116,10 @@ protected:
size_t sampleCnt;
};
std::unique_ptr<BufferedSampleFormatter> getBufferedSampleFormatter(const std::string &format, size_t bufSizeInSamples)
std::unique_ptr<BufferedSampleFormatter> getBufferedSampleFormatter(
const std::string &format,
size_t bufSizeInSamples)
{
if (format == "long") {
return std::make_unique<BufferedSampleFormatterLong>(bufSizeInSamples);

View file

@ -210,10 +210,10 @@ bool Dma::memcpy(const MemoryBlock &src, const MemoryBlock &dst, size_t len)
if (this->write(src, len) == 0)
return false;
if (not this->writeComplete())
if (not this->writeComplete().bds)
return false;
if (not this->readComplete())
if (not this->readComplete().bds)
return false;
return true;
@ -344,20 +344,18 @@ bool Dma::readScatterGather(void *buf, size_t len)
return true;
}
size_t
Dma::writeCompleteScatterGather()
Dma::Completion Dma::writeCompleteScatterGather()
{
Completion c;
XAxiDma_Bd *bd = nullptr, *curBd;
size_t processedBds = 0;
auto txRing = XAxiDma_GetTxRing(&xDma);
int ret = XST_FAILURE;
size_t bytesWritten = 0;
if ((processedBds = XAxiDma_BdRingFromHw(txRing, 1, &bd)) == 0)
if ((c.bds = XAxiDma_BdRingFromHw(txRing, 1, &bd)) == 0)
{
/*auto intrNum = */irqs[mm2sInterrupt].irqController->waitForInterrupt(irqs[mm2sInterrupt].num);
c.interrupts = irqs[mm2sInterrupt].irqController->waitForInterrupt(irqs[mm2sInterrupt].num);
//logger->info("Got {} interrupts (id: {}) from write channel", intrNum, irqs[mm2sInterrupt].num);
processedBds = XAxiDma_BdRingFromHw(txRing, 1, &bd);
c.bds = XAxiDma_BdRingFromHw(txRing, 1, &bd);
}
// Acknowledge the interrupt
@ -368,41 +366,38 @@ Dma::writeCompleteScatterGather()
throw RuntimeError("Bd was null.");
curBd = bd;
for (size_t i = 0; i < processedBds; i++) {
for (size_t i = 0; i < c.bds; i++) {
ret = XAxiDma_BdGetSts(curBd);
if ((ret & XAXIDMA_BD_STS_ALL_ERR_MASK) || (!(ret & XAXIDMA_BD_STS_COMPLETE_MASK))) {
throw RuntimeError("Bd Status register shows error: {}", ret);
break;
}
bytesWritten += XAxiDma_BdGetLength(bd, txRing->MaxTransferLen);
c.bytes += XAxiDma_BdGetLength(bd, txRing->MaxTransferLen);
curBd = (XAxiDma_Bd *) XAxiDma_BdRingNext(txRing, curBd);
}
ret = XAxiDma_BdRingFree(txRing, processedBds, bd);
ret = XAxiDma_BdRingFree(txRing, c.bds, bd);
if (ret != XST_SUCCESS)
throw RuntimeError("Failed to free {} TX BDs {}", processedBds, ret);
throw RuntimeError("Failed to free {} TX BDs {}", c.bds, ret);
return bytesWritten;
return c;
}
size_t
Dma::readCompleteScatterGather()
Dma::Completion Dma::readCompleteScatterGather()
{
Completion c;
XAxiDma_Bd *bd = nullptr, *curBd;
size_t processedBds = 0;
auto rxRing = XAxiDma_GetRxRing(&xDma);
int ret = XST_FAILURE;
size_t bytesRead = 0;
static size_t errcnt = 32;
//auto intrNum =
irqs[s2mmInterrupt].irqController->waitForInterrupt(irqs[s2mmInterrupt].num);
c.interrupts = irqs[s2mmInterrupt].irqController->waitForInterrupt(irqs[s2mmInterrupt].num);
// Wait until the data has been received by the RX channel.
if ((processedBds = XAxiDma_BdRingFromHw(rxRing, readCoalesce, &bd)) < readCoalesce)
if ((c.bds = XAxiDma_BdRingFromHw(rxRing, readCoalesce, &bd)) < readCoalesce)
{
logger->warn("Got partial batch of {}/{} BDs.", processedBds, readCoalesce);
logger->warn("Got partial batch of {}/{} BDs.", c.bds, readCoalesce);
if(errcnt-- == 0) {
throw RuntimeError("too many partial batches");
}
@ -412,30 +407,32 @@ Dma::readCompleteScatterGather()
auto irqStatus = XAxiDma_BdRingGetIrq(rxRing);
XAxiDma_BdRingAckIrq(rxRing, irqStatus);
if (processedBds == 0)
return 0;
if (c.bds == 0) {
c.bytes = 0;
return c;
}
if (bd == nullptr)
throw RuntimeError("Bd was null.");
curBd = bd;
for (size_t i = 0; i < processedBds; i++) {
for (size_t i = 0; i < c.bds; i++) {
ret = XAxiDma_BdGetSts(curBd);
if ((ret & XAXIDMA_BD_STS_ALL_ERR_MASK) || (!(ret & XAXIDMA_BD_STS_COMPLETE_MASK))) {
throw RuntimeError("Bd Status register shows error: {}", ret);
break;
}
bytesRead += XAxiDma_BdGetActualLength(bd, rxRing->MaxTransferLen);
c.bytes += XAxiDma_BdGetActualLength(bd, rxRing->MaxTransferLen);
curBd = (XAxiDma_Bd *) XAxiDma_BdRingNext(rxRing, curBd);
}
// Free all processed RX BDs for future transmission.
ret = XAxiDma_BdRingFree(rxRing, processedBds, bd);
ret = XAxiDma_BdRingFree(rxRing, c.bds, bd);
if (ret != XST_SUCCESS)
throw RuntimeError("Failed to free {} TX BDs {}.", processedBds, ret);
throw RuntimeError("Failed to free {} TX BDs {}.", c.bds, ret);
return bytesRead;
return c;
}
bool Dma::writeSimple(const void *buf, size_t len)
@ -518,32 +515,34 @@ bool Dma::readSimple(void *buf, size_t len)
return true;
}
size_t
Dma::writeCompleteSimple()
Dma::Completion Dma::writeCompleteSimple()
{
Completion c;
while (!(XAxiDma_IntrGetIrq(&xDma, XAXIDMA_DMA_TO_DEVICE) & XAXIDMA_IRQ_IOC_MASK))
irqs[mm2sInterrupt].irqController->waitForInterrupt(irqs[mm2sInterrupt]);
c.interrupts = irqs[mm2sInterrupt].irqController->waitForInterrupt(irqs[mm2sInterrupt]);
XAxiDma_IntrAckIrq(&xDma, XAXIDMA_IRQ_IOC_MASK, XAXIDMA_DMA_TO_DEVICE);
const XAxiDma_BdRing *ring = XAxiDma_GetTxRing(&xDma);
const size_t bytesWritten = XAxiDma_ReadReg(ring->ChanBase, XAXIDMA_BUFFLEN_OFFSET);
return bytesWritten;
c.bytes = bytesWritten;
return c;
}
size_t
Dma::readCompleteSimple()
Dma::Completion Dma::readCompleteSimple()
{
Completion c;
while (!(XAxiDma_IntrGetIrq(&xDma, XAXIDMA_DEVICE_TO_DMA) & XAXIDMA_IRQ_IOC_MASK))
irqs[s2mmInterrupt].irqController->waitForInterrupt(irqs[s2mmInterrupt]);
c.interrupts = irqs[s2mmInterrupt].irqController->waitForInterrupt(irqs[s2mmInterrupt]);
XAxiDma_IntrAckIrq(&xDma, XAXIDMA_IRQ_IOC_MASK, XAXIDMA_DEVICE_TO_DMA);
const XAxiDma_BdRing *ring = XAxiDma_GetRxRing(&xDma);
const size_t bytesRead = XAxiDma_ReadReg(ring->ChanBase, XAXIDMA_BUFFLEN_OFFSET);
return bytesRead;
c.bytes = bytesRead;
return c;
}
void Dma::makeAccesibleFromVA(std::shared_ptr<MemoryBlock> mem)

View file

@ -54,7 +54,6 @@ void readFromDmaToStdOut(std::shared_ptr<villas::fpga::ip::Dma> dma,
size_t cur = 0, next = 1;
std::ios::sync_with_stdio(false);
size_t bytesRead;
// Setup read transfer
dma->read(*block[0], block[0]->getSize());
@ -63,9 +62,16 @@ void readFromDmaToStdOut(std::shared_ptr<villas::fpga::ip::Dma> dma,
logger->trace("Read from stream and write to address {}:{:p}", block[next]->getAddrSpaceId(), block[next]->getOffset());
// We could use the number of interrupts to determine if we missed a chunk of data
dma->read(*block[next], block[next]->getSize());
bytesRead = dma->readComplete();
auto c = dma->readComplete();
for (size_t i = 0; i*4 < bytesRead; i++) {
if (c.interrupts > 1) {
logger->warn("Missed {} interrupts", c.interrupts - 1);
}
logger->trace("bytes: {}, intrs: {}, bds: {}",
c.bytes, c.interrupts, c.bds);
for (size_t i = 0; i*4 < c.bytes; i++) {
int32_t ival = mem[cur][i];
float fval = *((float*)(&ival)); // cppcheck-suppress invalidPointerCast
formatter->format(fval);

View file

@ -81,7 +81,7 @@ Test(fpga, rtds, .description = "RTDS")
"Failed to initiate DMA read");
// logger->info("Wait read");
const size_t bytesRead = dma->readComplete();
const size_t bytesRead = dma->readComplete().bytes;
cr_assert(bytesRead > 0,
"Failed to complete DMA read");