1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

added new message format

git-svn-id: https://zerberus.eonerc.rwth-aachen.de:8443/svn/s2ss/trunk@135 8ec27952-4edc-4aab-86aa-e87bb2611832
This commit is contained in:
Steffen Vogel 2014-07-04 15:58:11 +00:00
parent bbe2a6a02e
commit e03f5eece0
8 changed files with 88 additions and 99 deletions

View file

@ -14,61 +14,34 @@
#include "config.h"
#include "node.h"
#if PROTOCOL == 0
/** The format of a message (OPAL-RT example format).
*
* This struct defines the format of a message (protocol version 0).
* Its declared as "packed" because it represents the "on wire" data.
*/
struct msg
{
/** Sender device ID */
uint16_t device;
/** Message ID */
uint32_t sequence;
/** Message length (data only) */
uint16_t length;
/** Message data */
double data[MAX_VALUES];
} __attribute__((packed));
#elif PROTOCOL == 1
/** Next generation message format for RTDS integration.
*
* This struct defines the format of a message (protocol version 1).
* Its declared as "packed" because it represents the "on wire" data.
*/
struct msg
{
struct
{
/** Protocol version */
unsigned version : 4;
/** Header length */
unsigned hdr_len : 4;
/** Message flags */
uint8_t flags;
/** Sender device ID */
uint16_t dev_id;
/** Message ID */
uint32_t sequence;
/** Message length (data only) */
uint16_t data_len;
/** Digital signature for authentication */
uint32_t signature;
/** Timestamp in uS since unix epoch */
uint64_t timestamp
} header;
union
{
uint32_t integer;
float data float_single;
char * data_str;
} data[MAX_VALUES];
#define MSG_VERSION 0
/** @todo Implement more message types */
#define MSG_TYPE_DATA 0
#define MSG_TYPE_START 1
#define MSG_TYPE_STOP 2
struct node;
/** This message format is used by all clients
*
* @diafile msg_format.dia
**/
struct msg
{
/** The version specifies the format of the remaining message */
unsigned version : 4;
/** Data or control message */
unsigned type : 2;
/** These bits are reserved for future extensions */
unsigned __padding : 2;
/** Length in dwords of the whole message */
uint8_t length;
/** The sequence number gets incremented by one for consecutive messages */
uint16_t sequence;
/** The message payload */
float data[MAX_VALUES];
} __attribute__((packed));
#else
#error "Unknown protocol version!"
#endif
/** Print a raw UDP message in human readable form.
*

View file

@ -50,8 +50,10 @@ struct path
unsigned long delayed;
/** Counter for messages which arrived multiple times */
unsigned long duplicated;
/** Counter for received messages with invalid device id or data */
/** Counter for received messages with invalid version or type */
unsigned long invalid;
/** Counter for skipped or filtered messages by hook */
unsigned long skipped;
/** The thread id for this path */
pthread_t tid;

View file

@ -14,51 +14,44 @@
int msg_fprint(FILE *f, struct msg *msg)
{
fprintf(f, "%-8u %-8u", msg->device, msg->sequence);
fprintf(f, "%-8hu", msg->sequence);
for (int i = 0; i < msg->length / sizeof(double); i++) {
for (int i = 0; i < msg->length; i++)
fprintf(f, "%-12.6f ", msg->data[i]);
}
fprintf(f, "\n");
return 0;
}
int msg_fscan(FILE *f, struct msg *msg)
int msg_fscan(FILE *f, struct msg *m)
{
fscanf(f, "%8hu %8u ", &msg->device, &msg->sequence);
fscanf(f, "%8hu ", &m->sequence);
for (int i = 0; i < msg->length / sizeof(double); i++) {
fscanf(f, "%12lf ", &msg->data[i]);
}
for (int i = 0; i < m->length; i++)
fscanf(f, "%12f ", &m->data[i]);
fscanf(f, "\n");
return 0;
}
void msg_random(struct msg *m)
{
int values = m->length / sizeof(double);
for (int i = 0; i < values; i++) {
m->data[i] += (double) random() / RAND_MAX - .5;
}
for (int i = 0; i < m->length; i++)
m->data[i] += (float) random() / RAND_MAX - .5;
m->sequence++;
}
int msg_send(struct msg *m, struct node *n)
{
if (sendto(n->sd, m, m->length + 8, 0,
if (sendto(n->sd, m, (m->length+1) * 4, 0,
(struct sockaddr *) &n->remote,
sizeof(struct sockaddr_in)) < 0)
perror("Failed sendto");
debug(10, "Message sent to node %s (%s:%u)", n->name,
inet_ntoa(n->remote.sin_addr),
ntohs(n->remote.sin_port));
debug(10, "Message sent to node '%s'", n->name);
if (V >= 10) msg_fprint(stdout, m);
return 0;
}
@ -68,7 +61,8 @@ int msg_recv(struct msg *m, struct node *n)
if (recv(n->sd, m, sizeof(struct msg), 0) < 0)
perror("Failed recv");
debug(10, "Message received from node %s", n->name);
debug(10, "Message received from node '%s'", n->name);
if (V >= 10) msg_fprint(stdout, m);
return 0;
}

View file

@ -70,36 +70,51 @@ static void * path_run(void *arg)
/* Main thread loop */
while (1) {
msg_recv(&m, p->in); /* Receive message */
p->received++;
if (m.sequence == 0 && p->sequence > 0) {
path_stats(p);
info("Simulation restarted");
/** Check header fields */
if (m.version != MSG_VERSION) {
p->invalid++;
continue;
}
p->sequence = 0;
p->received = 0;
p->sent = 0;
p->delayed = 0;
p->duplicated = 0;
p->invalid = 0;
if (m.type != MSG_TYPE_DATA) {
p->invalid++;
continue;
}
/* Check sequence number */
if (m.sequence <= 1) {
path_stats(p);
info("Simulation started");
p->sequence = 0;
p->received = 1;
p->sent = 0;
p->skipped = 0;
p->delayed = 0;
p->duplicated = 0;
p->invalid = 0;
}
else if (m.sequence < p->sequence) {
p->delayed++;
continue;
}
else if (m.sequence == p->sequence) {
p->duplicated++;
continue;
}
/* Call hook */
if (p->hook && p->hook(&m))
if (p->hook && p->hook(&m)) {
p->skipped++;
continue;
}
/* At fixed rate mode, messages are send by another thread */
if (p->rate)
p->last = &m;
else
msg_send(p->last, p->out);
msg_send(&m, p->out);
p->sequence = m.sequence;
p->sent++;

View file

@ -31,9 +31,8 @@ void tick(int sig, siginfo_t *si, void *ptr)
int main(int argc, char *argv[])
{
if (argc != 4) {
printf("Usage: %s DEVID VALUES RATE\n", argv[0]);
printf(" DEVID is the device id of this node\n");
if (argc != 3) {
printf("Usage: %s VALUES RATE\n", argv[0]);
printf(" VALUES is the number of values a message contains\n");
printf(" RATE how many messages per second\n\n");
printf("s2ss Simulator2Simulator Server v%s\n", VERSION);
@ -41,10 +40,9 @@ int main(int argc, char *argv[])
exit(EXIT_FAILURE);
}
int rate = atoi(argv[3]);
int rate = atoi(argv[2]);
struct msg msg = {
.device = atoi(argv[1]),
.length = atoi(argv[2]) * sizeof(double),
.length = atoi(argv[1]),
.sequence = 0,
.data = { 0 }
};
@ -76,7 +74,7 @@ int main(int argc, char *argv[])
};
/* Print header */
fprintf(stderr, "# %-6s %-8s %-12s\n", "dev_id", "seq_no", "data");
fprintf(stderr, "# %-6s%-12s\n", "seq", "data");
timer_create(CLOCKID, &sev, &t);
timer_settime(t, 0, &its, NULL);

View file

@ -30,11 +30,11 @@ void quit(int sig, siginfo_t *si, void *ptr)
int main(int argc, char *argv[])
{
struct node n;
struct node n = {
.name ="node"
};
struct msg m;
memset(&n, 0, sizeof(struct node));
if (argc != 2) {
printf("Usage: %s LOCAL\n", argv[0]);
printf(" LOCAL is a IP:PORT combination of the local host\n\n");
@ -65,6 +65,11 @@ int main(int argc, char *argv[])
while (1) {
msg_recv(&m, &n);
if (m.version != MSG_VERSION)
continue;
if (m.type != MSG_TYPE_DATA)
continue;
#if 1
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);

View file

@ -44,7 +44,9 @@ int main(int argc, char *argv[])
.name = "node"
};
struct msg m = {
.length = atoi(argv[1]) * sizeof(double)
.version = MSG_VERSION,
.type = MSG_TYPE_DATA,
.length = atoi(argv[1])
};
/* Setup signals */
@ -69,7 +71,8 @@ int main(int argc, char *argv[])
n.local.sin_port = 0;
}
node_connect(&n);
if (node_connect(&n))
error("Failed to connect node");
while (!feof(stdin)) {
msg_fscan(stdin, &m);

View file

@ -64,7 +64,6 @@ int main(int argc, char *argv[])
if (!strcmp(argv[1], "latency")) {
struct msg m = {
.device = 99,
.sequence = 0
};
struct timespec *ts1 = (struct timespec *) &m.data;