1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

mqtt: a bunch of fixes and updates for the MQTT node-type

This commit is contained in:
Steffen Vogel 2020-09-30 11:22:59 +02:00
parent 048561417c
commit ffa57f5e7a

View file

@ -28,6 +28,7 @@
#include <villas/utils.hpp>
#include <villas/format_type.h>
using namespace villas;
using namespace villas::utils;
// Each process has a list of clients for which a thread invokes the mosquitto loop
@ -170,6 +171,41 @@ int mqtt_reverse(struct vnode *n)
return 0;
}
int mqtt_init(struct vnode *n)
{
int ret;
struct mqtt *m = (struct mqtt *) n->_vd;
m->client = mosquitto_new(n->name, 0, (void *) n);
if (!m->client)
return -1;
ret = mosquitto_threaded_set(m->client, true);
if (ret)
goto mosquitto_error;
mosquitto_log_callback_set(m->client, mqtt_log_cb);
mosquitto_connect_callback_set(m->client, mqtt_connect_cb);
mosquitto_disconnect_callback_set(m->client, mqtt_disconnect_cb);
mosquitto_message_callback_set(m->client, mqtt_message_cb);
mosquitto_subscribe_callback_set(m->client, mqtt_subscribe_cb);
/* Default values */
m->port = 1883;
m->qos = 0;
m->retain = 0;
m->keepalive = 1; /* 1 second */
m->ssl.enabled = 0;
m->ssl.insecure = 0;
return 0;
mosquitto_error:
warning("MQTT: %s", mosquitto_strerror(ret));
return ret;
}
int mqtt_parse(struct vnode *n, json_t *cfg)
{
int ret;
@ -182,14 +218,6 @@ int mqtt_parse(struct vnode *n, json_t *cfg)
const char *username = nullptr;
const char *password = nullptr;
/* Default values */
m->port = 1883;
m->qos = 0;
m->retain = 0;
m->keepalive = 1; /* 1 second */
m->ssl.enabled = 0;
m->ssl.insecure = 0;
json_error_t err;
json_t *json_ssl = nullptr;
@ -209,7 +237,7 @@ int mqtt_parse(struct vnode *n, json_t *cfg)
"ssl", &json_ssl
);
if (ret)
jerror(&err, "Failed to parse configuration of node %s", node_name(n));
throw ConfigError(cfg, err, "node-config-node-mqtt", "Failed to parse configuration of node {}", node_name(n));
m->host = strdup(host);
m->publish = publish ? strdup(publish) : nullptr;
@ -218,7 +246,7 @@ int mqtt_parse(struct vnode *n, json_t *cfg)
m->password = password ? strdup(password) : nullptr;
if (!m->publish && !m->subscribe)
error("At least one topic has to be specified for node %s", node_name(n));
throw ConfigError(cfg, "node-config-node-mqtt", "At least one topic has to be specified for node {}", node_name(n));
if (json_ssl) {
const char *cafile = nullptr;
@ -226,7 +254,7 @@ int mqtt_parse(struct vnode *n, json_t *cfg)
const char *certfile = nullptr;
const char *keyfile = nullptr;
ret = json_unpack_ex(cfg, &err, 0, "{ s?: b, s?: b, s?: s, s?: s, s?: s, s?: s }",
ret = json_unpack_ex(json_ssl, &err, 0, "{ s?: b, s?: b, s?: s, s?: s, s?: s, s?: s }",
"enabled", &m->ssl.enabled,
"insecure", &m->ssl.insecure,
"cafile", &cafile,
@ -235,10 +263,10 @@ int mqtt_parse(struct vnode *n, json_t *cfg)
"keyfile", &keyfile
);
if (ret)
jerror(&err, "Failed to parse SSL configuration of node %s", node_name(n));
throw ConfigError(json_ssl, err, "node-config-node-mqtt-ssl", "Failed to parse SSL configuration of node {}", node_name(n));
if (m->ssl.enabled && !cafile && !capath)
error("Either 'ssl.cafile' or 'ssl.capath' settings must be set for node %s.", node_name(n));
throw ConfigError(json_ssl, "node-config-node-mqtt-ssl", "Either 'ssl.cafile' or 'ssl.capath' settings must be set for node {}.", node_name(n));
m->ssl.cafile = cafile ? strdup(cafile) : nullptr;
m->ssl.capath = capath ? strdup(capath) : nullptr;
@ -248,7 +276,7 @@ int mqtt_parse(struct vnode *n, json_t *cfg)
m->format = format_type_lookup(format);
if (!m->format)
error("Invalid format '%s' for node %s", format, node_name(n));
throw ConfigError(json_ssl, "node-config-node-mqtt-format", "Invalid format '{}' for node {}", format, node_name(n));
return 0;
}
@ -258,6 +286,10 @@ int mqtt_check(struct vnode *n)
int ret;
struct mqtt *m = (struct mqtt *) n->_vd;
ret = io_check(&m->io);
if (ret)
return ret;
ret = mosquitto_sub_topic_check(m->subscribe);
if (ret != MOSQ_ERR_SUCCESS)
error("Invalid subscribe topic: '%s' for node %s: %s", m->subscribe, node_name(n), mosquitto_strerror(ret));
@ -275,10 +307,10 @@ char * mqtt_print(struct vnode *n)
char *buf = nullptr;
strcatf(&buf, "format=%s, host=%s, port=%d, keepalive=%s, ssl=%s", format_type_name(m->format),
strcatf(&buf, "format=%s, host=%s, port=%d, keepalive=%d, ssl=%s", format_type_name(m->format),
m->host,
m->port,
m->keepalive ? "yes" : "no",
m->keepalive,
m->ssl.enabled ? "yes" : "no"
);
@ -302,6 +334,10 @@ int mqtt_destroy(struct vnode *n)
mosquitto_destroy(m->client);
ret = io_destroy(&m->io);
if (ret)
return ret;
ret = pool_destroy(&m->pool);
if (ret)
return ret;
@ -329,40 +365,26 @@ int mqtt_start(struct vnode *n)
int ret;
struct mqtt *m = (struct mqtt *) n->_vd;
m->client = mosquitto_new(n->name, 0, (void *) n);
if (!m->client)
return -1;
if (m->username && m->password) {
ret = mosquitto_username_pw_set(m->client, m->username, m->password);
if (ret)
if (ret != MOSQ_ERR_SUCCESS)
goto mosquitto_error;
}
if (m->ssl.enabled) {
ret = mosquitto_tls_set(m->client, m->ssl.cafile, m->ssl.capath, m->ssl.certfile, m->ssl.keyfile, nullptr);
if (ret)
if (ret != MOSQ_ERR_SUCCESS)
goto mosquitto_error;
ret = mosquitto_tls_insecure_set(m->client, m->ssl.insecure);
if (ret)
if (ret != MOSQ_ERR_SUCCESS)
goto mosquitto_error;
}
mosquitto_log_callback_set(m->client, mqtt_log_cb);
mosquitto_connect_callback_set(m->client, mqtt_connect_cb);
mosquitto_disconnect_callback_set(m->client, mqtt_disconnect_cb);
mosquitto_message_callback_set(m->client, mqtt_message_cb);
mosquitto_subscribe_callback_set(m->client, mqtt_subscribe_cb);
ret = io_init(&m->io, m->format, &n->in.signals, (int) SampleFlags::HAS_ALL & ~(int) SampleFlags::HAS_OFFSET);
if (ret)
return ret;
ret = io_check(&m->io);
if (ret)
return ret;
ret = pool_init(&m->pool, 1024, SAMPLE_LENGTH(vlist_length(&n->in.signals)));
if (ret)
return ret;
@ -372,7 +394,7 @@ int mqtt_start(struct vnode *n)
return ret;
ret = mosquitto_connect(m->client, m->host, m->port, m->keepalive);
if (ret)
if (ret != MOSQ_ERR_SUCCESS)
goto mosquitto_error;
// Add client to global list of MQTT clients
@ -398,13 +420,9 @@ int mqtt_stop(struct vnode *n)
vlist_remove(&clients, vlist_index(&clients, n));
ret = mosquitto_disconnect(m->client);
if (ret)
if (ret != MOSQ_ERR_SUCCESS)
goto mosquitto_error;
ret = io_destroy(&m->io);
if (ret)
return ret;
return 0;
mosquitto_error:
@ -422,7 +440,7 @@ int mqtt_type_start(villas::node::SuperNode *sn)
return ret;
ret = mosquitto_lib_init();
if (ret)
if (ret != MOSQ_ERR_SUCCESS)
goto mosquitto_error;
// Start thread here to run mosquitto loop for registered clients
@ -453,13 +471,13 @@ int mqtt_type_stop()
return ret;
ret = mosquitto_lib_cleanup();
if (ret)
if (ret != MOSQ_ERR_SUCCESS)
goto mosquitto_error;
// When this is called the list of clients should be empty
if (vlist_length(&clients) > 0)
if (vlist_length(&clients) > 0)
error("List of MQTT clients contains elements at time of destruction. Call node_stop for each MQTT node before stopping node type!");
ret = vlist_destroy(&clients, nullptr, false);
if (ret)
return ret;
@ -537,6 +555,8 @@ static void register_plugin() {
p.node.parse = mqtt_parse;
p.node.check = mqtt_check;
p.node.print = mqtt_print;
p.node.init = mqtt_init;
p.node.destroy = mqtt_destroy;
p.node.start = mqtt_start;
p.node.stop = mqtt_stop;
p.node.read = mqtt_read;