diff --git a/lib/nodes/mqtt.cpp b/lib/nodes/mqtt.cpp index 125d6fc2d..9b27133b5 100644 --- a/lib/nodes/mqtt.cpp +++ b/lib/nodes/mqtt.cpp @@ -28,6 +28,7 @@ #include #include +using namespace villas; using namespace villas::utils; // Each process has a list of clients for which a thread invokes the mosquitto loop @@ -170,6 +171,41 @@ int mqtt_reverse(struct vnode *n) return 0; } +int mqtt_init(struct vnode *n) +{ + int ret; + struct mqtt *m = (struct mqtt *) n->_vd; + + m->client = mosquitto_new(n->name, 0, (void *) n); + if (!m->client) + return -1; + + ret = mosquitto_threaded_set(m->client, true); + if (ret) + goto mosquitto_error; + + mosquitto_log_callback_set(m->client, mqtt_log_cb); + mosquitto_connect_callback_set(m->client, mqtt_connect_cb); + mosquitto_disconnect_callback_set(m->client, mqtt_disconnect_cb); + mosquitto_message_callback_set(m->client, mqtt_message_cb); + mosquitto_subscribe_callback_set(m->client, mqtt_subscribe_cb); + + /* Default values */ + m->port = 1883; + m->qos = 0; + m->retain = 0; + m->keepalive = 1; /* 1 second */ + m->ssl.enabled = 0; + m->ssl.insecure = 0; + + return 0; + +mosquitto_error: + warning("MQTT: %s", mosquitto_strerror(ret)); + + return ret; +} + int mqtt_parse(struct vnode *n, json_t *cfg) { int ret; @@ -182,14 +218,6 @@ int mqtt_parse(struct vnode *n, json_t *cfg) const char *username = nullptr; const char *password = nullptr; - /* Default values */ - m->port = 1883; - m->qos = 0; - m->retain = 0; - m->keepalive = 1; /* 1 second */ - m->ssl.enabled = 0; - m->ssl.insecure = 0; - json_error_t err; json_t *json_ssl = nullptr; @@ -209,7 +237,7 @@ int mqtt_parse(struct vnode *n, json_t *cfg) "ssl", &json_ssl ); if (ret) - jerror(&err, "Failed to parse configuration of node %s", node_name(n)); + throw ConfigError(cfg, err, "node-config-node-mqtt", "Failed to parse configuration of node {}", node_name(n)); m->host = strdup(host); m->publish = publish ? strdup(publish) : nullptr; @@ -218,7 +246,7 @@ int mqtt_parse(struct vnode *n, json_t *cfg) m->password = password ? strdup(password) : nullptr; if (!m->publish && !m->subscribe) - error("At least one topic has to be specified for node %s", node_name(n)); + throw ConfigError(cfg, "node-config-node-mqtt", "At least one topic has to be specified for node {}", node_name(n)); if (json_ssl) { const char *cafile = nullptr; @@ -226,7 +254,7 @@ int mqtt_parse(struct vnode *n, json_t *cfg) const char *certfile = nullptr; const char *keyfile = nullptr; - ret = json_unpack_ex(cfg, &err, 0, "{ s?: b, s?: b, s?: s, s?: s, s?: s, s?: s }", + ret = json_unpack_ex(json_ssl, &err, 0, "{ s?: b, s?: b, s?: s, s?: s, s?: s, s?: s }", "enabled", &m->ssl.enabled, "insecure", &m->ssl.insecure, "cafile", &cafile, @@ -235,10 +263,10 @@ int mqtt_parse(struct vnode *n, json_t *cfg) "keyfile", &keyfile ); if (ret) - jerror(&err, "Failed to parse SSL configuration of node %s", node_name(n)); + throw ConfigError(json_ssl, err, "node-config-node-mqtt-ssl", "Failed to parse SSL configuration of node {}", node_name(n)); if (m->ssl.enabled && !cafile && !capath) - error("Either 'ssl.cafile' or 'ssl.capath' settings must be set for node %s.", node_name(n)); + throw ConfigError(json_ssl, "node-config-node-mqtt-ssl", "Either 'ssl.cafile' or 'ssl.capath' settings must be set for node {}.", node_name(n)); m->ssl.cafile = cafile ? strdup(cafile) : nullptr; m->ssl.capath = capath ? strdup(capath) : nullptr; @@ -248,7 +276,7 @@ int mqtt_parse(struct vnode *n, json_t *cfg) m->format = format_type_lookup(format); if (!m->format) - error("Invalid format '%s' for node %s", format, node_name(n)); + throw ConfigError(json_ssl, "node-config-node-mqtt-format", "Invalid format '{}' for node {}", format, node_name(n)); return 0; } @@ -258,6 +286,10 @@ int mqtt_check(struct vnode *n) int ret; struct mqtt *m = (struct mqtt *) n->_vd; + ret = io_check(&m->io); + if (ret) + return ret; + ret = mosquitto_sub_topic_check(m->subscribe); if (ret != MOSQ_ERR_SUCCESS) error("Invalid subscribe topic: '%s' for node %s: %s", m->subscribe, node_name(n), mosquitto_strerror(ret)); @@ -275,10 +307,10 @@ char * mqtt_print(struct vnode *n) char *buf = nullptr; - strcatf(&buf, "format=%s, host=%s, port=%d, keepalive=%s, ssl=%s", format_type_name(m->format), + strcatf(&buf, "format=%s, host=%s, port=%d, keepalive=%d, ssl=%s", format_type_name(m->format), m->host, m->port, - m->keepalive ? "yes" : "no", + m->keepalive, m->ssl.enabled ? "yes" : "no" ); @@ -302,6 +334,10 @@ int mqtt_destroy(struct vnode *n) mosquitto_destroy(m->client); + ret = io_destroy(&m->io); + if (ret) + return ret; + ret = pool_destroy(&m->pool); if (ret) return ret; @@ -329,40 +365,26 @@ int mqtt_start(struct vnode *n) int ret; struct mqtt *m = (struct mqtt *) n->_vd; - m->client = mosquitto_new(n->name, 0, (void *) n); - if (!m->client) - return -1; - if (m->username && m->password) { ret = mosquitto_username_pw_set(m->client, m->username, m->password); - if (ret) + if (ret != MOSQ_ERR_SUCCESS) goto mosquitto_error; } if (m->ssl.enabled) { ret = mosquitto_tls_set(m->client, m->ssl.cafile, m->ssl.capath, m->ssl.certfile, m->ssl.keyfile, nullptr); - if (ret) + if (ret != MOSQ_ERR_SUCCESS) goto mosquitto_error; ret = mosquitto_tls_insecure_set(m->client, m->ssl.insecure); - if (ret) + if (ret != MOSQ_ERR_SUCCESS) goto mosquitto_error; } - mosquitto_log_callback_set(m->client, mqtt_log_cb); - mosquitto_connect_callback_set(m->client, mqtt_connect_cb); - mosquitto_disconnect_callback_set(m->client, mqtt_disconnect_cb); - mosquitto_message_callback_set(m->client, mqtt_message_cb); - mosquitto_subscribe_callback_set(m->client, mqtt_subscribe_cb); - ret = io_init(&m->io, m->format, &n->in.signals, (int) SampleFlags::HAS_ALL & ~(int) SampleFlags::HAS_OFFSET); if (ret) return ret; - ret = io_check(&m->io); - if (ret) - return ret; - ret = pool_init(&m->pool, 1024, SAMPLE_LENGTH(vlist_length(&n->in.signals))); if (ret) return ret; @@ -372,7 +394,7 @@ int mqtt_start(struct vnode *n) return ret; ret = mosquitto_connect(m->client, m->host, m->port, m->keepalive); - if (ret) + if (ret != MOSQ_ERR_SUCCESS) goto mosquitto_error; // Add client to global list of MQTT clients @@ -398,13 +420,9 @@ int mqtt_stop(struct vnode *n) vlist_remove(&clients, vlist_index(&clients, n)); ret = mosquitto_disconnect(m->client); - if (ret) + if (ret != MOSQ_ERR_SUCCESS) goto mosquitto_error; - ret = io_destroy(&m->io); - if (ret) - return ret; - return 0; mosquitto_error: @@ -422,7 +440,7 @@ int mqtt_type_start(villas::node::SuperNode *sn) return ret; ret = mosquitto_lib_init(); - if (ret) + if (ret != MOSQ_ERR_SUCCESS) goto mosquitto_error; // Start thread here to run mosquitto loop for registered clients @@ -453,13 +471,13 @@ int mqtt_type_stop() return ret; ret = mosquitto_lib_cleanup(); - if (ret) + if (ret != MOSQ_ERR_SUCCESS) goto mosquitto_error; // When this is called the list of clients should be empty - if (vlist_length(&clients) > 0) + if (vlist_length(&clients) > 0) error("List of MQTT clients contains elements at time of destruction. Call node_stop for each MQTT node before stopping node type!"); - + ret = vlist_destroy(&clients, nullptr, false); if (ret) return ret; @@ -537,6 +555,8 @@ static void register_plugin() { p.node.parse = mqtt_parse; p.node.check = mqtt_check; p.node.print = mqtt_print; + p.node.init = mqtt_init; + p.node.destroy = mqtt_destroy; p.node.start = mqtt_start; p.node.stop = mqtt_stop; p.node.read = mqtt_read;