diff --git a/amqp/amqpclient.go b/amqp/amqpclient.go index 8adedad..a5358e0 100644 --- a/amqp/amqpclient.go +++ b/amqp/amqpclient.go @@ -126,7 +126,10 @@ func ConnectAMQP(uri string) error { go func() { for { for message := range client.replies { - processMessage(message) + err = processMessage(message) + if err != nil { + log.Println(err.Error()) + } } time.Sleep(2) // sleep for 2 sek } @@ -229,23 +232,21 @@ func StartAMQP(AMQPurl string, api *gin.RouterGroup) error { return nil } -func processMessage(message amqp.Delivery) { +func processMessage(message amqp.Delivery) error { log.Println("Processing AMQP message: ", string(message.Body)) var payload ICUpdate err := json.Unmarshal(message.Body, &payload) if err != nil { - log.Println("AMQP: Could not unmarshal message to JSON:", string(message.Body), "err: ", err) - return + return fmt.Errorf("AMQP: Could not unmarshal message to JSON: %v err: %v", string(message.Body), err) } ICUUID := payload.Properties.UUID _, err = uuid.Parse(ICUUID) if err != nil { - log.Printf("AMQP: UUID not valid: %v, message ignored: %v \n", ICUUID, string(message.Body)) - return + return fmt.Errorf("AMQP: UUID not valid: %v, message ignored: %v \n", ICUUID, string(message.Body)) } var sToBeUpdated infrastructure_component.InfrastructureComponent @@ -253,107 +254,124 @@ func processMessage(message amqp.Delivery) { if err == gorm.ErrRecordNotFound { // create new record - var newICReq infrastructure_component.AddICRequest - newICReq.InfrastructureComponent.UUID = payload.Properties.UUID - if payload.Properties.Name == nil || - payload.Properties.Category == nil || - payload.Properties.Type == nil { - // cannot create new IC because required information (name, type, and/or category missing) - log.Println("AMQP: Cannot create new IC, required field(s) is/are missing: name, type, category") - return - } - newICReq.InfrastructureComponent.Name = *payload.Properties.Name - newICReq.InfrastructureComponent.Category = *payload.Properties.Category - newICReq.InfrastructureComponent.Type = *payload.Properties.Type - - // add optional params - if payload.State != nil { - newICReq.InfrastructureComponent.State = *payload.State - } else { - newICReq.InfrastructureComponent.State = "unknown" - } - if payload.Properties.WS_url != nil { - newICReq.InfrastructureComponent.WebsocketURL = *payload.Properties.WS_url - } - if payload.Properties.API_url != nil { - newICReq.InfrastructureComponent.APIURL = *payload.Properties.API_url - } - if payload.Properties.Location != nil { - newICReq.InfrastructureComponent.Location = *payload.Properties.Location - } - if payload.Properties.Description != nil { - newICReq.InfrastructureComponent.Description = *payload.Properties.Description - } - // TODO add JSON start parameter scheme - - // Validate the new IC - err = newICReq.Validate() - if err != nil { - log.Println("AMQP: Validation of new IC failed:", err) - return - } - - // Create the new IC - newIC := newICReq.CreateIC() - - // save IC - err = newIC.Save() - if err != nil { - log.Println("AMQP: Saving new IC to DB failed:", err) - return - } - - log.Println("AMQP: Created IC ", newIC.Name) - + err = createNewIC(payload) } else if err != nil { - log.Println("AMQP: Database error for IC", ICUUID, " DB error message: ", err) - return + // database error + err = fmt.Errorf("AMQP: Database error for IC %v DB error message: %v", ICUUID, err) } else { - - var updatedICReq infrastructure_component.UpdateICRequest - if payload.State != nil { - updatedICReq.InfrastructureComponent.State = *payload.State - } - if payload.Properties.Type != nil { - updatedICReq.InfrastructureComponent.Type = *payload.Properties.Type - } - if payload.Properties.Category != nil { - updatedICReq.InfrastructureComponent.Category = *payload.Properties.Category - } - if payload.Properties.Name != nil { - updatedICReq.InfrastructureComponent.Name = *payload.Properties.Name - } - if payload.Properties.WS_url != nil { - updatedICReq.InfrastructureComponent.WebsocketURL = *payload.Properties.WS_url - } - if payload.Properties.API_url != nil { - updatedICReq.InfrastructureComponent.APIURL = *payload.Properties.API_url - } - if payload.Properties.Location != nil { - //postgres.Jsonb{json.RawMessage(`{"location" : " ` + *payload.Properties.Location + `"}`)} - updatedICReq.InfrastructureComponent.Location = *payload.Properties.Location - } - if payload.Properties.Description != nil { - updatedICReq.InfrastructureComponent.Description = *payload.Properties.Description - } - // TODO add JSON start parameter scheme - - // Validate the updated IC - if err = updatedICReq.Validate(); err != nil { - log.Println("AMQP: Validation of updated IC failed:", err) - return - } - - // Create the updated IC from old IC - updatedIC := updatedICReq.UpdatedIC(sToBeUpdated) - - // Finally update the IC in the DB - err = sToBeUpdated.Update(updatedIC) - if err != nil { - log.Println("AMQP: Unable to update IC", sToBeUpdated.Name, "in DB: ", err) - return - } - - log.Println("AMQP: Updated IC ", sToBeUpdated.Name) + // update record + err = updateIC(payload, sToBeUpdated) } + + return err +} + +func createNewIC(payload ICUpdate) error { + + var newICReq infrastructure_component.AddICRequest + newICReq.InfrastructureComponent.UUID = payload.Properties.UUID + if payload.Properties.Name == nil || + payload.Properties.Category == nil || + payload.Properties.Type == nil { + // cannot create new IC because required information (name, type, and/or category missing) + return fmt.Errorf("AMQP: Cannot create new IC, required field(s) is/are missing: name, type, category") + } + newICReq.InfrastructureComponent.Name = *payload.Properties.Name + newICReq.InfrastructureComponent.Category = *payload.Properties.Category + newICReq.InfrastructureComponent.Type = *payload.Properties.Type + + // add optional params + if payload.State != nil { + newICReq.InfrastructureComponent.State = *payload.State + } else { + newICReq.InfrastructureComponent.State = "unknown" + } + if payload.Properties.WS_url != nil { + newICReq.InfrastructureComponent.WebsocketURL = *payload.Properties.WS_url + } + if payload.Properties.API_url != nil { + newICReq.InfrastructureComponent.APIURL = *payload.Properties.API_url + } + if payload.Properties.Location != nil { + newICReq.InfrastructureComponent.Location = *payload.Properties.Location + } + if payload.Properties.Description != nil { + newICReq.InfrastructureComponent.Description = *payload.Properties.Description + } + // TODO add JSON start parameter scheme + + // set managed externally to true because this IC is created via AMQP + newICReq.InfrastructureComponent.ManagedExternally = newTrue() + + // Validate the new IC + err := newICReq.Validate() + if err != nil { + return fmt.Errorf("AMQP: Validation of new IC failed: %v", err) + } + + // Create the new IC + newIC := newICReq.CreateIC() + + // save IC + err = newIC.Save() + if err != nil { + return fmt.Errorf("AMQP: Saving new IC to DB failed: %v", err) + } + + return nil +} + +func updateIC(payload ICUpdate, sToBeUpdated infrastructure_component.InfrastructureComponent) error { + var updatedICReq infrastructure_component.UpdateICRequest + if payload.State != nil { + updatedICReq.InfrastructureComponent.State = *payload.State + } + if payload.Properties.Type != nil { + updatedICReq.InfrastructureComponent.Type = *payload.Properties.Type + } + if payload.Properties.Category != nil { + updatedICReq.InfrastructureComponent.Category = *payload.Properties.Category + } + if payload.Properties.Name != nil { + updatedICReq.InfrastructureComponent.Name = *payload.Properties.Name + } + if payload.Properties.WS_url != nil { + updatedICReq.InfrastructureComponent.WebsocketURL = *payload.Properties.WS_url + } + if payload.Properties.API_url != nil { + updatedICReq.InfrastructureComponent.APIURL = *payload.Properties.API_url + } + if payload.Properties.Location != nil { + //postgres.Jsonb{json.RawMessage(`{"location" : " ` + *payload.Properties.Location + `"}`)} + updatedICReq.InfrastructureComponent.Location = *payload.Properties.Location + } + if payload.Properties.Description != nil { + updatedICReq.InfrastructureComponent.Description = *payload.Properties.Description + } + // TODO add JSON start parameter scheme + + // set managed externally to true because this IC is updated via AMQP + updatedICReq.InfrastructureComponent.ManagedExternally = newTrue() + + // Validate the updated IC + err := updatedICReq.Validate() + if err != nil { + return fmt.Errorf("AMQP: Validation of updated IC failed: %v", err) + } + + // Create the updated IC from old IC + updatedIC := updatedICReq.UpdatedIC(sToBeUpdated) + + // Finally update the IC in the DB + err = sToBeUpdated.Update(updatedIC) + if err != nil { + return fmt.Errorf("AMQP: Unable to update IC %v in DB: %v", sToBeUpdated.Name, err) + } + + return err +} + +func newTrue() *bool { + b := true + return &b } diff --git a/database/models.go b/database/models.go index 1bb61e4..8ffa86d 100644 --- a/database/models.go +++ b/database/models.go @@ -140,6 +140,8 @@ type InfrastructureComponent struct { Description string `json:"description" gorm:"default:''"` // JSON scheme of start parameters for IC StartParameterScheme postgres.Jsonb `json:"startparameterscheme"` + // Boolean indicating if IC is managed externally (via AMQP/ VILLAScontroller) + ManagedExternally bool `json:"managedexternally" gorm:"default:false"` // ComponentConfigurations in which the IC is used ComponentConfigurations []ComponentConfiguration `json:"-" gorm:"foreignkey:ICID"` } diff --git a/helper/test_data.go b/helper/test_data.go index 32ef8fc..e8d3ced 100644 --- a/helper/test_data.go +++ b/helper/test_data.go @@ -103,6 +103,7 @@ var ICA = database.InfrastructureComponent{ Description: "This is a test description", //StateUpdateAt: time.Now().Format(time.RFC1123), StartParameterScheme: postgres.Jsonb{propertiesA}, + ManagedExternally: true, } var ICB = database.InfrastructureComponent{ @@ -118,6 +119,7 @@ var ICB = database.InfrastructureComponent{ Description: "A signal generator for testing purposes", //StateUpdateAt: time.Now().Format(time.RFC1123), StartParameterScheme: postgres.Jsonb{propertiesB}, + ManagedExternally: false, } // Scenarios diff --git a/routes/component-configuration/config_test.go b/routes/component-configuration/config_test.go index 065136d..4098551 100644 --- a/routes/component-configuration/config_test.go +++ b/routes/component-configuration/config_test.go @@ -58,6 +58,7 @@ type ICRequest struct { Location string `json:"location,omitempty"` Description string `json:"description,omitempty"` StartParameterScheme postgres.Jsonb `json:"startparameterscheme,omitempty"` + ManagedExternally *bool `json:"managedexternally,omitempty"` } type ScenarioRequest struct { @@ -83,6 +84,7 @@ func addScenarioAndIC() (scenarioID uint, ICID uint) { Location: helper.ICA.Location, Description: helper.ICA.Description, StartParameterScheme: helper.ICA.StartParameterScheme, + ManagedExternally: &helper.ICA.ManagedExternally, } _, resp, _ := helper.TestEndpoint(router, token, "/api/ic", "POST", helper.KeyModels{"ic": newICA}) @@ -101,6 +103,7 @@ func addScenarioAndIC() (scenarioID uint, ICID uint) { Location: helper.ICB.Location, Description: helper.ICB.Description, StartParameterScheme: helper.ICB.StartParameterScheme, + ManagedExternally: &helper.ICB.ManagedExternally, } _, resp, _ = helper.TestEndpoint(router, token, "/api/ic", "POST", helper.KeyModels{"ic": newICB}) diff --git a/routes/infrastructure-component/ic_test.go b/routes/infrastructure-component/ic_test.go index a475082..334746e 100644 --- a/routes/infrastructure-component/ic_test.go +++ b/routes/infrastructure-component/ic_test.go @@ -49,6 +49,7 @@ type ICRequest struct { Location string `json:"location,omitempty"` Description string `json:"description,omitempty"` StartParameterScheme postgres.Jsonb `json:"startparameterscheme,omitempty"` + ManagedExternally *bool `json:"managedexternally,omitempty"` } func TestMain(m *testing.M) { @@ -112,6 +113,7 @@ func TestAddICAsAdmin(t *testing.T) { Location: helper.ICA.Location, Description: helper.ICA.Description, StartParameterScheme: helper.ICA.StartParameterScheme, + ManagedExternally: &helper.ICA.ManagedExternally, } code, resp, err = helper.TestEndpoint(router, token, "/api/ic", "POST", helper.KeyModels{"ic": newIC}) @@ -166,6 +168,7 @@ func TestAddICAsUser(t *testing.T) { Location: helper.ICA.Location, Description: helper.ICA.Description, StartParameterScheme: helper.ICA.StartParameterScheme, + ManagedExternally: &helper.ICA.ManagedExternally, } // This should fail with unprocessable entity 422 error code @@ -197,6 +200,7 @@ func TestUpdateICAsAdmin(t *testing.T) { Location: helper.ICA.Location, Description: helper.ICA.Description, StartParameterScheme: helper.ICA.StartParameterScheme, + ManagedExternally: &helper.ICA.ManagedExternally, } code, resp, err := helper.TestEndpoint(router, token, "/api/ic", "POST", helper.KeyModels{"ic": newIC}) @@ -263,6 +267,7 @@ func TestUpdateICAsUser(t *testing.T) { Location: helper.ICA.Location, Description: helper.ICA.Description, StartParameterScheme: helper.ICA.StartParameterScheme, + ManagedExternally: &helper.ICA.ManagedExternally, } code, resp, err := helper.TestEndpoint(router, token, "/api/ic", "POST", helper.KeyModels{"ic": newIC}) @@ -309,6 +314,7 @@ func TestDeleteICAsAdmin(t *testing.T) { Location: helper.ICA.Location, Description: helper.ICA.Description, StartParameterScheme: helper.ICA.StartParameterScheme, + ManagedExternally: &helper.ICA.ManagedExternally, } code, resp, err := helper.TestEndpoint(router, token, "/api/ic", "POST", helper.KeyModels{"ic": newIC}) @@ -363,6 +369,7 @@ func TestDeleteICAsUser(t *testing.T) { Location: helper.ICA.Location, Description: helper.ICA.Description, StartParameterScheme: helper.ICA.StartParameterScheme, + ManagedExternally: &helper.ICA.ManagedExternally, } code, resp, err := helper.TestEndpoint(router, token, "/api/ic", "POST", helper.KeyModels{"ic": newIC}) @@ -413,6 +420,7 @@ func TestGetAllICs(t *testing.T) { Location: helper.ICA.Location, Description: helper.ICA.Description, StartParameterScheme: helper.ICA.StartParameterScheme, + ManagedExternally: &helper.ICA.ManagedExternally, } code, resp, err := helper.TestEndpoint(router, token, "/api/ic", "POST", helper.KeyModels{"ic": newICA}) @@ -430,6 +438,7 @@ func TestGetAllICs(t *testing.T) { Location: helper.ICB.Location, Description: helper.ICB.Description, StartParameterScheme: helper.ICB.StartParameterScheme, + ManagedExternally: &helper.ICB.ManagedExternally, } code, resp, err = helper.TestEndpoint(router, token, "/api/ic", "POST", helper.KeyModels{"ic": newICB}) @@ -477,6 +486,7 @@ func TestGetConfigsOfIC(t *testing.T) { Location: helper.ICA.Location, Description: helper.ICA.Description, StartParameterScheme: helper.ICA.StartParameterScheme, + ManagedExternally: &helper.ICA.ManagedExternally, } code, resp, err := helper.TestEndpoint(router, token, "/api/ic", "POST", helper.KeyModels{"ic": newICA}) diff --git a/routes/infrastructure-component/ic_validators.go b/routes/infrastructure-component/ic_validators.go index 7aefd54..da3484b 100644 --- a/routes/infrastructure-component/ic_validators.go +++ b/routes/infrastructure-component/ic_validators.go @@ -38,10 +38,11 @@ type validNewIC struct { Type string `form:"Type" validate:"required"` Name string `form:"Name" validate:"required"` Category string `form:"Category" validate:"required"` - StartParameterScheme postgres.Jsonb `form:"StartParameterScheme" validate:"omitempty"` State string `form:"State" validate:"omitempty"` Location string `form:"Location" validate:"omitempty"` Description string `form:"Description" validate:"omitempty"` + StartParameterScheme postgres.Jsonb `form:"StartParameterScheme" validate:"omitempty"` + ManagedExternally *bool `form:"ManagedExternally" validate:"required"` } type validUpdatedIC struct { @@ -51,10 +52,11 @@ type validUpdatedIC struct { Type string `form:"Type" validate:"omitempty"` Name string `form:"Name" validate:"omitempty"` Category string `form:"Category" validate:"omitempty"` - StartParameterScheme postgres.Jsonb `form:"StartParameterScheme" validate:"omitempty"` State string `form:"State" validate:"omitempty"` Location string `form:"Location" validate:"omitempty"` Description string `form:"Description" validate:"omitempty"` + StartParameterScheme postgres.Jsonb `form:"StartParameterScheme" validate:"omitempty"` + ManagedExternally *bool `form:"ManagedExternally" validate:"required"` } type AddICRequest struct { @@ -89,6 +91,7 @@ func (r *AddICRequest) CreateIC() InfrastructureComponent { s.Location = r.InfrastructureComponent.Location s.Description = r.InfrastructureComponent.Description s.StartParameterScheme = r.InfrastructureComponent.StartParameterScheme + s.ManagedExternally = *r.InfrastructureComponent.ManagedExternally if r.InfrastructureComponent.State != "" { s.State = r.InfrastructureComponent.State } else { @@ -140,6 +143,10 @@ func (r *UpdateICRequest) UpdatedIC(oldIC InfrastructureComponent) Infrastructur s.Description = r.InfrastructureComponent.Description } + if r.InfrastructureComponent.ManagedExternally != nil { + s.ManagedExternally = *r.InfrastructureComponent.ManagedExternally + } + // set last update time s.StateUpdateAt = time.Now().Format(time.RFC1123) diff --git a/routes/signal/signal_test.go b/routes/signal/signal_test.go index bdc5e46..045d53c 100644 --- a/routes/signal/signal_test.go +++ b/routes/signal/signal_test.go @@ -65,6 +65,7 @@ type ICRequest struct { Location string `json:"location,omitempty"` Description string `json:"description,omitempty"` StartParameterScheme postgres.Jsonb `json:"startparameterscheme,omitempty"` + ManagedExternally *bool `json:"managedexternally,omitempty"` } type ScenarioRequest struct { @@ -90,6 +91,7 @@ func addScenarioAndICAndConfig() (scenarioID uint, ICID uint, configID uint) { Location: helper.ICA.Location, Description: helper.ICA.Description, StartParameterScheme: helper.ICA.StartParameterScheme, + ManagedExternally: &helper.ICA.ManagedExternally, } _, resp, _ := helper.TestEndpoint(router, token, "/api/ic", "POST", helper.KeyModels{"ic": newICA})