mirror of
https://git.rwth-aachen.de/acs/public/villas/web-backend-go/
synced 2025-03-30 00:00:12 +01:00
Add field ManagedExternally to IC data model
This commit is contained in:
parent
adabfc8e9c
commit
4922d8c4ce
7 changed files with 152 additions and 108 deletions
|
@ -126,7 +126,10 @@ func ConnectAMQP(uri string) error {
|
|||
go func() {
|
||||
for {
|
||||
for message := range client.replies {
|
||||
processMessage(message)
|
||||
err = processMessage(message)
|
||||
if err != nil {
|
||||
log.Println(err.Error())
|
||||
}
|
||||
}
|
||||
time.Sleep(2) // sleep for 2 sek
|
||||
}
|
||||
|
@ -229,23 +232,21 @@ func StartAMQP(AMQPurl string, api *gin.RouterGroup) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func processMessage(message amqp.Delivery) {
|
||||
func processMessage(message amqp.Delivery) error {
|
||||
|
||||
log.Println("Processing AMQP message: ", string(message.Body))
|
||||
|
||||
var payload ICUpdate
|
||||
err := json.Unmarshal(message.Body, &payload)
|
||||
if err != nil {
|
||||
log.Println("AMQP: Could not unmarshal message to JSON:", string(message.Body), "err: ", err)
|
||||
return
|
||||
return fmt.Errorf("AMQP: Could not unmarshal message to JSON: %v err: %v", string(message.Body), err)
|
||||
}
|
||||
|
||||
ICUUID := payload.Properties.UUID
|
||||
_, err = uuid.Parse(ICUUID)
|
||||
|
||||
if err != nil {
|
||||
log.Printf("AMQP: UUID not valid: %v, message ignored: %v \n", ICUUID, string(message.Body))
|
||||
return
|
||||
return fmt.Errorf("AMQP: UUID not valid: %v, message ignored: %v \n", ICUUID, string(message.Body))
|
||||
}
|
||||
|
||||
var sToBeUpdated infrastructure_component.InfrastructureComponent
|
||||
|
@ -253,107 +254,124 @@ func processMessage(message amqp.Delivery) {
|
|||
|
||||
if err == gorm.ErrRecordNotFound {
|
||||
// create new record
|
||||
var newICReq infrastructure_component.AddICRequest
|
||||
newICReq.InfrastructureComponent.UUID = payload.Properties.UUID
|
||||
if payload.Properties.Name == nil ||
|
||||
payload.Properties.Category == nil ||
|
||||
payload.Properties.Type == nil {
|
||||
// cannot create new IC because required information (name, type, and/or category missing)
|
||||
log.Println("AMQP: Cannot create new IC, required field(s) is/are missing: name, type, category")
|
||||
return
|
||||
}
|
||||
newICReq.InfrastructureComponent.Name = *payload.Properties.Name
|
||||
newICReq.InfrastructureComponent.Category = *payload.Properties.Category
|
||||
newICReq.InfrastructureComponent.Type = *payload.Properties.Type
|
||||
|
||||
// add optional params
|
||||
if payload.State != nil {
|
||||
newICReq.InfrastructureComponent.State = *payload.State
|
||||
} else {
|
||||
newICReq.InfrastructureComponent.State = "unknown"
|
||||
}
|
||||
if payload.Properties.WS_url != nil {
|
||||
newICReq.InfrastructureComponent.WebsocketURL = *payload.Properties.WS_url
|
||||
}
|
||||
if payload.Properties.API_url != nil {
|
||||
newICReq.InfrastructureComponent.APIURL = *payload.Properties.API_url
|
||||
}
|
||||
if payload.Properties.Location != nil {
|
||||
newICReq.InfrastructureComponent.Location = *payload.Properties.Location
|
||||
}
|
||||
if payload.Properties.Description != nil {
|
||||
newICReq.InfrastructureComponent.Description = *payload.Properties.Description
|
||||
}
|
||||
// TODO add JSON start parameter scheme
|
||||
|
||||
// Validate the new IC
|
||||
err = newICReq.Validate()
|
||||
if err != nil {
|
||||
log.Println("AMQP: Validation of new IC failed:", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Create the new IC
|
||||
newIC := newICReq.CreateIC()
|
||||
|
||||
// save IC
|
||||
err = newIC.Save()
|
||||
if err != nil {
|
||||
log.Println("AMQP: Saving new IC to DB failed:", err)
|
||||
return
|
||||
}
|
||||
|
||||
log.Println("AMQP: Created IC ", newIC.Name)
|
||||
|
||||
err = createNewIC(payload)
|
||||
} else if err != nil {
|
||||
log.Println("AMQP: Database error for IC", ICUUID, " DB error message: ", err)
|
||||
return
|
||||
// database error
|
||||
err = fmt.Errorf("AMQP: Database error for IC %v DB error message: %v", ICUUID, err)
|
||||
} else {
|
||||
|
||||
var updatedICReq infrastructure_component.UpdateICRequest
|
||||
if payload.State != nil {
|
||||
updatedICReq.InfrastructureComponent.State = *payload.State
|
||||
}
|
||||
if payload.Properties.Type != nil {
|
||||
updatedICReq.InfrastructureComponent.Type = *payload.Properties.Type
|
||||
}
|
||||
if payload.Properties.Category != nil {
|
||||
updatedICReq.InfrastructureComponent.Category = *payload.Properties.Category
|
||||
}
|
||||
if payload.Properties.Name != nil {
|
||||
updatedICReq.InfrastructureComponent.Name = *payload.Properties.Name
|
||||
}
|
||||
if payload.Properties.WS_url != nil {
|
||||
updatedICReq.InfrastructureComponent.WebsocketURL = *payload.Properties.WS_url
|
||||
}
|
||||
if payload.Properties.API_url != nil {
|
||||
updatedICReq.InfrastructureComponent.APIURL = *payload.Properties.API_url
|
||||
}
|
||||
if payload.Properties.Location != nil {
|
||||
//postgres.Jsonb{json.RawMessage(`{"location" : " ` + *payload.Properties.Location + `"}`)}
|
||||
updatedICReq.InfrastructureComponent.Location = *payload.Properties.Location
|
||||
}
|
||||
if payload.Properties.Description != nil {
|
||||
updatedICReq.InfrastructureComponent.Description = *payload.Properties.Description
|
||||
}
|
||||
// TODO add JSON start parameter scheme
|
||||
|
||||
// Validate the updated IC
|
||||
if err = updatedICReq.Validate(); err != nil {
|
||||
log.Println("AMQP: Validation of updated IC failed:", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Create the updated IC from old IC
|
||||
updatedIC := updatedICReq.UpdatedIC(sToBeUpdated)
|
||||
|
||||
// Finally update the IC in the DB
|
||||
err = sToBeUpdated.Update(updatedIC)
|
||||
if err != nil {
|
||||
log.Println("AMQP: Unable to update IC", sToBeUpdated.Name, "in DB: ", err)
|
||||
return
|
||||
}
|
||||
|
||||
log.Println("AMQP: Updated IC ", sToBeUpdated.Name)
|
||||
// update record
|
||||
err = updateIC(payload, sToBeUpdated)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func createNewIC(payload ICUpdate) error {
|
||||
|
||||
var newICReq infrastructure_component.AddICRequest
|
||||
newICReq.InfrastructureComponent.UUID = payload.Properties.UUID
|
||||
if payload.Properties.Name == nil ||
|
||||
payload.Properties.Category == nil ||
|
||||
payload.Properties.Type == nil {
|
||||
// cannot create new IC because required information (name, type, and/or category missing)
|
||||
return fmt.Errorf("AMQP: Cannot create new IC, required field(s) is/are missing: name, type, category")
|
||||
}
|
||||
newICReq.InfrastructureComponent.Name = *payload.Properties.Name
|
||||
newICReq.InfrastructureComponent.Category = *payload.Properties.Category
|
||||
newICReq.InfrastructureComponent.Type = *payload.Properties.Type
|
||||
|
||||
// add optional params
|
||||
if payload.State != nil {
|
||||
newICReq.InfrastructureComponent.State = *payload.State
|
||||
} else {
|
||||
newICReq.InfrastructureComponent.State = "unknown"
|
||||
}
|
||||
if payload.Properties.WS_url != nil {
|
||||
newICReq.InfrastructureComponent.WebsocketURL = *payload.Properties.WS_url
|
||||
}
|
||||
if payload.Properties.API_url != nil {
|
||||
newICReq.InfrastructureComponent.APIURL = *payload.Properties.API_url
|
||||
}
|
||||
if payload.Properties.Location != nil {
|
||||
newICReq.InfrastructureComponent.Location = *payload.Properties.Location
|
||||
}
|
||||
if payload.Properties.Description != nil {
|
||||
newICReq.InfrastructureComponent.Description = *payload.Properties.Description
|
||||
}
|
||||
// TODO add JSON start parameter scheme
|
||||
|
||||
// set managed externally to true because this IC is created via AMQP
|
||||
newICReq.InfrastructureComponent.ManagedExternally = newTrue()
|
||||
|
||||
// Validate the new IC
|
||||
err := newICReq.Validate()
|
||||
if err != nil {
|
||||
return fmt.Errorf("AMQP: Validation of new IC failed: %v", err)
|
||||
}
|
||||
|
||||
// Create the new IC
|
||||
newIC := newICReq.CreateIC()
|
||||
|
||||
// save IC
|
||||
err = newIC.Save()
|
||||
if err != nil {
|
||||
return fmt.Errorf("AMQP: Saving new IC to DB failed: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func updateIC(payload ICUpdate, sToBeUpdated infrastructure_component.InfrastructureComponent) error {
|
||||
var updatedICReq infrastructure_component.UpdateICRequest
|
||||
if payload.State != nil {
|
||||
updatedICReq.InfrastructureComponent.State = *payload.State
|
||||
}
|
||||
if payload.Properties.Type != nil {
|
||||
updatedICReq.InfrastructureComponent.Type = *payload.Properties.Type
|
||||
}
|
||||
if payload.Properties.Category != nil {
|
||||
updatedICReq.InfrastructureComponent.Category = *payload.Properties.Category
|
||||
}
|
||||
if payload.Properties.Name != nil {
|
||||
updatedICReq.InfrastructureComponent.Name = *payload.Properties.Name
|
||||
}
|
||||
if payload.Properties.WS_url != nil {
|
||||
updatedICReq.InfrastructureComponent.WebsocketURL = *payload.Properties.WS_url
|
||||
}
|
||||
if payload.Properties.API_url != nil {
|
||||
updatedICReq.InfrastructureComponent.APIURL = *payload.Properties.API_url
|
||||
}
|
||||
if payload.Properties.Location != nil {
|
||||
//postgres.Jsonb{json.RawMessage(`{"location" : " ` + *payload.Properties.Location + `"}`)}
|
||||
updatedICReq.InfrastructureComponent.Location = *payload.Properties.Location
|
||||
}
|
||||
if payload.Properties.Description != nil {
|
||||
updatedICReq.InfrastructureComponent.Description = *payload.Properties.Description
|
||||
}
|
||||
// TODO add JSON start parameter scheme
|
||||
|
||||
// set managed externally to true because this IC is updated via AMQP
|
||||
updatedICReq.InfrastructureComponent.ManagedExternally = newTrue()
|
||||
|
||||
// Validate the updated IC
|
||||
err := updatedICReq.Validate()
|
||||
if err != nil {
|
||||
return fmt.Errorf("AMQP: Validation of updated IC failed: %v", err)
|
||||
}
|
||||
|
||||
// Create the updated IC from old IC
|
||||
updatedIC := updatedICReq.UpdatedIC(sToBeUpdated)
|
||||
|
||||
// Finally update the IC in the DB
|
||||
err = sToBeUpdated.Update(updatedIC)
|
||||
if err != nil {
|
||||
return fmt.Errorf("AMQP: Unable to update IC %v in DB: %v", sToBeUpdated.Name, err)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func newTrue() *bool {
|
||||
b := true
|
||||
return &b
|
||||
}
|
||||
|
|
|
@ -140,6 +140,8 @@ type InfrastructureComponent struct {
|
|||
Description string `json:"description" gorm:"default:''"`
|
||||
// JSON scheme of start parameters for IC
|
||||
StartParameterScheme postgres.Jsonb `json:"startparameterscheme"`
|
||||
// Boolean indicating if IC is managed externally (via AMQP/ VILLAScontroller)
|
||||
ManagedExternally bool `json:"managedexternally" gorm:"default:false"`
|
||||
// ComponentConfigurations in which the IC is used
|
||||
ComponentConfigurations []ComponentConfiguration `json:"-" gorm:"foreignkey:ICID"`
|
||||
}
|
||||
|
|
|
@ -103,6 +103,7 @@ var ICA = database.InfrastructureComponent{
|
|||
Description: "This is a test description",
|
||||
//StateUpdateAt: time.Now().Format(time.RFC1123),
|
||||
StartParameterScheme: postgres.Jsonb{propertiesA},
|
||||
ManagedExternally: true,
|
||||
}
|
||||
|
||||
var ICB = database.InfrastructureComponent{
|
||||
|
@ -118,6 +119,7 @@ var ICB = database.InfrastructureComponent{
|
|||
Description: "A signal generator for testing purposes",
|
||||
//StateUpdateAt: time.Now().Format(time.RFC1123),
|
||||
StartParameterScheme: postgres.Jsonb{propertiesB},
|
||||
ManagedExternally: false,
|
||||
}
|
||||
|
||||
// Scenarios
|
||||
|
|
|
@ -58,6 +58,7 @@ type ICRequest struct {
|
|||
Location string `json:"location,omitempty"`
|
||||
Description string `json:"description,omitempty"`
|
||||
StartParameterScheme postgres.Jsonb `json:"startparameterscheme,omitempty"`
|
||||
ManagedExternally *bool `json:"managedexternally,omitempty"`
|
||||
}
|
||||
|
||||
type ScenarioRequest struct {
|
||||
|
@ -83,6 +84,7 @@ func addScenarioAndIC() (scenarioID uint, ICID uint) {
|
|||
Location: helper.ICA.Location,
|
||||
Description: helper.ICA.Description,
|
||||
StartParameterScheme: helper.ICA.StartParameterScheme,
|
||||
ManagedExternally: &helper.ICA.ManagedExternally,
|
||||
}
|
||||
_, resp, _ := helper.TestEndpoint(router, token,
|
||||
"/api/ic", "POST", helper.KeyModels{"ic": newICA})
|
||||
|
@ -101,6 +103,7 @@ func addScenarioAndIC() (scenarioID uint, ICID uint) {
|
|||
Location: helper.ICB.Location,
|
||||
Description: helper.ICB.Description,
|
||||
StartParameterScheme: helper.ICB.StartParameterScheme,
|
||||
ManagedExternally: &helper.ICB.ManagedExternally,
|
||||
}
|
||||
_, resp, _ = helper.TestEndpoint(router, token,
|
||||
"/api/ic", "POST", helper.KeyModels{"ic": newICB})
|
||||
|
|
|
@ -49,6 +49,7 @@ type ICRequest struct {
|
|||
Location string `json:"location,omitempty"`
|
||||
Description string `json:"description,omitempty"`
|
||||
StartParameterScheme postgres.Jsonb `json:"startparameterscheme,omitempty"`
|
||||
ManagedExternally *bool `json:"managedexternally,omitempty"`
|
||||
}
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
|
@ -112,6 +113,7 @@ func TestAddICAsAdmin(t *testing.T) {
|
|||
Location: helper.ICA.Location,
|
||||
Description: helper.ICA.Description,
|
||||
StartParameterScheme: helper.ICA.StartParameterScheme,
|
||||
ManagedExternally: &helper.ICA.ManagedExternally,
|
||||
}
|
||||
code, resp, err = helper.TestEndpoint(router, token,
|
||||
"/api/ic", "POST", helper.KeyModels{"ic": newIC})
|
||||
|
@ -166,6 +168,7 @@ func TestAddICAsUser(t *testing.T) {
|
|||
Location: helper.ICA.Location,
|
||||
Description: helper.ICA.Description,
|
||||
StartParameterScheme: helper.ICA.StartParameterScheme,
|
||||
ManagedExternally: &helper.ICA.ManagedExternally,
|
||||
}
|
||||
|
||||
// This should fail with unprocessable entity 422 error code
|
||||
|
@ -197,6 +200,7 @@ func TestUpdateICAsAdmin(t *testing.T) {
|
|||
Location: helper.ICA.Location,
|
||||
Description: helper.ICA.Description,
|
||||
StartParameterScheme: helper.ICA.StartParameterScheme,
|
||||
ManagedExternally: &helper.ICA.ManagedExternally,
|
||||
}
|
||||
code, resp, err := helper.TestEndpoint(router, token,
|
||||
"/api/ic", "POST", helper.KeyModels{"ic": newIC})
|
||||
|
@ -263,6 +267,7 @@ func TestUpdateICAsUser(t *testing.T) {
|
|||
Location: helper.ICA.Location,
|
||||
Description: helper.ICA.Description,
|
||||
StartParameterScheme: helper.ICA.StartParameterScheme,
|
||||
ManagedExternally: &helper.ICA.ManagedExternally,
|
||||
}
|
||||
code, resp, err := helper.TestEndpoint(router, token,
|
||||
"/api/ic", "POST", helper.KeyModels{"ic": newIC})
|
||||
|
@ -309,6 +314,7 @@ func TestDeleteICAsAdmin(t *testing.T) {
|
|||
Location: helper.ICA.Location,
|
||||
Description: helper.ICA.Description,
|
||||
StartParameterScheme: helper.ICA.StartParameterScheme,
|
||||
ManagedExternally: &helper.ICA.ManagedExternally,
|
||||
}
|
||||
code, resp, err := helper.TestEndpoint(router, token,
|
||||
"/api/ic", "POST", helper.KeyModels{"ic": newIC})
|
||||
|
@ -363,6 +369,7 @@ func TestDeleteICAsUser(t *testing.T) {
|
|||
Location: helper.ICA.Location,
|
||||
Description: helper.ICA.Description,
|
||||
StartParameterScheme: helper.ICA.StartParameterScheme,
|
||||
ManagedExternally: &helper.ICA.ManagedExternally,
|
||||
}
|
||||
code, resp, err := helper.TestEndpoint(router, token,
|
||||
"/api/ic", "POST", helper.KeyModels{"ic": newIC})
|
||||
|
@ -413,6 +420,7 @@ func TestGetAllICs(t *testing.T) {
|
|||
Location: helper.ICA.Location,
|
||||
Description: helper.ICA.Description,
|
||||
StartParameterScheme: helper.ICA.StartParameterScheme,
|
||||
ManagedExternally: &helper.ICA.ManagedExternally,
|
||||
}
|
||||
code, resp, err := helper.TestEndpoint(router, token,
|
||||
"/api/ic", "POST", helper.KeyModels{"ic": newICA})
|
||||
|
@ -430,6 +438,7 @@ func TestGetAllICs(t *testing.T) {
|
|||
Location: helper.ICB.Location,
|
||||
Description: helper.ICB.Description,
|
||||
StartParameterScheme: helper.ICB.StartParameterScheme,
|
||||
ManagedExternally: &helper.ICB.ManagedExternally,
|
||||
}
|
||||
code, resp, err = helper.TestEndpoint(router, token,
|
||||
"/api/ic", "POST", helper.KeyModels{"ic": newICB})
|
||||
|
@ -477,6 +486,7 @@ func TestGetConfigsOfIC(t *testing.T) {
|
|||
Location: helper.ICA.Location,
|
||||
Description: helper.ICA.Description,
|
||||
StartParameterScheme: helper.ICA.StartParameterScheme,
|
||||
ManagedExternally: &helper.ICA.ManagedExternally,
|
||||
}
|
||||
code, resp, err := helper.TestEndpoint(router, token,
|
||||
"/api/ic", "POST", helper.KeyModels{"ic": newICA})
|
||||
|
|
|
@ -38,10 +38,11 @@ type validNewIC struct {
|
|||
Type string `form:"Type" validate:"required"`
|
||||
Name string `form:"Name" validate:"required"`
|
||||
Category string `form:"Category" validate:"required"`
|
||||
StartParameterScheme postgres.Jsonb `form:"StartParameterScheme" validate:"omitempty"`
|
||||
State string `form:"State" validate:"omitempty"`
|
||||
Location string `form:"Location" validate:"omitempty"`
|
||||
Description string `form:"Description" validate:"omitempty"`
|
||||
StartParameterScheme postgres.Jsonb `form:"StartParameterScheme" validate:"omitempty"`
|
||||
ManagedExternally *bool `form:"ManagedExternally" validate:"required"`
|
||||
}
|
||||
|
||||
type validUpdatedIC struct {
|
||||
|
@ -51,10 +52,11 @@ type validUpdatedIC struct {
|
|||
Type string `form:"Type" validate:"omitempty"`
|
||||
Name string `form:"Name" validate:"omitempty"`
|
||||
Category string `form:"Category" validate:"omitempty"`
|
||||
StartParameterScheme postgres.Jsonb `form:"StartParameterScheme" validate:"omitempty"`
|
||||
State string `form:"State" validate:"omitempty"`
|
||||
Location string `form:"Location" validate:"omitempty"`
|
||||
Description string `form:"Description" validate:"omitempty"`
|
||||
StartParameterScheme postgres.Jsonb `form:"StartParameterScheme" validate:"omitempty"`
|
||||
ManagedExternally *bool `form:"ManagedExternally" validate:"required"`
|
||||
}
|
||||
|
||||
type AddICRequest struct {
|
||||
|
@ -89,6 +91,7 @@ func (r *AddICRequest) CreateIC() InfrastructureComponent {
|
|||
s.Location = r.InfrastructureComponent.Location
|
||||
s.Description = r.InfrastructureComponent.Description
|
||||
s.StartParameterScheme = r.InfrastructureComponent.StartParameterScheme
|
||||
s.ManagedExternally = *r.InfrastructureComponent.ManagedExternally
|
||||
if r.InfrastructureComponent.State != "" {
|
||||
s.State = r.InfrastructureComponent.State
|
||||
} else {
|
||||
|
@ -140,6 +143,10 @@ func (r *UpdateICRequest) UpdatedIC(oldIC InfrastructureComponent) Infrastructur
|
|||
s.Description = r.InfrastructureComponent.Description
|
||||
}
|
||||
|
||||
if r.InfrastructureComponent.ManagedExternally != nil {
|
||||
s.ManagedExternally = *r.InfrastructureComponent.ManagedExternally
|
||||
}
|
||||
|
||||
// set last update time
|
||||
s.StateUpdateAt = time.Now().Format(time.RFC1123)
|
||||
|
||||
|
|
|
@ -65,6 +65,7 @@ type ICRequest struct {
|
|||
Location string `json:"location,omitempty"`
|
||||
Description string `json:"description,omitempty"`
|
||||
StartParameterScheme postgres.Jsonb `json:"startparameterscheme,omitempty"`
|
||||
ManagedExternally *bool `json:"managedexternally,omitempty"`
|
||||
}
|
||||
|
||||
type ScenarioRequest struct {
|
||||
|
@ -90,6 +91,7 @@ func addScenarioAndICAndConfig() (scenarioID uint, ICID uint, configID uint) {
|
|||
Location: helper.ICA.Location,
|
||||
Description: helper.ICA.Description,
|
||||
StartParameterScheme: helper.ICA.StartParameterScheme,
|
||||
ManagedExternally: &helper.ICA.ManagedExternally,
|
||||
}
|
||||
_, resp, _ := helper.TestEndpoint(router, token,
|
||||
"/api/ic", "POST", helper.KeyModels{"ic": newICA})
|
||||
|
|
Loading…
Add table
Reference in a new issue