diff --git a/routes/component-configuration/config_methods.go b/routes/component-configuration/config_methods.go index f6a222d..767b76c 100644 --- a/routes/component-configuration/config_methods.go +++ b/routes/component-configuration/config_methods.go @@ -23,7 +23,6 @@ package component_configuration import ( "git.rwth-aachen.de/acs/public/villas/web-backend-go/database" - "git.rwth-aachen.de/acs/public/villas/web-backend-go/routes/infrastructure-component" "git.rwth-aachen.de/acs/public/villas/web-backend-go/routes/scenario" ) @@ -61,8 +60,11 @@ func (m *ComponentConfiguration) addToScenario() error { } // associate IC with component configuration - var ic infrastructure_component.InfrastructureComponent - err = ic.ByID(m.ICID) + var ic database.InfrastructureComponent + err = db.Find(&ic, m.ICID).Error + if err != nil { + return err + } err = db.Model(&ic).Association("ComponentConfigurations").Append(m).Error if err != nil { return err @@ -80,23 +82,24 @@ func (m *ComponentConfiguration) Update(modifiedConfig ComponentConfiguration) e // check if IC has been updated if m.ICID != modifiedConfig.ICID { // update IC - var s infrastructure_component.InfrastructureComponent - var s_old infrastructure_component.InfrastructureComponent - err := s.ByID(modifiedConfig.ICID) + var ic database.InfrastructureComponent + var ic_old database.InfrastructureComponent + err := db.Find(&ic, modifiedConfig.ICID).Error if err != nil { return err } - err = s_old.ByID(m.ICID) + err = db.Find(&ic_old, m.ICID).Error if err != nil { return err } + // remove component configuration from old IC - err = db.Model(&s_old).Association("ComponentConfigurations").Delete(m).Error + err = db.Model(&ic_old).Association("ComponentConfigurations").Delete(m).Error if err != nil { return err } // add component configuration to new IC - err = db.Model(&s).Association("ComponentConfigurations").Append(m).Error + err = db.Model(&ic).Association("ComponentConfigurations").Append(m).Error if err != nil { return err } @@ -121,8 +124,8 @@ func (m *ComponentConfiguration) delete() error { return err } - var ic infrastructure_component.InfrastructureComponent - err = ic.ByID(m.ICID) + var ic database.InfrastructureComponent + err = db.Find(&ic, m.ICID).Error if err != nil { return err } diff --git a/routes/infrastructure-component/amqpclient.go b/routes/infrastructure-component/ic_amqpclient.go similarity index 57% rename from routes/infrastructure-component/amqpclient.go rename to routes/infrastructure-component/ic_amqpclient.go index 13e7f6f..1adf68e 100644 --- a/routes/infrastructure-component/amqpclient.go +++ b/routes/infrastructure-component/ic_amqpclient.go @@ -183,7 +183,7 @@ func sendActionAMQP(action Action) error { return err } - log.Println("AMQP: Sending message", string(msg.Body)) + //log.Println("AMQP: Sending message", string(msg.Body)) err = client.channel.Publish(VILLAS_EXCHANGE, "", false, @@ -193,16 +193,16 @@ func sendActionAMQP(action Action) error { } -func PingAMQP() error { - log.Println("AMQP: sending ping command to all ICs") - - var a Action - a.Act = "ping" - *a.Properties.UUID = "" - - err := sendActionAMQP(a) - return err -} +//func PingAMQP() error { +// log.Println("AMQP: sending ping command to all ICs") +// +// var a Action +// a.Act = "ping" +// *a.Properties.UUID = "" +// +// err := sendActionAMQP(a) +// return err +//} func CheckConnection() error { @@ -256,8 +256,6 @@ func StartAMQP(AMQPurl string, api *gin.RouterGroup) error { func processMessage(message amqp.Delivery) error { - log.Println("Processing AMQP message: ", string(message.Body)) - var payload ICUpdate err := json.Unmarshal(message.Body, &payload) if err != nil { @@ -265,6 +263,7 @@ func processMessage(message amqp.Delivery) error { } if payload.Status != nil { + //log.Println("Processing AMQP message: ", string(message.Body)) // if a message contains a "state" field, it is an update for an IC ICUUID := payload.Status.UUID _, err = uuid.Parse(ICUUID) @@ -273,18 +272,160 @@ func processMessage(message amqp.Delivery) error { return fmt.Errorf("AMQP: UUID not valid: %v, message ignored: %v \n", ICUUID, string(message.Body)) } var sToBeUpdated InfrastructureComponent - err = sToBeUpdated.ByUUID(ICUUID) + err = sToBeUpdated.byUUID(ICUUID) if err == gorm.ErrRecordNotFound { // create new record - err = createNewICviaAMQP(payload) + err = createExternalIC(payload) } else if err != nil { // database error err = fmt.Errorf("AMQP: Database error for IC %v DB error message: %v", ICUUID, err) } else { // update record based on payload - err = sToBeUpdated.updateICviaAMQP(payload) + err = sToBeUpdated.updateExternalIC(payload) } } return err } + +func createExternalIC(payload ICUpdate) error { + + var newICReq AddICRequest + newICReq.InfrastructureComponent.UUID = payload.Status.UUID + if payload.Status.Name == nil || + payload.Status.Category == nil || + payload.Status.Type == nil { + // cannot create new IC because required information (name, type, and/or category missing) + return fmt.Errorf("AMQP: Cannot create new IC, required field(s) is/are missing: name, type, category") + } + newICReq.InfrastructureComponent.Name = *payload.Status.Name + newICReq.InfrastructureComponent.Category = *payload.Status.Category + newICReq.InfrastructureComponent.Type = *payload.Status.Type + + // add optional params + if payload.Status.State != nil { + newICReq.InfrastructureComponent.State = *payload.Status.State + } else { + newICReq.InfrastructureComponent.State = "unknown" + } + if newICReq.InfrastructureComponent.State == "gone" { + // Check if state is "gone" and abort creation of IC in this case + log.Println("AMQP: Aborting creation of IC with state gone") + return nil + } + + if payload.Status.WS_url != nil { + newICReq.InfrastructureComponent.WebsocketURL = *payload.Status.WS_url + } + if payload.Status.API_url != nil { + newICReq.InfrastructureComponent.APIURL = *payload.Status.API_url + } + if payload.Status.Location != nil { + newICReq.InfrastructureComponent.Location = *payload.Status.Location + } + if payload.Status.Description != nil { + newICReq.InfrastructureComponent.Description = *payload.Status.Description + } + if payload.Status.Uptime != nil { + newICReq.InfrastructureComponent.Uptime = *payload.Status.Uptime + } + // TODO add JSON start parameter scheme + + // set managed externally to true because this IC is created via AMQP + newICReq.InfrastructureComponent.ManagedExternally = newTrue() + + // Validate the new IC + err := newICReq.validate() + if err != nil { + return fmt.Errorf("AMQP: Validation of new IC failed: %v", err) + } + + // Create the new IC + newIC, err := newICReq.createIC(true) + if err != nil { + return fmt.Errorf("AMQP: Creating new IC failed: %v", err) + } + + // save IC + err = newIC.save() + if err != nil { + return fmt.Errorf("AMQP: Saving new IC to DB failed: %v", err) + } + + log.Println("AMQP: Created IC with UUID ", newIC.UUID) + return nil +} + +func (s *InfrastructureComponent) updateExternalIC(payload ICUpdate) error { + + var updatedICReq UpdateICRequest + if payload.Status.State != nil { + updatedICReq.InfrastructureComponent.State = *payload.Status.State + + if *payload.Status.State == "gone" { + // remove IC from DB + log.Println("AMQP: Deleting IC with state gone") + err := s.delete(true) + if err != nil { + // if component could not be deleted there are still configurations using it in the DB + // continue with the update to save the new state of the component and get back to the deletion later + log.Println("AMQP: Deletion of IC postponed (config(s) associated to it)") + } + + } + } + if payload.Status.Type != nil { + updatedICReq.InfrastructureComponent.Type = *payload.Status.Type + } + if payload.Status.Category != nil { + updatedICReq.InfrastructureComponent.Category = *payload.Status.Category + } + if payload.Status.Name != nil { + updatedICReq.InfrastructureComponent.Name = *payload.Status.Name + } + if payload.Status.WS_url != nil { + updatedICReq.InfrastructureComponent.WebsocketURL = *payload.Status.WS_url + } + if payload.Status.API_url != nil { + updatedICReq.InfrastructureComponent.APIURL = *payload.Status.API_url + } + if payload.Status.Location != nil { + //postgres.Jsonb{json.RawMessage(`{"location" : " ` + *payload.Status.Location + `"}`)} + updatedICReq.InfrastructureComponent.Location = *payload.Status.Location + } + if payload.Status.Description != nil { + updatedICReq.InfrastructureComponent.Description = *payload.Status.Description + } + if payload.Status.Uptime != nil { + updatedICReq.InfrastructureComponent.Uptime = *payload.Status.Uptime + } + // TODO add JSON start parameter scheme + + // Validate the updated IC + err := updatedICReq.validate() + if err != nil { + return fmt.Errorf("AMQP: Validation of updated IC failed: %v", err) + } + + // Create the updated IC from old IC + updatedIC := updatedICReq.updatedIC(*s) + + // Finally update the IC in the DB + err = s.update(updatedIC) + if err != nil { + return fmt.Errorf("AMQP: Unable to update IC %v in DB: %v", s.Name, err) + } + + log.Println("AMQP: Updated IC with UUID ", s.UUID) + return err +} + +func newTrue() *bool { + b := true + return &b +} + +func newFalse() *bool { + b := false + return &b +} diff --git a/routes/infrastructure-component/ic_endpoints.go b/routes/infrastructure-component/ic_endpoints.go index a542a63..2b9f5b8 100644 --- a/routes/infrastructure-component/ic_endpoints.go +++ b/routes/infrastructure-component/ic_endpoints.go @@ -22,7 +22,6 @@ package infrastructure_component import ( - "fmt" "git.rwth-aachen.de/acs/public/villas/web-backend-go/database" "git.rwth-aachen.de/acs/public/villas/web-backend-go/helper" "github.com/gin-gonic/gin" @@ -108,9 +107,9 @@ func addIC(c *gin.Context) { return } - if !newIC.ManagedExternally { + if !(newIC.ManagedExternally) { // Save new IC to DB if not managed externally - err = newIC.Save() + err = newIC.save() if helper.DBError(c, err) { return @@ -142,6 +141,11 @@ func updateIC(c *gin.Context) { return } + if oldIC.ManagedExternally { + helper.ForbiddenError(c, "Cannot update externally managed component via API") + return + } + var req UpdateICRequest err := c.BindJSON(&req) if err != nil { @@ -156,17 +160,10 @@ func updateIC(c *gin.Context) { } // Create the updatedIC from oldIC - updatedIC, err := req.updatedIC(oldIC, false) - if err != nil { - c.JSON(http.StatusForbidden, gin.H{ - "success": false, - "message": fmt.Sprintf("%v", err), - }) - return - } + updatedIC := req.updatedIC(oldIC) // Finally update the IC in the DB - err = oldIC.Update(updatedIC) + err = oldIC.update(updatedIC) if !helper.DBError(c, err) { c.JSON(http.StatusOK, gin.H{"ic": updatedIC.InfrastructureComponent}) } @@ -285,7 +282,7 @@ func sendActionToIC(c *gin.Context) { } //now := time.Now() - log.Println("AMQP: Will attempt to send the following actions:", actions) + log.Println("AMQP: Sending actions:", actions) for _, action := range actions { /*if action.When == 0 { diff --git a/routes/infrastructure-component/ic_methods.go b/routes/infrastructure-component/ic_methods.go index 3a757b2..b8d06c1 100644 --- a/routes/infrastructure-component/ic_methods.go +++ b/routes/infrastructure-component/ic_methods.go @@ -32,25 +32,25 @@ type InfrastructureComponent struct { database.InfrastructureComponent } -func (s *InfrastructureComponent) Save() error { +func (s *InfrastructureComponent) save() error { db := database.GetDB() err := db.Create(s).Error return err } -func (s *InfrastructureComponent) ByID(id uint) error { +func (s *InfrastructureComponent) byID(id uint) error { db := database.GetDB() err := db.Find(s, id).Error return err } -func (s *InfrastructureComponent) ByUUID(uuid string) error { +func (s *InfrastructureComponent) byUUID(uuid string) error { db := database.GetDB() err := db.Find(s, "UUID = ?", uuid).Error return err } -func (s *InfrastructureComponent) Update(updatedIC InfrastructureComponent) error { +func (s *InfrastructureComponent) update(updatedIC InfrastructureComponent) error { db := database.GetDB() err := db.Model(s).Updates(updatedIC).Error @@ -66,6 +66,7 @@ func (s *InfrastructureComponent) delete(receivedViaAMQP bool) error { action.Properties.UUID = new(string) *action.Properties.UUID = s.UUID + log.Println("AMQP: Sending request to delete IC with UUID", s.UUID) err := sendActionAMQP(action) return err } @@ -89,144 +90,3 @@ func (s *InfrastructureComponent) getConfigs() ([]database.ComponentConfiguratio err := db.Order("ID asc").Model(s).Related(&configs, "ComponentConfigurations").Error return configs, len(configs), err } - -func createNewICviaAMQP(payload ICUpdate) error { - - var newICReq AddICRequest - newICReq.InfrastructureComponent.UUID = payload.Status.UUID - if payload.Status.Name == nil || - payload.Status.Category == nil || - payload.Status.Type == nil { - // cannot create new IC because required information (name, type, and/or category missing) - return fmt.Errorf("AMQP: Cannot create new IC, required field(s) is/are missing: name, type, category") - } - newICReq.InfrastructureComponent.Name = *payload.Status.Name - newICReq.InfrastructureComponent.Category = *payload.Status.Category - newICReq.InfrastructureComponent.Type = *payload.Status.Type - - // add optional params - if payload.Status.State != nil { - newICReq.InfrastructureComponent.State = *payload.Status.State - } else { - newICReq.InfrastructureComponent.State = "unknown" - } - if newICReq.InfrastructureComponent.State == "gone" { - // Check if state is "gone" and abort creation of IC in this case - log.Println("########## AMQP: Aborting creation of IC with state gone") - return nil - } - - if payload.Status.WS_url != nil { - newICReq.InfrastructureComponent.WebsocketURL = *payload.Status.WS_url - } - if payload.Status.API_url != nil { - newICReq.InfrastructureComponent.APIURL = *payload.Status.API_url - } - if payload.Status.Location != nil { - newICReq.InfrastructureComponent.Location = *payload.Status.Location - } - if payload.Status.Description != nil { - newICReq.InfrastructureComponent.Description = *payload.Status.Description - } - if payload.Status.Uptime != nil { - newICReq.InfrastructureComponent.Uptime = *payload.Status.Uptime - } - // TODO add JSON start parameter scheme - - // set managed externally to true because this IC is created via AMQP - newICReq.InfrastructureComponent.ManagedExternally = newTrue() - - // Validate the new IC - err := newICReq.validate() - if err != nil { - return fmt.Errorf("AMQP: Validation of new IC failed: %v", err) - } - - // Create the new IC - newIC, err := newICReq.createIC(true) - if err != nil { - return fmt.Errorf("AMQP: Creating new IC failed: %v", err) - } - - // save IC - err = newIC.Save() - if err != nil { - return fmt.Errorf("AMQP: Saving new IC to DB failed: %v", err) - } - - return nil -} - -func (s *InfrastructureComponent) updateICviaAMQP(payload ICUpdate) error { - - var updatedICReq UpdateICRequest - if payload.Status.State != nil { - updatedICReq.InfrastructureComponent.State = *payload.Status.State - - if *payload.Status.State == "gone" { - // remove IC from DB - log.Println("########## AMQP: Deleting IC with state gone") - err := s.delete(true) - if err != nil { - // if component could not be deleted there are still configurations using it in the DB - // continue with the update to save the new state of the component and get back to the deletion later - log.Println("########## AMQP: Deletion of IC postponed (config(s) associated to it)") - } - - } - } - if payload.Status.Type != nil { - updatedICReq.InfrastructureComponent.Type = *payload.Status.Type - } - if payload.Status.Category != nil { - updatedICReq.InfrastructureComponent.Category = *payload.Status.Category - } - if payload.Status.Name != nil { - updatedICReq.InfrastructureComponent.Name = *payload.Status.Name - } - if payload.Status.WS_url != nil { - updatedICReq.InfrastructureComponent.WebsocketURL = *payload.Status.WS_url - } - if payload.Status.API_url != nil { - updatedICReq.InfrastructureComponent.APIURL = *payload.Status.API_url - } - if payload.Status.Location != nil { - //postgres.Jsonb{json.RawMessage(`{"location" : " ` + *payload.Status.Location + `"}`)} - updatedICReq.InfrastructureComponent.Location = *payload.Status.Location - } - if payload.Status.Description != nil { - updatedICReq.InfrastructureComponent.Description = *payload.Status.Description - } - if payload.Status.Uptime != nil { - updatedICReq.InfrastructureComponent.Uptime = *payload.Status.Uptime - } - // TODO add JSON start parameter scheme - - // set managed externally to true because this IC is updated via AMQP - updatedICReq.InfrastructureComponent.ManagedExternally = newTrue() - - // Validate the updated IC - err := updatedICReq.validate() - if err != nil { - return fmt.Errorf("AMQP: Validation of updated IC failed: %v", err) - } - - // Create the updated IC from old IC - updatedIC, err := updatedICReq.updatedIC(*s, true) - if err != nil { - return fmt.Errorf("AMQP: Unable to update IC %v : %v", s.Name, err) - } - - // Finally update the IC in the DB - err = s.Update(updatedIC) - if err != nil { - return fmt.Errorf("AMQP: Unable to update IC %v in DB: %v", s.Name, err) - } - - return err -} - -func newTrue() *bool { - b := true - return &b -} diff --git a/routes/infrastructure-component/ic_middleware.go b/routes/infrastructure-component/ic_middleware.go index a5c7d5a..3125d5b 100644 --- a/routes/infrastructure-component/ic_middleware.go +++ b/routes/infrastructure-component/ic_middleware.go @@ -45,7 +45,7 @@ func CheckPermissions(c *gin.Context, modeltype database.ModelName, operation da return false, s } - err = s.ByID(uint(ICID)) + err = s.byID(uint(ICID)) if helper.DBError(c, err) { return false, s } diff --git a/routes/infrastructure-component/ic_test.go b/routes/infrastructure-component/ic_test.go index a9f1489..4204d26 100644 --- a/routes/infrastructure-component/ic_test.go +++ b/routes/infrastructure-component/ic_test.go @@ -25,10 +25,11 @@ import ( "encoding/json" "fmt" "git.rwth-aachen.de/acs/public/villas/web-backend-go/helper" + component_configuration "git.rwth-aachen.de/acs/public/villas/web-backend-go/routes/component-configuration" + "git.rwth-aachen.de/acs/public/villas/web-backend-go/routes/scenario" "github.com/jinzhu/gorm/dialects/postgres" "github.com/streadway/amqp" "github.com/stretchr/testify/assert" - "log" "os" "testing" "time" @@ -41,6 +42,8 @@ import ( ) var router *gin.Engine +var api *gin.RouterGroup +var waitingTime time.Duration = 2 type ICRequest struct { UUID string `json:"uuid,omitempty"` @@ -53,7 +56,21 @@ type ICRequest struct { Location string `json:"location,omitempty"` Description string `json:"description,omitempty"` StartParameterScheme postgres.Jsonb `json:"startparameterscheme,omitempty"` - ManagedExternally *bool `json:"managedexternally,omitempty"` + ManagedExternally *bool `json:"managedexternally"` +} + +type ScenarioRequest struct { + Name string `json:"name,omitempty"` + Running bool `json:"running,omitempty"` + StartParameters postgres.Jsonb `json:"startParameters,omitempty"` +} + +type ConfigRequest struct { + Name string `json:"name,omitempty"` + ScenarioID uint `json:"scenarioID,omitempty"` + ICID uint `json:"icID,omitempty"` + StartParameters postgres.Jsonb `json:"startParameters,omitempty"` + FileIDs []int64 `json:"fileIDs,omitempty"` } type ICAction struct { @@ -84,25 +101,16 @@ func TestMain(m *testing.M) { defer database.DBpool.Close() router = gin.Default() - api := router.Group("/api") + api = router.Group("/api") user.RegisterAuthenticate(api.Group("/authenticate")) api.Use(user.Authentication(true)) RegisterICEndpoints(api.Group("/ic")) - RegisterAMQPEndpoint(api.Group("/ic")) - - // connect AMQP client (make sure that AMQP_HOST, AMQP_USER, AMQP_PASS are set via command line parameters) - host, err := configuration.GolbalConfig.String("amqp.host") - user, err := configuration.GolbalConfig.String("amqp.user") - pass, err := configuration.GolbalConfig.String("amqp.pass") - - amqpURI := "amqp://" + user + ":" + pass + "@" + host - log.Println("AMQP URI is", amqpURI) - - err = ConnectAMQP(amqpURI) - if err != nil { - panic(m) - } + // component configuration endpoints required to associate an IC with a component config + component_configuration.RegisterComponentConfigurationEndpoints(api.Group("/configs")) + // scenario endpoints required here to first add a scenario to the DB + // that can be associated with a new component configuration + scenario.RegisterScenarioEndpoints(api.Group("/scenarios")) os.Exit(m.Run()) } @@ -112,6 +120,22 @@ func TestAddICAsAdmin(t *testing.T) { database.MigrateModels() assert.NoError(t, helper.DBAddAdminAndUserAndGuest()) + // check AMQP connection + err := CheckConnection() + assert.Errorf(t, err, "connection is nil") + + // connect AMQP client + // Make sure that AMQP_HOST, AMQP_USER, AMQP_PASS are set + host, err := configuration.GolbalConfig.String("amqp.host") + user, err := configuration.GolbalConfig.String("amqp.user") + pass, err := configuration.GolbalConfig.String("amqp.pass") + amqpURI := "amqp://" + user + ":" + pass + "@" + host + + // AMQP Connection startup is tested here + // Not repeated in other tests because it is only needed once + err = StartAMQP(amqpURI, api) + assert.NoError(t, err) + // authenticate as admin token, err := helper.AuthenticateForTest(router, "/api/authenticate", "POST", helper.AdminCredentials) @@ -146,7 +170,7 @@ func TestAddICAsAdmin(t *testing.T) { Location: helper.ICA.Location, Description: helper.ICA.Description, StartParameterScheme: helper.ICA.StartParameterScheme, - ManagedExternally: &helper.ICA.ManagedExternally, + ManagedExternally: newFalse(), } code, resp, err = helper.TestEndpoint(router, token, "/api/ic", "POST", helper.KeyModels{"ic": newIC}) @@ -178,6 +202,30 @@ func TestAddICAsAdmin(t *testing.T) { fmt.Sprintf("/api/ic/%v", newICID+1), "GET", nil) assert.NoError(t, err) assert.Equalf(t, 404, code, "Response body: \n%v\n", resp) + + newExternalIC := ICRequest{ + UUID: helper.ICB.UUID, + WebsocketURL: helper.ICB.WebsocketURL, + APIURL: helper.ICB.APIURL, + Type: helper.ICB.Type, + Name: helper.ICB.Name, + Category: helper.ICB.Category, + State: helper.ICB.State, + Location: helper.ICB.Location, + Description: helper.ICB.Description, + StartParameterScheme: helper.ICB.StartParameterScheme, + ManagedExternally: newTrue(), + } + + // test creation of external IC (should lead to emission of AMQP message to VILLAS) + code, resp, err = helper.TestEndpoint(router, token, + "/api/ic", "POST", helper.KeyModels{"ic": newExternalIC}) + assert.NoError(t, err) + assert.Equalf(t, 200, code, "Response body: \n%v\n", resp) + + // Compare POST's response with the newExternalIC + err = helper.CompareResponse(resp, helper.KeyModels{"ic": newExternalIC}) + assert.NoError(t, err) } func TestAddICAsUser(t *testing.T) { @@ -201,7 +249,7 @@ func TestAddICAsUser(t *testing.T) { Location: helper.ICA.Location, Description: helper.ICA.Description, StartParameterScheme: helper.ICA.StartParameterScheme, - ManagedExternally: &helper.ICA.ManagedExternally, + ManagedExternally: newFalse(), } // This should fail with unprocessable entity 422 error code @@ -233,7 +281,7 @@ func TestUpdateICAsAdmin(t *testing.T) { Location: helper.ICA.Location, Description: helper.ICA.Description, StartParameterScheme: helper.ICA.StartParameterScheme, - ManagedExternally: &helper.ICA.ManagedExternally, + ManagedExternally: newFalse(), } code, resp, err := helper.TestEndpoint(router, token, "/api/ic", "POST", helper.KeyModels{"ic": newIC}) @@ -277,6 +325,54 @@ func TestUpdateICAsAdmin(t *testing.T) { err = helper.CompareResponse(resp, helper.KeyModels{"ic": newIC}) assert.NoError(t, err) + // fake an IC update (create) message + var update ICUpdate + update.Status = new(ICStatus) + update.Status.UUID = helper.ICB.UUID + update.Status.State = new(string) + *update.Status.State = "idle" + update.Status.Name = new(string) + *update.Status.Name = helper.ICB.Name + update.Status.Category = new(string) + *update.Status.Category = helper.ICB.Category + update.Status.Type = new(string) + *update.Status.Type = helper.ICB.Type + + payload, err := json.Marshal(update) + assert.NoError(t, err) + + msg := amqp.Publishing{ + DeliveryMode: 2, + Timestamp: time.Now(), + ContentType: "application/json", + ContentEncoding: "utf-8", + Priority: 0, + Body: payload, + } + + err = CheckConnection() + assert.NoError(t, err) + + err = client.channel.Publish(VILLAS_EXCHANGE, + "", + false, + false, + msg) + assert.NoError(t, err) + + // Wait until externally managed IC is created (happens async) + time.Sleep(waitingTime * time.Second) + + // try to update this IC + var updatedIC ICRequest + updatedIC.Name = "a new name" + updatedIC.ManagedExternally = newTrue() + + // Should result in forbidden return code 403 + code, resp, err = helper.TestEndpoint(router, token, + fmt.Sprintf("/api/ic/%v", 2), "PUT", helper.KeyModels{"ic": updatedIC}) + assert.NoError(t, err) + assert.Equalf(t, 403, code, "Response body: \n%v\n", resp) } func TestUpdateICAsUser(t *testing.T) { @@ -300,7 +396,7 @@ func TestUpdateICAsUser(t *testing.T) { Location: helper.ICA.Location, Description: helper.ICA.Description, StartParameterScheme: helper.ICA.StartParameterScheme, - ManagedExternally: &helper.ICA.ManagedExternally, + ManagedExternally: newFalse(), } code, resp, err := helper.TestEndpoint(router, token, "/api/ic", "POST", helper.KeyModels{"ic": newIC}) @@ -347,7 +443,7 @@ func TestDeleteICAsAdmin(t *testing.T) { Location: helper.ICA.Location, Description: helper.ICA.Description, StartParameterScheme: helper.ICA.StartParameterScheme, - ManagedExternally: &helper.ICA.ManagedExternally, + ManagedExternally: newFalse(), } code, resp, err := helper.TestEndpoint(router, token, "/api/ic", "POST", helper.KeyModels{"ic": newIC}) @@ -379,6 +475,58 @@ func TestDeleteICAsAdmin(t *testing.T) { assert.NoError(t, err) assert.Equal(t, finalNumber, initialNumber-1) + + // fake an IC update (create) message + var update ICUpdate + update.Status = new(ICStatus) + update.Status.UUID = helper.ICB.UUID + update.Status.State = new(string) + *update.Status.State = "idle" + update.Status.Name = new(string) + *update.Status.Name = helper.ICB.Name + update.Status.Category = new(string) + *update.Status.Category = helper.ICB.Category + update.Status.Type = new(string) + *update.Status.Type = helper.ICB.Type + + payload, err := json.Marshal(update) + assert.NoError(t, err) + + msg := amqp.Publishing{ + DeliveryMode: 2, + Timestamp: time.Now(), + ContentType: "application/json", + ContentEncoding: "utf-8", + Priority: 0, + Body: payload, + } + + err = CheckConnection() + assert.NoError(t, err) + + err = client.channel.Publish(VILLAS_EXCHANGE, + "", + false, + false, + msg) + assert.NoError(t, err) + + // Wait until externally managed IC is created (happens async) + time.Sleep(waitingTime * time.Second) + + // Delete the added external IC (triggers an AMQP message, but should not remove the IC from the DB) + code, resp, err = helper.TestEndpoint(router, token, + fmt.Sprintf("/api/ic/%v", 2), "DELETE", nil) + assert.NoError(t, err) + assert.Equalf(t, 200, code, "Response body: \n%v\n", resp) + + // Again count the number of all the ICs returned + finalNumberAfterExtneralDelete, err := helper.LengthOfResponse(router, token, + "/api/ic", "GET", nil) + assert.NoError(t, err) + + assert.Equal(t, finalNumber+1, finalNumberAfterExtneralDelete) + } func TestDeleteICAsUser(t *testing.T) { @@ -402,7 +550,7 @@ func TestDeleteICAsUser(t *testing.T) { Location: helper.ICA.Location, Description: helper.ICA.Description, StartParameterScheme: helper.ICA.StartParameterScheme, - ManagedExternally: &helper.ICA.ManagedExternally, + ManagedExternally: newFalse(), } code, resp, err := helper.TestEndpoint(router, token, "/api/ic", "POST", helper.KeyModels{"ic": newIC}) @@ -453,7 +601,7 @@ func TestGetAllICs(t *testing.T) { Location: helper.ICA.Location, Description: helper.ICA.Description, StartParameterScheme: helper.ICA.StartParameterScheme, - ManagedExternally: &helper.ICA.ManagedExternally, + ManagedExternally: newFalse(), } code, resp, err := helper.TestEndpoint(router, token, "/api/ic", "POST", helper.KeyModels{"ic": newICA}) @@ -471,8 +619,9 @@ func TestGetAllICs(t *testing.T) { Location: helper.ICB.Location, Description: helper.ICB.Description, StartParameterScheme: helper.ICB.StartParameterScheme, - ManagedExternally: &helper.ICB.ManagedExternally, + ManagedExternally: newFalse(), } + code, resp, err = helper.TestEndpoint(router, token, "/api/ic", "POST", helper.KeyModels{"ic": newICB}) assert.NoError(t, err) @@ -519,7 +668,7 @@ func TestGetConfigsOfIC(t *testing.T) { Location: helper.ICA.Location, Description: helper.ICA.Description, StartParameterScheme: helper.ICA.StartParameterScheme, - ManagedExternally: &helper.ICA.ManagedExternally, + ManagedExternally: newFalse(), } code, resp, err := helper.TestEndpoint(router, token, "/api/ic", "POST", helper.KeyModels{"ic": newICA}) @@ -531,7 +680,6 @@ func TestGetConfigsOfIC(t *testing.T) { assert.NoError(t, err) // test GET ic/ID/confis - // TODO how to properly test this without using component configuration endpoints? numberOfConfigs, err := helper.LengthOfResponse(router, token, fmt.Sprintf("/api/ic/%v/configs", newICID), "GET", nil) assert.NoError(t, err) @@ -545,7 +693,6 @@ func TestGetConfigsOfIC(t *testing.T) { assert.NoError(t, err) // test GET ic/ID/configs - // TODO how to properly test this without using component configuration endpoints? numberOfConfigs, err = helper.LengthOfResponse(router, token, fmt.Sprintf("/api/ic/%v/configs", newICID), "GET", nil) assert.NoError(t, err) @@ -582,7 +729,7 @@ func TestSendActionToIC(t *testing.T) { Location: helper.ICA.Location, Description: helper.ICA.Description, StartParameterScheme: helper.ICA.StartParameterScheme, - ManagedExternally: &helper.ICA.ManagedExternally, + ManagedExternally: newFalse(), } code, resp, err := helper.TestEndpoint(router, token, "/api/ic", "POST", helper.KeyModels{"ic": newICA}) @@ -615,24 +762,23 @@ func TestSendActionToIC(t *testing.T) { assert.Equalf(t, 400, code, "Response body: \n%v\n", resp) } -func TestCreateUpdateDeleteViaAMQPRecv(t *testing.T) { +func TestCreateUpdateViaAMQPRecv(t *testing.T) { database.DropTables() database.MigrateModels() assert.NoError(t, helper.DBAddAdminAndUserAndGuest()) + // authenticate as admin + token, err := helper.AuthenticateForTest(router, + "/api/authenticate", "POST", helper.AdminCredentials) + assert.NoError(t, err) + // fake an IC update message var update ICUpdate update.Status = new(ICStatus) update.Status.UUID = helper.ICA.UUID update.Status.State = new(string) *update.Status.State = "idle" - update.Status.Name = new(string) - *update.Status.Name = helper.ICA.Name - update.Status.Category = new(string) - *update.Status.Category = helper.ICA.Category - update.Status.Type = new(string) - *update.Status.Type = helper.ICA.Type payload, err := json.Marshal(update) assert.NoError(t, err) @@ -648,6 +794,50 @@ func TestCreateUpdateDeleteViaAMQPRecv(t *testing.T) { err = CheckConnection() assert.NoError(t, err) + err = client.channel.Publish(VILLAS_EXCHANGE, + "", + false, + false, + msg) + assert.NoError(t, err) + + time.Sleep(waitingTime * time.Second) + + // get the length of the GET all ICs response for user + number, err := helper.LengthOfResponse(router, token, + "/api/ic", "GET", nil) + assert.NoError(t, err) + assert.Equal(t, 0, number) + + // complete the (required) data of an IC + update.Status.Name = new(string) + *update.Status.Name = helper.ICA.Name + update.Status.Category = new(string) + *update.Status.Category = helper.ICA.Category + update.Status.Type = new(string) + *update.Status.Type = helper.ICA.Type + update.Status.Uptime = new(float64) + *update.Status.Uptime = -1.0 + update.Status.WS_url = new(string) + *update.Status.WS_url = helper.ICA.WebsocketURL + update.Status.API_url = new(string) + *update.Status.API_url = helper.ICA.APIURL + update.Status.Description = new(string) + *update.Status.Description = helper.ICA.Description + update.Status.Location = new(string) + *update.Status.Location = helper.ICA.Location + + payload, err = json.Marshal(update) + assert.NoError(t, err) + + msg = amqp.Publishing{ + DeliveryMode: 2, + Timestamp: time.Now(), + ContentType: "application/json", + ContentEncoding: "utf-8", + Priority: 0, + Body: payload, + } err = client.channel.Publish(VILLAS_EXCHANGE, "", @@ -656,14 +846,10 @@ func TestCreateUpdateDeleteViaAMQPRecv(t *testing.T) { msg) assert.NoError(t, err) - time.Sleep(4 * time.Second) - // authenticate as admin - token, err := helper.AuthenticateForTest(router, - "/api/authenticate", "POST", helper.AdminCredentials) - assert.NoError(t, err) + time.Sleep(waitingTime * time.Second) // get the length of the GET all ICs response for user - number, err := helper.LengthOfResponse(router, token, + number, err = helper.LengthOfResponse(router, token, "/api/ic", "GET", nil) assert.NoError(t, err) assert.Equal(t, 1, number) @@ -689,13 +875,121 @@ func TestCreateUpdateDeleteViaAMQPRecv(t *testing.T) { msg) assert.NoError(t, err) - time.Sleep(4 * time.Second) + time.Sleep(waitingTime * time.Second) // get the length of the GET all ICs response for user number, err = helper.LengthOfResponse(router, token, "/api/ic", "GET", nil) assert.NoError(t, err) assert.Equal(t, 1, number) +} + +func TestDeleteICViaAMQPRecv(t *testing.T) { + + database.DropTables() + database.MigrateModels() + assert.NoError(t, helper.DBAddAdminAndUserAndGuest()) + + // authenticate as admin + token, err := helper.AuthenticateForTest(router, + "/api/authenticate", "POST", helper.AdminCredentials) + assert.NoError(t, err) + + // fake an IC update message + var update ICUpdate + update.Status = new(ICStatus) + update.Status.UUID = helper.ICA.UUID + update.Status.State = new(string) + *update.Status.State = "idle" + // complete the (required) data of an IC + update.Status.Name = new(string) + *update.Status.Name = helper.ICA.Name + update.Status.Category = new(string) + *update.Status.Category = helper.ICA.Category + update.Status.Type = new(string) + *update.Status.Type = helper.ICA.Type + update.Status.Uptime = new(float64) + *update.Status.Uptime = -1.0 + update.Status.WS_url = new(string) + *update.Status.WS_url = helper.ICA.WebsocketURL + update.Status.API_url = new(string) + *update.Status.API_url = helper.ICA.APIURL + update.Status.Description = new(string) + *update.Status.Description = helper.ICA.Description + update.Status.Location = new(string) + *update.Status.Location = helper.ICA.Location + + payload, err := json.Marshal(update) + assert.NoError(t, err) + + msg := amqp.Publishing{ + DeliveryMode: 2, + Timestamp: time.Now(), + ContentType: "application/json", + ContentEncoding: "utf-8", + Priority: 0, + Body: payload, + } + + err = CheckConnection() + assert.NoError(t, err) + err = client.channel.Publish(VILLAS_EXCHANGE, + "", + false, + false, + msg) + assert.NoError(t, err) + + time.Sleep(waitingTime * time.Second) + + // get the length of the GET all ICs response for user + number, err := helper.LengthOfResponse(router, token, + "/api/ic", "GET", nil) + assert.NoError(t, err) + assert.Equal(t, 1, number) + + // add scenario + newScenario := ScenarioRequest{ + Name: helper.ScenarioA.Name, + Running: helper.ScenarioA.Running, + StartParameters: helper.ScenarioA.StartParameters, + } + + code, resp, err := helper.TestEndpoint(router, token, + "/api/scenarios", "POST", helper.KeyModels{"scenario": newScenario}) + assert.NoError(t, err) + assert.Equalf(t, 200, code, "Response body: \n%v\n", resp) + + // Compare POST's response with the newScenario + err = helper.CompareResponse(resp, helper.KeyModels{"scenario": newScenario}) + assert.NoError(t, err) + + // Read newScenario's ID from the response + newScenarioID, err := helper.GetResponseID(resp) + assert.NoError(t, err) + + // Add component config and associate with IC and scenario + newConfig := ConfigRequest{ + Name: helper.ConfigA.Name, + ScenarioID: uint(newScenarioID), + ICID: 1, + StartParameters: helper.ConfigA.StartParameters, + FileIDs: helper.ConfigA.FileIDs, + } + + code, resp, err = helper.TestEndpoint(router, token, + "/api/configs", "POST", helper.KeyModels{"config": newConfig}) + assert.NoError(t, err) + assert.Equalf(t, 200, code, "Response body: \n%v\n", resp) + + // Compare POST's response with the newConfig + err = helper.CompareResponse(resp, helper.KeyModels{"config": newConfig}) + assert.NoError(t, err) + + // Read newConfig's ID from the response + newConfigID, err := helper.GetResponseID(resp) + assert.NoError(t, err) + // modify status update to state "gone" *update.Status.State = "gone" payload, err = json.Marshal(update) @@ -710,6 +1004,7 @@ func TestCreateUpdateDeleteViaAMQPRecv(t *testing.T) { Body: payload, } + // attempt to delete IC (should not work immediately because IC is still associated with component config) err = client.channel.Publish(VILLAS_EXCHANGE, "", false, @@ -717,15 +1012,27 @@ func TestCreateUpdateDeleteViaAMQPRecv(t *testing.T) { msg) assert.NoError(t, err) - time.Sleep(4 * time.Second) + time.Sleep(waitingTime * time.Second) + + // get the length of the GET all ICs response for user + number, err = helper.LengthOfResponse(router, token, + "/api/ic", "GET", nil) + assert.NoError(t, err) + assert.Equal(t, 1, number) + + // Delete component config from earlier + code, resp, err = helper.TestEndpoint(router, token, + fmt.Sprintf("/api/configs/%v", newConfigID), "DELETE", nil) + assert.NoError(t, err) + assert.Equalf(t, 200, code, "Response body: \n%v\n", resp) + + // Compare DELETE's response with the newConfig + err = helper.CompareResponse(resp, helper.KeyModels{"config": newConfig}) + assert.NoError(t, err) + // get the length of the GET all ICs response for user number, err = helper.LengthOfResponse(router, token, "/api/ic", "GET", nil) assert.NoError(t, err) assert.Equal(t, 0, number) - -} - -func TestCreateDeleteViaAMQPSend(t *testing.T) { - } diff --git a/routes/infrastructure-component/ic_validators.go b/routes/infrastructure-component/ic_validators.go index 9f6bb05..fae7369 100644 --- a/routes/infrastructure-component/ic_validators.go +++ b/routes/infrastructure-component/ic_validators.go @@ -23,7 +23,6 @@ package infrastructure_component import ( "encoding/json" - "fmt" "github.com/google/uuid" "github.com/jinzhu/gorm/dialects/postgres" "github.com/nsf/jsondiff" @@ -60,7 +59,6 @@ type validUpdatedIC struct { Location string `form:"Location" validate:"omitempty"` Description string `form:"Description" validate:"omitempty"` StartParameterScheme postgres.Jsonb `form:"StartParameterScheme" validate:"omitempty"` - ManagedExternally *bool `form:"ManagedExternally" validate:"required"` Uptime float64 `form:"Uptime" validate:"omitempty"` } @@ -134,43 +132,36 @@ func (r *AddICRequest) createIC(receivedViaAMQP bool) (InfrastructureComponent, *action.Properties.UUID = r.InfrastructureComponent.UUID } - log.Println("########## AMQP: Sending request to create new IC") + log.Println("AMQP: Sending request to create new IC") err = sendActionAMQP(action) - - // s remains empty - - } else { - s.UUID = r.InfrastructureComponent.UUID - s.WebsocketURL = r.InfrastructureComponent.WebsocketURL - s.APIURL = r.InfrastructureComponent.APIURL - s.Type = r.InfrastructureComponent.Type - s.Name = r.InfrastructureComponent.Name - s.Category = r.InfrastructureComponent.Category - s.Location = r.InfrastructureComponent.Location - s.Description = r.InfrastructureComponent.Description - s.StartParameterScheme = r.InfrastructureComponent.StartParameterScheme - s.ManagedExternally = *r.InfrastructureComponent.ManagedExternally - s.Uptime = -1.0 // no uptime available - if r.InfrastructureComponent.State != "" { - s.State = r.InfrastructureComponent.State - } else { - s.State = "unknown" - } - // set last update to creation time of IC - s.StateUpdateAt = time.Now().Format(time.RFC1123) } + s.UUID = r.InfrastructureComponent.UUID + s.WebsocketURL = r.InfrastructureComponent.WebsocketURL + s.APIURL = r.InfrastructureComponent.APIURL + s.Type = r.InfrastructureComponent.Type + s.Name = r.InfrastructureComponent.Name + s.Category = r.InfrastructureComponent.Category + s.Location = r.InfrastructureComponent.Location + s.Description = r.InfrastructureComponent.Description + s.StartParameterScheme = r.InfrastructureComponent.StartParameterScheme + s.ManagedExternally = *r.InfrastructureComponent.ManagedExternally + s.Uptime = -1.0 // no uptime available + if r.InfrastructureComponent.State != "" { + s.State = r.InfrastructureComponent.State + } else { + s.State = "unknown" + } + // set last update to creation time of IC + s.StateUpdateAt = time.Now().Format(time.RFC1123) + return s, err } -func (r *UpdateICRequest) updatedIC(oldIC InfrastructureComponent, receivedViaAMQP bool) (InfrastructureComponent, error) { +func (r *UpdateICRequest) updatedIC(oldIC InfrastructureComponent) InfrastructureComponent { // Use the old InfrastructureComponent as a basis for the updated InfrastructureComponent `s` s := oldIC - if s.ManagedExternally && !receivedViaAMQP { - // externally managed IC cannot be updated via API, only via AMQP - return s, fmt.Errorf("cannot update externally managed IC %v", s.Name) - } if r.InfrastructureComponent.UUID != "" { s.UUID = r.InfrastructureComponent.UUID } @@ -207,10 +198,6 @@ func (r *UpdateICRequest) updatedIC(oldIC InfrastructureComponent, receivedViaAM s.Description = r.InfrastructureComponent.Description } - if r.InfrastructureComponent.ManagedExternally != nil { - s.ManagedExternally = *r.InfrastructureComponent.ManagedExternally - } - // set last update time s.StateUpdateAt = time.Now().Format(time.RFC1123) @@ -225,5 +212,5 @@ func (r *UpdateICRequest) updatedIC(oldIC InfrastructureComponent, receivedViaAM s.StartParameterScheme = r.InfrastructureComponent.StartParameterScheme } - return s, nil + return s }