From 6a23b6d8c3bc40d5e3cae61b12cc9c9ee31258ad Mon Sep 17 00:00:00 2001 From: Sonja Happ Date: Thu, 28 Jan 2021 10:36:35 +0100 Subject: [PATCH] adapt AMQP status messages to new message structure; add field to IC data model for raw status message --- database/models.go | 2 + .../infrastructure-component/ic_amqpclient.go | 128 +++++++++++------- .../infrastructure-component/ic_validators.go | 12 +- 3 files changed, 92 insertions(+), 50 deletions(-) diff --git a/database/models.go b/database/models.go index 5878b59..273d2ab 100644 --- a/database/models.go +++ b/database/models.go @@ -143,6 +143,8 @@ type InfrastructureComponent struct { Description string `json:"description" gorm:"default:''"` // JSON scheme of start parameters for IC StartParameterScheme postgres.Jsonb `json:"startparameterscheme"` + // raw JSON of last status update + StatusUpdateRaw postgres.Jsonb `json:"statusupdateraw"` // Boolean indicating if IC is managed externally (via AMQP/ VILLAScontroller) ManagedExternally bool `json:"managedexternally" gorm:"default:false"` // ComponentConfigurations in which the IC is used diff --git a/routes/infrastructure-component/ic_amqpclient.go b/routes/infrastructure-component/ic_amqpclient.go index 461b779..f1af181 100644 --- a/routes/infrastructure-component/ic_amqpclient.go +++ b/routes/infrastructure-component/ic_amqpclient.go @@ -27,6 +27,7 @@ import ( "github.com/gin-gonic/gin" "github.com/google/uuid" "github.com/jinzhu/gorm" + "github.com/jinzhu/gorm/dialects/postgres" "github.com/streadway/amqp" "log" "time" @@ -56,19 +57,28 @@ type Action struct { } type ICStatus struct { - State *string `json:"state"` - Name *string `json:"name"` - Category *string `json:"category"` - Type *string `json:"type"` - Location *string `json:"location"` - WS_url *string `json:"ws_url"` - API_url *string `json:"api_url"` - Description *string `json:"description"` - Uptime *float64 `json:"uptime"` // TODO check if data type of uptime is float64 or int + State *string `json:"state"` + Version *string `json:"version"` + Uptime *float64 `json:"uptime"` + Result *string `json:"result"` + Error *string `json:"error"` +} + +type ICProperties struct { + Name *string `json:"name"` + Description *string `json:"description"` + Location *string `json:"location"` + Owner *string `json:"owner"` + WS_url *string `json:"ws_url"` + API_url *string `json:"api_url"` + Category *string `json:"category"` + Type *string `json:"type"` } type ICUpdate struct { - Status *ICStatus `json:"status"` + Status *ICStatus `json:"status"` + Properties *ICProperties `json:"properties"` + When *float64 `json:"when"` // TODO add JSON start parameter scheme } @@ -261,7 +271,7 @@ func processMessage(message amqp.Delivery) error { return fmt.Errorf("AMQP: Could not unmarshal message to JSON: %v err: %v", string(message.Body), err) } - if payload.Status != nil { + if payload.Status != nil || payload.Properties != nil { //log.Println("Processing AMQP message: ", string(message.Body)) // if a message contains a "state" field, it is an update for an IC @@ -285,7 +295,10 @@ func processMessage(message amqp.Delivery) error { // update record based on payload err = sToBeUpdated.updateExternalIC(payload) } + } else { + log.Println("INFO: ignoring message, payload neither contains status nor properties", message) } + return err } @@ -293,15 +306,15 @@ func createExternalIC(payload ICUpdate, ICUUID string) error { var newICReq AddICRequest newICReq.InfrastructureComponent.UUID = ICUUID - if payload.Status.Name == nil || - payload.Status.Category == nil || - payload.Status.Type == nil { + if payload.Properties.Name == nil || + payload.Properties.Category == nil || + payload.Properties.Type == nil { // cannot create new IC because required information (name, type, and/or category missing) return fmt.Errorf("AMQP: Cannot create new IC, required field(s) is/are missing: name, type, category") } - newICReq.InfrastructureComponent.Name = *payload.Status.Name - newICReq.InfrastructureComponent.Category = *payload.Status.Category - newICReq.InfrastructureComponent.Type = *payload.Status.Type + newICReq.InfrastructureComponent.Name = *payload.Properties.Name + newICReq.InfrastructureComponent.Category = *payload.Properties.Category + newICReq.InfrastructureComponent.Type = *payload.Properties.Type // add optional params if payload.Status.State != nil { @@ -315,17 +328,17 @@ func createExternalIC(payload ICUpdate, ICUUID string) error { return nil } - if payload.Status.WS_url != nil { - newICReq.InfrastructureComponent.WebsocketURL = *payload.Status.WS_url + if payload.Properties.WS_url != nil { + newICReq.InfrastructureComponent.WebsocketURL = *payload.Properties.WS_url } - if payload.Status.API_url != nil { - newICReq.InfrastructureComponent.APIURL = *payload.Status.API_url + if payload.Properties.API_url != nil { + newICReq.InfrastructureComponent.APIURL = *payload.Properties.API_url } - if payload.Status.Location != nil { - newICReq.InfrastructureComponent.Location = *payload.Status.Location + if payload.Properties.Location != nil { + newICReq.InfrastructureComponent.Location = *payload.Properties.Location } - if payload.Status.Description != nil { - newICReq.InfrastructureComponent.Description = *payload.Status.Description + if payload.Properties.Description != nil { + newICReq.InfrastructureComponent.Description = *payload.Properties.Description } if payload.Status.Uptime != nil { newICReq.InfrastructureComponent.Uptime = *payload.Status.Uptime @@ -335,8 +348,15 @@ func createExternalIC(payload ICUpdate, ICUUID string) error { // set managed externally to true because this IC is created via AMQP newICReq.InfrastructureComponent.ManagedExternally = newTrue() + // set raw status update if IC + payloadRaw, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("AMQP: failed to marshal raw payload: %v", err) + } + newICReq.InfrastructureComponent.StatusUpdateRaw = postgres.Jsonb{RawMessage: payloadRaw} + // Validate the new IC - err := newICReq.validate() + err = newICReq.validate() if err != nil { return fmt.Errorf("AMQP: Validation of new IC failed: %v", err) } @@ -375,35 +395,45 @@ func (s *InfrastructureComponent) updateExternalIC(payload ICUpdate) error { } } - if payload.Status.Type != nil { - updatedICReq.InfrastructureComponent.Type = *payload.Status.Type - } - if payload.Status.Category != nil { - updatedICReq.InfrastructureComponent.Category = *payload.Status.Category - } - if payload.Status.Name != nil { - updatedICReq.InfrastructureComponent.Name = *payload.Status.Name - } - if payload.Status.WS_url != nil { - updatedICReq.InfrastructureComponent.WebsocketURL = *payload.Status.WS_url - } - if payload.Status.API_url != nil { - updatedICReq.InfrastructureComponent.APIURL = *payload.Status.API_url - } - if payload.Status.Location != nil { - //postgres.Jsonb{json.RawMessage(`{"location" : " ` + *payload.Status.Location + `"}`)} - updatedICReq.InfrastructureComponent.Location = *payload.Status.Location - } - if payload.Status.Description != nil { - updatedICReq.InfrastructureComponent.Description = *payload.Status.Description - } + if payload.Status.Uptime != nil { updatedICReq.InfrastructureComponent.Uptime = *payload.Status.Uptime } + + if payload.Properties.Type != nil { + updatedICReq.InfrastructureComponent.Type = *payload.Properties.Type + } + if payload.Properties.Category != nil { + updatedICReq.InfrastructureComponent.Category = *payload.Properties.Category + } + if payload.Properties.Name != nil { + updatedICReq.InfrastructureComponent.Name = *payload.Properties.Name + } + if payload.Properties.WS_url != nil { + updatedICReq.InfrastructureComponent.WebsocketURL = *payload.Properties.WS_url + } + if payload.Properties.API_url != nil { + updatedICReq.InfrastructureComponent.APIURL = *payload.Properties.API_url + } + if payload.Properties.Location != nil { + //postgres.Jsonb{json.RawMessage(`{"location" : " ` + *payload.Status.Location + `"}`)} + updatedICReq.InfrastructureComponent.Location = *payload.Properties.Location + } + if payload.Properties.Description != nil { + updatedICReq.InfrastructureComponent.Description = *payload.Properties.Description + } + + // set raw status update if IC + payloadRaw, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("AMQP: failed to marshal raw payload: %v", err) + } + updatedICReq.InfrastructureComponent.StatusUpdateRaw = postgres.Jsonb{RawMessage: payloadRaw} + // TODO add JSON start parameter scheme // Validate the updated IC - err := updatedICReq.validate() + err = updatedICReq.validate() if err != nil { return fmt.Errorf("AMQP: Validation of updated IC failed: %v", err) } diff --git a/routes/infrastructure-component/ic_validators.go b/routes/infrastructure-component/ic_validators.go index 44f9840..1920285 100644 --- a/routes/infrastructure-component/ic_validators.go +++ b/routes/infrastructure-component/ic_validators.go @@ -44,6 +44,7 @@ type validNewIC struct { Location string `form:"Location" validate:"omitempty"` Description string `form:"Description" validate:"omitempty"` StartParameterScheme postgres.Jsonb `form:"StartParameterScheme" validate:"omitempty"` + StatusUpdateRaw postgres.Jsonb `form:"StatusUpdateRaw" validate:"omitempty"` ManagedExternally *bool `form:"ManagedExternally" validate:"required"` Uptime float64 `form:"Uptime" validate:"omitempty"` } @@ -59,6 +60,7 @@ type validUpdatedIC struct { Location string `form:"Location" validate:"omitempty"` Description string `form:"Description" validate:"omitempty"` StartParameterScheme postgres.Jsonb `form:"StartParameterScheme" validate:"omitempty"` + StatusUpdateRaw postgres.Jsonb `form:"StatusUpdateRaw" validate:"omitempty"` Uptime float64 `form:"Uptime" validate:"omitempty"` } @@ -135,6 +137,7 @@ func (r *AddICRequest) createIC(receivedViaAMQP bool) (InfrastructureComponent, s.Location = r.InfrastructureComponent.Location s.Description = r.InfrastructureComponent.Description s.StartParameterScheme = r.InfrastructureComponent.StartParameterScheme + s.StatusUpdateRaw = r.InfrastructureComponent.StatusUpdateRaw s.ManagedExternally = *r.InfrastructureComponent.ManagedExternally s.Uptime = -1.0 // no uptime available if r.InfrastructureComponent.State != "" { @@ -181,12 +184,19 @@ func (r *UpdateICRequest) updatedIC(oldIC InfrastructureComponent) Infrastructur var emptyJson postgres.Jsonb // Serialize empty json and params emptyJson_ser, _ := json.Marshal(emptyJson) - startParams_ser, _ := json.Marshal(r.InfrastructureComponent.StartParameterScheme) opts := jsondiff.DefaultConsoleOptions() + + startParams_ser, _ := json.Marshal(r.InfrastructureComponent.StartParameterScheme) diff, _ := jsondiff.Compare(emptyJson_ser, startParams_ser, &opts) if diff.String() != "FullMatch" { s.StartParameterScheme = r.InfrastructureComponent.StartParameterScheme } + statusUpdateRaw_ser, _ := json.Marshal(r.InfrastructureComponent.StatusUpdateRaw) + diff, _ = jsondiff.Compare(emptyJson_ser, statusUpdateRaw_ser, &opts) + if diff.String() != "FullMatch" { + s.StatusUpdateRaw = r.InfrastructureComponent.StatusUpdateRaw + } + return s }