From b07cd23a7087b2bc779c327edfd9cf90527289d5 Mon Sep 17 00:00:00 2001 From: Sonja Happ Date: Wed, 21 Oct 2020 17:16:15 +0200 Subject: [PATCH] continue integration of new AMQP functionality #31 #41 #42 --- helper/test_data.go | 4 +- .../component-configuration/config_methods.go | 30 ++++- routes/component-configuration/config_test.go | 17 ++- routes/infrastructure-component/amqpclient.go | 38 ++++-- .../infrastructure-component/ic_endpoints.go | 75 +++++++---- routes/infrastructure-component/ic_methods.go | 43 ++++-- .../infrastructure-component/ic_validators.go | 125 +++++++++++++----- 7 files changed, 252 insertions(+), 80 deletions(-) diff --git a/helper/test_data.go b/helper/test_data.go index e8d3ced..9bad50e 100644 --- a/helper/test_data.go +++ b/helper/test_data.go @@ -103,7 +103,7 @@ var ICA = database.InfrastructureComponent{ Description: "This is a test description", //StateUpdateAt: time.Now().Format(time.RFC1123), StartParameterScheme: postgres.Jsonb{propertiesA}, - ManagedExternally: true, + ManagedExternally: false, } var ICB = database.InfrastructureComponent{ @@ -119,7 +119,7 @@ var ICB = database.InfrastructureComponent{ Description: "A signal generator for testing purposes", //StateUpdateAt: time.Now().Format(time.RFC1123), StartParameterScheme: postgres.Jsonb{propertiesB}, - ManagedExternally: false, + ManagedExternally: true, } // Scenarios diff --git a/routes/component-configuration/config_methods.go b/routes/component-configuration/config_methods.go index c48473f..f6a222d 100644 --- a/routes/component-configuration/config_methods.go +++ b/routes/component-configuration/config_methods.go @@ -121,9 +121,37 @@ func (m *ComponentConfiguration) delete() error { return err } + var ic infrastructure_component.InfrastructureComponent + err = ic.ByID(m.ICID) + if err != nil { + return err + } + // remove association between ComponentConfiguration and Scenario // ComponentConfiguration itself is not deleted from DB, it remains as "dangling" err = db.Model(&so).Association("ComponentConfigurations").Delete(m).Error + if err != nil { + return err + } - return err + // remove association between Infrastructure component and config + err = db.Model(&ic).Association("ComponentConfigurations").Delete(m).Error + if err != nil { + return err + } + + // delete component configuration + err = db.Delete(m).Error + if err != nil { + return err + } + + // if IC has state gone and there is no component configuration associated with it: delete IC + no_configs := db.Model(ic).Association("ComponentConfigurations").Count() + if no_configs == 0 && ic.State == "gone" { + err = db.Delete(ic).Error + return err + } + + return nil } diff --git a/routes/component-configuration/config_test.go b/routes/component-configuration/config_test.go index 4098551..579f13c 100644 --- a/routes/component-configuration/config_test.go +++ b/routes/component-configuration/config_test.go @@ -86,8 +86,11 @@ func addScenarioAndIC() (scenarioID uint, ICID uint) { StartParameterScheme: helper.ICA.StartParameterScheme, ManagedExternally: &helper.ICA.ManagedExternally, } - _, resp, _ := helper.TestEndpoint(router, token, + code, resp, err := helper.TestEndpoint(router, token, "/api/ic", "POST", helper.KeyModels{"ic": newICA}) + if code != 200 || err != nil { + fmt.Println("Adding IC returned code", code, err, resp) + } // Read newIC's ID from the response newICID, _ := helper.GetResponseID(resp) @@ -103,10 +106,13 @@ func addScenarioAndIC() (scenarioID uint, ICID uint) { Location: helper.ICB.Location, Description: helper.ICB.Description, StartParameterScheme: helper.ICB.StartParameterScheme, - ManagedExternally: &helper.ICB.ManagedExternally, + ManagedExternally: &helper.ICA.ManagedExternally, } - _, resp, _ = helper.TestEndpoint(router, token, + code, resp, err = helper.TestEndpoint(router, token, "/api/ic", "POST", helper.KeyModels{"ic": newICB}) + if code != 200 || err != nil { + fmt.Println("Adding IC returned code", code, err, resp) + } // authenticate as normal user token, _ = helper.AuthenticateForTest(router, @@ -118,8 +124,11 @@ func addScenarioAndIC() (scenarioID uint, ICID uint) { Running: helper.ScenarioA.Running, StartParameters: helper.ScenarioA.StartParameters, } - _, resp, _ = helper.TestEndpoint(router, token, + code, resp, err = helper.TestEndpoint(router, token, "/api/scenarios", "POST", helper.KeyModels{"scenario": newScenario}) + if code != 200 || err != nil { + fmt.Println("Adding Scenario returned code", code, err, resp) + } // Read newScenario's ID from the response newScenarioID, _ := helper.GetResponseID(resp) diff --git a/routes/infrastructure-component/amqpclient.go b/routes/infrastructure-component/amqpclient.go index 7bf89dd..7cee748 100644 --- a/routes/infrastructure-component/amqpclient.go +++ b/routes/infrastructure-component/amqpclient.go @@ -41,12 +41,18 @@ type AMQPclient struct { } type Action struct { - Act string `json:"action"` - When float32 `json:"when"` - Parameters struct{} `json:"parameters"` - UUID *string `json:"uuid"` - //Model struct{} `json:"model"` - //Results struct{} `json:"results"` + Act string `json:"action"` + When int64 `json:"when"` + Properties struct { + UUID *string `json:"uuid"` + Name *string `json:"name"` + Category *string `json:"category"` + Type *string `json:"type"` + Location *string `json:"location"` + WS_url *string `json:"ws_url"` + API_url *string `json:"api_url"` + Description *string `json:"description"` + } `json:"properties"` } type ICUpdate struct { @@ -139,7 +145,7 @@ func ConnectAMQP(uri string) error { return nil } -func SendActionAMQP(action Action) error { +func sendActionAMQP(action Action) error { payload, err := json.Marshal(action) if err != nil { @@ -155,6 +161,20 @@ func SendActionAMQP(action Action) error { Body: payload, } + // set message headers + var headers map[string]interface{} + headers = make(map[string]interface{}) // empty map + if action.Properties.UUID != nil { + headers["uuid"] = *action.Properties.UUID + } + if action.Properties.Type != nil { + headers["type"] = *action.Properties.Type + } + if action.Properties.Category != nil { + headers["category"] = *action.Properties.Category + } + msg.Headers = headers + err = CheckConnection() if err != nil { return err @@ -175,9 +195,9 @@ func PingAMQP() error { var a Action a.Act = "ping" - *a.UUID = "" + *a.Properties.UUID = "" - err := SendActionAMQP(a) + err := sendActionAMQP(a) return err } diff --git a/routes/infrastructure-component/ic_endpoints.go b/routes/infrastructure-component/ic_endpoints.go index 0328ff7..a542a63 100644 --- a/routes/infrastructure-component/ic_endpoints.go +++ b/routes/infrastructure-component/ic_endpoints.go @@ -22,12 +22,12 @@ package infrastructure_component import ( + "fmt" + "git.rwth-aachen.de/acs/public/villas/web-backend-go/database" "git.rwth-aachen.de/acs/public/villas/web-backend-go/helper" "github.com/gin-gonic/gin" "log" "net/http" - - "git.rwth-aachen.de/acs/public/villas/web-backend-go/database" ) func RegisterICEndpoints(r *gin.RouterGroup) { @@ -96,22 +96,28 @@ func addIC(c *gin.Context) { } // Validate the request - if err = req.Validate(); err != nil { + if err = req.validate(); err != nil { helper.UnprocessableEntityError(c, err.Error()) return } - // TODO add case distinction here for externally managed IC - // Create the new IC from the request - newIC := req.CreateIC() - - // Save new IC to DB - err = newIC.Save() - if !helper.DBError(c, err) { - c.JSON(http.StatusOK, gin.H{"ic": newIC.InfrastructureComponent}) + newIC, err := req.createIC(false) + if err != nil { + helper.InternalServerError(c, "Unable to send create action: "+err.Error()) + return } + if !newIC.ManagedExternally { + // Save new IC to DB if not managed externally + err = newIC.Save() + + if helper.DBError(c, err) { + return + } + } + + c.JSON(http.StatusOK, gin.H{"ic": newIC.InfrastructureComponent}) } // updateIC godoc @@ -144,15 +150,20 @@ func updateIC(c *gin.Context) { } // Validate the request - if err = req.Validate(); err != nil { + if err = req.validate(); err != nil { helper.UnprocessableEntityError(c, err.Error()) return } - // TODO add case distinction here for externally managed IC - // Create the updatedIC from oldIC - updatedIC := req.UpdatedIC(oldIC) + updatedIC, err := req.updatedIC(oldIC, false) + if err != nil { + c.JSON(http.StatusForbidden, gin.H{ + "success": false, + "message": fmt.Sprintf("%v", err), + }) + return + } // Finally update the IC in the DB err = oldIC.Update(updatedIC) @@ -205,14 +216,16 @@ func deleteIC(c *gin.Context) { return } - // TODO add case distinction here for externally managed IC - // Delete the IC - err := s.delete() - if !helper.DBError(c, err) { - c.JSON(http.StatusOK, gin.H{"ic": s.InfrastructureComponent}) + err := s.delete(false) + if helper.DBError(c, err) { + return + } else if err != nil { + helper.InternalServerError(c, "Unable to send delete action: "+err.Error()) + return } + c.JSON(http.StatusOK, gin.H{"ic": s.InfrastructureComponent}) } // getConfigsOfIC godoc @@ -278,9 +291,25 @@ func sendActionToIC(c *gin.Context) { /*if action.When == 0 { action.When = float32(now.Unix()) }*/ - action.UUID = new(string) - *action.UUID = s.UUID - err = SendActionAMQP(action) + // make sure that the important properties are set correctly so that the message can be identified by the receiver + if action.Properties.UUID == nil { + action.Properties.UUID = new(string) + *action.Properties.UUID = s.UUID + } + if action.Properties.Type == nil { + action.Properties.Type = new(string) + *action.Properties.Type = s.Type + } + if action.Properties.Category == nil { + action.Properties.Category = new(string) + *action.Properties.Category = s.Category + } + if action.Properties.Name == nil { + action.Properties.Name = new(string) + *action.Properties.Name = s.Name + } + + err = sendActionAMQP(action) if err != nil { helper.InternalServerError(c, "Unable to send actions to IC: "+err.Error()) return diff --git a/routes/infrastructure-component/ic_methods.go b/routes/infrastructure-component/ic_methods.go index 674fd8f..13fda99 100644 --- a/routes/infrastructure-component/ic_methods.go +++ b/routes/infrastructure-component/ic_methods.go @@ -23,8 +23,9 @@ package infrastructure_component import ( "fmt" - "git.rwth-aachen.de/acs/public/villas/web-backend-go/database" + "log" + "time" ) type InfrastructureComponent struct { @@ -57,7 +58,18 @@ func (s *InfrastructureComponent) Update(updatedIC InfrastructureComponent) erro return err } -func (s *InfrastructureComponent) delete() error { +func (s *InfrastructureComponent) delete(receivedViaAMQP bool) error { + if s.ManagedExternally && !receivedViaAMQP { + var action Action + action.Act = "delete" + action.When = time.Now().Unix() + action.Properties.UUID = new(string) + *action.Properties.UUID = s.UUID + + err := sendActionAMQP(action) + return err + } + db := database.GetDB() no_configs := db.Model(s).Association("ComponentConfigurations").Count() @@ -118,13 +130,16 @@ func createNewICviaAMQP(payload ICUpdate) error { newICReq.InfrastructureComponent.ManagedExternally = newTrue() // Validate the new IC - err := newICReq.Validate() + err := newICReq.validate() if err != nil { return fmt.Errorf("AMQP: Validation of new IC failed: %v", err) } // Create the new IC - newIC := newICReq.CreateIC() + newIC, err := newICReq.createIC(true) + if err != nil { + return fmt.Errorf("AMQP: Creating new IC failed: %v", err) + } // save IC err = newIC.Save() @@ -139,8 +154,17 @@ func (s *InfrastructureComponent) updateICviaAMQP(payload ICUpdate) error { var updatedICReq UpdateICRequest if payload.State != nil { updatedICReq.InfrastructureComponent.State = *payload.State - // TODO check if state is "gone" and attempt to remove IC from DB if it still exists - // TODO if state is different from "gone", continue to update the IC + + if *payload.State == "gone" { + // remove IC from DB + err := s.delete(true) + if err != nil { + // if component could not be deleted there are still configurations using it in the DB + // continue with the update to save the new state of the component and get back to the deletion later + log.Println("Could not delete IC because there is a config using it, deletion postponed") + } + + } } if payload.Properties.Type != nil { updatedICReq.InfrastructureComponent.Type = *payload.Properties.Type @@ -170,13 +194,16 @@ func (s *InfrastructureComponent) updateICviaAMQP(payload ICUpdate) error { updatedICReq.InfrastructureComponent.ManagedExternally = newTrue() // Validate the updated IC - err := updatedICReq.Validate() + err := updatedICReq.validate() if err != nil { return fmt.Errorf("AMQP: Validation of updated IC failed: %v", err) } // Create the updated IC from old IC - updatedIC := updatedICReq.UpdatedIC(*s) + updatedIC, err := updatedICReq.updatedIC(*s, true) + if err != nil { + return fmt.Errorf("AMQP: Unable to update IC %v : %v", s.Name, err) + } // Finally update the IC in the DB err = s.Update(updatedIC) diff --git a/routes/infrastructure-component/ic_validators.go b/routes/infrastructure-component/ic_validators.go index da3484b..af28299 100644 --- a/routes/infrastructure-component/ic_validators.go +++ b/routes/infrastructure-component/ic_validators.go @@ -23,6 +23,8 @@ package infrastructure_component import ( "encoding/json" + "fmt" + "github.com/google/uuid" "github.com/jinzhu/gorm/dialects/postgres" "github.com/nsf/jsondiff" "gopkg.in/go-playground/validator.v9" @@ -32,7 +34,7 @@ import ( var validate *validator.Validate type validNewIC struct { - UUID string `form:"UUID" validate:"required"` + UUID string `form:"UUID" validate:"omitempty"` WebsocketURL string `form:"WebsocketURL" validate:"omitempty"` APIURL string `form:"APIURL" validate:"omitempty"` Type string `form:"Type" validate:"required"` @@ -67,46 +69,103 @@ type UpdateICRequest struct { InfrastructureComponent validUpdatedIC `json:"ic"` } -func (r *AddICRequest) Validate() error { +func (r *AddICRequest) validate() error { validate = validator.New() errs := validate.Struct(r) - return errs -} - -func (r *UpdateICRequest) Validate() error { - validate = validator.New() - errs := validate.Struct(r) - return errs -} - -func (r *AddICRequest) CreateIC() InfrastructureComponent { - var s InfrastructureComponent - - s.UUID = r.InfrastructureComponent.UUID - s.WebsocketURL = r.InfrastructureComponent.WebsocketURL - s.APIURL = r.InfrastructureComponent.APIURL - s.Type = r.InfrastructureComponent.Type - s.Name = r.InfrastructureComponent.Name - s.Category = r.InfrastructureComponent.Category - s.Location = r.InfrastructureComponent.Location - s.Description = r.InfrastructureComponent.Description - s.StartParameterScheme = r.InfrastructureComponent.StartParameterScheme - s.ManagedExternally = *r.InfrastructureComponent.ManagedExternally - if r.InfrastructureComponent.State != "" { - s.State = r.InfrastructureComponent.State - } else { - s.State = "unknown" + if errs != nil { + return errs } - // set last update to creation time of IC - s.StateUpdateAt = time.Now().Format(time.RFC1123) - return s + // check if uuid is valid + _, errs = uuid.Parse(r.InfrastructureComponent.UUID) + return errs } -func (r *UpdateICRequest) UpdatedIC(oldIC InfrastructureComponent) InfrastructureComponent { +func (r *UpdateICRequest) validate() error { + validate = validator.New() + errs := validate.Struct(r) + return errs +} + +func (r *AddICRequest) createIC(receivedViaAMQP bool) (InfrastructureComponent, error) { + var s InfrastructureComponent + var err error + err = nil + + // case distinction for externally managed IC + if *r.InfrastructureComponent.ManagedExternally && !receivedViaAMQP { + var action Action + action.Act = "create" + action.When = time.Now().Unix() + action.Properties.Type = new(string) + action.Properties.Name = new(string) + action.Properties.Category = new(string) + + *action.Properties.Type = r.InfrastructureComponent.Type + *action.Properties.Name = r.InfrastructureComponent.Name + *action.Properties.Category = r.InfrastructureComponent.Category + + // set optional properties + if r.InfrastructureComponent.Description != "" { + action.Properties.Description = new(string) + *action.Properties.Description = r.InfrastructureComponent.Description + } + + if r.InfrastructureComponent.Location != "" { + action.Properties.Location = new(string) + *action.Properties.Location = r.InfrastructureComponent.Location + } + + if r.InfrastructureComponent.APIURL != "" { + action.Properties.API_url = new(string) + *action.Properties.API_url = r.InfrastructureComponent.APIURL + } + + if r.InfrastructureComponent.WebsocketURL != "" { + action.Properties.WS_url = new(string) + *action.Properties.WS_url = r.InfrastructureComponent.WebsocketURL + } + + if r.InfrastructureComponent.UUID != "" { + action.Properties.UUID = new(string) + *action.Properties.UUID = r.InfrastructureComponent.UUID + } + + err = sendActionAMQP(action) + + // s remains empty + + } else { + s.UUID = r.InfrastructureComponent.UUID + s.WebsocketURL = r.InfrastructureComponent.WebsocketURL + s.APIURL = r.InfrastructureComponent.APIURL + s.Type = r.InfrastructureComponent.Type + s.Name = r.InfrastructureComponent.Name + s.Category = r.InfrastructureComponent.Category + s.Location = r.InfrastructureComponent.Location + s.Description = r.InfrastructureComponent.Description + s.StartParameterScheme = r.InfrastructureComponent.StartParameterScheme + s.ManagedExternally = *r.InfrastructureComponent.ManagedExternally + if r.InfrastructureComponent.State != "" { + s.State = r.InfrastructureComponent.State + } else { + s.State = "unknown" + } + // set last update to creation time of IC + s.StateUpdateAt = time.Now().Format(time.RFC1123) + } + + return s, err +} + +func (r *UpdateICRequest) updatedIC(oldIC InfrastructureComponent, receivedViaAMQP bool) (InfrastructureComponent, error) { // Use the old InfrastructureComponent as a basis for the updated InfrastructureComponent `s` s := oldIC + if s.ManagedExternally && !receivedViaAMQP { + // externally managed IC cannot be updated via API, only via AMQP + return s, fmt.Errorf("cannot update externally managed IC %v", s.Name) + } if r.InfrastructureComponent.UUID != "" { s.UUID = r.InfrastructureComponent.UUID } @@ -161,5 +220,5 @@ func (r *UpdateICRequest) UpdatedIC(oldIC InfrastructureComponent) Infrastructur s.StartParameterScheme = r.InfrastructureComponent.StartParameterScheme } - return s + return s, nil }