diff --git a/server/include/msg.h b/server/include/msg.h index e6d6a9215..8fbac2686 100644 --- a/server/include/msg.h +++ b/server/include/msg.h @@ -14,61 +14,34 @@ #include "config.h" #include "node.h" -#if PROTOCOL == 0 -/** The format of a message (OPAL-RT example format). - * - * This struct defines the format of a message (protocol version 0). - * Its declared as "packed" because it represents the "on wire" data. - */ -struct msg -{ - /** Sender device ID */ - uint16_t device; - /** Message ID */ - uint32_t sequence; - /** Message length (data only) */ - uint16_t length; - /** Message data */ - double data[MAX_VALUES]; -} __attribute__((packed)); -#elif PROTOCOL == 1 -/** Next generation message format for RTDS integration. - * - * This struct defines the format of a message (protocol version 1). - * Its declared as "packed" because it represents the "on wire" data. - */ -struct msg -{ - struct - { - /** Protocol version */ - unsigned version : 4; - /** Header length */ - unsigned hdr_len : 4; - /** Message flags */ - uint8_t flags; - /** Sender device ID */ - uint16_t dev_id; - /** Message ID */ - uint32_t sequence; - /** Message length (data only) */ - uint16_t data_len; - /** Digital signature for authentication */ - uint32_t signature; - /** Timestamp in uS since unix epoch */ - uint64_t timestamp - } header; - union - { - uint32_t integer; - float data float_single; - char * data_str; - } data[MAX_VALUES]; +#define MSG_VERSION 0 +/** @todo Implement more message types */ +#define MSG_TYPE_DATA 0 +#define MSG_TYPE_START 1 +#define MSG_TYPE_STOP 2 + +struct node; + +/** This message format is used by all clients + * + * @diafile msg_format.dia + **/ +struct msg +{ + /** The version specifies the format of the remaining message */ + unsigned version : 4; + /** Data or control message */ + unsigned type : 2; + /** These bits are reserved for future extensions */ + unsigned __padding : 2; + /** Length in dwords of the whole message */ + uint8_t length; + /** The sequence number gets incremented by one for consecutive messages */ + uint16_t sequence; + /** The message payload */ + float data[MAX_VALUES]; } __attribute__((packed)); -#else - #error "Unknown protocol version!" -#endif /** Print a raw UDP message in human readable form. * diff --git a/server/include/path.h b/server/include/path.h index 1d332b399..75a843c35 100644 --- a/server/include/path.h +++ b/server/include/path.h @@ -50,8 +50,10 @@ struct path unsigned long delayed; /** Counter for messages which arrived multiple times */ unsigned long duplicated; - /** Counter for received messages with invalid device id or data */ + /** Counter for received messages with invalid version or type */ unsigned long invalid; + /** Counter for skipped or filtered messages by hook */ + unsigned long skipped; /** The thread id for this path */ pthread_t tid; diff --git a/server/src/msg.c b/server/src/msg.c index 3f03c5e35..72ba15f03 100644 --- a/server/src/msg.c +++ b/server/src/msg.c @@ -14,51 +14,44 @@ int msg_fprint(FILE *f, struct msg *msg) { - fprintf(f, "%-8u %-8u", msg->device, msg->sequence); + fprintf(f, "%-8hu", msg->sequence); - for (int i = 0; i < msg->length / sizeof(double); i++) { + for (int i = 0; i < msg->length; i++) fprintf(f, "%-12.6f ", msg->data[i]); - } fprintf(f, "\n"); return 0; } -int msg_fscan(FILE *f, struct msg *msg) +int msg_fscan(FILE *f, struct msg *m) { - fscanf(f, "%8hu %8u ", &msg->device, &msg->sequence); + fscanf(f, "%8hu ", &m->sequence); - for (int i = 0; i < msg->length / sizeof(double); i++) { - fscanf(f, "%12lf ", &msg->data[i]); - } + for (int i = 0; i < m->length; i++) + fscanf(f, "%12f ", &m->data[i]); fscanf(f, "\n"); - return 0; } void msg_random(struct msg *m) { - int values = m->length / sizeof(double); - - for (int i = 0; i < values; i++) { - m->data[i] += (double) random() / RAND_MAX - .5; - } + for (int i = 0; i < m->length; i++) + m->data[i] += (float) random() / RAND_MAX - .5; m->sequence++; } int msg_send(struct msg *m, struct node *n) { - if (sendto(n->sd, m, m->length + 8, 0, + if (sendto(n->sd, m, (m->length+1) * 4, 0, (struct sockaddr *) &n->remote, sizeof(struct sockaddr_in)) < 0) perror("Failed sendto"); - debug(10, "Message sent to node %s (%s:%u)", n->name, - inet_ntoa(n->remote.sin_addr), - ntohs(n->remote.sin_port)); + debug(10, "Message sent to node '%s'", n->name); + if (V >= 10) msg_fprint(stdout, m); return 0; } @@ -68,7 +61,8 @@ int msg_recv(struct msg *m, struct node *n) if (recv(n->sd, m, sizeof(struct msg), 0) < 0) perror("Failed recv"); - debug(10, "Message received from node %s", n->name); + debug(10, "Message received from node '%s'", n->name); + if (V >= 10) msg_fprint(stdout, m); return 0; } diff --git a/server/src/path.c b/server/src/path.c index 3350d7813..0ce30c25d 100644 --- a/server/src/path.c +++ b/server/src/path.c @@ -70,36 +70,51 @@ static void * path_run(void *arg) /* Main thread loop */ while (1) { msg_recv(&m, p->in); /* Receive message */ - p->received++; - if (m.sequence == 0 && p->sequence > 0) { - path_stats(p); - info("Simulation restarted"); + /** Check header fields */ + if (m.version != MSG_VERSION) { + p->invalid++; + continue; + } - p->sequence = 0; - p->received = 0; - p->sent = 0; - p->delayed = 0; - p->duplicated = 0; - p->invalid = 0; + if (m.type != MSG_TYPE_DATA) { + p->invalid++; + continue; + } + + /* Check sequence number */ + if (m.sequence <= 1) { + path_stats(p); + info("Simulation started"); + + p->sequence = 0; + p->received = 1; + p->sent = 0; + p->skipped = 0; + p->delayed = 0; + p->duplicated = 0; + p->invalid = 0; } else if (m.sequence < p->sequence) { p->delayed++; + continue; } else if (m.sequence == p->sequence) { p->duplicated++; + continue; } - /* Call hook */ - if (p->hook && p->hook(&m)) + if (p->hook && p->hook(&m)) { + p->skipped++; continue; + } /* At fixed rate mode, messages are send by another thread */ if (p->rate) p->last = &m; else - msg_send(p->last, p->out); + msg_send(&m, p->out); p->sequence = m.sequence; p->sent++; diff --git a/server/src/random.c b/server/src/random.c index f8ee7fb35..bfa8ef013 100644 --- a/server/src/random.c +++ b/server/src/random.c @@ -31,9 +31,8 @@ void tick(int sig, siginfo_t *si, void *ptr) int main(int argc, char *argv[]) { - if (argc != 4) { - printf("Usage: %s DEVID VALUES RATE\n", argv[0]); - printf(" DEVID is the device id of this node\n"); + if (argc != 3) { + printf("Usage: %s VALUES RATE\n", argv[0]); printf(" VALUES is the number of values a message contains\n"); printf(" RATE how many messages per second\n\n"); printf("s2ss Simulator2Simulator Server v%s\n", VERSION); @@ -41,10 +40,9 @@ int main(int argc, char *argv[]) exit(EXIT_FAILURE); } - int rate = atoi(argv[3]); + int rate = atoi(argv[2]); struct msg msg = { - .device = atoi(argv[1]), - .length = atoi(argv[2]) * sizeof(double), + .length = atoi(argv[1]), .sequence = 0, .data = { 0 } }; @@ -76,7 +74,7 @@ int main(int argc, char *argv[]) }; /* Print header */ - fprintf(stderr, "# %-6s %-8s %-12s\n", "dev_id", "seq_no", "data"); + fprintf(stderr, "# %-6s%-12s\n", "seq", "data"); timer_create(CLOCKID, &sev, &t); timer_settime(t, 0, &its, NULL); diff --git a/server/src/receive.c b/server/src/receive.c index 402778129..acf9d79e7 100644 --- a/server/src/receive.c +++ b/server/src/receive.c @@ -30,11 +30,11 @@ void quit(int sig, siginfo_t *si, void *ptr) int main(int argc, char *argv[]) { - struct node n; + struct node n = { + .name ="node" + }; struct msg m; - memset(&n, 0, sizeof(struct node)); - if (argc != 2) { printf("Usage: %s LOCAL\n", argv[0]); printf(" LOCAL is a IP:PORT combination of the local host\n\n"); @@ -65,6 +65,11 @@ int main(int argc, char *argv[]) while (1) { msg_recv(&m, &n); + if (m.version != MSG_VERSION) + continue; + if (m.type != MSG_TYPE_DATA) + continue; + #if 1 struct timespec ts; clock_gettime(CLOCK_REALTIME, &ts); diff --git a/server/src/send.c b/server/src/send.c index 714fe8172..e7adb4408 100644 --- a/server/src/send.c +++ b/server/src/send.c @@ -44,7 +44,9 @@ int main(int argc, char *argv[]) .name = "node" }; struct msg m = { - .length = atoi(argv[1]) * sizeof(double) + .version = MSG_VERSION, + .type = MSG_TYPE_DATA, + .length = atoi(argv[1]) }; /* Setup signals */ @@ -69,7 +71,8 @@ int main(int argc, char *argv[]) n.local.sin_port = 0; } - node_connect(&n); + if (node_connect(&n)) + error("Failed to connect node"); while (!feof(stdin)) { msg_fscan(stdin, &m); diff --git a/server/src/test.c b/server/src/test.c index b1bd3518c..0ba5eaee0 100644 --- a/server/src/test.c +++ b/server/src/test.c @@ -64,7 +64,6 @@ int main(int argc, char *argv[]) if (!strcmp(argv[1], "latency")) { struct msg m = { - .device = 99, .sequence = 0 }; struct timespec *ts1 = (struct timespec *) &m.data;