1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

Merge pull request #680 from VILLASframework/fix-leaks

Fix memory and synchronization issues
This commit is contained in:
Steffen Vogel 2023-06-23 19:16:13 +02:00 committed by GitHub
commit da0a73c39b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 19 additions and 11 deletions

View file

@ -44,7 +44,7 @@ PathSource::~PathSource()
int PathSource::read(int i)
{
int ret, recv, tomux, allocated, cnt, toenqueue, enqueued = 0;
int ret, recv, tomux, allocated, cnt, toenqueue, enqueued = 0, muxed_initialized = 0;
cnt = node->in.vectorize;
@ -61,19 +61,19 @@ int PathSource::read(int i)
recv = node->read(read_smps, allocated);
if (recv == 0) {
enqueued = 0;
goto out2;
goto read_decref_read_smps;
}
else if (recv < 0) {
if (node->getState() == State::STOPPING) {
path->state = State::STOPPING;
enqueued = -1;
goto out2;
goto read_decref_read_smps;
}
else {
path->logger->error("Failed to read samples from node {}", node->getName());
enqueued = 0;
goto out2;
goto read_decref_read_smps;
}
}
else if (recv < allocated)
@ -97,7 +97,9 @@ int PathSource::read(int i)
: sample_clone(muxed_smps[i-1]);
if (!muxed_smps[i]) {
path->logger->error("Pool underrun in path {}", path->toString());
return -1;
muxed_initialized = i == 0 ? 0 : i-1;
enqueued = -1;
goto read_decref_muxed_smps;
}
if (path->original_sequence_no) {
@ -119,12 +121,16 @@ int PathSource::read(int i)
muxed_smps[i]->flags |= tomux_smps[i]->flags & (int) SampleFlags::HAS_TS;
ret = mappings.remap(muxed_smps[i], tomux_smps[i]);
if (ret)
return ret;
if (ret < 0) {
enqueued = ret;
muxed_initialized = i;
goto read_decref_muxed_smps;
}
if (muxed_smps[i]->length > 0)
muxed_smps[i]->flags |= (int) SampleFlags::HAS_DATA;
}
muxed_initialized = tomux;
sample_copy(path->last_sample, muxed_smps[tomux-1]);
@ -169,8 +175,10 @@ int PathSource::read(int i)
else
enqueued = 0;
sample_decref_many(muxed_smps, tomux);
out2: sample_decref_many(read_smps, recv);
read_decref_muxed_smps:
sample_decref_many(muxed_smps, muxed_initialized);
read_decref_read_smps:
sample_decref_many(read_smps, allocated);
return enqueued;
}

View file

@ -117,7 +117,7 @@ int villas::node::queue_signalled_push(struct CQueueSignalled *qs, void *ptr)
int pushed;
pushed = queue_push(&qs->queue, ptr);
if (pushed < 0)
if (pushed <= 0)
return pushed;
if (qs->mode == QueueSignalledMode::PTHREAD) {
@ -148,7 +148,7 @@ int villas::node::queue_signalled_push_many(struct CQueueSignalled *qs, void *pt
int pushed;
pushed = queue_push_many(&qs->queue, ptr, cnt);
if (pushed < 0)
if (pushed <= 0)
return pushed;
if (qs->mode == QueueSignalledMode::PTHREAD) {