diff --git a/lib/nodes/amqp.c b/lib/nodes/amqp.c index 3fe07ba1a..bad31cf7b 100644 --- a/lib/nodes/amqp.c +++ b/lib/nodes/amqp.c @@ -133,7 +133,7 @@ int amqp_parse(struct node *n, json_t *json) amqp_default_ssl_info(&a->ssl_info); amqp_default_connection_info(&a->connection_info); - ret = json_unpack_ex(json, &err, 0, "{ s?: s, s?: s, s?: s, s?: s, s?: i, s: s, s: s, s?: s, s?: o }", + ret = json_unpack_ex(json, &err, 0, "{ s?: s, s?: s, s?: s, s?: s, s?: s, s?: i, s: s, s: s, s?: s, s?: o }", "uri", &uri, "host", &host, "vhost", &vhost, @@ -199,7 +199,8 @@ char * amqp_print(struct node *n) char *buf = NULL; - strcatf(&buf, "format=%s, uri=%s://%s:%s@%s:%d%s, exchange=%s, routing_key=%s", format_type_name(a->format), + strcatf(&buf, "format=%s, uri=%s://%s:%s@%s:%d%s, exchange=%s, routing_key=%s", + format_type_name(a->format), a->connection_info.ssl ? "amqps" : "amqp", a->connection_info.user, a->connection_info.password, @@ -238,7 +239,11 @@ int amqp_start(struct node *n) amqp_rpc_reply_t rep; amqp_queue_declare_ok_t *r; - ret = io_init(&a->io, a->format, n, SAMPLE_HAS_ALL); + ret = io_init(&a->io, a->format, &n->signals, SAMPLE_HAS_ALL); + if (ret) + return ret; + + ret = io_check(&a->io); if (ret) return ret; @@ -298,6 +303,10 @@ int amqp_stop(struct node *n) if (ret) return ret; + ret = io_destroy(&a->io); + if (ret) + return ret; + return 0; } @@ -358,13 +367,8 @@ int amqp_fd(struct node *n) int amqp_destroy(struct node *n) { - int ret; struct amqp *a = n->_vd; - ret = io_destroy(&a->io); - if (ret) - return ret; - if (a->uri) free(a->uri);