1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

added storage for previous message

(this will may be extended to a variable number of past messages)
This commit is contained in:
Steffen Vogel 2015-03-21 18:04:52 +01:00
parent 9cfdf4c874
commit 8587184bf6

View file

@ -56,13 +56,10 @@ static void * path_send(void *arg)
while (1) {
sigwait(&set, &sig); /* blocking wait for next timer tick */
if (p->received) {
FOREACH(&p->destinations, it) {
node_write(it->node, p->last);
}
p->sent++;
}
FOREACH(&p->destinations, it)
node_write(it->node, p->current);
p->sent++;
}
return NULL;
@ -71,41 +68,48 @@ static void * path_send(void *arg)
/** Receive messages */
static void * path_run(void *arg)
{
char buf[33];
struct path *p = arg;
struct msg *m = alloc(sizeof(struct msg));
p->previous = alloc(sizeof(struct msg));
p->current = alloc(sizeof(struct msg));
char buf[33];
/* Open deferred TCP connection */
node_start_defer(p->in);
// FIXME: node_start_defer(p->out);
/* Main thread loop */
while (1) {
node_read(p->in, m); /* Receive message */
node_read(p->in, p->current); /* Receive message */
p->received++;
/* Check header fields */
if (m->version != MSG_VERSION ||
m->type != MSG_TYPE_DATA) {
if (p->current->version != MSG_VERSION ||
p->current->type != MSG_TYPE_DATA) {
p->invalid++;
continue;
}
/* Update histogram */
int dist = (UINT16_MAX + m->sequence - p->sequence) % UINT16_MAX;
int dist = (UINT16_MAX + p->current->sequence - p->previous->sequence) % UINT16_MAX;
if (dist > UINT16_MAX / 2)
dist -= UINT16_MAX;
hist_put(&p->histogram, dist);
/* Handle simulation restart */
if (m->sequence == 0 && abs(dist) >= 1) {
path_print(p, buf, sizeof(buf));
path_stats(p);
warn("Simulation for path %s restarted (p->seq=%u, m->seq=%u, dist=%d)",
buf, p->sequence, m->sequence, dist);
if (p->current->sequence == 0 && abs(dist) >= 1) {
if (p->received) {
path_print_stats(p);
hist_print(&p->histogram);
}
path_print(p, buf, sizeof(buf));
warn("Simulation for path %s restarted (prev->seq=%u, current->seq=%u, dist=%d)",
buf, p->previous->sequence, p->current->sequence, dist);
/* Reset counters */
p->sent = 0;
@ -114,7 +118,6 @@ static void * path_run(void *arg)
p->skipped = 0;
p->dropped = 0;
hist_print(&p->histogram);
hist_reset(&p->histogram);
}
else if (dist <= 0 && p->received > 1) {
@ -124,27 +127,22 @@ static void * path_run(void *arg)
/* Call hook callbacks */
FOREACH(&p->hooks, it) {
if (it->hook(m, p)) {
if (it->hook(p->current, p)) {
p->skipped++;
continue;
}
}
/* Update last known sequence number */
p->sequence = m->sequence;
p->last = m;
/* At fixed rate mode, messages are send by another thread */
if (!p->rate) {
FOREACH(&p->destinations, it) {
node_write(it->node, m);
}
FOREACH(&p->destinations, it)
node_write(it->node, p->current);
p->sent++;
}
}
free(m);
SWAP(p->previous, p->current);
}
return NULL;
}
@ -180,10 +178,8 @@ int path_stop(struct path *p)
timer_delete(p->timer);
}
if (p->sent || p->received) {
path_print_stats(p);
if (p->received)
hist_print(&p->histogram);
}
return 0;
}