mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
Merge branch 'mqtt_unthreaded' into 'master'
Unthreaded MQTT Closes #248 See merge request acs/public/villas/VILLASnode!49
This commit is contained in:
commit
b74bf5b7ee
3 changed files with 80 additions and 9 deletions
2
common
2
common
|
@ -1 +1 @@
|
|||
Subproject commit 43fd33c9c77db22b93f65c92a1acb25c141d42a3
|
||||
Subproject commit 793427ac7f06e8a70509264e9688a024fca9a7c9
|
|
@ -54,7 +54,7 @@ int node_init(struct node *n, struct node_type *vt)
|
|||
n->_vt = vt;
|
||||
n->_vd = alloc(vt->size);
|
||||
|
||||
n->stats = nullptr;
|
||||
//n->stats = nullptr;
|
||||
n->name = nullptr;
|
||||
n->_name = nullptr;
|
||||
n->_name_long = nullptr;
|
||||
|
|
|
@ -30,6 +30,47 @@
|
|||
|
||||
using namespace villas::utils;
|
||||
|
||||
// Each process has a list of clients for which a thread invokes the mosquitto loop
|
||||
static struct vlist clients;
|
||||
static pthread_t thread;
|
||||
|
||||
static void * mosquitto_loop_thread(void *ctx)
|
||||
{
|
||||
int ret;
|
||||
|
||||
// Set the cancel type of this thread to async
|
||||
ret = pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr);
|
||||
if (ret != 0) {
|
||||
error("Unable to set cancel type of MQTT communication thread to asynchronous.");
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
while (true) {
|
||||
for (unsigned i = 0; i < vlist_length(&clients); i++) {
|
||||
struct node *node = (struct node *) vlist_at(&clients, i);
|
||||
struct mqtt *m = (struct mqtt *) node->_vd;
|
||||
|
||||
// Execute mosquitto loop for this client
|
||||
ret = mosquitto_loop(m->client, 0, 1);
|
||||
if (ret) {
|
||||
warning("MQTT: connection error for node %s: %s, attempting reconnect", node_name(node), mosquitto_strerror(ret));
|
||||
|
||||
ret = mosquitto_reconnect(m->client);
|
||||
if (ret != MOSQ_ERR_SUCCESS)
|
||||
error("MQTT: reconnection to broker failed for node %s: %s", node_name(node), mosquitto_strerror(ret));
|
||||
else
|
||||
warning("MQTT: successfully reconnected to broker for node %s: %s", node_name(node), mosquitto_strerror(ret));
|
||||
|
||||
ret = mosquitto_loop(m->client, -1, 1);
|
||||
if (ret != MOSQ_ERR_SUCCESS)
|
||||
error("MQTT: persisting connection error for node %s: %s", node_name(node), mosquitto_strerror(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
static void mqtt_log_cb(struct mosquitto *mosq, void *userdata, int level, const char *str)
|
||||
{
|
||||
switch (level) {
|
||||
|
@ -100,6 +141,7 @@ static void mqtt_message_cb(struct mosquitto *mosq, void *userdata, const struct
|
|||
warning(" Payload: %s", (char *) msg->payload);
|
||||
return;
|
||||
}
|
||||
|
||||
if (ret == 0) {
|
||||
debug(4, "MQTT: skip empty message for node %s", node_name(n));
|
||||
sample_decref_many(smps, n->in.vectorize);
|
||||
|
@ -331,9 +373,9 @@ int mqtt_start(struct node *n)
|
|||
if (ret)
|
||||
goto mosquitto_error;
|
||||
|
||||
ret = mosquitto_loop_start(m->client);
|
||||
if (ret)
|
||||
goto mosquitto_error;
|
||||
// Add client to global list of MQTT clients
|
||||
// so that thread can call mosquitto loop for this client
|
||||
vlist_push(&clients, n);
|
||||
|
||||
return 0;
|
||||
|
||||
|
@ -348,11 +390,12 @@ int mqtt_stop(struct node *n)
|
|||
int ret;
|
||||
struct mqtt *m = (struct mqtt *) n->_vd;
|
||||
|
||||
ret = mosquitto_disconnect(m->client);
|
||||
if (ret)
|
||||
goto mosquitto_error;
|
||||
// Unregister client from global MQTT client list
|
||||
// so that mosquitto loop is no longer invoked for this client
|
||||
// important to do that before disconnecting from broker, otherwise, mosquitto thread will attempt to reconnect
|
||||
vlist_remove(&clients, vlist_index(&clients, n));
|
||||
|
||||
ret = mosquitto_loop_stop(m->client, 0);
|
||||
ret = mosquitto_disconnect(m->client);
|
||||
if (ret)
|
||||
goto mosquitto_error;
|
||||
|
||||
|
@ -372,10 +415,21 @@ int mqtt_type_start(villas::node::SuperNode *sn)
|
|||
{
|
||||
int ret;
|
||||
|
||||
ret = vlist_init(&clients);
|
||||
if (ret) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
ret = mosquitto_lib_init();
|
||||
if (ret)
|
||||
goto mosquitto_error;
|
||||
|
||||
// Start thread here to run mosquitto loop for registered clients
|
||||
ret = pthread_create(&thread, nullptr, mosquitto_loop_thread, nullptr);
|
||||
if (ret) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
||||
mosquitto_error:
|
||||
|
@ -388,10 +442,27 @@ int mqtt_type_stop()
|
|||
{
|
||||
int ret;
|
||||
|
||||
// Stop thread here that executes mosquitto loop
|
||||
ret = pthread_cancel(thread);
|
||||
if (ret)
|
||||
return ret;
|
||||
debug( 3, "Called pthread_cancel() on MQTT communication management thread.");
|
||||
|
||||
ret = pthread_join(thread, nullptr);
|
||||
if (ret) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
ret = mosquitto_lib_cleanup();
|
||||
if (ret)
|
||||
goto mosquitto_error;
|
||||
|
||||
// When this is called the list of clients should be empty
|
||||
if (vlist_length(&clients) > 0) {
|
||||
error("List of MQTT clients contains elements at time of destruction. Call node_stop for each MQTT node before stopping node type!");
|
||||
}
|
||||
vlist_destroy(&clients, nullptr, false);
|
||||
|
||||
return 0;
|
||||
|
||||
mosquitto_error:
|
||||
|
|
Loading…
Add table
Reference in a new issue