mirror of
https://git.rwth-aachen.de/acs/public/villas/web-backend-go/
synced 2025-03-30 00:00:12 +01:00
AMQP: use channel for receiving function, improve error handling
This commit is contained in:
parent
e76d8b872e
commit
32b42a1be0
1 changed files with 15 additions and 7 deletions
|
@ -70,13 +70,15 @@ func ConnectAMQP(uri string) error {
|
||||||
// connect to broker
|
// connect to broker
|
||||||
client.connection, err = amqp.Dial(uri)
|
client.connection, err = amqp.Dial(uri)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("AMQP: failed to connect to RabbitMQ broker %v", uri)
|
return fmt.Errorf("AMQP: failed to connect to RabbitMQ broker %v, error: %v", uri, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
defer client.connection.Close()
|
||||||
|
|
||||||
// create channel
|
// create channel
|
||||||
client.channel, err = client.connection.Channel()
|
client.channel, err = client.connection.Channel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("AMQP: failed to open a channel")
|
return fmt.Errorf("AMQP: failed to open a channel, error: %v", err)
|
||||||
}
|
}
|
||||||
// declare exchange
|
// declare exchange
|
||||||
err = client.channel.ExchangeDeclare(VILLAS_EXCHANGE,
|
err = client.channel.ExchangeDeclare(VILLAS_EXCHANGE,
|
||||||
|
@ -87,7 +89,7 @@ func ConnectAMQP(uri string) error {
|
||||||
false,
|
false,
|
||||||
nil)
|
nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("AMQP: failed to declare the exchange")
|
return fmt.Errorf("AMQP: failed to declare the exchange, error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// add a queue for the ICs
|
// add a queue for the ICs
|
||||||
|
@ -98,26 +100,28 @@ func ConnectAMQP(uri string) error {
|
||||||
false,
|
false,
|
||||||
nil)
|
nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("AMQP: failed to declare the queue")
|
return fmt.Errorf("AMQP: failed to declare the queue, error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = client.channel.QueueBind(ICQueue.Name, "", VILLAS_EXCHANGE, false, nil)
|
err = client.channel.QueueBind(ICQueue.Name, "", VILLAS_EXCHANGE, false, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("AMQP: failed to bind the queue")
|
return fmt.Errorf("AMQP: failed to bind the queue, error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// consume deliveries
|
// consume deliveries
|
||||||
client.replies, err = client.channel.Consume(ICQueue.Name,
|
client.replies, err = client.channel.Consume(ICQueue.Name,
|
||||||
"",
|
"",
|
||||||
false,
|
true,
|
||||||
false,
|
false,
|
||||||
false,
|
false,
|
||||||
false,
|
false,
|
||||||
nil)
|
nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("AMQP: failed to consume deliveries")
|
return fmt.Errorf("AMQP: failed to consume deliveries, error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
forever := make(chan bool)
|
||||||
|
|
||||||
// consuming queue
|
// consuming queue
|
||||||
go func() {
|
go func() {
|
||||||
for message := range client.replies {
|
for message := range client.replies {
|
||||||
|
@ -233,6 +237,10 @@ func ConnectAMQP(uri string) error {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
log.Printf(" AMQP: Waiting for messages... ")
|
||||||
|
|
||||||
|
<-forever
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue