mirror of
https://git.rwth-aachen.de/acs/public/villas/web-backend-go/
synced 2025-03-30 00:00:12 +01:00
adapt AMQP status messages to new message structure; add field to IC data model for raw status message
This commit is contained in:
parent
05c9d364b3
commit
6a23b6d8c3
3 changed files with 92 additions and 50 deletions
|
@ -143,6 +143,8 @@ type InfrastructureComponent struct {
|
||||||
Description string `json:"description" gorm:"default:''"`
|
Description string `json:"description" gorm:"default:''"`
|
||||||
// JSON scheme of start parameters for IC
|
// JSON scheme of start parameters for IC
|
||||||
StartParameterScheme postgres.Jsonb `json:"startparameterscheme"`
|
StartParameterScheme postgres.Jsonb `json:"startparameterscheme"`
|
||||||
|
// raw JSON of last status update
|
||||||
|
StatusUpdateRaw postgres.Jsonb `json:"statusupdateraw"`
|
||||||
// Boolean indicating if IC is managed externally (via AMQP/ VILLAScontroller)
|
// Boolean indicating if IC is managed externally (via AMQP/ VILLAScontroller)
|
||||||
ManagedExternally bool `json:"managedexternally" gorm:"default:false"`
|
ManagedExternally bool `json:"managedexternally" gorm:"default:false"`
|
||||||
// ComponentConfigurations in which the IC is used
|
// ComponentConfigurations in which the IC is used
|
||||||
|
|
|
@ -27,6 +27,7 @@ import (
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"github.com/jinzhu/gorm"
|
"github.com/jinzhu/gorm"
|
||||||
|
"github.com/jinzhu/gorm/dialects/postgres"
|
||||||
"github.com/streadway/amqp"
|
"github.com/streadway/amqp"
|
||||||
"log"
|
"log"
|
||||||
"time"
|
"time"
|
||||||
|
@ -57,18 +58,27 @@ type Action struct {
|
||||||
|
|
||||||
type ICStatus struct {
|
type ICStatus struct {
|
||||||
State *string `json:"state"`
|
State *string `json:"state"`
|
||||||
|
Version *string `json:"version"`
|
||||||
|
Uptime *float64 `json:"uptime"`
|
||||||
|
Result *string `json:"result"`
|
||||||
|
Error *string `json:"error"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ICProperties struct {
|
||||||
Name *string `json:"name"`
|
Name *string `json:"name"`
|
||||||
Category *string `json:"category"`
|
Description *string `json:"description"`
|
||||||
Type *string `json:"type"`
|
|
||||||
Location *string `json:"location"`
|
Location *string `json:"location"`
|
||||||
|
Owner *string `json:"owner"`
|
||||||
WS_url *string `json:"ws_url"`
|
WS_url *string `json:"ws_url"`
|
||||||
API_url *string `json:"api_url"`
|
API_url *string `json:"api_url"`
|
||||||
Description *string `json:"description"`
|
Category *string `json:"category"`
|
||||||
Uptime *float64 `json:"uptime"` // TODO check if data type of uptime is float64 or int
|
Type *string `json:"type"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type ICUpdate struct {
|
type ICUpdate struct {
|
||||||
Status *ICStatus `json:"status"`
|
Status *ICStatus `json:"status"`
|
||||||
|
Properties *ICProperties `json:"properties"`
|
||||||
|
When *float64 `json:"when"`
|
||||||
// TODO add JSON start parameter scheme
|
// TODO add JSON start parameter scheme
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -261,7 +271,7 @@ func processMessage(message amqp.Delivery) error {
|
||||||
return fmt.Errorf("AMQP: Could not unmarshal message to JSON: %v err: %v", string(message.Body), err)
|
return fmt.Errorf("AMQP: Could not unmarshal message to JSON: %v err: %v", string(message.Body), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if payload.Status != nil {
|
if payload.Status != nil || payload.Properties != nil {
|
||||||
//log.Println("Processing AMQP message: ", string(message.Body))
|
//log.Println("Processing AMQP message: ", string(message.Body))
|
||||||
// if a message contains a "state" field, it is an update for an IC
|
// if a message contains a "state" field, it is an update for an IC
|
||||||
|
|
||||||
|
@ -285,7 +295,10 @@ func processMessage(message amqp.Delivery) error {
|
||||||
// update record based on payload
|
// update record based on payload
|
||||||
err = sToBeUpdated.updateExternalIC(payload)
|
err = sToBeUpdated.updateExternalIC(payload)
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
log.Println("INFO: ignoring message, payload neither contains status nor properties", message)
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -293,15 +306,15 @@ func createExternalIC(payload ICUpdate, ICUUID string) error {
|
||||||
|
|
||||||
var newICReq AddICRequest
|
var newICReq AddICRequest
|
||||||
newICReq.InfrastructureComponent.UUID = ICUUID
|
newICReq.InfrastructureComponent.UUID = ICUUID
|
||||||
if payload.Status.Name == nil ||
|
if payload.Properties.Name == nil ||
|
||||||
payload.Status.Category == nil ||
|
payload.Properties.Category == nil ||
|
||||||
payload.Status.Type == nil {
|
payload.Properties.Type == nil {
|
||||||
// cannot create new IC because required information (name, type, and/or category missing)
|
// cannot create new IC because required information (name, type, and/or category missing)
|
||||||
return fmt.Errorf("AMQP: Cannot create new IC, required field(s) is/are missing: name, type, category")
|
return fmt.Errorf("AMQP: Cannot create new IC, required field(s) is/are missing: name, type, category")
|
||||||
}
|
}
|
||||||
newICReq.InfrastructureComponent.Name = *payload.Status.Name
|
newICReq.InfrastructureComponent.Name = *payload.Properties.Name
|
||||||
newICReq.InfrastructureComponent.Category = *payload.Status.Category
|
newICReq.InfrastructureComponent.Category = *payload.Properties.Category
|
||||||
newICReq.InfrastructureComponent.Type = *payload.Status.Type
|
newICReq.InfrastructureComponent.Type = *payload.Properties.Type
|
||||||
|
|
||||||
// add optional params
|
// add optional params
|
||||||
if payload.Status.State != nil {
|
if payload.Status.State != nil {
|
||||||
|
@ -315,17 +328,17 @@ func createExternalIC(payload ICUpdate, ICUUID string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if payload.Status.WS_url != nil {
|
if payload.Properties.WS_url != nil {
|
||||||
newICReq.InfrastructureComponent.WebsocketURL = *payload.Status.WS_url
|
newICReq.InfrastructureComponent.WebsocketURL = *payload.Properties.WS_url
|
||||||
}
|
}
|
||||||
if payload.Status.API_url != nil {
|
if payload.Properties.API_url != nil {
|
||||||
newICReq.InfrastructureComponent.APIURL = *payload.Status.API_url
|
newICReq.InfrastructureComponent.APIURL = *payload.Properties.API_url
|
||||||
}
|
}
|
||||||
if payload.Status.Location != nil {
|
if payload.Properties.Location != nil {
|
||||||
newICReq.InfrastructureComponent.Location = *payload.Status.Location
|
newICReq.InfrastructureComponent.Location = *payload.Properties.Location
|
||||||
}
|
}
|
||||||
if payload.Status.Description != nil {
|
if payload.Properties.Description != nil {
|
||||||
newICReq.InfrastructureComponent.Description = *payload.Status.Description
|
newICReq.InfrastructureComponent.Description = *payload.Properties.Description
|
||||||
}
|
}
|
||||||
if payload.Status.Uptime != nil {
|
if payload.Status.Uptime != nil {
|
||||||
newICReq.InfrastructureComponent.Uptime = *payload.Status.Uptime
|
newICReq.InfrastructureComponent.Uptime = *payload.Status.Uptime
|
||||||
|
@ -335,8 +348,15 @@ func createExternalIC(payload ICUpdate, ICUUID string) error {
|
||||||
// set managed externally to true because this IC is created via AMQP
|
// set managed externally to true because this IC is created via AMQP
|
||||||
newICReq.InfrastructureComponent.ManagedExternally = newTrue()
|
newICReq.InfrastructureComponent.ManagedExternally = newTrue()
|
||||||
|
|
||||||
|
// set raw status update if IC
|
||||||
|
payloadRaw, err := json.Marshal(payload)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("AMQP: failed to marshal raw payload: %v", err)
|
||||||
|
}
|
||||||
|
newICReq.InfrastructureComponent.StatusUpdateRaw = postgres.Jsonb{RawMessage: payloadRaw}
|
||||||
|
|
||||||
// Validate the new IC
|
// Validate the new IC
|
||||||
err := newICReq.validate()
|
err = newICReq.validate()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("AMQP: Validation of new IC failed: %v", err)
|
return fmt.Errorf("AMQP: Validation of new IC failed: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -375,35 +395,45 @@ func (s *InfrastructureComponent) updateExternalIC(payload ICUpdate) error {
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if payload.Status.Type != nil {
|
|
||||||
updatedICReq.InfrastructureComponent.Type = *payload.Status.Type
|
|
||||||
}
|
|
||||||
if payload.Status.Category != nil {
|
|
||||||
updatedICReq.InfrastructureComponent.Category = *payload.Status.Category
|
|
||||||
}
|
|
||||||
if payload.Status.Name != nil {
|
|
||||||
updatedICReq.InfrastructureComponent.Name = *payload.Status.Name
|
|
||||||
}
|
|
||||||
if payload.Status.WS_url != nil {
|
|
||||||
updatedICReq.InfrastructureComponent.WebsocketURL = *payload.Status.WS_url
|
|
||||||
}
|
|
||||||
if payload.Status.API_url != nil {
|
|
||||||
updatedICReq.InfrastructureComponent.APIURL = *payload.Status.API_url
|
|
||||||
}
|
|
||||||
if payload.Status.Location != nil {
|
|
||||||
//postgres.Jsonb{json.RawMessage(`{"location" : " ` + *payload.Status.Location + `"}`)}
|
|
||||||
updatedICReq.InfrastructureComponent.Location = *payload.Status.Location
|
|
||||||
}
|
|
||||||
if payload.Status.Description != nil {
|
|
||||||
updatedICReq.InfrastructureComponent.Description = *payload.Status.Description
|
|
||||||
}
|
|
||||||
if payload.Status.Uptime != nil {
|
if payload.Status.Uptime != nil {
|
||||||
updatedICReq.InfrastructureComponent.Uptime = *payload.Status.Uptime
|
updatedICReq.InfrastructureComponent.Uptime = *payload.Status.Uptime
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if payload.Properties.Type != nil {
|
||||||
|
updatedICReq.InfrastructureComponent.Type = *payload.Properties.Type
|
||||||
|
}
|
||||||
|
if payload.Properties.Category != nil {
|
||||||
|
updatedICReq.InfrastructureComponent.Category = *payload.Properties.Category
|
||||||
|
}
|
||||||
|
if payload.Properties.Name != nil {
|
||||||
|
updatedICReq.InfrastructureComponent.Name = *payload.Properties.Name
|
||||||
|
}
|
||||||
|
if payload.Properties.WS_url != nil {
|
||||||
|
updatedICReq.InfrastructureComponent.WebsocketURL = *payload.Properties.WS_url
|
||||||
|
}
|
||||||
|
if payload.Properties.API_url != nil {
|
||||||
|
updatedICReq.InfrastructureComponent.APIURL = *payload.Properties.API_url
|
||||||
|
}
|
||||||
|
if payload.Properties.Location != nil {
|
||||||
|
//postgres.Jsonb{json.RawMessage(`{"location" : " ` + *payload.Status.Location + `"}`)}
|
||||||
|
updatedICReq.InfrastructureComponent.Location = *payload.Properties.Location
|
||||||
|
}
|
||||||
|
if payload.Properties.Description != nil {
|
||||||
|
updatedICReq.InfrastructureComponent.Description = *payload.Properties.Description
|
||||||
|
}
|
||||||
|
|
||||||
|
// set raw status update if IC
|
||||||
|
payloadRaw, err := json.Marshal(payload)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("AMQP: failed to marshal raw payload: %v", err)
|
||||||
|
}
|
||||||
|
updatedICReq.InfrastructureComponent.StatusUpdateRaw = postgres.Jsonb{RawMessage: payloadRaw}
|
||||||
|
|
||||||
// TODO add JSON start parameter scheme
|
// TODO add JSON start parameter scheme
|
||||||
|
|
||||||
// Validate the updated IC
|
// Validate the updated IC
|
||||||
err := updatedICReq.validate()
|
err = updatedICReq.validate()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("AMQP: Validation of updated IC failed: %v", err)
|
return fmt.Errorf("AMQP: Validation of updated IC failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,6 +44,7 @@ type validNewIC struct {
|
||||||
Location string `form:"Location" validate:"omitempty"`
|
Location string `form:"Location" validate:"omitempty"`
|
||||||
Description string `form:"Description" validate:"omitempty"`
|
Description string `form:"Description" validate:"omitempty"`
|
||||||
StartParameterScheme postgres.Jsonb `form:"StartParameterScheme" validate:"omitempty"`
|
StartParameterScheme postgres.Jsonb `form:"StartParameterScheme" validate:"omitempty"`
|
||||||
|
StatusUpdateRaw postgres.Jsonb `form:"StatusUpdateRaw" validate:"omitempty"`
|
||||||
ManagedExternally *bool `form:"ManagedExternally" validate:"required"`
|
ManagedExternally *bool `form:"ManagedExternally" validate:"required"`
|
||||||
Uptime float64 `form:"Uptime" validate:"omitempty"`
|
Uptime float64 `form:"Uptime" validate:"omitempty"`
|
||||||
}
|
}
|
||||||
|
@ -59,6 +60,7 @@ type validUpdatedIC struct {
|
||||||
Location string `form:"Location" validate:"omitempty"`
|
Location string `form:"Location" validate:"omitempty"`
|
||||||
Description string `form:"Description" validate:"omitempty"`
|
Description string `form:"Description" validate:"omitempty"`
|
||||||
StartParameterScheme postgres.Jsonb `form:"StartParameterScheme" validate:"omitempty"`
|
StartParameterScheme postgres.Jsonb `form:"StartParameterScheme" validate:"omitempty"`
|
||||||
|
StatusUpdateRaw postgres.Jsonb `form:"StatusUpdateRaw" validate:"omitempty"`
|
||||||
Uptime float64 `form:"Uptime" validate:"omitempty"`
|
Uptime float64 `form:"Uptime" validate:"omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -135,6 +137,7 @@ func (r *AddICRequest) createIC(receivedViaAMQP bool) (InfrastructureComponent,
|
||||||
s.Location = r.InfrastructureComponent.Location
|
s.Location = r.InfrastructureComponent.Location
|
||||||
s.Description = r.InfrastructureComponent.Description
|
s.Description = r.InfrastructureComponent.Description
|
||||||
s.StartParameterScheme = r.InfrastructureComponent.StartParameterScheme
|
s.StartParameterScheme = r.InfrastructureComponent.StartParameterScheme
|
||||||
|
s.StatusUpdateRaw = r.InfrastructureComponent.StatusUpdateRaw
|
||||||
s.ManagedExternally = *r.InfrastructureComponent.ManagedExternally
|
s.ManagedExternally = *r.InfrastructureComponent.ManagedExternally
|
||||||
s.Uptime = -1.0 // no uptime available
|
s.Uptime = -1.0 // no uptime available
|
||||||
if r.InfrastructureComponent.State != "" {
|
if r.InfrastructureComponent.State != "" {
|
||||||
|
@ -181,12 +184,19 @@ func (r *UpdateICRequest) updatedIC(oldIC InfrastructureComponent) Infrastructur
|
||||||
var emptyJson postgres.Jsonb
|
var emptyJson postgres.Jsonb
|
||||||
// Serialize empty json and params
|
// Serialize empty json and params
|
||||||
emptyJson_ser, _ := json.Marshal(emptyJson)
|
emptyJson_ser, _ := json.Marshal(emptyJson)
|
||||||
startParams_ser, _ := json.Marshal(r.InfrastructureComponent.StartParameterScheme)
|
|
||||||
opts := jsondiff.DefaultConsoleOptions()
|
opts := jsondiff.DefaultConsoleOptions()
|
||||||
|
|
||||||
|
startParams_ser, _ := json.Marshal(r.InfrastructureComponent.StartParameterScheme)
|
||||||
diff, _ := jsondiff.Compare(emptyJson_ser, startParams_ser, &opts)
|
diff, _ := jsondiff.Compare(emptyJson_ser, startParams_ser, &opts)
|
||||||
if diff.String() != "FullMatch" {
|
if diff.String() != "FullMatch" {
|
||||||
s.StartParameterScheme = r.InfrastructureComponent.StartParameterScheme
|
s.StartParameterScheme = r.InfrastructureComponent.StartParameterScheme
|
||||||
}
|
}
|
||||||
|
|
||||||
|
statusUpdateRaw_ser, _ := json.Marshal(r.InfrastructureComponent.StatusUpdateRaw)
|
||||||
|
diff, _ = jsondiff.Compare(emptyJson_ser, statusUpdateRaw_ser, &opts)
|
||||||
|
if diff.String() != "FullMatch" {
|
||||||
|
s.StatusUpdateRaw = r.InfrastructureComponent.StatusUpdateRaw
|
||||||
|
}
|
||||||
|
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue