diff --git a/amqp/amqpclient.go b/amqp/amqpclient.go index bda6714..19a63d0 100644 --- a/amqp/amqpclient.go +++ b/amqp/amqpclient.go @@ -25,8 +25,11 @@ import ( "encoding/json" "fmt" "git.rwth-aachen.de/acs/public/villas/web-backend-go/database" + infrastructure_component "git.rwth-aachen.de/acs/public/villas/web-backend-go/routes/infrastructure-component" "github.com/gin-gonic/gin" "github.com/google/uuid" + "github.com/jinzhu/gorm" + "github.com/jinzhu/gorm/dialects/postgres" "github.com/streadway/amqp" "log" "time" @@ -48,11 +51,15 @@ type Action struct { Results struct{} `json:"results"` } -type Update struct { - State string `json:"state"` +type ICUpdate struct { + State *string `json:"state"` //Uptime float64 `json:"uptime"` Properties struct { - UUID string `json:"uuid"` + UUID string `json:"uuid"` + Name *string `json:"name"` + Category *string `json:"category"` + Type *string `json:"type"` + Location *string `json:"location"` } `json:"properties"` } @@ -123,7 +130,7 @@ func ConnectAMQP(uri string) error { // fmt.Println("AMQP: Unable to ack message:", err) //} - var payload Update + var payload ICUpdate err := json.Unmarshal(message.Body, &payload) if err != nil { log.Println("AMQP: Could not unmarshal message to JSON:", string(message.Body), "err: ", err) @@ -139,24 +146,69 @@ func ConnectAMQP(uri string) error { var sToBeUpdated database.InfrastructureComponent err = db.Find(&sToBeUpdated, "UUID = ?", ICUUID).Error - if err != nil { - log.Println("AMQP: Unable to find IC with UUID: ", ICUUID, " DB error message: ", err) + if err == gorm.ErrRecordNotFound { + // create new record + var newICReq infrastructure_component.AddICRequest + newICReq.InfrastructureComponent.UUID = payload.Properties.UUID + if payload.Properties.Name == nil || + payload.Properties.Category == nil || + payload.Properties.Type == nil { + // cannot create new IC because required information (name, type, and/or category missing) + log.Println("AMQP: Cannot create new IC, required field(s) is/are missing: name, type, category") + continue + } + newICReq.InfrastructureComponent.Name = *payload.Properties.Name + newICReq.InfrastructureComponent.Category = *payload.Properties.Category + newICReq.InfrastructureComponent.Category = *payload.Properties.Type + + // add optional params + if payload.State != nil { + newICReq.InfrastructureComponent.State = *payload.State + } else { + newICReq.InfrastructureComponent.State = "unknown" + } + if payload.Properties.Location != nil { + newICReq.InfrastructureComponent.Properties = postgres.Jsonb{json.RawMessage(`{"location" : " ` + *payload.Properties.Location + `"}`)} + } + + // Validate the new IC + if err = newICReq.Validate(); err != nil { + log.Println("AMQP: Validation of new IC failed:", err) + continue + } + + // Create the new IC + newIC := newICReq.CreateIC() + + // save IC + err = newIC.Save() + if err != nil { + log.Println("AMQP: Saving new IC to DB failed:", err) + continue + } + + log.Println("AMQP: Created IC ", newIC.Name) + + } else if err != nil { + log.Println("AMQP: Database error for IC", ICUUID, " DB error message: ", err) continue - } - //var stateUpdateAt = message.Timestamp.UTC() - err = db.Model(&sToBeUpdated).Updates(map[string]interface{}{ - //"Host": gjson.Get(content, "host"), - //"Type": gjson.Get(content, "model"), - //"Uptime": math.Round(payload.Uptime), - "State": payload.State, - //"StateUpdateAt": stateUpdateAt.Format(time.RFC1123), - //"RawProperties": gjson.Get(content, "properties"), - }).Error - if err != nil { - log.Println("AMQP: Unable to update IC", sToBeUpdated.Name, "in DB: ", err) + } else { + //var stateUpdateAt = message.Timestamp.UTC() + err = db.Model(&sToBeUpdated).Updates(map[string]interface{}{ + //"Host": gjson.Get(content, "host"), + //"Type": gjson.Get(content, "model"), + //"Uptime": math.Round(payload.Uptime), + "State": *payload.State, + //"StateUpdateAt": stateUpdateAt.Format(time.RFC1123), + //"RawProperties": gjson.Get(content, "properties"), + }).Error + if err != nil { + log.Println("AMQP: Unable to update IC", sToBeUpdated.Name, "in DB: ", err) + } + + log.Println("AMQP: Updated IC ", sToBeUpdated.Name) } - log.Println("AMQP: Updated IC ", sToBeUpdated.Name) } } }() diff --git a/routes/infrastructure-component/ic_endpoints.go b/routes/infrastructure-component/ic_endpoints.go index e3eec1c..ba7e682 100644 --- a/routes/infrastructure-component/ic_endpoints.go +++ b/routes/infrastructure-component/ic_endpoints.go @@ -73,7 +73,7 @@ func getICs(c *gin.Context) { // @Failure 404 {object} docs.ResponseError "Not found" // @Failure 422 {object} docs.ResponseError "Unprocessable entity" // @Failure 500 {object} docs.ResponseError "Internal server error" -// @Param inputIC body infrastructure_component.addICRequest true "Infrastructure Component to be added" +// @Param inputIC body infrastructure_component.AddICRequest true "Infrastructure Component to be added" // @Router /ic [post] // @Security Bearer func addIC(c *gin.Context) { @@ -83,7 +83,7 @@ func addIC(c *gin.Context) { return } - var req addICRequest + var req AddICRequest err := c.BindJSON(&req) if err != nil { helper.BadRequestError(c, "Error binding form data to JSON: "+err.Error()) @@ -91,16 +91,16 @@ func addIC(c *gin.Context) { } // Validate the request - if err = req.validate(); err != nil { + if err = req.Validate(); err != nil { helper.UnprocessableEntityError(c, err.Error()) return } // Create the new IC from the request - newIC := req.createIC() + newIC := req.CreateIC() // Save new IC to DB - err = newIC.save() + err = newIC.Save() if !helper.DBError(c, err) { c.JSON(http.StatusOK, gin.H{"ic": newIC.InfrastructureComponent}) } diff --git a/routes/infrastructure-component/ic_methods.go b/routes/infrastructure-component/ic_methods.go index 2636436..a475ba5 100644 --- a/routes/infrastructure-component/ic_methods.go +++ b/routes/infrastructure-component/ic_methods.go @@ -31,7 +31,7 @@ type InfrastructureComponent struct { database.InfrastructureComponent } -func (s *InfrastructureComponent) save() error { +func (s *InfrastructureComponent) Save() error { db := database.GetDB() err := db.Create(s).Error return err diff --git a/routes/infrastructure-component/ic_validators.go b/routes/infrastructure-component/ic_validators.go index 289f1fb..8fce1a8 100644 --- a/routes/infrastructure-component/ic_validators.go +++ b/routes/infrastructure-component/ic_validators.go @@ -53,7 +53,7 @@ type validUpdatedIC struct { State string `form:"State" validate:"omitempty"` } -type addICRequest struct { +type AddICRequest struct { InfrastructureComponent validNewIC `json:"ic"` } @@ -61,7 +61,7 @@ type updateICRequest struct { InfrastructureComponent validUpdatedIC `json:"ic"` } -func (r *addICRequest) validate() error { +func (r *AddICRequest) Validate() error { validate = validator.New() errs := validate.Struct(r) return errs @@ -73,7 +73,7 @@ func (r *validUpdatedIC) validate() error { return errs } -func (r *addICRequest) createIC() InfrastructureComponent { +func (r *AddICRequest) CreateIC() InfrastructureComponent { var s InfrastructureComponent s.UUID = r.InfrastructureComponent.UUID