1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

Merge branch 'develop' of git.rwth-aachen.de:acs/public/villas/VILLASnode into develop

This commit is contained in:
Steffen Vogel 2018-06-04 22:24:50 +02:00
commit afd7561966
6 changed files with 110 additions and 181 deletions

View file

@ -32,6 +32,7 @@ struct node;
struct signal {
char *name; /**< The name of the signal. */
char *unit;
int enabled;
enum {
SIGNAL_FORMAT_INTEGER,
SIGNAL_FORMAT_REAL

View file

@ -31,6 +31,8 @@
#include <villas/io.h>
#include <villas/formats/json.h>
#define JSON_RESERVE_INTEGER_TARGET 1
static int json_reserve_pack_sample(struct io *io, json_t **j, struct sample *smp)
{
json_error_t err;
@ -39,7 +41,7 @@ static int json_reserve_pack_sample(struct io *io, json_t **j, struct sample *sm
struct signal *sig;
if (smp->flags & SAMPLE_HAS_ORIGIN)
json_created = json_real(time_to_double(&smp->ts.origin));
json_created = json_integer(time_to_double(&smp->ts.origin) * 1e3);
if (smp->flags & SAMPLE_HAS_SEQUENCE)
json_sequence = json_integer(smp->sequence);
@ -53,6 +55,9 @@ static int json_reserve_pack_sample(struct io *io, json_t **j, struct sample *sm
sig = NULL;
if (sig) {
if (!sig->enabled)
continue;
json_name = json_string(sig->name);
json_unit = json_string(sig->unit);
}
@ -92,10 +97,26 @@ static int json_reserve_pack_sample(struct io *io, json_t **j, struct sample *sm
if (*j == NULL)
return -1;
#ifdef JSON_RESERVE_INTEGER_TARGET
if (io->output.node) {
char *endptr;
char *id_str = strrchr(io->output.node->name, '_');
if (!id_str)
return -1;
int id = strtoul(id_str+1, &endptr, 10);
if (endptr[0] != 0)
return -1;
json_object_set_new(*j, "target", json_integer(id));
}
#else
if (io->output.node)
json_object_set_new(*j, "target", json_string(io->output.node->name));
//if (smp->source)
// json_object_set_new(*j, "origin", json_string(smp->source->name));
if (smp->source)
json_object_set_new(*j, "origin", json_string(smp->source->name));
#endif
return 0;
}
@ -106,23 +127,45 @@ static int json_reserve_unpack_sample(struct io *io, json_t *json_smp, struct sa
double created = -1;
json_error_t err;
json_t *json_value, *json_data = NULL;
json_t *json_origin = NULL, *json_target = NULL;
size_t i;
const char *origin = NULL, *target = NULL;
ret = json_unpack_ex(json_smp, &err, 0, "{ s?: s, s?: s, s?: o, s?: o }",
"origin", &origin,
"target", &target,
ret = json_unpack_ex(json_smp, &err, 0, "{ s?: o, s?: o, s?: o, s?: o }",
"origin", &json_origin,
"target", &json_target,
"measurements", &json_data,
"setpoints", &json_data
);
if (ret)
return -1;
if (target && io->input.node) {
if (strcmp(target, io->input.node->name))
#ifdef JSON_RESERVE_INTEGER_TARGET
if (json_target && io->input.node) {
if (!json_is_integer(json_target))
return -1;
char *endptr;
char *id_str = strrchr(io->input.node->name, '_');
if (!id_str)
return -1;
int id = strtoul(id_str+1, &endptr, 10);
if (endptr[0] != 0)
return -1;
if (id != json_integer_value(json_target))
return 0;
}
#else
if (json_target && io->input.node) {
const char *target = json_string_value(json_target);
if (!target)
return -1;
if (strcmp(target, io->input.node->name))
return 0;
}
#endif
if (!json_data || !json_is_array(json_data))
return -1;
@ -134,7 +177,7 @@ static int json_reserve_unpack_sample(struct io *io, json_t *json_smp, struct sa
const char *name, *unit = NULL;
double value;
ret = json_unpack_ex(json_value, &err, 0, "{ s: s, s?: s, s: f, s?: f }",
ret = json_unpack_ex(json_value, &err, 0, "{ s: s, s?: s, s: F, s?: F }",
"name", &name,
"unit", &unit,
"value", &value,
@ -143,8 +186,16 @@ static int json_reserve_unpack_sample(struct io *io, json_t *json_smp, struct sa
if (ret)
return -1;
idx = list_lookup_index(io->input.signals, name);
if (idx < 0) {
struct signal *sig;
sig = (struct signal *) list_lookup(io->input.signals, name);
if (sig) {
if (!sig->enabled)
continue;
idx = list_index(io->input.signals, sig);
}
else {
ret = sscanf(name, "signal_%d", &idx);
if (ret != 1)
continue;
@ -162,7 +213,7 @@ static int json_reserve_unpack_sample(struct io *io, json_t *json_smp, struct sa
smp->flags |= SAMPLE_HAS_VALUES;
if (created > 0) {
smp->ts.origin = time_from_double(created);
smp->ts.origin = time_from_double(created * 1e-3);
smp->flags |= SAMPLE_HAS_ORIGIN;
}

View file

@ -137,8 +137,6 @@ int mapping_parse_str(struct mapping_entry *me, const char *str, struct list *no
first_str = strtok(NULL, "-]");
if (first_str) {
info("Mapping: %s", first_str);
if (me->node)
first = list_lookup_index(&me->node->in.signals, first_str);

View file

@ -28,110 +28,6 @@
#include <villas/utils.h>
#include <villas/format_type.h>
//#define MQTT_THREAD 1
#ifdef MQTT_THREAD
#include <pthread.h>
static struct {
pthread_t thread;
pthread_mutex_t mutex;
int length;
struct pollfd *fds;
struct mosquitto **clients;
} polling = {
.length = 0,
.fds = NULL,
.clients = NULL
};
static pthread_t thread;
static int mqtt_register_client(struct mosquitto *mosq)
{
pthread_mutex_lock(&polling.mutex);
/* Add this client to the pollfd list */
int i = polling.length++;
polling.fds = realloc(polling.fds, polling.length * sizeof(struct pollfd));
polling.clients = realloc(polling.clients, polling.length * sizeof(struct mosquitto *));
if (!polling.fds || !polling.clients) {
pthread_mutex_unlock(&polling.mutex);
return -1;
}
polling.clients[i] = mosq;
polling.fds[i].events = POLLIN;
polling.fds[i].fd = mosquitto_socket(mosq);
pthread_mutex_unlock(&polling.mutex);
info("MQTT: registered new client");
return 0;
}
static int mqtt_unregister_client(struct mosquitto *mosq)
{
pthread_mutex_lock(&polling.mutex);
/* Find client */
int i;
for (i = 0; i < polling.length; i++) {
if (polling.clients[i] == mosq)
break;
}
if (i >= polling.length) {
pthread_mutex_unlock(&polling.mutex);
return -1; /* Otherwise something wrong happened! */
}
/* Remove this client to the pollfd list */
memmove(polling.fds + i, polling.fds + i + 1, (polling.length - i - 1) * sizeof(struct pollfd));
memmove(polling.clients + i, polling.clients + i + 1, (polling.length - i - 1) * sizeof(struct mosquitto *));
polling.length--;
pthread_mutex_unlock(&polling.mutex);
info("MQTT: deregistered client");
return 0;
}
static void * mqtt_thread(void *ctx)
{
int ret;
debug(5, "MQTT: started thread");
while (1) {
// pthread_mutex_lock(&polling.mutex);
debug(5, "MQTT: Polling on %d clients", polling.length);
ret = poll(polling.fds, polling.length, -1);
if (ret < 0)
serror("Failed to poll");
for (int i = 0; i < polling.length; i++) {
if (polling.fds[i].revents & POLLIN) {
ret = mosquitto_loop(polling.clients[i], -1, 1);
if (ret)
warn("MQTT: Loop failed for clients %p!", polling.clients[i]);
}
}
// pthread_mutex_unlock(&polling.mutex);
}
return NULL;
}
#endif
static void mqtt_log_cb(struct mosquitto *mosq, void *userdata, int level, const char *str)
{
switch (level) {
@ -160,11 +56,13 @@ static void mqtt_connect_cb(struct mosquitto *mosq, void *userdata, int result)
struct node *n = (struct node *) userdata;
struct mqtt *m = (struct mqtt *) n->_vd;
debug(5, "MQTT: Node %s connected to broker %s", node_name(n), m->host);
int ret;
#ifdef MQTT_THREAD
mqtt_register_client(mosq);
#endif
info("MQTT: Node %s connected to broker %s", node_name(n), m->host);
ret = mosquitto_subscribe(m->client, NULL, m->subscribe, m->qos);
if (ret)
warn("MQTT: failed to subscribe to topic '%s' for node %s", m->subscribe, node_name(n));
}
static void mqtt_disconnect_cb(struct mosquitto *mosq, void *userdata, int result)
@ -172,11 +70,7 @@ static void mqtt_disconnect_cb(struct mosquitto *mosq, void *userdata, int resul
struct node *n = (struct node *) userdata;
struct mqtt *m = (struct mqtt *) n->_vd;
debug(5, "MQTT: Node %s disconnected from broker %s", node_name(n), m->host);
#ifdef MQTT_THREAD
mqtt_unregister_client(mosq);
#endif
info("MQTT: Node %s disconnected from broker %s", node_name(n), m->host);
}
static void mqtt_message_cb(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *msg)
@ -195,8 +89,16 @@ static void mqtt_message_cb(struct mosquitto *mosq, void *userdata, const struct
}
ret = io_sscan(&m->io, msg->payload, msg->payloadlen, NULL, &smp, 1);
if (ret != 1)
if (ret < 0) {
warn("MQTT: Node %s received an invalid message", node_name(n));
warn(" Payload: %s", (char *) msg->payload);
return;
}
if (ret != 1) {
debug(4, "MQTT: skip empty message for node %s", node_name(n));
sample_put(smp);
return;
}
queue_signalled_push(&m->queue, (void *) smp);
}
@ -293,6 +195,15 @@ int mqtt_parse(struct node *n, json_t *cfg)
if (!m->format)
error("Invalid format '%s' for node %s", format, node_name(n));
// Some checks
ret = mosquitto_sub_topic_check(m->subscribe);
if (ret != MOSQ_ERR_SUCCESS)
error("Invalid subscribe topic: '%s' for node %s", m->subscribe, node_name(n));
ret = mosquitto_pub_topic_check(m->publish);
if (ret != MOSQ_ERR_SUCCESS)
error("Invalid publish topic: '%s' for node %s", m->publish, node_name(n));
return 0;
}
@ -302,14 +213,17 @@ char * mqtt_print(struct node *n)
char *buf = NULL;
strcatf(&buf, "format=%s, host=%s, port=%d, username=%s, keepalive=%s, ssl=%s", plugin_name(m->format),
strcatf(&buf, "format=%s, host=%s, port=%d, keepalive=%s, ssl=%s", plugin_name(m->format),
m->host,
m->port,
m->username,
m->keepalive ? "yes" : "no",
m->ssl.enabled ? "yes" : "no"
);
/* Only show if not default */
if (m->username)
strcatf(&buf, ", username=%s", m->username);
if (m->publish)
strcatf(&buf, ", publish=%s", m->publish);
@ -361,12 +275,6 @@ int mqtt_start(struct node *n)
if (!m->client)
return -1;
#ifdef MQTT_THREAD
ret = mosquitto_threaded_set(m->client, 1);
if (ret)
return ret;
#endif
if (m->username && m->password) {
ret = mosquitto_username_pw_set(m->client, m->username, m->password);
if (ret)
@ -401,19 +309,11 @@ int mqtt_start(struct node *n)
if (ret)
return ret;
#ifdef MQTT_THREAD
mqtt_register_client(m->client);
#else
ret = mosquitto_loop_start(m->client);
if (ret)
return ret;
#endif
ret = mosquitto_connect(m->client, m->host, m->port, m->keepalive);
if (ret)
return ret;
ret = mosquitto_subscribe(m->client, NULL, m->subscribe, m->qos);
ret = mosquitto_loop_start(m->client);
if (ret)
return ret;
@ -429,13 +329,9 @@ int mqtt_stop(struct node *n)
if (ret)
return ret;
#ifdef MQTT_THREAD
mqtt_unregister_client(m->client);
#else
ret = mosquitto_loop_stop(m->client, 0);
if (ret)
return ret;
#endif
return 0;
}
@ -448,16 +344,6 @@ int mqtt_init()
if (ret)
return ret;
#ifdef MQTT_THREAD
ret = pthread_mutex_init(&polling.mutex, NULL);
if (ret)
return ret;
ret = pthread_create(&thread, NULL, mqtt_thread, NULL);
if (ret)
return ret;
#endif
return 0;
}
@ -465,20 +351,6 @@ int mqtt_deinit()
{
int ret;
#ifdef MQTT_THREAD
ret = pthread_cancel(thread);
if (ret)
return ret;
ret = pthread_join(thread, NULL);
if (ret)
return ret;
ret = pthread_mutex_destroy(&polling.mutex);
if (ret)
return ret;
#endif
ret = mosquitto_lib_cleanup();
if (ret)
return ret;

View file

@ -42,9 +42,13 @@ int signal_parse(struct signal *s, json_t *cfg)
const char *name;
const char *unit = NULL;
ret = json_unpack_ex(cfg, &err, 0, "{ s: s, s?: s }",
/* Default values */
s->enabled = true;
ret = json_unpack_ex(cfg, &err, 0, "{ s: s, s?: s, s?: b }",
"name", &name,
"unit", &unit
"unit", &unit,
"enabled", &s->enabled
);
if (ret)
return -1;
@ -60,6 +64,7 @@ int signal_parse(struct signal *s, json_t *cfg)
int signal_parse_list(struct list *list, json_t *cfg)
{
int ret;
struct signal *s;
if (!json_is_array(cfg))
return -1;
@ -67,7 +72,9 @@ int signal_parse_list(struct list *list, json_t *cfg)
size_t index;
json_t *json_signal;
json_array_foreach(cfg, index, json_signal) {
struct signal *s = alloc(sizeof(struct signal));
s = alloc(sizeof(struct signal));
if (!s)
return -1;
ret = signal_parse(s, json_signal);
if (ret)

View file

@ -160,10 +160,10 @@ static void * send_loop(void *ctx)
}
sent = node_write(node, smps, scanned);
if (sent < 0) {
if (sent < 0)
warn("Failed to sent samples to node %s: reason=%d", node_name(node), sent);
continue;
}
else if (sent < scanned)
warn("Failed to sent %d out of %d samples to node %s", scanned-sent, scanned, node_name(node));
sample_put_many(smps, ready);