1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00
This commit is contained in:
Steffen Vogel 2019-04-10 21:49:56 +02:00
commit 78fdadfbba
13 changed files with 245 additions and 157 deletions

2
common

@ -1 +1 @@
Subproject commit 44eb1b40601937738a20b42db209dc71bb50cecd
Subproject commit 117dda74728679b890e20fb8bb1fc4b5792fc46d

View file

@ -3,8 +3,8 @@ nodes = {
type = "test_rtt", # sending rates, number of values and generates statistics.
cooldown = 2, # The cooldown time between each test case in seconds
prefix = "test_rtt", # An optional prefix in the filename
output = "/tmp/results/testA", # The output directory for all results
prefix = "test_rtt_%y-%m-%d_%H-%M-%S", # An optional prefix in the filename
output = "./results", # The output directory for all results
# The results of each test case will be written to a seperate file.
format = "villas.human", # The output format of the result files.
@ -14,7 +14,7 @@ nodes = {
# possible combinations
{
rates = 55.0, # The sending rate in Hz
values = 5, # The number of values which should be send in each sample
values = [ 5, 10, 20], # The number of values which should be send in each sample
limit = 100 # The number of samples which should be send during this test case
},
{
@ -25,3 +25,18 @@ nodes = {
)
}
}
paths = (
{
# Simple loopback path to test the node
in = "rtt_node"
out = "rtt_node"
# hooks = (
# {
# type = "print"
# }
# )
}
)

View file

@ -46,14 +46,6 @@ extern "C" {
/** The offset to the first data value in a message. */
#define MSG_DATA_OFFSET(msg) ((char *) (msg) + offsetof(struct msg, data))
/** Initialize a message with default values */
#define MSG_INIT(len, seq) (struct msg) {\
.type = MSG_TYPE_DATA, \
.version = MSG_VERSION, \
.length = (uint16_t) (len), \
.sequence = (uint32_t) (seq), \
}
/** The timestamp of a message in struct timespec format */
#define MSG_TS(msg) (struct timespec) { \
.tv_sec = (msg)->ts.sec, \

View file

@ -197,35 +197,50 @@ void csv_header(struct io *io, const struct sample *smp)
fprintf(f, "%c", io->delimiter);
}
static struct plugin p1 = {
.name = "tsv",
.description = "Tabulator-separated values",
.type = PLUGIN_TYPE_FORMAT,
.format = {
.header = csv_header,
.sprint = csv_sprint,
.sscan = csv_sscan,
.size = 0,
.flags = IO_NEWLINES |
SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_DATA,
.separator = '\t'
}
};
static struct plugin p1;
__attribute__((constructor(110))) static void UNIQUE(__ctor)() {
if (plugins.state == STATE_DESTROYED)
vlist_init(&plugins);
static struct plugin p2 = {
.name = "csv",
.description = "Comma-separated values",
.type = PLUGIN_TYPE_FORMAT,
.format = {
.header = csv_header,
.sprint = csv_sprint,
.sscan = csv_sscan,
.size = 0,
.flags = IO_NEWLINES |
SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_DATA,
.separator = ','
}
};
p1.name = "tsv";
p1.description = "Tabulator-separated values";
p1.type = PLUGIN_TYPE_FORMAT;
p1.format.header = csv_header;
p1.format.sprint = csv_sprint;
p1.format.sscan = csv_sscan;
p1.format.size = 0;
p1.format.flags = IO_NEWLINES |
SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_DATA;
p1.format.separator = '\t';
REGISTER_PLUGIN(&p1);
REGISTER_PLUGIN(&p2);
vlist_push(&plugins, &p1);
}
__attribute__((destructor(110))) static void UNIQUE(__dtor)() {
if (plugins.state != STATE_DESTROYED)
vlist_remove_all(&plugins, &p1);
}
static struct plugin p2;
__attribute__((constructor(110))) static void UNIQUE(__ctor)() {
if (plugins.state == STATE_DESTROYED)
vlist_init(&plugins);
p2.name = "csv";
p2.description = "Comma-separated values";
p2.type = PLUGIN_TYPE_FORMAT;
p1.format.header = csv_header;
p1.format.sprint = csv_sprint;
p1.format.sscan = csv_sscan;
p1.format.size = 0;
p1.format.flags = IO_NEWLINES |
SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_DATA;
p1.format.separator = ',';
vlist_push(&plugins, &p2);
}
__attribute__((destructor(110))) static void UNIQUE(__dtor)() {
if (plugins.state != STATE_DESTROYED)
vlist_remove_all(&plugins, &p2);
}

View file

@ -67,8 +67,8 @@ static int json_unpack_timestamps(json_t *json_ts, struct sample *smp)
{
int ret;
json_error_t err;
json_t *json_ts_origin = nullptr;
json_t *json_ts_received = nullptr;
json_t *json_ts_origin = NULL;
json_t *json_ts_received = NULL;
json_unpack_ex(json_ts, &err, 0, "{ s?: o, s?: o }",
"origin", &json_ts_origin,
@ -173,7 +173,7 @@ static int json_unpack_sample(struct io *io, json_t *json_smp, struct sample *sm
{
int ret;
json_error_t err;
json_t *json_data, *json_value, *json_ts = nullptr;
json_t *json_data, *json_value, *json_ts = NULL;
size_t i;
int64_t sequence = -1;
@ -347,19 +347,27 @@ skip: json = json_loadf(f, JSON_DISABLE_EOF_CHECK, &err);
return i;
}
static struct plugin p = {
.name = "json",
.description = "Javascript Object Notation",
.type = PLUGIN_TYPE_FORMAT,
.format = {
.print = json_print,
.scan = json_scan,
.sprint = json_sprint,
.sscan = json_sscan,
.size = 0,
.flags = SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_DATA,
.delimiter = '\n'
},
};
static struct plugin p;
REGISTER_PLUGIN(&p);
__attribute__((constructor(110))) static void UNIQUE(__ctor)() {
if (plugins.state == STATE_DESTROYED)
vlist_init(&plugins);
p.name = "json";
p.description = "Javascript Object Notation";
p.type = PLUGIN_TYPE_FORMAT;
p.format.print = json_print;
p.format.scan = json_scan;
p.format.sprint = json_sprint;
p.format.sscan = json_sscan;
p.format.size = 0;
p.format.flags = SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_DATA;
p.format.delimiter = '\n';
vlist_push(&plugins, &p);
}
__attribute__((destructor(110))) static void UNIQUE(__dtor)() {
if (plugins.state != STATE_DESTROYED)
vlist_remove_all(&plugins, &p);
}

View file

@ -37,8 +37,8 @@ static int json_reserve_pack_sample(struct io *io, json_t **j, struct sample *sm
{
json_error_t err;
json_t *json_data, *json_name, *json_unit, *json_value;
json_t *json_created = nullptr;
json_t *json_sequence = nullptr;
json_t *json_created = NULL;
json_t *json_sequence = NULL;
if (smp->flags & SAMPLE_HAS_TS_ORIGIN)
json_created = json_integer(time_to_double(&smp->ts.origin) * 1e3);
@ -67,7 +67,7 @@ static int json_reserve_pack_sample(struct io *io, json_t **j, struct sample *sm
if (sig->unit)
json_unit = json_string(sig->unit);
else
json_unit = nullptr;
json_unit = NULL;
json_value = json_pack_ex(&err, 0, "{ s: o, s: f }",
"name", json_name,
@ -94,7 +94,7 @@ static int json_reserve_pack_sample(struct io *io, json_t **j, struct sample *sm
*j = json_pack_ex(&err, 0, "{ s: o }",
"measurements", json_data
);
if (*j == nullptr)
if (*j == NULL)
return -1;
#if 0
#ifdef JSON_RESERVE_INTEGER_TARGET
@ -124,8 +124,8 @@ static int json_reserve_unpack_sample(struct io *io, json_t *json_smp, struct sa
int ret, idx;
double created = -1;
json_error_t err;
json_t *json_value, *json_data = nullptr;
json_t *json_origin = nullptr, *json_target = nullptr;
json_t *json_value, *json_data = NULL;
json_t *json_origin = NULL, *json_target = NULL;
size_t i;
ret = json_unpack_ex(json_smp, &err, 0, "{ s?: o, s?: o, s?: o, s?: o }",
@ -173,7 +173,7 @@ static int json_reserve_unpack_sample(struct io *io, json_t *json_smp, struct sa
smp->length = 0;
json_array_foreach(json_data, i, json_value) {
const char *name, *unit = nullptr;
const char *name, *unit = NULL;
double value;
ret = json_unpack_ex(json_value, &err, 0, "{ s: s, s?: s, s: F, s?: F }",
@ -323,17 +323,25 @@ skip: json = json_loadf(f, JSON_DISABLE_EOF_CHECK, &err);
return i;
}
static struct plugin p = {
.name = "json.reserve",
.description = "RESERVE JSON format",
.type = PLUGIN_TYPE_FORMAT,
.format = {
.print = json_reserve_print,
.scan = json_reserve_scan,
.sprint = json_reserve_sprint,
.sscan = json_reserve_sscan,
.size = 0
},
};
static struct plugin p;
REGISTER_PLUGIN(&p);
__attribute__((constructor(110))) static void UNIQUE(__ctor)() {
if (plugins.state == STATE_DESTROYED)
vlist_init(&plugins);
p.name = "json.reserve";
p.description = "RESERVE JSON format";
p.type = PLUGIN_TYPE_FORMAT;
p.format.print = json_reserve_print;
p.format.scan = json_reserve_scan;
p.format.sprint = json_reserve_sprint;
p.format.sscan = json_reserve_sscan;
p.format.size = 0;
vlist_push(&plugins, &p);
}
__attribute__((destructor(110))) static void UNIQUE(__dtor)() {
if (plugins.state != STATE_DESTROYED)
vlist_remove_all(&plugins, &p);
}

View file

@ -106,23 +106,25 @@ int msg_to_sample(struct msg *msg, struct sample *smp, struct vlist *signals)
return 0;
}
int msg_from_sample(struct msg *msg, struct sample *smp, struct vlist *signals)
int msg_from_sample(struct msg *msg_in, struct sample *smp, struct vlist *signals)
{
*msg = MSG_INIT(smp->length, smp->sequence);
msg->ts.sec = smp->ts.origin.tv_sec;
msg->ts.nsec = smp->ts.origin.tv_nsec;
msg_in->type = MSG_TYPE_DATA;
msg_in->version = MSG_VERSION;
msg_in->length = (uint16_t) smp->length;
msg_in->sequence = (uint32_t) smp->sequence;
msg_in->ts.sec = smp->ts.origin.tv_sec;
msg_in->ts.nsec = smp->ts.origin.tv_nsec;
for (unsigned i = 0; i < smp->length; i++) {
struct signal *sig = (struct signal *) vlist_at(signals, i);
switch (sig->type) {
case SIGNAL_TYPE_FLOAT:
msg->data[i].f = smp->data[i].f;
msg_in->data[i].f = smp->data[i].f;
break;
case SIGNAL_TYPE_INTEGER:
msg->data[i].i = smp->data[i].i;
msg_in->data[i].i = smp->data[i].i;
break;
default:

View file

@ -128,14 +128,14 @@ int protobuf_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, struct
goto out;
villas__node__message__pack(pb_msg, (uint8_t *) buf);
villas__node__message__free_unpacked(pb_msg, nullptr);
villas__node__message__free_unpacked(pb_msg, NULL);
*wbytes = psz;
return cnt;
out:
villas__node__message__free_unpacked(pb_msg, nullptr);
villas__node__message__free_unpacked(pb_msg, NULL);
return -1;
}
@ -145,7 +145,7 @@ int protobuf_sscan(struct io *io, const char *buf, size_t len, size_t *rbytes, s
unsigned i, j;
Villas__Node__Message *pb_msg;
pb_msg = villas__node__message__unpack(nullptr, len, (uint8_t *) buf);
pb_msg = villas__node__message__unpack(NULL, len, (uint8_t *) buf);
if (!pb_msg)
return -1;
@ -216,20 +216,30 @@ int protobuf_sscan(struct io *io, const char *buf, size_t len, size_t *rbytes, s
if (rbytes)
*rbytes = villas__node__message__get_packed_size(pb_msg);
villas__node__message__free_unpacked(pb_msg, nullptr);
villas__node__message__free_unpacked(pb_msg, NULL);
return i;
}
static struct plugin p = {
.name = "protobuf",
.description = "Google Protobuf",
.type = PLUGIN_TYPE_FORMAT,
.format = {
.sprint = protobuf_sprint,
.sscan = protobuf_sscan,
.flags = IO_HAS_BINARY_PAYLOAD |
SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_DATA
}
};
REGISTER_PLUGIN(&p);
static struct plugin p;
__attribute__((constructor(110))) static void UNIQUE(__ctor)() {
if (plugins.state == STATE_DESTROYED)
vlist_init(&plugins);
p.name = "protobuf";
p.description = "Google Protobuf";
p.type = PLUGIN_TYPE_FORMAT;
p.format.sprint = protobuf_sprint;
p.format.sscan = protobuf_sscan;
p.format.flags = IO_HAS_BINARY_PAYLOAD |
SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_DATA;
vlist_push(&plugins, &p);
}
__attribute__((destructor(110))) static void UNIQUE(__dtor)() {
if (plugins.state != STATE_DESTROYED)
vlist_remove_all(&plugins, &p);
}

View file

@ -408,20 +408,27 @@ int raw_sscan(struct io *io, const char *buf, size_t len, size_t *rbytes, struct
return 1;
}
#define REGISTER_FORMAT_RAW(i, n, d, f) \
static struct plugin i = { \
.name = n, \
.description = d, \
.type = PLUGIN_TYPE_FORMAT, \
.format = { \
.sprint = raw_sprint, \
.sscan = raw_sscan, \
.flags = f | IO_HAS_BINARY_PAYLOAD |\
SAMPLE_HAS_DATA \
} \
}; \
REGISTER_PLUGIN(& i);
#define REGISTER_FORMAT_RAW(i, n, d, f) \
static struct plugin i; \
__attribute__((constructor(110))) static void UNIQUE(__ctor)() { \
if (plugins.state == STATE_DESTROYED) \
vlist_init(&plugins); \
\
i.name = n; \
i.description = d; \
i.type = PLUGIN_TYPE_FORMAT; \
i.format.sprint = raw_sprint; \
i.format.sscan = raw_sscan; \
i.format.flags = f | IO_HAS_BINARY_PAYLOAD | \
SAMPLE_HAS_DATA; \
\
vlist_push(&plugins, &i); \
} \
\
__attribute__((destructor(110))) static void UNIQUE(__dtor)() { \
if (plugins.state != STATE_DESTROYED) \
vlist_remove_all(&plugins, &i); \
}
/* Feel free to add additional format identifiers here to suit your needs */
REGISTER_FORMAT_RAW(p_8, "raw.8", "Raw 8 bit", RAW_BITS_8)
REGISTER_FORMAT_RAW(p_16be, "raw.16.be", "Raw 16 bit, big endian byte-order", RAW_BITS_16 | RAW_BIG_ENDIAN)

View file

@ -118,32 +118,51 @@ int villas_binary_sscan(struct io *io, const char *buf, size_t len, size_t *rbyt
return i;
}
static struct plugin p1 = {
.name = "villas.binary",
.description = "VILLAS binary network format",
.type = PLUGIN_TYPE_FORMAT,
.format = {
.sprint = villas_binary_sprint,
.sscan = villas_binary_sscan,
.size = 0,
.flags = IO_HAS_BINARY_PAYLOAD |
SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_DATA
},
};
static struct plugin p1;
__attribute__((constructor(110))) static void UNIQUE(__ctor)() {
if (plugins.state == STATE_DESTROYED)
vlist_init(&plugins);
p1.name = "villas.binary";
p1.description = "VILLAS binary network format";
p1.type = PLUGIN_TYPE_FORMAT;
p1.format.sprint = villas_binary_sprint;
p1.format.sscan = villas_binary_sscan;
p1.format.size = 0;
p1.format.flags = IO_HAS_BINARY_PAYLOAD |
SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_DATA;
vlist_push(&plugins, &p1);
}
__attribute__((destructor(110))) static void UNIQUE(__dtor)() {
if (plugins.state != STATE_DESTROYED)
vlist_remove_all(&plugins, &p1);
}
/** The WebSocket node-type usually uses little endian byte order intead of network byte order */
static struct plugin p2 = {
.name = "villas.web",
.description = "VILLAS binary network format for WebSockets",
.type = PLUGIN_TYPE_FORMAT,
.format = {
.sprint = villas_binary_sprint,
.sscan = villas_binary_sscan,
.size = 0,
.flags = IO_HAS_BINARY_PAYLOAD | VILLAS_BINARY_WEB |
SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_DATA
},
};
static struct plugin p2;
__attribute__((constructor(110))) static void UNIQUE(__ctor)() {
if (plugins.state == STATE_DESTROYED)
vlist_init(&plugins);
p2.name = "villas.web";
p2.description = "VILLAS binary network format for WebSockets";
p2.type = PLUGIN_TYPE_FORMAT;
p2.format.sprint = villas_binary_sprint;
p2.format.sscan = villas_binary_sscan;
p2.format.size = 0;
p2.format.flags = IO_HAS_BINARY_PAYLOAD | VILLAS_BINARY_WEB |
SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_DATA;
vlist_push(&plugins, &p2);
}
__attribute__((destructor(110))) static void UNIQUE(__dtor)() {
if (plugins.state != STATE_DESTROYED)
vlist_remove_all(&plugins, &p2);
}
REGISTER_PLUGIN(&p1);
REGISTER_PLUGIN(&p2);

View file

@ -224,19 +224,26 @@ void villas_human_header(struct io *io, const struct sample *smp)
fprintf(f, "%c", io->delimiter);
}
static struct plugin p = {
.name = "villas.human",
.description = "VILLAS human readable format",
.type = PLUGIN_TYPE_FORMAT,
.format = {
.header = villas_human_header,
.sprint = villas_human_sprint,
.sscan = villas_human_sscan,
.size = 0,
.flags = IO_NEWLINES | SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_DATA,
.delimiter = '\n',
.separator = '\t'
}
};
static struct plugin p;
__attribute__((constructor(110))) static void UNIQUE(__ctor)() {
if (plugins.state == STATE_DESTROYED)
vlist_init(&plugins);
REGISTER_PLUGIN(&p);
p.name = "villas.human";
p.description = "VILLAS human readable format";
p.type = PLUGIN_TYPE_FORMAT;
p.format.header = villas_human_header;
p.format.sprint = villas_human_sprint;
p.format.sscan = villas_human_sscan;
p.format.size = 0;
p.format.flags = IO_NEWLINES | SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_DATA;
p.format.delimiter = '\n';
p.format.separator = '\t';
vlist_push(&plugins, &p);
}
__attribute__((destructor(110))) static void UNIQUE(__dtor)() {
if (plugins.state != STATE_DESTROYED)
vlist_remove_all(&plugins, &p);
}

View file

@ -91,8 +91,8 @@ int memory_lock(size_t lock)
debug(LOG_MEM | 2, "Increased ressource limit of locked memory to %zd bytes", lock);
}
#endif /* __arm__ */
out:
#endif /* __arm__ */
#ifdef _POSIX_MEMLOCK
/* Lock all current and future memory allocations */
ret = mlockall(MCL_CURRENT | MCL_FUTURE);

View file

@ -57,9 +57,14 @@ int queue_init(struct queue *q, size_t size, struct memory_type *m)
for (size_t i = 0; i != size; i += 1)
std::atomic_store_explicit(&buffer[i].sequence, i, std::memory_order_relaxed);
#ifndef __arm__
std::atomic_store_explicit(&q->tail, 0ul, std::memory_order_relaxed);
std::atomic_store_explicit(&q->head, 0ul, std::memory_order_relaxed);
#else
std::atomic_store_explicit(&q->tail, 0u, std::memory_order_relaxed);
std::atomic_store_explicit(&q->head, 0u, std::memory_order_relaxed);
#endif
q->state = STATE_INITIALIZED;
return 0;