From 2dada335620d14a3443864c0a993d431bddffe08 Mon Sep 17 00:00:00 2001 From: Sonja Happ Date: Fri, 27 Aug 2021 17:20:51 +0200 Subject: [PATCH] started refactoring the AMPQ session, WIP #74 --- helper/amqp_session.go | 277 ++++++++++++++++++ routes/healthz/healthz_endpoint.go | 9 +- routes/healthz/healthz_test.go | 10 +- .../{ic_amqpclient.go => ic_amqpmethods.go} | 238 +++------------ .../infrastructure-component/ic_endpoints.go | 7 +- routes/infrastructure-component/ic_test.go | 160 ++-------- routes/register_test.go | 23 -- start.go | 20 +- 8 files changed, 365 insertions(+), 379 deletions(-) create mode 100644 helper/amqp_session.go rename routes/infrastructure-component/{ic_amqpclient.go => ic_amqpmethods.go} (69%) diff --git a/helper/amqp_session.go b/helper/amqp_session.go new file mode 100644 index 0000000..fc80d79 --- /dev/null +++ b/helper/amqp_session.go @@ -0,0 +1,277 @@ +package helper + +import ( + "fmt" + "github.com/streadway/amqp" + "log" + "time" +) + +type AMQPsession struct { + connection *amqp.Connection + sendCh *amqp.Channel + recvCh *amqp.Channel + notifyConnClose chan *amqp.Error + notifySendChanClose chan *amqp.Error + notifySendConfirm chan amqp.Confirmation + notifyRecvChanClose chan *amqp.Error + notifyRecvConfirm chan amqp.Confirmation + IsReady bool + done chan bool + name string + exchange string + processMessage func(delivery amqp.Delivery) +} + +const ( + // When reconnecting to the server after connection failure + reconnectDelay = 5 * time.Second + + // When setting up the channel after a channel exception + reInitDelay = 2 * time.Second + + // When resending messages the server didn't confirm + resendDelay = 5 * time.Second +) + +//var client AMQPsession + +// NewAMQPSession creates a new consumer state instance, and automatically +// attempts to connect to the server. +func NewAMQPSession(name string, AMQPurl string, exchange string, processMessage func(delivery amqp.Delivery)) *AMQPsession { + if AMQPurl != "" { + log.Println("Starting AMQP client") + + session := AMQPsession{ + name: name, + exchange: exchange, + done: make(chan bool), + processMessage: processMessage, + } + go session.handleReconnect(AMQPurl) + + return &session + } + + return nil +} + +// handleReconnect will wait for a connection error on +// notifyConnClose, and then continuously attempt to reconnect. +func (session *AMQPsession) handleReconnect(addr string) { + for { + session.IsReady = false + log.Println("Attempting to connect to AMQP broker ", addr) + + conn, err := session.connect(addr) + + if err != nil { + log.Println("Failed to connect. Retrying...") + + select { + case <-session.done: + return + case <-time.After(reconnectDelay): + } + continue + } + + if done := session.handleReInit(conn); done { + break + } + } +} + +// connect will create a new AMQP connection +func (session *AMQPsession) connect(addr string) (*amqp.Connection, error) { + conn, err := amqp.Dial(addr) + + if err != nil { + return nil, err + } + + // take a new connection to the queue, and updates the close listener to reflect this. + session.connection = conn + session.notifyConnClose = make(chan *amqp.Error) + session.connection.NotifyClose(session.notifyConnClose) + + log.Println("Connected!") + return conn, nil +} + +// handleReInit will wait for a channel error +// and then continuously attempt to re-initialize both channels +func (session *AMQPsession) handleReInit(conn *amqp.Connection) bool { + for { + session.IsReady = false + + err := session.init(conn) + + if err != nil { + log.Println("Failed to initialize channel. Retrying...") + + select { + case <-session.done: + return true + case <-time.After(reInitDelay): + } + continue + } + + select { + case <-session.done: + return true + case <-session.notifyConnClose: + log.Println("Connection closed. Reconnecting...") + return false + case <-session.notifySendChanClose: + log.Println("Send channel closed. Re-running init...") + case <-session.notifyRecvChanClose: + log.Println("Receive channel closed. Re-running init...") + } + } +} + +// init will initialize channel & declare queue +func (session *AMQPsession) init(conn *amqp.Connection) error { + ch, err := conn.Channel() + + if err != nil { + return err + } + + err = ch.Confirm(false) + + if err != nil { + return err + } + _, err = ch.QueueDeclare( + session.name, + false, // Durable + false, // Delete when unused + false, // Exclusive + false, // No-wait + nil, // Arguments + ) + + if err != nil { + return err + } + + // create sendCh + sendCh, err := conn.Channel() + if err != nil { + return fmt.Errorf("AMQP: failed to open a sendCh, error: %v", err) + } + // declare exchange + err = sendCh.ExchangeDeclare(session.exchange, + "headers", + true, + false, + false, + false, + nil) + if err != nil { + return fmt.Errorf("AMQP: failed to declare the exchange, error: %v", err) + } + + // add a queue for the ICs + ICQueue, err := sendCh.QueueDeclare("infrastructure_components", + true, + false, + false, + false, + nil) + if err != nil { + return fmt.Errorf("AMQP: failed to declare the queue, error: %v", err) + } + + err = sendCh.QueueBind(ICQueue.Name, "", session.exchange, false, nil) + if err != nil { + return fmt.Errorf("AMQP: failed to bind the queue, error: %v", err) + } + + session.sendCh = sendCh + session.notifySendChanClose = make(chan *amqp.Error) + session.notifySendConfirm = make(chan amqp.Confirmation, 1) + session.sendCh.NotifyClose(session.notifySendChanClose) + session.sendCh.NotifyPublish(session.notifySendConfirm) + + // create receive channel + recvCh, err := conn.Channel() + if err != nil { + return fmt.Errorf("AMQP: failed to open a recvCh, error: %v", err) + } + + session.recvCh = recvCh + session.notifyRecvChanClose = make(chan *amqp.Error) + session.notifyRecvConfirm = make(chan amqp.Confirmation, 1) + session.recvCh.NotifyClose(session.notifyRecvChanClose) + session.recvCh.NotifyPublish(session.notifyRecvConfirm) + + // start deliveries + messages, err := session.recvCh.Consume(ICQueue.Name, + "", + true, + false, + false, + false, + nil) + if err != nil { + return fmt.Errorf("AMQP: failed to start deliveries: %v", err) + } + + // consume deliveries + go func() { + for { + for message := range messages { + session.processMessage(message) + } + } + }() + session.IsReady = true + log.Println("AMQP channels setup! Waiting for messages...") + + return nil +} + +func (session *AMQPsession) CheckConnection() error { + + if session.connection != nil { + if session.connection.IsClosed() { + return fmt.Errorf("connection to broker is closed") + } + } else { + return fmt.Errorf("connection is nil") + } + + return nil +} + +func (session *AMQPsession) Send(payload []byte, destinationUuid string) error { + + message := amqp.Publishing{ + DeliveryMode: 2, + Timestamp: time.Now(), + ContentType: "application/json", + ContentEncoding: "utf-8", + Priority: 0, + Body: payload, + } + + // set message headers + message.Headers = make(map[string]interface{}) // empty map + message.Headers["uuid"] = destinationUuid // leave uuid empty if message should go to all ICs + + err := session.CheckConnection() + if err != nil { + return err + } + + err = session.sendCh.Publish(session.exchange, + "", + false, + false, + message) + return err +} diff --git a/routes/healthz/healthz_endpoint.go b/routes/healthz/healthz_endpoint.go index 6c3a546..efe13cf 100644 --- a/routes/healthz/healthz_endpoint.go +++ b/routes/healthz/healthz_endpoint.go @@ -29,15 +29,20 @@ import ( "git.rwth-aachen.de/acs/public/villas/web-backend-go/configuration" "git.rwth-aachen.de/acs/public/villas/web-backend-go/database" "git.rwth-aachen.de/acs/public/villas/web-backend-go/helper" - infrastructure_component "git.rwth-aachen.de/acs/public/villas/web-backend-go/routes/infrastructure-component" "github.com/gin-gonic/gin" ) +var session *helper.AMQPsession + func RegisterHealthzEndpoint(r *gin.RouterGroup) { r.GET("", getHealth) } +func SetAMQPSession(s *helper.AMQPsession) { + session = s +} + // getHealth godoc // @Summary Get health status of backend // @ID getHealth @@ -69,7 +74,7 @@ func getHealth(c *gin.Context) { } if len(url) != 0 { - err = infrastructure_component.CheckConnection() + err = session.CheckConnection() if err != nil { log.Println(err.Error()) c.JSON(http.StatusInternalServerError, gin.H{ diff --git a/routes/healthz/healthz_test.go b/routes/healthz/healthz_test.go index 84c6a77..e18e103 100644 --- a/routes/healthz/healthz_test.go +++ b/routes/healthz/healthz_test.go @@ -80,8 +80,14 @@ func TestHealthz(t *testing.T) { amqpURI := "amqp://" + user + ":" + pass + "@" + host log.Println("AMQP URI is", amqpURI) - err = infrastructure_component.ConnectAMQP(amqpURI) - assert.NoError(t, err) + session = helper.NewAMQPSession("villas-test-session", amqpURI, "villas", infrastructure_component.ProcessMessage) + SetAMQPSession(session) + + for { + if session.IsReady { + break + } + } // test healthz endpoint for connected DB and AMQP client code, resp, err = helper.TestEndpoint(router, "", "/api/v2/healthz", http.MethodGet, nil) diff --git a/routes/infrastructure-component/ic_amqpclient.go b/routes/infrastructure-component/ic_amqpmethods.go similarity index 69% rename from routes/infrastructure-component/ic_amqpclient.go rename to routes/infrastructure-component/ic_amqpmethods.go index 126e580..954ad84 100644 --- a/routes/infrastructure-component/ic_amqpclient.go +++ b/routes/infrastructure-component/ic_amqpmethods.go @@ -24,25 +24,14 @@ package infrastructure_component import ( "encoding/json" "fmt" - "log" - "strings" - "time" - - "github.com/gin-gonic/gin" "github.com/google/uuid" "github.com/jinzhu/gorm" "github.com/jinzhu/gorm/dialects/postgres" "github.com/streadway/amqp" + "log" + "strings" ) -const VILLAS_EXCHANGE = "villas" - -type AMQPclient struct { - connection *amqp.Connection - sendCh *amqp.Channel - recvCh *amqp.Channel -} - type Action struct { Act string `json:"action"` When int64 `json:"when"` @@ -85,205 +74,24 @@ type ICUpdate struct { Action string `json:"action"` } -var client AMQPclient - -func ConnectAMQP(uri string) error { - - var err error - - // connect to broker - client.connection, err = amqp.Dial(uri) - if err != nil { - return fmt.Errorf("AMQP: failed to connect to RabbitMQ broker %v, error: %v", uri, err) - } - - // create sendCh - client.sendCh, err = client.connection.Channel() - if err != nil { - return fmt.Errorf("AMQP: failed to open a sendCh, error: %v", err) - } - // declare exchange - err = client.sendCh.ExchangeDeclare(VILLAS_EXCHANGE, - "headers", - true, - false, - false, - false, - nil) - if err != nil { - return fmt.Errorf("AMQP: failed to declare the exchange, error: %v", err) - } - - // add a queue for the ICs - ICQueue, err := client.sendCh.QueueDeclare("infrastructure_components", - true, - false, - false, - false, - nil) - if err != nil { - return fmt.Errorf("AMQP: failed to declare the queue, error: %v", err) - } - - err = client.sendCh.QueueBind(ICQueue.Name, "", VILLAS_EXCHANGE, false, nil) - if err != nil { - return fmt.Errorf("AMQP: failed to bind the queue, error: %v", err) - } - - // create receive channel - client.recvCh, err = client.connection.Channel() - if err != nil { - return fmt.Errorf("AMQP: failed to open a recvCh, error: %v", err) - } - - // start deliveries - messages, err := client.recvCh.Consume(ICQueue.Name, - "", - true, - false, - false, - false, - nil) - if err != nil { - return fmt.Errorf("AMQP: failed to start deliveries: %v", err) - } - - // consume deliveries - go func() { - for { - for message := range messages { - err = processMessage(message) - if err != nil { - log.Println("AMQP: Error processing message: ", err.Error()) - } - } - } - }() - - log.Printf(" AMQP: Waiting for messages... ") - - return nil -} - -func sendActionAMQP(action Action, destinationUUID string) error { - - payload, err := json.Marshal(action) - if err != nil { - return err - } - - msg := amqp.Publishing{ - DeliveryMode: 2, - Timestamp: time.Now(), - ContentType: "application/json", - ContentEncoding: "utf-8", - Priority: 0, - Body: payload, - } - - // set message headers - msg.Headers = make(map[string]interface{}) // empty map - msg.Headers["uuid"] = destinationUUID - - err = CheckConnection() - if err != nil { - return err - } - - //log.Println("AMQP: Sending message", string(msg.Body)) - err = client.sendCh.Publish(VILLAS_EXCHANGE, - "", - false, - false, - msg) - return err - -} - -func SendPing(uuid string) error { - var ping Action - ping.Act = "ping" - - payload, err := json.Marshal(ping) - if err != nil { - return err - } - - msg := amqp.Publishing{ - DeliveryMode: 2, - Timestamp: time.Now(), - ContentType: "application/json", - ContentEncoding: "utf-8", - Priority: 0, - Body: payload, - } - - // set message headers - msg.Headers = make(map[string]interface{}) // empty map - msg.Headers["uuid"] = uuid // leave uuid empty if ping should go to all ICs - - err = CheckConnection() - if err != nil { - return err - } - - err = client.sendCh.Publish(VILLAS_EXCHANGE, - "", - false, - false, - msg) - return err -} - -func CheckConnection() error { - - if client.connection != nil { - if client.connection.IsClosed() { - return fmt.Errorf("connection to broker is closed") - } - } else { - return fmt.Errorf("connection is nil") - } - - return nil -} - -func StartAMQP(AMQPurl string, api *gin.RouterGroup) error { - if AMQPurl != "" { - log.Println("Starting AMQP client") - - err := ConnectAMQP(AMQPurl) - if err != nil { - return err - } - - // register IC action endpoint only if AMQP client is used - RegisterAMQPEndpoint(api.Group("/ic")) - - log.Printf("Connected AMQP client to %s", AMQPurl) - } - - return nil -} - -func processMessage(message amqp.Delivery) error { +func ProcessMessage(message amqp.Delivery) { var payload ICUpdate err := json.Unmarshal(message.Body, &payload) if err != nil { - return fmt.Errorf("AMQP: Could not unmarshal message to JSON: %v err: %v", string(message.Body), err) + log.Printf("AMQP: Could not unmarshal message to JSON: %v err: %v", string(message.Body), err) } if payload.Action != "" { // if a message contains an action, it is not intended for the backend //log.Println("AMQP: Ignoring action message ", payload) - return nil + return } ICUUID := payload.Properties.UUID _, err = uuid.Parse(ICUUID) if err != nil { - return fmt.Errorf("AMQP: UUID not valid: %v, message ignored: %v \n", ICUUID, string(message.Body)) + log.Printf("AMQP: UUID not valid: %v, message ignored: %v \n", ICUUID, string(message.Body)) } var sToBeUpdated InfrastructureComponent @@ -299,8 +107,10 @@ func processMessage(message amqp.Delivery) error { // update record based on payload err = sToBeUpdated.updateExternalIC(payload, message.Body) } + if err != nil { + log.Printf(err.Error()) + } - return err } func createExternalIC(payload ICUpdate, ICUUID string, body []byte) error { @@ -358,7 +168,11 @@ func createExternalIC(payload ICUpdate, ICUUID string, body []byte) error { log.Println("AMQP: Created IC with UUID ", newIC.UUID) // send ping to get full status update of this IC - err = SendPing(ICUUID) + if session != nil { + err = SendPing(ICUUID) + } else { + err = fmt.Errorf("cannot sent ping to %v because AMQP session is nil", ICUUID) + } return err } @@ -433,3 +247,27 @@ func newFalse() *bool { b := false return &b } + +func SendPing(uuid string) error { + var ping Action + ping.Act = "ping" + + payload, err := json.Marshal(ping) + if err != nil { + return err + } + + err = session.Send(payload, uuid) + return err +} + +func sendActionAMQP(action Action, destinationUUID string) error { + + payload, err := json.Marshal(action) + if err != nil { + return err + } + + err = session.Send(payload, destinationUUID) + return err +} diff --git a/routes/infrastructure-component/ic_endpoints.go b/routes/infrastructure-component/ic_endpoints.go index e26198b..3471e5b 100644 --- a/routes/infrastructure-component/ic_endpoints.go +++ b/routes/infrastructure-component/ic_endpoints.go @@ -37,10 +37,13 @@ func RegisterICEndpoints(r *gin.RouterGroup) { r.GET("/:ICID", getIC) r.DELETE("/:ICID", deleteIC) r.GET("/:ICID/configs", getConfigsOfIC) + r.POST("/:ICID/action", sendActionToIC) } -func RegisterAMQPEndpoint(r *gin.RouterGroup) { - r.POST("/:ICID/action", sendActionToIC) +var session *helper.AMQPsession + +func SetAMQPSession(s *helper.AMQPsession) { + session = s } // getICs godoc diff --git a/routes/infrastructure-component/ic_test.go b/routes/infrastructure-component/ic_test.go index 64f193b..844df34 100644 --- a/routes/infrastructure-component/ic_test.go +++ b/routes/infrastructure-component/ic_test.go @@ -24,13 +24,10 @@ package infrastructure_component import ( "encoding/json" "fmt" - "log" "os" "testing" "time" - "github.com/streadway/amqp" - "git.rwth-aachen.de/acs/public/villas/web-backend-go/helper" component_configuration "git.rwth-aachen.de/acs/public/villas/web-backend-go/routes/component-configuration" "git.rwth-aachen.de/acs/public/villas/web-backend-go/routes/scenario" @@ -133,12 +130,6 @@ func TestMain(m *testing.M) { // that can be associated with a new component configuration scenario.RegisterScenarioEndpoints(api.Group("/scenarios")) - // check AMQP connection - err = CheckConnection() - if err.Error() != "connection is nil" { - return - } - // connect AMQP client // Make sure that AMQP_HOST, AMQP_USER, AMQP_PASS are set host, err := configuration.GlobalConfig.String("amqp.host") @@ -148,11 +139,8 @@ func TestMain(m *testing.M) { // AMQP Connection startup is tested here // Not repeated in other tests because it is only needed once - err = StartAMQP(amqpURI, api) - if err != nil { - log.Println("unable to connect to AMQP") - return - } + session = helper.NewAMQPSession("villas-test-session", amqpURI, "villas", ProcessMessage) + SetAMQPSession(session) os.Exit(m.Run()) } @@ -306,28 +294,14 @@ func TestUpdateICAsAdmin(t *testing.T) { payload, err := json.Marshal(update) assert.NoError(t, err) - var headers map[string]interface{} - headers = make(map[string]interface{}) // empty map - headers["uuid"] = newIC2.Manager // set uuid + //var headers map[string]interface{} + //headers = make(map[string]interface{}) // empty map + //headers["uuid"] = newIC2.Manager // set uuid - msg := amqp.Publishing{ - DeliveryMode: 2, - Timestamp: time.Now(), - ContentType: "application/json", - ContentEncoding: "utf-8", - Priority: 0, - Body: payload, - Headers: headers, - } - - err = CheckConnection() + err = session.CheckConnection() assert.NoError(t, err) - err = client.sendCh.Publish(VILLAS_EXCHANGE, - "", - false, - false, - msg) + err = session.Send(payload, newIC2.Manager) assert.NoError(t, err) // Wait until externally managed IC is created (happens async) @@ -430,28 +404,10 @@ func TestDeleteICAsAdmin(t *testing.T) { payload, err := json.Marshal(update) assert.NoError(t, err) - var headers map[string]interface{} - headers = make(map[string]interface{}) // empty map - headers["uuid"] = newIC2.UUID // set uuid - - msg := amqp.Publishing{ - DeliveryMode: 2, - Timestamp: time.Now(), - ContentType: "application/json", - ContentEncoding: "utf-8", - Priority: 0, - Body: payload, - Headers: headers, - } - - err = CheckConnection() + err = session.CheckConnection() assert.NoError(t, err) - err = client.sendCh.Publish(VILLAS_EXCHANGE, - "", - false, - false, - msg) + err = session.Send(payload, newIC2.UUID) assert.NoError(t, err) // Wait until externally managed IC is created (happens async) @@ -662,28 +618,10 @@ func TestCreateUpdateViaAMQPRecv(t *testing.T) { payload, err := json.Marshal(update) assert.NoError(t, err) - var headers map[string]interface{} - headers = make(map[string]interface{}) // empty map - headers["uuid"] = newIC1.Manager // set uuid - - msg := amqp.Publishing{ - DeliveryMode: 2, - Timestamp: time.Now(), - ContentType: "application/json", - ContentEncoding: "utf-8", - Priority: 0, - Body: payload, - Headers: headers, - } - - err = CheckConnection() + err = session.CheckConnection() assert.NoError(t, err) - err = client.sendCh.Publish(VILLAS_EXCHANGE, - "", - false, - false, - msg) + err = session.Send(payload, newIC1.Manager) assert.NoError(t, err) time.Sleep(waitingTime * time.Second) @@ -709,25 +647,7 @@ func TestCreateUpdateViaAMQPRecv(t *testing.T) { payload, err = json.Marshal(update) assert.NoError(t, err) - var headersA map[string]interface{} - headersA = make(map[string]interface{}) // empty map - headersA["uuid"] = newIC1.Manager - - msg = amqp.Publishing{ - DeliveryMode: 2, - Timestamp: time.Now(), - ContentType: "application/json", - ContentEncoding: "utf-8", - Priority: 0, - Body: payload, - Headers: headersA, - } - - err = client.sendCh.Publish(VILLAS_EXCHANGE, - "", - false, - false, - msg) + err = session.Send(payload, newIC1.Manager) assert.NoError(t, err) time.Sleep(waitingTime * time.Second) @@ -743,21 +663,7 @@ func TestCreateUpdateViaAMQPRecv(t *testing.T) { payload, err = json.Marshal(update) assert.NoError(t, err) - msg = amqp.Publishing{ - DeliveryMode: 2, - Timestamp: time.Now(), - ContentType: "application/json", - ContentEncoding: "utf-8", - Priority: 0, - Body: payload, - Headers: headersA, - } - - err = client.sendCh.Publish(VILLAS_EXCHANGE, - "", - false, - false, - msg) + err = session.Send(payload, newIC1.Manager) assert.NoError(t, err) time.Sleep(waitingTime * time.Second) @@ -797,27 +703,10 @@ func TestDeleteICViaAMQPRecv(t *testing.T) { payload, err := json.Marshal(update) assert.NoError(t, err) - var headers map[string]interface{} - headers = make(map[string]interface{}) // empty map - headers["uuid"] = newIC1.Manager // set uuid - - msg := amqp.Publishing{ - DeliveryMode: 2, - Timestamp: time.Now(), - ContentType: "application/json", - ContentEncoding: "utf-8", - Priority: 0, - Body: payload, - Headers: headers, - } - - err = CheckConnection() + err = session.CheckConnection() assert.NoError(t, err) - err = client.sendCh.Publish(VILLAS_EXCHANGE, - "", - false, - false, - msg) + + err = session.Send(payload, newIC1.Manager) assert.NoError(t, err) time.Sleep(waitingTime * time.Second) @@ -874,22 +763,7 @@ func TestDeleteICViaAMQPRecv(t *testing.T) { payload, err = json.Marshal(update) assert.NoError(t, err) - msg = amqp.Publishing{ - DeliveryMode: 2, - Timestamp: time.Now(), - ContentType: "application/json", - ContentEncoding: "utf-8", - Priority: 0, - Body: payload, - Headers: headers, - } - - // attempt to delete IC (should not work immediately because IC is still associated with component config) - err = client.sendCh.Publish(VILLAS_EXCHANGE, - "", - false, - false, - msg) + err = session.Send(payload, newIC1.Manager) assert.NoError(t, err) time.Sleep(waitingTime * time.Second) diff --git a/routes/register_test.go b/routes/register_test.go index 33f8834..402bdf5 100644 --- a/routes/register_test.go +++ b/routes/register_test.go @@ -26,8 +26,6 @@ import ( "os" "testing" - infrastructure_component "git.rwth-aachen.de/acs/public/villas/web-backend-go/routes/infrastructure-component" - "git.rwth-aachen.de/acs/public/villas/web-backend-go/configuration" "git.rwth-aachen.de/acs/public/villas/web-backend-go/database" "github.com/gin-gonic/gin" @@ -55,27 +53,6 @@ func TestMain(m *testing.M) { os.Exit(m.Run()) } -/* - * The order of test functions is important here - * 1. Start and connect AMQP - * 2. Register endpoints - * 3. Add test data - */ - -func TestStartAMQP(t *testing.T) { - // connect AMQP client - // Make sure that AMQP_HOST, AMQP_USER, AMQP_PASS are set - host, err := configuration.GlobalConfig.String("amqp.host") - user, err := configuration.GlobalConfig.String("amqp.user") - pass, err := configuration.GlobalConfig.String("amqp.pass") - amqpURI := "amqp://" + user + ":" + pass + "@" + host - - // AMQP Connection startup is tested here - // Not repeated in other tests because it is only needed once - err = infrastructure_component.StartAMQP(amqpURI, api) - assert.NoError(t, err) -} - func TestRegisterEndpoints(t *testing.T) { database.DropTables() database.MigrateModels() diff --git a/start.go b/start.go index b86a1bc..10f2f70 100644 --- a/start.go +++ b/start.go @@ -23,6 +23,8 @@ package main import ( "fmt" + "git.rwth-aachen.de/acs/public/villas/web-backend-go/helper" + "git.rwth-aachen.de/acs/public/villas/web-backend-go/routes/healthz" "log" "time" @@ -122,15 +124,19 @@ func main() { if AMQPhost != "" { // create amqp URL based on username, password and host amqpurl := "amqp://" + AMQPuser + ":" + AMQPpass + "@" + AMQPhost - err = infrastructure_component.StartAMQP(amqpurl, api) - if err != nil { - log.Fatal(err) - } + session := helper.NewAMQPSession("villas-amqp-session", amqpurl, "villas", infrastructure_component.ProcessMessage) + healthz.SetAMQPSession(session) // healthz needs to know the amqp session to check the health of the backend + infrastructure_component.SetAMQPSession(session) // IC needs to know the session to send amqp messages // send Ping to all externally managed ICs - err = infrastructure_component.SendPing("") - if err != nil { - log.Println("error sending ping action via AMQP: ", err) + for { + if session.IsReady { + err = infrastructure_component.SendPing("") + if err != nil { + log.Println("error sending ping action via AMQP: ", err.Error()) + } + break + } } }