From 32b42a1be0bb34487242630923e64696fca160cb Mon Sep 17 00:00:00 2001 From: Sonja Happ Date: Fri, 9 Oct 2020 11:02:42 +0200 Subject: [PATCH] AMQP: use channel for receiving function, improve error handling --- amqp/amqpclient.go | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/amqp/amqpclient.go b/amqp/amqpclient.go index 125e947..61fddd6 100644 --- a/amqp/amqpclient.go +++ b/amqp/amqpclient.go @@ -70,13 +70,15 @@ func ConnectAMQP(uri string) error { // connect to broker client.connection, err = amqp.Dial(uri) if err != nil { - return fmt.Errorf("AMQP: failed to connect to RabbitMQ broker %v", uri) + return fmt.Errorf("AMQP: failed to connect to RabbitMQ broker %v, error: %v", uri, err) } + defer client.connection.Close() + // create channel client.channel, err = client.connection.Channel() if err != nil { - return fmt.Errorf("AMQP: failed to open a channel") + return fmt.Errorf("AMQP: failed to open a channel, error: %v", err) } // declare exchange err = client.channel.ExchangeDeclare(VILLAS_EXCHANGE, @@ -87,7 +89,7 @@ func ConnectAMQP(uri string) error { false, nil) if err != nil { - return fmt.Errorf("AMQP: failed to declare the exchange") + return fmt.Errorf("AMQP: failed to declare the exchange, error: %v", err) } // add a queue for the ICs @@ -98,26 +100,28 @@ func ConnectAMQP(uri string) error { false, nil) if err != nil { - return fmt.Errorf("AMQP: failed to declare the queue") + return fmt.Errorf("AMQP: failed to declare the queue, error: %v", err) } err = client.channel.QueueBind(ICQueue.Name, "", VILLAS_EXCHANGE, false, nil) if err != nil { - return fmt.Errorf("AMQP: failed to bind the queue") + return fmt.Errorf("AMQP: failed to bind the queue, error: %v", err) } // consume deliveries client.replies, err = client.channel.Consume(ICQueue.Name, "", - false, + true, false, false, false, nil) if err != nil { - return fmt.Errorf("AMQP: failed to consume deliveries") + return fmt.Errorf("AMQP: failed to consume deliveries, error: %v", err) } + forever := make(chan bool) + // consuming queue go func() { for message := range client.replies { @@ -233,6 +237,10 @@ func ConnectAMQP(uri string) error { } }() + log.Printf(" AMQP: Waiting for messages... ") + + <-forever + return nil }