1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

mqtt: adapt to new signal code and separate node-type configuration into in/out sections

This commit is contained in:
Steffen Vogel 2018-08-20 18:26:53 +02:00
parent df94f7fb91
commit 1aef3e7d35

View file

@ -60,15 +60,13 @@ static void mqtt_connect_cb(struct mosquitto *mosq, void *userdata, int result)
info("MQTT: Node %s connected to broker %s", node_name(n), m->host);
if(m->subscribe){
if (m->subscribe) {
ret = mosquitto_subscribe(m->client, NULL, m->subscribe, m->qos);
if (ret)
warn("MQTT: failed to subscribe to topic '%s' for node %s", m->subscribe, node_name(n));
}
else{
warn("MQTT: no subscribe for node %s as no subscribe topic is given", node_name(n));
}
else
warn("MQTT: no subscribe for node %s as no subscribe topic is given", node_name(n));
}
static void mqtt_disconnect_cb(struct mosquitto *mosq, void *userdata, int result)
@ -149,9 +147,11 @@ int mqtt_parse(struct node *n, json_t *cfg)
json_error_t err;
json_t *json_ssl = NULL;
ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: s, s?: s, s: s, s?: i, s?: i, s?: i, s?: b, s?: s, s?: s, s?: o }",
"publish", &publish,
"subscribe", &subscribe,
ret = json_unpack_ex(cfg, &err, 0, "{ s?: { s?: s }, s?: { s?: s }, s?: s, s: s, s?: i, s?: i, s?: i, s?: b, s?: s, s?: s, s?: o }",
"out",
"publish", &publish,
"in",
"subscribe", &subscribe,
"format", &format,
"host", &host,
"port", &m->port,
@ -231,10 +231,10 @@ char * mqtt_print(struct node *n)
strcatf(&buf, ", username=%s", m->username);
if (m->publish)
strcatf(&buf, ", publish=%s", m->publish);
strcatf(&buf, ", out.publish=%s", m->publish);
if (m->subscribe)
strcatf(&buf, ", subscribe=%s", m->subscribe);
strcatf(&buf, ", in.subscribe=%s", m->subscribe);
return buf;
}
@ -246,10 +246,6 @@ int mqtt_destroy(struct node *n)
mosquitto_destroy(m->client);
ret = io_destroy(&m->io);
if (ret)
return ret;
ret = pool_destroy(&m->pool);
if (ret)
return ret;
@ -303,11 +299,15 @@ int mqtt_start(struct node *n)
mosquitto_message_callback_set(m->client, mqtt_message_cb);
mosquitto_subscribe_callback_set(m->client, mqtt_subscribe_cb);
ret = io_init(&m->io, m->format, n, SAMPLE_HAS_ALL);
ret = io_init(&m->io, m->format, &n->signals, SAMPLE_HAS_ALL);
if (ret)
return ret;
ret = pool_init(&m->pool, 1024, SAMPLE_LENGTH(n->samplelen), &memory_hugepage);
ret = io_check(&m->io);
if (ret)
return ret;
ret = pool_init(&m->pool, 1024, SAMPLE_LENGTH(list_length(&n->signals)), &memory_hugepage);
if (ret)
return ret;
@ -339,6 +339,10 @@ int mqtt_stop(struct node *n)
if (ret)
return ret;
ret = io_destroy(&m->io);
if (ret)
return ret;
return 0;
}