From 91bd51cba1c055e2849b846f3bc9f8fe1725ca05 Mon Sep 17 00:00:00 2001 From: Sonja Happ Date: Fri, 9 Oct 2020 09:41:03 +0200 Subject: [PATCH] AMQP: Set update time, improve creation and update of ICs via AMQP --- amqp/amqpclient.go | 57 ++++++++++++------- .../infrastructure-component/ic_endpoints.go | 10 ++-- routes/infrastructure-component/ic_methods.go | 8 ++- .../infrastructure-component/ic_validators.go | 9 ++- 4 files changed, 56 insertions(+), 28 deletions(-) diff --git a/amqp/amqpclient.go b/amqp/amqpclient.go index 47fdf98..e9e64c7 100644 --- a/amqp/amqpclient.go +++ b/amqp/amqpclient.go @@ -24,7 +24,6 @@ package amqp import ( "encoding/json" "fmt" - "git.rwth-aachen.de/acs/public/villas/web-backend-go/database" infrastructure_component "git.rwth-aachen.de/acs/public/villas/web-backend-go/routes/infrastructure-component" "github.com/gin-gonic/gin" "github.com/google/uuid" @@ -52,8 +51,7 @@ type Action struct { } type ICUpdate struct { - State *string `json:"state"` - //Uptime float64 `json:"uptime"` + State *string `json:"state"` Properties struct { UUID string `json:"uuid"` Name *string `json:"name"` @@ -122,8 +120,6 @@ func ConnectAMQP(uri string) error { // consuming queue go func() { - - db := database.GetDB() for message := range client.replies { //err = message.Ack(false) //if err != nil { @@ -142,10 +138,12 @@ func ConnectAMQP(uri string) error { if err != nil { log.Printf("AMQP: UUID not valid: %v, message ignored: %v \n", ICUUID, string(message.Body)) + continue } else { - var sToBeUpdated database.InfrastructureComponent - err = db.Find(&sToBeUpdated, "UUID = ?", ICUUID).Error + var sToBeUpdated infrastructure_component.InfrastructureComponent + err = sToBeUpdated.ByUUID(ICUUID) + if err == gorm.ErrRecordNotFound { // create new record var newICReq infrastructure_component.AddICRequest @@ -193,20 +191,41 @@ func ConnectAMQP(uri string) error { log.Println("AMQP: Database error for IC", ICUUID, " DB error message: ", err) continue } else { - //var stateUpdateAt = message.Timestamp.UTC() - err = db.Model(&sToBeUpdated).Updates(map[string]interface{}{ - //"Host": gjson.Get(content, "host"), - //"Type": gjson.Get(content, "model"), - //"Uptime": math.Round(payload.Uptime), - "State": *payload.State, - //"StateUpdateAt": stateUpdateAt.Format(time.RFC1123), - //"RawProperties": gjson.Get(content, "properties"), - }).Error - if err != nil { - log.Println("AMQP: Unable to update IC", sToBeUpdated.Name, "in DB: ", err) + + var updatedICReq infrastructure_component.UpdateICRequest + if payload.State != nil { + updatedICReq.InfrastructureComponent.State = *payload.State + } + if payload.Properties.Type != nil { + updatedICReq.InfrastructureComponent.Type = *payload.Properties.Type + } + if payload.Properties.Category != nil { + updatedICReq.InfrastructureComponent.Category = *payload.Properties.Category + } + if payload.Properties.Name != nil { + updatedICReq.InfrastructureComponent.Name = *payload.Properties.Name + } + if payload.Properties.Location != nil { + updatedICReq.InfrastructureComponent.Properties = postgres.Jsonb{json.RawMessage(`{"location" : " ` + *payload.Properties.Location + `"}`)} } - log.Println("AMQP: Updated IC ", sToBeUpdated.Name) + // Validate the updated IC + if err = updatedICReq.Validate(); err != nil { + log.Println("AMQP: Validation of updated IC failed:", err) + continue + } + + // Create the updated IC from old IC + updatedIC := updatedICReq.UpdatedIC(sToBeUpdated) + + // Finally update the IC in the DB + err = sToBeUpdated.Update(updatedIC) + if err != nil { + log.Println("AMQP: Unable to update IC", sToBeUpdated.Name, "in DB: ", err) + continue + } + + //log.Println("AMQP: Updated IC ", sToBeUpdated.Name) } } diff --git a/routes/infrastructure-component/ic_endpoints.go b/routes/infrastructure-component/ic_endpoints.go index ba7e682..afc3b32 100644 --- a/routes/infrastructure-component/ic_endpoints.go +++ b/routes/infrastructure-component/ic_endpoints.go @@ -118,7 +118,7 @@ func addIC(c *gin.Context) { // @Failure 404 {object} docs.ResponseError "Not found" // @Failure 422 {object} docs.ResponseError "Unprocessable entity" // @Failure 500 {object} docs.ResponseError "Internal server error" -// @Param inputIC body infrastructure_component.updateICRequest true "InfrastructureComponent to be updated" +// @Param inputIC body infrastructure_component.UpdateICRequest true "InfrastructureComponent to be updated" // @Param ICID path int true "InfrastructureComponent ID" // @Router /ic/{ICID} [put] // @Security Bearer @@ -129,7 +129,7 @@ func updateIC(c *gin.Context) { return } - var req updateICRequest + var req UpdateICRequest err := c.BindJSON(&req) if err != nil { helper.BadRequestError(c, "Error binding form data to JSON: "+err.Error()) @@ -137,16 +137,16 @@ func updateIC(c *gin.Context) { } // Validate the request - if err = req.InfrastructureComponent.validate(); err != nil { + if err = req.Validate(); err != nil { helper.UnprocessableEntityError(c, err.Error()) return } // Create the updatedIC from oldIC - updatedIC := req.updatedIC(oldIC) + updatedIC := req.UpdatedIC(oldIC) // Finally update the IC in the DB - err = oldIC.update(updatedIC) + err = oldIC.Update(updatedIC) if !helper.DBError(c, err) { c.JSON(http.StatusOK, gin.H{"ic": updatedIC.InfrastructureComponent}) } diff --git a/routes/infrastructure-component/ic_methods.go b/routes/infrastructure-component/ic_methods.go index a475ba5..62a2449 100644 --- a/routes/infrastructure-component/ic_methods.go +++ b/routes/infrastructure-component/ic_methods.go @@ -43,7 +43,13 @@ func (s *InfrastructureComponent) ByID(id uint) error { return err } -func (s *InfrastructureComponent) update(updatedIC InfrastructureComponent) error { +func (s *InfrastructureComponent) ByUUID(uuid string) error { + db := database.GetDB() + err := db.Find(s, "UUID = ?", uuid).Error + return err +} + +func (s *InfrastructureComponent) Update(updatedIC InfrastructureComponent) error { db := database.GetDB() err := db.Model(s).Updates(updatedIC).Error diff --git a/routes/infrastructure-component/ic_validators.go b/routes/infrastructure-component/ic_validators.go index 8fce1a8..6ecc03b 100644 --- a/routes/infrastructure-component/ic_validators.go +++ b/routes/infrastructure-component/ic_validators.go @@ -57,7 +57,7 @@ type AddICRequest struct { InfrastructureComponent validNewIC `json:"ic"` } -type updateICRequest struct { +type UpdateICRequest struct { InfrastructureComponent validUpdatedIC `json:"ic"` } @@ -67,7 +67,7 @@ func (r *AddICRequest) Validate() error { return errs } -func (r *validUpdatedIC) validate() error { +func (r *UpdateICRequest) Validate() error { validate = validator.New() errs := validate.Struct(r) return errs @@ -94,7 +94,7 @@ func (r *AddICRequest) CreateIC() InfrastructureComponent { return s } -func (r *updateICRequest) updatedIC(oldIC InfrastructureComponent) InfrastructureComponent { +func (r *UpdateICRequest) UpdatedIC(oldIC InfrastructureComponent) InfrastructureComponent { // Use the old InfrastructureComponent as a basis for the updated InfrastructureComponent `s` s := oldIC @@ -126,6 +126,9 @@ func (r *updateICRequest) updatedIC(oldIC InfrastructureComponent) Infrastructur s.State = r.InfrastructureComponent.State } + // set last update time + s.StateUpdateAt = time.Now().Format(time.RFC1123) + // only update props if not empty var emptyJson postgres.Jsonb // Serialize empty json and params