diff --git a/server/include/msg.h b/server/include/msg.h index 949cf52d3..0bb043c81 100644 --- a/server/include/msg.h +++ b/server/include/msg.h @@ -16,6 +16,16 @@ struct node; +/** These flags define the format which is used by msg_fscan() and msg_fprint(). */ +enum msg_flags { + MSG_PRINT_NANOSECONDS = 1, + MSG_PRINT_OFFSET = 2, + MSG_PRINT_SEQUENCE = 4, + MSG_PRINT_VALUES = 8, + + MSG_PRINT_ALL = 0xFF +}; + /** Swap a message to host byte order. * * Message can either be transmitted in little or big endian @@ -41,21 +51,25 @@ int msg_verify(struct msg *m); /** Print a raw message in human readable form to a file stream. * - * @param f The file stream - * @param m A pointer to the message + * @param f The file handle from fopen() or stdout, stderr. + * @param m A pointer to the message. + * @param flags See msg_flags. + * @param offset A optional offset in seconds. Only used if flags contains MSG_PRINT_OFFSET. * @retval 0 Success. Everything went well. * @retval <0 Error. Something went wrong. */ -int msg_fprint(FILE *f, struct msg *m); +int msg_fprint(FILE *f, struct msg *m, int flags, double offset); /** Read a message from a file stream. * - * @param f The file stream - * @param m A pointer to the message + * @param f The file handle from fopen() or stdin. + * @param m A pointer to the message. + * @param flags et MSG_PRINT_* flags for each component found in sample if not NULL. See msg_flags. + * @param offset Write offset to this pointer if not NULL. * @retval 0 Success. Everything went well. * @retval <0 Error. Something went wrong. */ -int msg_fscan(FILE *f, struct msg *m); +int msg_fscan(FILE *f, struct msg *m, int *flags, double *offset); /** Change the values of an existing message in a random fashion. * diff --git a/server/src/file.c b/server/src/file.c index d25028c97..2aab7cf13 100644 --- a/server/src/file.c +++ b/server/src/file.c @@ -185,13 +185,13 @@ int file_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt serror("Failed to wait for timer"); } - msg_fscan(f->in, cur); + msg_fscan(f->in, cur, NULL, NULL); } else { struct timespec until; /* Get message and timestamp */ - msg_fscan(f->in, cur); + msg_fscan(f->in, cur, NULL, NULL); /* Wait for next message / sampe */ until = time_add(&MSG_TS(cur), &f->offset); @@ -215,8 +215,10 @@ int file_write(struct node *n, struct msg *pool, int poolsize, int first, int cn struct timespec ts; clock_gettime(CLOCK_REALTIME, &ts); - for (i = 0; i < cnt; i++) - msg_fprint(f->out, &pool[(first+i) % poolsize]); + for (i = 0; i < cnt; i++) { + struct msg *m = &pool[(first+i) % poolsize]; + msg_fprint(f->out, m, MSG_PRINT_ALL & ~MSG_PRINT_OFFSET, 0); + } } else error("Can not write to node '%s", n->name); diff --git a/server/src/hooks.c b/server/src/hooks.c index 9811b9bd1..fa679d418 100644 --- a/server/src/hooks.c +++ b/server/src/hooks.c @@ -59,8 +59,7 @@ int hook_print(struct path *p) struct msg *m = p->current; struct timespec ts = MSG_TS(m); - fprintf(stdout, "%.3e+", time_delta(&ts, &p->ts_recv)); /* Print delay */ - msg_fprint(stdout, m); + msg_fprint(stdout, m, MSG_PRINT_ALL, time_delta(&ts, &p->ts_recv)); return 0; } diff --git a/server/src/msg.c b/server/src/msg.c index b92374e1a..20cf4639e 100644 --- a/server/src/msg.c +++ b/server/src/msg.c @@ -44,15 +44,23 @@ int msg_verify(struct msg *m) return 0; } -int msg_fprint(FILE *f, struct msg *m) +int msg_fprint(FILE *f, struct msg *m, int flags, double offset) { - if (m->endian != MSG_ENDIAN_HOST) - msg_swap(m); + fprintf(f, "%u", m->ts.sec); + + if (flags & MSG_PRINT_NANOSECONDS) + fprintf(f, ".%09u", m->ts.nsec); + + if (flags & MSG_PRINT_OFFSET) + fprintf(f, "%+g", offset); + + if (flags & MSG_PRINT_SEQUENCE) + fprintf(f, "(%u)", m->sequence); - fprintf(f, "%10u.%09u\t%hu", m->ts.sec, m->ts.nsec, m->sequence); - - for (int i = 0; i < m->length; i++) - fprintf(f, "\t%.6f", m->data[i].f); + if (flags & MSG_PRINT_VALUES) { + for (int i = 0; i < m->length; i++) + fprintf(f, "\t%.6f", m->data[i].f); + } fprintf(f, "\n"); @@ -60,33 +68,93 @@ int msg_fprint(FILE *f, struct msg *m) } /** @todo Currently only floating point values are supported */ -int msg_fscan(FILE *f, struct msg *m) +int msg_fscan(FILE *f, struct msg *m, int *fl, double *off) { char line[MSG_VALUES * 16]; - char *next, *ptr = line; - - if (fgets(line, sizeof(line), f) == NULL) - return -1; /* An error occured */ - - m->ts.sec = (uint32_t) strtoul(ptr, &ptr, 10); ptr++; - m->ts.nsec = (uint32_t) strtoul(ptr, &ptr, 10); - m->sequence = (uint16_t) strtoul(ptr, &ptr, 10); - + char *end, *ptr = line; + int flags = 0; + double offset; + m->version = MSG_VERSION; m->endian = MSG_ENDIAN_HOST; - m->length = 0; - m->rsvd1 = 0; - m->rsvd2 = 0; + m->rsvd1 = m->rsvd2 = 0; + + /* Format: Seconds.NanoSeconds+Offset(SequenceNumber) Value1 Value2 ... + * RegEx: (\d+(?:\.\d+)?)([-+]\d+(?:\.\d+)?(?:e[+-]?\d+)?)?(?:\((\d+)\))? + * + * Please note that only the seconds and at least one value are mandatory + */ - while (m->length < MSG_VALUES) { - m->data[m->length].f = strtod(ptr, &next); +skip: if (fgets(line, sizeof(line), f) == NULL) + return -1; /* An error occured */ - if (next == ptr) - break; + if (line[0] == '#') + goto skip; - ptr = next; - m->length++; + /* Mandatory: seconds */ + m->ts.sec = (uint32_t) strtoul(ptr, &end, 10); + if (ptr == end) + return -2; + + /* Optional: nano seconds */ + if (*end == '.') { + ptr = end + 1; + + m->ts.nsec = (uint32_t) strtoul(ptr, &end, 10); + if (ptr != end) + flags |= MSG_PRINT_NANOSECONDS; + else + return -3; } + else + m->ts.nsec = 0; + + /* Optional: offset / delay */ + if (*end == '+' || *end == '-') { + ptr = end + 1; + + offset = strtof(ptr, &end); /* offset is ignored for now */ + if (ptr != end) + flags |= MSG_PRINT_OFFSET; + else + return -4; + } + + /* Optional: sequence number */ + if (*end == '(') { + ptr = end + 1; + + m->sequence = (uint16_t) strtoul(ptr, &end, 10); + if (ptr != end && *end == ')') + flags |= MSG_PRINT_SEQUENCE; + else { + info(end); + return -5; + } + + end = end + 1; + } + else + m->sequence = 0; + + for ( m->length = 0, ptr = end; + m->length < MSG_VALUES; + m->length++, ptr = end) { + + /** @todo We only support floating point values at the moment */ + m->data[m->length].f = strtod(ptr, &end); + + if (end == ptr) /* there are no valid FP values anymore */ + break; + } + + if (m->length > 0) + flags |= MSG_PRINT_VALUES; + + if (fl) + *fl = flags; + if (off && (flags & MSG_PRINT_OFFSET)) + *off = offset; return m->length; } diff --git a/server/src/random.c b/server/src/random.c index 305d88dc0..c253592bc 100644 --- a/server/src/random.c +++ b/server/src/random.c @@ -53,7 +53,7 @@ int main(int argc, char *argv[]) serror("Failed to start timer"); /* Print header */ - fprintf(stderr, "# %-20s\t%s\t%s\n", "timestamp", "seqno", "data[]"); + fprintf(stderr, "# %-20s\t\t%s\n", "sec.nsec(seq)", "data[]"); /* Block until 1/p->rate seconds elapsed */ while (limit-- > 0 || argc < 4) { @@ -66,7 +66,7 @@ int main(int argc, char *argv[]) m.ts.nsec = ts.tv_nsec; msg_random(&m); - msg_fprint(stdout, &m); + msg_fprint(stdout, &m, MSG_PRINT_ALL & ~MSG_PRINT_OFFSET, 0); fflush(stdout); } diff --git a/server/src/receive.c b/server/src/receive.c index 66021f52a..2d612d08a 100644 --- a/server/src/receive.c +++ b/server/src/receive.c @@ -110,16 +110,23 @@ int main(int argc, char *argv[]) pool = alloc(sizeof(struct msg) * node->combine); /* Print header */ - fprintf(stderr, "# %-20s\t%s\t%s\n", "timestamp", "seqno", "data[]"); + fprintf(stderr, "# %-20s\t\t%s\n", "sec.nsec+offset(seq)", "data[]"); for (;;) { + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + int recv = node_read(node, pool, node->combine, 0, node->combine); for (int i = 0; i < recv; i++) { - int ret = msg_verify(&pool[i]); + struct msg *m = &pool[i]; + + int ret = msg_verify(m); if (ret) warn("Failed to verify message: %d", ret); + + /** @todo should we drop reordered / delayed packets here? */ - msg_fprint(stdout, &pool[i]); + msg_fprint(stdout, &pool[i], MSG_PRINT_ALL, time_delta(&MSG_TS(m), &ts)); } } diff --git a/server/src/send.c b/server/src/send.c index 1a658d83e..7c5e57b85 100644 --- a/server/src/send.c +++ b/server/src/send.c @@ -113,15 +113,24 @@ int main(int argc, char *argv[]) pool = alloc(sizeof(struct msg) * node->combine); /* Print header */ - fprintf(stderr, "# %-20s\t%s\t%s\n", "timestamp", "seqno", "data[]"); + fprintf(stderr, "# %-20s\t\t%s\n", "sec.nsec+offset(seq)", "data[]"); for (;;) { - int i = 0; - while (i < node->combine) { - if (msg_fscan(stdin, &pool[i]) > 0) - msg_fprint(stdout, &pool[i++]); - else if (feof(stdin)) - goto out; + for (int i = 0; i < node->combine; i++) { + struct msg *m = &pool[i]; + int reason; + +retry: reason = msg_fscan(stdin, m, NULL, NULL); + if (reason < 0) { + if (feof(stdin)) + goto out; + else { + warn("Skipped invalid message message: reason=%d", reason); + goto retry; + } + } + else + msg_fprint(stdout, m, MSG_PRINT_ALL, 0); } node_write(node, pool, node->combine, 0, node->combine);