mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
Changed format of msg_{fscan,print} functions
Added flags and options to the mentioned functions
This commit is contained in:
parent
84425199c9
commit
7682ffb080
7 changed files with 149 additions and 50 deletions
|
@ -16,6 +16,16 @@
|
|||
|
||||
struct node;
|
||||
|
||||
/** These flags define the format which is used by msg_fscan() and msg_fprint(). */
|
||||
enum msg_flags {
|
||||
MSG_PRINT_NANOSECONDS = 1,
|
||||
MSG_PRINT_OFFSET = 2,
|
||||
MSG_PRINT_SEQUENCE = 4,
|
||||
MSG_PRINT_VALUES = 8,
|
||||
|
||||
MSG_PRINT_ALL = 0xFF
|
||||
};
|
||||
|
||||
/** Swap a message to host byte order.
|
||||
*
|
||||
* Message can either be transmitted in little or big endian
|
||||
|
@ -41,21 +51,25 @@ int msg_verify(struct msg *m);
|
|||
|
||||
/** Print a raw message in human readable form to a file stream.
|
||||
*
|
||||
* @param f The file stream
|
||||
* @param m A pointer to the message
|
||||
* @param f The file handle from fopen() or stdout, stderr.
|
||||
* @param m A pointer to the message.
|
||||
* @param flags See msg_flags.
|
||||
* @param offset A optional offset in seconds. Only used if flags contains MSG_PRINT_OFFSET.
|
||||
* @retval 0 Success. Everything went well.
|
||||
* @retval <0 Error. Something went wrong.
|
||||
*/
|
||||
int msg_fprint(FILE *f, struct msg *m);
|
||||
int msg_fprint(FILE *f, struct msg *m, int flags, double offset);
|
||||
|
||||
/** Read a message from a file stream.
|
||||
*
|
||||
* @param f The file stream
|
||||
* @param m A pointer to the message
|
||||
* @param f The file handle from fopen() or stdin.
|
||||
* @param m A pointer to the message.
|
||||
* @param flags et MSG_PRINT_* flags for each component found in sample if not NULL. See msg_flags.
|
||||
* @param offset Write offset to this pointer if not NULL.
|
||||
* @retval 0 Success. Everything went well.
|
||||
* @retval <0 Error. Something went wrong.
|
||||
*/
|
||||
int msg_fscan(FILE *f, struct msg *m);
|
||||
int msg_fscan(FILE *f, struct msg *m, int *flags, double *offset);
|
||||
|
||||
/** Change the values of an existing message in a random fashion.
|
||||
*
|
||||
|
|
|
@ -185,13 +185,13 @@ int file_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt
|
|||
serror("Failed to wait for timer");
|
||||
}
|
||||
|
||||
msg_fscan(f->in, cur);
|
||||
msg_fscan(f->in, cur, NULL, NULL);
|
||||
}
|
||||
else {
|
||||
struct timespec until;
|
||||
|
||||
/* Get message and timestamp */
|
||||
msg_fscan(f->in, cur);
|
||||
msg_fscan(f->in, cur, NULL, NULL);
|
||||
|
||||
/* Wait for next message / sampe */
|
||||
until = time_add(&MSG_TS(cur), &f->offset);
|
||||
|
@ -215,8 +215,10 @@ int file_write(struct node *n, struct msg *pool, int poolsize, int first, int cn
|
|||
struct timespec ts;
|
||||
clock_gettime(CLOCK_REALTIME, &ts);
|
||||
|
||||
for (i = 0; i < cnt; i++)
|
||||
msg_fprint(f->out, &pool[(first+i) % poolsize]);
|
||||
for (i = 0; i < cnt; i++) {
|
||||
struct msg *m = &pool[(first+i) % poolsize];
|
||||
msg_fprint(f->out, m, MSG_PRINT_ALL & ~MSG_PRINT_OFFSET, 0);
|
||||
}
|
||||
}
|
||||
else
|
||||
error("Can not write to node '%s", n->name);
|
||||
|
|
|
@ -59,8 +59,7 @@ int hook_print(struct path *p)
|
|||
struct msg *m = p->current;
|
||||
struct timespec ts = MSG_TS(m);
|
||||
|
||||
fprintf(stdout, "%.3e+", time_delta(&ts, &p->ts_recv)); /* Print delay */
|
||||
msg_fprint(stdout, m);
|
||||
msg_fprint(stdout, m, MSG_PRINT_ALL, time_delta(&ts, &p->ts_recv));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
120
server/src/msg.c
120
server/src/msg.c
|
@ -44,15 +44,23 @@ int msg_verify(struct msg *m)
|
|||
return 0;
|
||||
}
|
||||
|
||||
int msg_fprint(FILE *f, struct msg *m)
|
||||
int msg_fprint(FILE *f, struct msg *m, int flags, double offset)
|
||||
{
|
||||
if (m->endian != MSG_ENDIAN_HOST)
|
||||
msg_swap(m);
|
||||
fprintf(f, "%u", m->ts.sec);
|
||||
|
||||
if (flags & MSG_PRINT_NANOSECONDS)
|
||||
fprintf(f, ".%09u", m->ts.nsec);
|
||||
|
||||
if (flags & MSG_PRINT_OFFSET)
|
||||
fprintf(f, "%+g", offset);
|
||||
|
||||
if (flags & MSG_PRINT_SEQUENCE)
|
||||
fprintf(f, "(%u)", m->sequence);
|
||||
|
||||
fprintf(f, "%10u.%09u\t%hu", m->ts.sec, m->ts.nsec, m->sequence);
|
||||
|
||||
for (int i = 0; i < m->length; i++)
|
||||
fprintf(f, "\t%.6f", m->data[i].f);
|
||||
if (flags & MSG_PRINT_VALUES) {
|
||||
for (int i = 0; i < m->length; i++)
|
||||
fprintf(f, "\t%.6f", m->data[i].f);
|
||||
}
|
||||
|
||||
fprintf(f, "\n");
|
||||
|
||||
|
@ -60,33 +68,93 @@ int msg_fprint(FILE *f, struct msg *m)
|
|||
}
|
||||
|
||||
/** @todo Currently only floating point values are supported */
|
||||
int msg_fscan(FILE *f, struct msg *m)
|
||||
int msg_fscan(FILE *f, struct msg *m, int *fl, double *off)
|
||||
{
|
||||
char line[MSG_VALUES * 16];
|
||||
char *next, *ptr = line;
|
||||
|
||||
if (fgets(line, sizeof(line), f) == NULL)
|
||||
return -1; /* An error occured */
|
||||
|
||||
m->ts.sec = (uint32_t) strtoul(ptr, &ptr, 10); ptr++;
|
||||
m->ts.nsec = (uint32_t) strtoul(ptr, &ptr, 10);
|
||||
m->sequence = (uint16_t) strtoul(ptr, &ptr, 10);
|
||||
|
||||
char *end, *ptr = line;
|
||||
int flags = 0;
|
||||
double offset;
|
||||
|
||||
m->version = MSG_VERSION;
|
||||
m->endian = MSG_ENDIAN_HOST;
|
||||
m->length = 0;
|
||||
m->rsvd1 = 0;
|
||||
m->rsvd2 = 0;
|
||||
m->rsvd1 = m->rsvd2 = 0;
|
||||
|
||||
/* Format: Seconds.NanoSeconds+Offset(SequenceNumber) Value1 Value2 ...
|
||||
* RegEx: (\d+(?:\.\d+)?)([-+]\d+(?:\.\d+)?(?:e[+-]?\d+)?)?(?:\((\d+)\))?
|
||||
*
|
||||
* Please note that only the seconds and at least one value are mandatory
|
||||
*/
|
||||
|
||||
while (m->length < MSG_VALUES) {
|
||||
m->data[m->length].f = strtod(ptr, &next);
|
||||
skip: if (fgets(line, sizeof(line), f) == NULL)
|
||||
return -1; /* An error occured */
|
||||
|
||||
if (next == ptr)
|
||||
break;
|
||||
if (line[0] == '#')
|
||||
goto skip;
|
||||
|
||||
ptr = next;
|
||||
m->length++;
|
||||
/* Mandatory: seconds */
|
||||
m->ts.sec = (uint32_t) strtoul(ptr, &end, 10);
|
||||
if (ptr == end)
|
||||
return -2;
|
||||
|
||||
/* Optional: nano seconds */
|
||||
if (*end == '.') {
|
||||
ptr = end + 1;
|
||||
|
||||
m->ts.nsec = (uint32_t) strtoul(ptr, &end, 10);
|
||||
if (ptr != end)
|
||||
flags |= MSG_PRINT_NANOSECONDS;
|
||||
else
|
||||
return -3;
|
||||
}
|
||||
else
|
||||
m->ts.nsec = 0;
|
||||
|
||||
/* Optional: offset / delay */
|
||||
if (*end == '+' || *end == '-') {
|
||||
ptr = end + 1;
|
||||
|
||||
offset = strtof(ptr, &end); /* offset is ignored for now */
|
||||
if (ptr != end)
|
||||
flags |= MSG_PRINT_OFFSET;
|
||||
else
|
||||
return -4;
|
||||
}
|
||||
|
||||
/* Optional: sequence number */
|
||||
if (*end == '(') {
|
||||
ptr = end + 1;
|
||||
|
||||
m->sequence = (uint16_t) strtoul(ptr, &end, 10);
|
||||
if (ptr != end && *end == ')')
|
||||
flags |= MSG_PRINT_SEQUENCE;
|
||||
else {
|
||||
info(end);
|
||||
return -5;
|
||||
}
|
||||
|
||||
end = end + 1;
|
||||
}
|
||||
else
|
||||
m->sequence = 0;
|
||||
|
||||
for ( m->length = 0, ptr = end;
|
||||
m->length < MSG_VALUES;
|
||||
m->length++, ptr = end) {
|
||||
|
||||
/** @todo We only support floating point values at the moment */
|
||||
m->data[m->length].f = strtod(ptr, &end);
|
||||
|
||||
if (end == ptr) /* there are no valid FP values anymore */
|
||||
break;
|
||||
}
|
||||
|
||||
if (m->length > 0)
|
||||
flags |= MSG_PRINT_VALUES;
|
||||
|
||||
if (fl)
|
||||
*fl = flags;
|
||||
if (off && (flags & MSG_PRINT_OFFSET))
|
||||
*off = offset;
|
||||
|
||||
return m->length;
|
||||
}
|
||||
|
|
|
@ -53,7 +53,7 @@ int main(int argc, char *argv[])
|
|||
serror("Failed to start timer");
|
||||
|
||||
/* Print header */
|
||||
fprintf(stderr, "# %-20s\t%s\t%s\n", "timestamp", "seqno", "data[]");
|
||||
fprintf(stderr, "# %-20s\t\t%s\n", "sec.nsec(seq)", "data[]");
|
||||
|
||||
/* Block until 1/p->rate seconds elapsed */
|
||||
while (limit-- > 0 || argc < 4) {
|
||||
|
@ -66,7 +66,7 @@ int main(int argc, char *argv[])
|
|||
m.ts.nsec = ts.tv_nsec;
|
||||
|
||||
msg_random(&m);
|
||||
msg_fprint(stdout, &m);
|
||||
msg_fprint(stdout, &m, MSG_PRINT_ALL & ~MSG_PRINT_OFFSET, 0);
|
||||
|
||||
fflush(stdout);
|
||||
}
|
||||
|
|
|
@ -110,16 +110,23 @@ int main(int argc, char *argv[])
|
|||
pool = alloc(sizeof(struct msg) * node->combine);
|
||||
|
||||
/* Print header */
|
||||
fprintf(stderr, "# %-20s\t%s\t%s\n", "timestamp", "seqno", "data[]");
|
||||
fprintf(stderr, "# %-20s\t\t%s\n", "sec.nsec+offset(seq)", "data[]");
|
||||
|
||||
for (;;) {
|
||||
struct timespec ts;
|
||||
clock_gettime(CLOCK_REALTIME, &ts);
|
||||
|
||||
int recv = node_read(node, pool, node->combine, 0, node->combine);
|
||||
for (int i = 0; i < recv; i++) {
|
||||
int ret = msg_verify(&pool[i]);
|
||||
struct msg *m = &pool[i];
|
||||
|
||||
int ret = msg_verify(m);
|
||||
if (ret)
|
||||
warn("Failed to verify message: %d", ret);
|
||||
|
||||
/** @todo should we drop reordered / delayed packets here? */
|
||||
|
||||
msg_fprint(stdout, &pool[i]);
|
||||
msg_fprint(stdout, &pool[i], MSG_PRINT_ALL, time_delta(&MSG_TS(m), &ts));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -113,15 +113,24 @@ int main(int argc, char *argv[])
|
|||
pool = alloc(sizeof(struct msg) * node->combine);
|
||||
|
||||
/* Print header */
|
||||
fprintf(stderr, "# %-20s\t%s\t%s\n", "timestamp", "seqno", "data[]");
|
||||
fprintf(stderr, "# %-20s\t\t%s\n", "sec.nsec+offset(seq)", "data[]");
|
||||
|
||||
for (;;) {
|
||||
int i = 0;
|
||||
while (i < node->combine) {
|
||||
if (msg_fscan(stdin, &pool[i]) > 0)
|
||||
msg_fprint(stdout, &pool[i++]);
|
||||
else if (feof(stdin))
|
||||
goto out;
|
||||
for (int i = 0; i < node->combine; i++) {
|
||||
struct msg *m = &pool[i];
|
||||
int reason;
|
||||
|
||||
retry: reason = msg_fscan(stdin, m, NULL, NULL);
|
||||
if (reason < 0) {
|
||||
if (feof(stdin))
|
||||
goto out;
|
||||
else {
|
||||
warn("Skipped invalid message message: reason=%d", reason);
|
||||
goto retry;
|
||||
}
|
||||
}
|
||||
else
|
||||
msg_fprint(stdout, m, MSG_PRINT_ALL, 0);
|
||||
}
|
||||
|
||||
node_write(node, pool, node->combine, 0, node->combine);
|
||||
|
|
Loading…
Add table
Reference in a new issue