diff --git a/include/villas/nodes/amqp.h b/include/villas/nodes/amqp.h index 3a34dee61..5e36a5d77 100644 --- a/include/villas/nodes/amqp.h +++ b/include/villas/nodes/amqp.h @@ -39,23 +39,26 @@ /* Forward declarations */ struct io_format; +struct amqp_ssl_info { + int verify_peer; + int verify_hostname; + char *ca_cert; + char *client_cert; + char *client_key; +}; + struct amqp { + char *uri; + struct amqp_connection_info connection_info; + struct amqp_ssl_info ssl_info; - struct { - amqp_socket_t *socket; - amqp_connection_state_t connection; + amqp_bytes_t routing_key; + amqp_bytes_t exchange; - amqp_bytes_t routing_key; - amqp_bytes_t exchange; - } producer; - - struct { - amqp_socket_t *socket; - amqp_connection_state_t connection; - - amqp_bytes_t queue; - } consumer; + /* We need to create two connection because rabbitmq-c is not thread-safe! */ + amqp_connection_state_t producer; + amqp_connection_state_t consumer; struct io_format *format; }; @@ -64,7 +67,7 @@ struct amqp { char * amqp_print(struct node *n); /** @see node_type::parse */ -int amqp_parse(struct node *n, json_t *cfg); +int amqp_parse(struct node *n, json_t *json); /** @see node_type::open */ int amqp_start(struct node *n); diff --git a/lib/nodes/amqp.c b/lib/nodes/amqp.c index 58be17a9a..8bf5a3d4b 100644 --- a/lib/nodes/amqp.c +++ b/lib/nodes/amqp.c @@ -22,106 +22,162 @@ #include +#include +#include + #include "plugin.h" #include "nodes/amqp.h" #include "utils.h" #include "io_format.h" -int amqp_reverse(struct node *n) +static void amqp_default_ssl_info(struct amqp_ssl_info *s) { - struct amqp *a = n->_vd; - - if (list_length(&m->publisher.endpoints) != 1 || - list_length(&m->subscriber.endpoints) != 1) - return -1; - - char *subscriber = list_first(&m->subscriber.endpoints); - char *publisher = list_first(&m->publisher.endpoints); - - list_set(&m->subscriber.endpoints, 0, publisher); - list_set(&m->publisher.endpoints, 0, subscriber); - - return 0; + s->verify_peer = 1; + s->verify_hostname = 1; + s->client_key = NULL; + s->client_cert = NULL; + s->ca_cert = NULL; } -static int amqp_parse_endpoints(struct list *l, json_t *cfg) +static amqp_bytes_t amqp_bytes_strdup(const char *str) { - const char *ep; + size_t len = strlen(str) + 1; + amqp_bytes_t buf = amqp_bytes_malloc(len); - size_t index; - json_t *json_val; + memcpy(buf.bytes, str, len); - switch (json_typeof(cfg)) { - case JSON_ARRAY: - json_array_foreach(cfg, index, json_val) { - ep = json_string_value(json_val); - if (!ep) - return -1; + return buf; +} - list_push(l, strdup(ep)); - } - break; +static amqp_connection_state_t amqp_connect(struct amqp_connection_info *ci, struct amqp_ssl_info *ssl) +{ + int ret; + amqp_rpc_reply_t rep; + amqp_connection_state_t conn; + amqp_socket_t *sock; - case JSON_STRING: - ep = json_string_value(cfg); + conn = amqp_new_connection(); + if (!conn) + return NULL; - list_push(l, strdup(ep)); - break; + if (ci->ssl) { + sock = amqp_ssl_socket_new(conn); + if (!sock) + return NULL; - default: - return -1; + amqp_ssl_socket_set_verify_peer(sock, ssl->verify_peer); + amqp_ssl_socket_set_verify_hostname(sock, ssl->verify_hostname); + + if (ssl->ca_cert) + amqp_ssl_socket_set_cacert(sock, ssl->ca_cert); + + if (ssl->client_key && ssl->client_cert) + amqp_ssl_socket_set_key(sock, ssl->client_cert, ssl->client_key); + } + else { + sock = amqp_tcp_socket_new(conn); + if (!sock) + return NULL; } + ret = amqp_socket_open(sock, ci->host, ci->port); + if (ret != AMQP_STATUS_OK) + return NULL; + + rep = amqp_login(conn, ci->vhost, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, ci->user, ci->password); + if (rep.reply_type != AMQP_RESPONSE_NORMAL) + return NULL; + + amqp_channel_open(conn, 1); + rep = amqp_get_rpc_reply(conn); + if (rep.reply_type != AMQP_RESPONSE_NORMAL) + return NULL; + + return conn; +} + +static int amqp_close(amqp_connection_state_t conn) +{ + amqp_rpc_reply_t rep; + + rep = amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS); + if (rep.reply_type != AMQP_RESPONSE_NORMAL) + return -1; + + rep = amqp_connection_close(conn, AMQP_REPLY_SUCCESS); + if (rep.reply_type != AMQP_RESPONSE_NORMAL) + return -1; + return 0; } -int amqp_parse(struct node *n, json_t *cfg) +int amqp_parse(struct node *n, json_t *json) { int ret; struct amqp *a = n->_vd; const char *format = "json"; const char *uri = NULL; - const char *exchange, *routing_key, *queue; + const char *exchange, *routing_key; json_error_t err; - json_t *json_producer = NULL; - json_t *json_consumer = NULL; + json_t *json_ssl = NULL; /* Default values */ + amqp_default_ssl_info(&a->ssl_info); amqp_default_connection_info(&a->connection_info); - ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s: s, s: s, s: s, s?: s }", + ret = json_unpack_ex(json, &err, 0, "{ s?: s, s: s, s: s, s?: s, s?: o }", "uri", &uri, "exchange", &exchange, "routing_key", &routing_key, - "queue", &queue, - "format", &format + "format", &format, + "ssl", &json_ssl ); if (ret) jerror(&err, "Failed to parse configuration of node %s", node_name(n)); + a->exchange = amqp_bytes_strdup(exchange); + a->routing_key = amqp_bytes_strdup(routing_key); + if (uri) { - ret = amqp_parse_url(uri, &a->connection_info); + a->uri = strdup(uri); + ret = amqp_parse_url(a->uri, &a->connection_info); + if (ret != AMQP_STATUS_OK) error("Failed to parse URI of node %s", node_name(n)); } + if (json_ssl) { + const char *ca_cert = NULL; + const char *client_cert = NULL; + const char *client_key = NULL; + ret = json_unpack_ex(json_ssl, &err, 0, "{ s?: b, s?: b, s?: s, s?: s, s?: s }", + "verify_peer", &a->ssl_info.verify_peer, + "verify_hostname", &a->ssl_info.verify_hostname, + "ca_cert", &ca_cert, + "client_key", &client_key, + "client_cert", &client_cert + ); + if (ret) + jerror(&err, "Failed to parse configuration of node %s", node_name(n)); - m->format = io_format_lookup(format); - if (!m->format) + if (ca_cert) + a->ssl_info.ca_cert = strdup(ca_cert); + + if (client_cert) + a->ssl_info.client_cert = strdup(client_cert); + + if (client_key) + a->ssl_info.client_key = strdup(client_key); + } + + a->format = io_format_lookup(format); + if (!a->format) error("Invalid format '%s' for node %s", format, node_name(n)); - if (json_producer) { - - } - - if (json_consumer) { - - } - return 0; } @@ -131,54 +187,83 @@ char * amqp_print(struct node *n) char *buf = NULL; - strcatf(&buf, "format=%s, uri=amqp://%s:%s@%s:%d%s, exchange=%s, routing_key=%s, queue=%s", plugin_name(m->format) + strcatf(&buf, "format=%s, uri=%s://%s:%s@%s:%d%s, exchange=%s, routing_key=%s", plugin_name(a->format), + a->connection_info.ssl ? "amqps" : "amqp", a->connection_info.user, a->connection_info.password, a->connection_info.host, a->connection_info.port, a->connection_info.vhost, - (char *) a->producer.exchange.bytes, - (char *) a->producer.routing_key.bytes, - (char *) a->consumer.queue.bytes, + (char *) a->exchange.bytes, + (char *) a->routing_key.bytes ); + if (a->connection_info.ssl) { + strcatf(&buf, ", ssl_info.verify_peer=%s, ssl_info.verify_hostname=%s", + a->ssl_info.verify_peer ? "true" : "false", + a->ssl_info.verify_hostname ? "true" : "false" + ); + + if (a->ssl_info.ca_cert) + strcatf(&buf, ", ssl_info.ca_cert=%s", a->ssl_info.ca_cert); + + if (a->ssl_info.client_cert) + strcatf(&buf, ", ssl_info.client_cert=%s", a->ssl_info.client_cert); + + if (a->ssl_info.client_key) + strcatf(&buf, ", ssl_info.client_key=%s", a->ssl_info.client_key); + } + return buf; } -int amqp_connect(amqp_connection_t *conn, amqp_socket_t *sock, struct amqp_connection_info *ci) -{ - int ret; - - *connection = amqp_new_connection(); - *socket = amqp_tcp_socket_new(conn); - - ret = amqp_socket_open(socket, ci->hostname, ci->port); - if (ret != AMQP_STATUS_OK) - return -1; - - ret = amqp_login(&connection, ci->vhost, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, ci->user, ci->password); - if (ret != AMQP_STATUS_OK) - return -2; - - amqp_channel_open(a->producer.connection, 1); - - return 0; -} - int amqp_start(struct node *n) { - int ret; struct amqp *a = n->_vd; - /* Start producer */ - ret = amqp_connect(&a->producer.connection, &a->producer.socket, &a->connection_info); - if (ret) + amqp_bytes_t queue; + amqp_rpc_reply_t rep; + amqp_queue_declare_ok_t *r; + + /* Connect producer */ + a->producer = amqp_connect(&a->connection_info, &a->ssl_info); + if (!a->producer) + return -1; + + /* Connect consumer */ + a->consumer = amqp_connect(&a->connection_info, &a->ssl_info); + if (!a->consumer) + return -1; + + /* Declare exchange */ + amqp_exchange_declare(a->producer, 1, a->exchange, amqp_cstring_bytes("direct"), 0, 0, 0, 0, amqp_empty_table); + rep = amqp_get_rpc_reply(a->consumer); + if (rep.reply_type != AMQP_RESPONSE_NORMAL) + return -1; + + /* Declare private queue */ + r = amqp_queue_declare(a->consumer, 1, amqp_empty_bytes, 0, 0, 0, 1, amqp_empty_table); + rep = amqp_get_rpc_reply(a->consumer); + if (rep.reply_type != AMQP_RESPONSE_NORMAL) + return -1; + + queue = amqp_bytes_malloc_dup(r->queue); + if (queue.bytes == NULL) + return -1; + + /* Bind queue to exchange */ + amqp_queue_bind(a->consumer, 1, queue, a->exchange, a->routing_key, amqp_empty_table); + rep = amqp_get_rpc_reply(a->consumer); + if (rep.reply_type != AMQP_RESPONSE_NORMAL) return -1; /* Start consumer */ - ret = amqp_connect(&a->consumer.connection, &a->consumer.socket, &a->connection_info); - if (ret) - return -2; + amqp_basic_consume(a->consumer, 1, queue, amqp_empty_bytes, 0, 1, 0, amqp_empty_table); + rep = amqp_get_rpc_reply(a->consumer); + if (rep.reply_type != AMQP_RESPONSE_NORMAL) + return -1; + + amqp_bytes_free(queue); return 0; } @@ -188,50 +273,56 @@ int amqp_stop(struct node *n) int ret; struct amqp *a = n->_vd; - /* Stop producer */ - amqp_channel_close(a->procuder.connection, 1, AMQP_REPLY_SUCCESS); - amqp_connection_close(a->procuder.connection, AMQP_REPLY_SUCCESS); - amqp_destroy_connection(a->procuder.connection); + ret = amqp_close(a->consumer); + if (ret) + return ret; - /* Stop consumer */ - amqp_channel_close(a->consumer.connection, 1, AMQP_REPLY_SUCCESS); - amqp_connection_close(a->consumer.connection, AMQP_REPLY_SUCCESS); - amqp_destroy_connection(a->consumer.connection); + ret = amqp_close(a->producer); + if (ret) + return ret; return 0; } int amqp_read(struct node *n, struct sample *smps[], unsigned cnt) { + int ret; struct amqp *a = n->_vd; - int bytes; - char data[amqp_MAX_PACKET_LEN]; + amqp_envelope_t env; + amqp_rpc_reply_t rep; - /* Receive message */ - ret = amqp_basic_consume(a->consumer.connection, 1, a->consumer.queue, message, 0, 1, 0, amqp_empty_table); + rep = amqp_consume_message(a->consumer, &env, NULL, 0); + if (rep.reply_type != AMQP_RESPONSE_NORMAL) + return -1; + ret = io_format_sscan(a->format, env.message.body.bytes, env.message.body.len, NULL, smps, cnt, 0); - return io_format_sscan(m->format, data, bytes, NULL, smps, cnt, 0); + amqp_destroy_envelope(&env); + + return ret; } int amqp_write(struct node *n, struct sample *smps[], unsigned cnt) { int ret; struct amqp *a = n->_vd; - + char data[1500]; size_t wbytes; - char data[amqp_MAX_PACKET_LEN]; - - ret = io_format_sprint(m->format, data, sizeof(data), &wbytes, smps, cnt, SAMPLE_HAS_ALL); + ret = io_format_sprint(a->format, data, sizeof(data), &wbytes, smps, cnt, SAMPLE_HAS_ALL); if (ret <= 0) return -1; + amqp_bytes_t message = { + .len = wbytes, + .bytes = data + }; + /* Send message */ - ret = amqp_basic_publish(a->producer.connection, 1, - a->producer.exchange, - a->producer.routing_key, - 0, 0, NULL, message_bytes); + ret = amqp_basic_publish(a->producer, 1, + a->exchange, + a->routing_key, + 0, 0, NULL, message); if (ret != AMQP_STATUS_OK) return -1; @@ -241,10 +332,36 @@ int amqp_write(struct node *n, struct sample *smps[], unsigned cnt) int amqp_fd(struct node *n) { - int ret; struct amqp *a = n->_vd; - return amqp_socket_get_sockfd(a->consumer.connection); + amqp_socket_t *sock = amqp_get_socket(a->consumer); + + return amqp_socket_get_sockfd(sock); +} + +int amqp_destroy(struct node *n) +{ + struct amqp *a = n->_vd; + + if (a->uri) + free(a->uri); + + if (a->ssl_info.client_cert) + free(a->ssl_info.client_cert); + + if (a->ssl_info.client_key) + free(a->ssl_info.client_key); + + if (a->ssl_info.ca_cert) + free(a->ssl_info.ca_cert); + + if (a->producer) + amqp_destroy_connection(a->producer); + + if (a->consumer) + amqp_destroy_connection(a->consumer); + + return 0; } static struct plugin p = { @@ -253,14 +370,14 @@ static struct plugin p = { .type = PLUGIN_TYPE_NODE, .node = { .vectorize = 0, - .size = sizeof(struct nanomsg), - .reverse = amqp_reverse, + .size = sizeof(struct amqp), .parse = amqp_parse, .print = amqp_print, .start = amqp_start, .stop = amqp_stop, .read = amqp_read, .write = amqp_write, + .destroy = amqp_destroy, .fd = amqp_fd } };