AMQP: use separate send and recv channels

This commit is contained in:
Sonja Happ 2021-01-28 15:49:59 +01:00
parent ae4436be6d
commit 3ffc835a8a

View file

@ -37,8 +37,8 @@ const VILLAS_EXCHANGE = "villas"
type AMQPclient struct { type AMQPclient struct {
connection *amqp.Connection connection *amqp.Connection
channel *amqp.Channel sendCh *amqp.Channel
replies <-chan amqp.Delivery recvCh *amqp.Channel
} }
type Action struct { type Action struct {
@ -94,13 +94,13 @@ func ConnectAMQP(uri string) error {
return fmt.Errorf("AMQP: failed to connect to RabbitMQ broker %v, error: %v", uri, err) return fmt.Errorf("AMQP: failed to connect to RabbitMQ broker %v, error: %v", uri, err)
} }
// create channel // create sendCh
client.channel, err = client.connection.Channel() client.sendCh, err = client.connection.Channel()
if err != nil { if err != nil {
return fmt.Errorf("AMQP: failed to open a channel, error: %v", err) return fmt.Errorf("AMQP: failed to open a sendCh, error: %v", err)
} }
// declare exchange // declare exchange
err = client.channel.ExchangeDeclare(VILLAS_EXCHANGE, err = client.sendCh.ExchangeDeclare(VILLAS_EXCHANGE,
"headers", "headers",
true, true,
false, false,
@ -112,7 +112,7 @@ func ConnectAMQP(uri string) error {
} }
// add a queue for the ICs // add a queue for the ICs
ICQueue, err := client.channel.QueueDeclare("infrastructure_components", ICQueue, err := client.sendCh.QueueDeclare("infrastructure_components",
true, true,
false, false,
false, false,
@ -122,13 +122,19 @@ func ConnectAMQP(uri string) error {
return fmt.Errorf("AMQP: failed to declare the queue, error: %v", err) return fmt.Errorf("AMQP: failed to declare the queue, error: %v", err)
} }
err = client.channel.QueueBind(ICQueue.Name, "", VILLAS_EXCHANGE, false, nil) err = client.sendCh.QueueBind(ICQueue.Name, "", VILLAS_EXCHANGE, false, nil)
if err != nil { if err != nil {
return fmt.Errorf("AMQP: failed to bind the queue, error: %v", err) return fmt.Errorf("AMQP: failed to bind the queue, error: %v", err)
} }
// consume deliveries // create receive channel
client.replies, err = client.channel.Consume(ICQueue.Name, client.recvCh, err = client.connection.Channel()
if err != nil {
return fmt.Errorf("AMQP: failed to open a recvCh, error: %v", err)
}
// start deliveries
messages, err := client.recvCh.Consume(ICQueue.Name,
"", "",
true, true,
false, false,
@ -136,19 +142,18 @@ func ConnectAMQP(uri string) error {
false, false,
nil) nil)
if err != nil { if err != nil {
return fmt.Errorf("AMQP: failed to consume deliveries, error: %v", err) return fmt.Errorf("AMQP: failed to start deliveries: %v", err)
} }
// consuming queue // consume deliveries
go func() { go func() {
for { for {
for message := range client.replies { for message := range messages {
err = processMessage(message) err = processMessage(message)
if err != nil { if err != nil {
log.Println(err.Error()) log.Println("AMQP: Error processing message: ", message, err.Error())
} }
} }
time.Sleep(2) // sleep for 2 sek
} }
}() }()
@ -193,7 +198,7 @@ func sendActionAMQP(action Action) error {
} }
//log.Println("AMQP: Sending message", string(msg.Body)) //log.Println("AMQP: Sending message", string(msg.Body))
err = client.channel.Publish(VILLAS_EXCHANGE, err = client.sendCh.Publish(VILLAS_EXCHANGE,
"", "",
false, false,
false, false,