diff --git a/routes/infrastructure-component/ic_amqpclient.go b/routes/infrastructure-component/ic_amqpclient.go index 22fb8f8..471b97e 100644 --- a/routes/infrastructure-component/ic_amqpclient.go +++ b/routes/infrastructure-component/ic_amqpclient.go @@ -37,8 +37,8 @@ const VILLAS_EXCHANGE = "villas" type AMQPclient struct { connection *amqp.Connection - channel *amqp.Channel - replies <-chan amqp.Delivery + sendCh *amqp.Channel + recvCh *amqp.Channel } type Action struct { @@ -94,13 +94,13 @@ func ConnectAMQP(uri string) error { return fmt.Errorf("AMQP: failed to connect to RabbitMQ broker %v, error: %v", uri, err) } - // create channel - client.channel, err = client.connection.Channel() + // create sendCh + client.sendCh, err = client.connection.Channel() if err != nil { - return fmt.Errorf("AMQP: failed to open a channel, error: %v", err) + return fmt.Errorf("AMQP: failed to open a sendCh, error: %v", err) } // declare exchange - err = client.channel.ExchangeDeclare(VILLAS_EXCHANGE, + err = client.sendCh.ExchangeDeclare(VILLAS_EXCHANGE, "headers", true, false, @@ -112,7 +112,7 @@ func ConnectAMQP(uri string) error { } // add a queue for the ICs - ICQueue, err := client.channel.QueueDeclare("infrastructure_components", + ICQueue, err := client.sendCh.QueueDeclare("infrastructure_components", true, false, false, @@ -122,13 +122,19 @@ func ConnectAMQP(uri string) error { return fmt.Errorf("AMQP: failed to declare the queue, error: %v", err) } - err = client.channel.QueueBind(ICQueue.Name, "", VILLAS_EXCHANGE, false, nil) + err = client.sendCh.QueueBind(ICQueue.Name, "", VILLAS_EXCHANGE, false, nil) if err != nil { return fmt.Errorf("AMQP: failed to bind the queue, error: %v", err) } - // consume deliveries - client.replies, err = client.channel.Consume(ICQueue.Name, + // create receive channel + client.recvCh, err = client.connection.Channel() + if err != nil { + return fmt.Errorf("AMQP: failed to open a recvCh, error: %v", err) + } + + // start deliveries + messages, err := client.recvCh.Consume(ICQueue.Name, "", true, false, @@ -136,19 +142,18 @@ func ConnectAMQP(uri string) error { false, nil) if err != nil { - return fmt.Errorf("AMQP: failed to consume deliveries, error: %v", err) + return fmt.Errorf("AMQP: failed to start deliveries: %v", err) } - // consuming queue + // consume deliveries go func() { for { - for message := range client.replies { + for message := range messages { err = processMessage(message) if err != nil { - log.Println(err.Error()) + log.Println("AMQP: Error processing message: ", message, err.Error()) } } - time.Sleep(2) // sleep for 2 sek } }() @@ -193,7 +198,7 @@ func sendActionAMQP(action Action) error { } //log.Println("AMQP: Sending message", string(msg.Body)) - err = client.channel.Publish(VILLAS_EXCHANGE, + err = client.sendCh.Publish(VILLAS_EXCHANGE, "", false, false,