continue integration of new AMQP functionality #31 #41 #42

This commit is contained in:
Sonja Happ 2020-10-21 17:16:15 +02:00
parent f9cef090d4
commit b07cd23a70
7 changed files with 252 additions and 80 deletions

View file

@ -103,7 +103,7 @@ var ICA = database.InfrastructureComponent{
Description: "This is a test description",
//StateUpdateAt: time.Now().Format(time.RFC1123),
StartParameterScheme: postgres.Jsonb{propertiesA},
ManagedExternally: true,
ManagedExternally: false,
}
var ICB = database.InfrastructureComponent{
@ -119,7 +119,7 @@ var ICB = database.InfrastructureComponent{
Description: "A signal generator for testing purposes",
//StateUpdateAt: time.Now().Format(time.RFC1123),
StartParameterScheme: postgres.Jsonb{propertiesB},
ManagedExternally: false,
ManagedExternally: true,
}
// Scenarios

View file

@ -121,9 +121,37 @@ func (m *ComponentConfiguration) delete() error {
return err
}
var ic infrastructure_component.InfrastructureComponent
err = ic.ByID(m.ICID)
if err != nil {
return err
}
// remove association between ComponentConfiguration and Scenario
// ComponentConfiguration itself is not deleted from DB, it remains as "dangling"
err = db.Model(&so).Association("ComponentConfigurations").Delete(m).Error
if err != nil {
return err
}
return err
// remove association between Infrastructure component and config
err = db.Model(&ic).Association("ComponentConfigurations").Delete(m).Error
if err != nil {
return err
}
// delete component configuration
err = db.Delete(m).Error
if err != nil {
return err
}
// if IC has state gone and there is no component configuration associated with it: delete IC
no_configs := db.Model(ic).Association("ComponentConfigurations").Count()
if no_configs == 0 && ic.State == "gone" {
err = db.Delete(ic).Error
return err
}
return nil
}

View file

@ -86,8 +86,11 @@ func addScenarioAndIC() (scenarioID uint, ICID uint) {
StartParameterScheme: helper.ICA.StartParameterScheme,
ManagedExternally: &helper.ICA.ManagedExternally,
}
_, resp, _ := helper.TestEndpoint(router, token,
code, resp, err := helper.TestEndpoint(router, token,
"/api/ic", "POST", helper.KeyModels{"ic": newICA})
if code != 200 || err != nil {
fmt.Println("Adding IC returned code", code, err, resp)
}
// Read newIC's ID from the response
newICID, _ := helper.GetResponseID(resp)
@ -103,10 +106,13 @@ func addScenarioAndIC() (scenarioID uint, ICID uint) {
Location: helper.ICB.Location,
Description: helper.ICB.Description,
StartParameterScheme: helper.ICB.StartParameterScheme,
ManagedExternally: &helper.ICB.ManagedExternally,
ManagedExternally: &helper.ICA.ManagedExternally,
}
_, resp, _ = helper.TestEndpoint(router, token,
code, resp, err = helper.TestEndpoint(router, token,
"/api/ic", "POST", helper.KeyModels{"ic": newICB})
if code != 200 || err != nil {
fmt.Println("Adding IC returned code", code, err, resp)
}
// authenticate as normal user
token, _ = helper.AuthenticateForTest(router,
@ -118,8 +124,11 @@ func addScenarioAndIC() (scenarioID uint, ICID uint) {
Running: helper.ScenarioA.Running,
StartParameters: helper.ScenarioA.StartParameters,
}
_, resp, _ = helper.TestEndpoint(router, token,
code, resp, err = helper.TestEndpoint(router, token,
"/api/scenarios", "POST", helper.KeyModels{"scenario": newScenario})
if code != 200 || err != nil {
fmt.Println("Adding Scenario returned code", code, err, resp)
}
// Read newScenario's ID from the response
newScenarioID, _ := helper.GetResponseID(resp)

View file

@ -41,12 +41,18 @@ type AMQPclient struct {
}
type Action struct {
Act string `json:"action"`
When float32 `json:"when"`
Parameters struct{} `json:"parameters"`
UUID *string `json:"uuid"`
//Model struct{} `json:"model"`
//Results struct{} `json:"results"`
Act string `json:"action"`
When int64 `json:"when"`
Properties struct {
UUID *string `json:"uuid"`
Name *string `json:"name"`
Category *string `json:"category"`
Type *string `json:"type"`
Location *string `json:"location"`
WS_url *string `json:"ws_url"`
API_url *string `json:"api_url"`
Description *string `json:"description"`
} `json:"properties"`
}
type ICUpdate struct {
@ -139,7 +145,7 @@ func ConnectAMQP(uri string) error {
return nil
}
func SendActionAMQP(action Action) error {
func sendActionAMQP(action Action) error {
payload, err := json.Marshal(action)
if err != nil {
@ -155,6 +161,20 @@ func SendActionAMQP(action Action) error {
Body: payload,
}
// set message headers
var headers map[string]interface{}
headers = make(map[string]interface{}) // empty map
if action.Properties.UUID != nil {
headers["uuid"] = *action.Properties.UUID
}
if action.Properties.Type != nil {
headers["type"] = *action.Properties.Type
}
if action.Properties.Category != nil {
headers["category"] = *action.Properties.Category
}
msg.Headers = headers
err = CheckConnection()
if err != nil {
return err
@ -175,9 +195,9 @@ func PingAMQP() error {
var a Action
a.Act = "ping"
*a.UUID = ""
*a.Properties.UUID = ""
err := SendActionAMQP(a)
err := sendActionAMQP(a)
return err
}

View file

@ -22,12 +22,12 @@
package infrastructure_component
import (
"fmt"
"git.rwth-aachen.de/acs/public/villas/web-backend-go/database"
"git.rwth-aachen.de/acs/public/villas/web-backend-go/helper"
"github.com/gin-gonic/gin"
"log"
"net/http"
"git.rwth-aachen.de/acs/public/villas/web-backend-go/database"
)
func RegisterICEndpoints(r *gin.RouterGroup) {
@ -96,22 +96,28 @@ func addIC(c *gin.Context) {
}
// Validate the request
if err = req.Validate(); err != nil {
if err = req.validate(); err != nil {
helper.UnprocessableEntityError(c, err.Error())
return
}
// TODO add case distinction here for externally managed IC
// Create the new IC from the request
newIC := req.CreateIC()
// Save new IC to DB
err = newIC.Save()
if !helper.DBError(c, err) {
c.JSON(http.StatusOK, gin.H{"ic": newIC.InfrastructureComponent})
newIC, err := req.createIC(false)
if err != nil {
helper.InternalServerError(c, "Unable to send create action: "+err.Error())
return
}
if !newIC.ManagedExternally {
// Save new IC to DB if not managed externally
err = newIC.Save()
if helper.DBError(c, err) {
return
}
}
c.JSON(http.StatusOK, gin.H{"ic": newIC.InfrastructureComponent})
}
// updateIC godoc
@ -144,15 +150,20 @@ func updateIC(c *gin.Context) {
}
// Validate the request
if err = req.Validate(); err != nil {
if err = req.validate(); err != nil {
helper.UnprocessableEntityError(c, err.Error())
return
}
// TODO add case distinction here for externally managed IC
// Create the updatedIC from oldIC
updatedIC := req.UpdatedIC(oldIC)
updatedIC, err := req.updatedIC(oldIC, false)
if err != nil {
c.JSON(http.StatusForbidden, gin.H{
"success": false,
"message": fmt.Sprintf("%v", err),
})
return
}
// Finally update the IC in the DB
err = oldIC.Update(updatedIC)
@ -205,14 +216,16 @@ func deleteIC(c *gin.Context) {
return
}
// TODO add case distinction here for externally managed IC
// Delete the IC
err := s.delete()
if !helper.DBError(c, err) {
c.JSON(http.StatusOK, gin.H{"ic": s.InfrastructureComponent})
err := s.delete(false)
if helper.DBError(c, err) {
return
} else if err != nil {
helper.InternalServerError(c, "Unable to send delete action: "+err.Error())
return
}
c.JSON(http.StatusOK, gin.H{"ic": s.InfrastructureComponent})
}
// getConfigsOfIC godoc
@ -278,9 +291,25 @@ func sendActionToIC(c *gin.Context) {
/*if action.When == 0 {
action.When = float32(now.Unix())
}*/
action.UUID = new(string)
*action.UUID = s.UUID
err = SendActionAMQP(action)
// make sure that the important properties are set correctly so that the message can be identified by the receiver
if action.Properties.UUID == nil {
action.Properties.UUID = new(string)
*action.Properties.UUID = s.UUID
}
if action.Properties.Type == nil {
action.Properties.Type = new(string)
*action.Properties.Type = s.Type
}
if action.Properties.Category == nil {
action.Properties.Category = new(string)
*action.Properties.Category = s.Category
}
if action.Properties.Name == nil {
action.Properties.Name = new(string)
*action.Properties.Name = s.Name
}
err = sendActionAMQP(action)
if err != nil {
helper.InternalServerError(c, "Unable to send actions to IC: "+err.Error())
return

View file

@ -23,8 +23,9 @@ package infrastructure_component
import (
"fmt"
"git.rwth-aachen.de/acs/public/villas/web-backend-go/database"
"log"
"time"
)
type InfrastructureComponent struct {
@ -57,7 +58,18 @@ func (s *InfrastructureComponent) Update(updatedIC InfrastructureComponent) erro
return err
}
func (s *InfrastructureComponent) delete() error {
func (s *InfrastructureComponent) delete(receivedViaAMQP bool) error {
if s.ManagedExternally && !receivedViaAMQP {
var action Action
action.Act = "delete"
action.When = time.Now().Unix()
action.Properties.UUID = new(string)
*action.Properties.UUID = s.UUID
err := sendActionAMQP(action)
return err
}
db := database.GetDB()
no_configs := db.Model(s).Association("ComponentConfigurations").Count()
@ -118,13 +130,16 @@ func createNewICviaAMQP(payload ICUpdate) error {
newICReq.InfrastructureComponent.ManagedExternally = newTrue()
// Validate the new IC
err := newICReq.Validate()
err := newICReq.validate()
if err != nil {
return fmt.Errorf("AMQP: Validation of new IC failed: %v", err)
}
// Create the new IC
newIC := newICReq.CreateIC()
newIC, err := newICReq.createIC(true)
if err != nil {
return fmt.Errorf("AMQP: Creating new IC failed: %v", err)
}
// save IC
err = newIC.Save()
@ -139,8 +154,17 @@ func (s *InfrastructureComponent) updateICviaAMQP(payload ICUpdate) error {
var updatedICReq UpdateICRequest
if payload.State != nil {
updatedICReq.InfrastructureComponent.State = *payload.State
// TODO check if state is "gone" and attempt to remove IC from DB if it still exists
// TODO if state is different from "gone", continue to update the IC
if *payload.State == "gone" {
// remove IC from DB
err := s.delete(true)
if err != nil {
// if component could not be deleted there are still configurations using it in the DB
// continue with the update to save the new state of the component and get back to the deletion later
log.Println("Could not delete IC because there is a config using it, deletion postponed")
}
}
}
if payload.Properties.Type != nil {
updatedICReq.InfrastructureComponent.Type = *payload.Properties.Type
@ -170,13 +194,16 @@ func (s *InfrastructureComponent) updateICviaAMQP(payload ICUpdate) error {
updatedICReq.InfrastructureComponent.ManagedExternally = newTrue()
// Validate the updated IC
err := updatedICReq.Validate()
err := updatedICReq.validate()
if err != nil {
return fmt.Errorf("AMQP: Validation of updated IC failed: %v", err)
}
// Create the updated IC from old IC
updatedIC := updatedICReq.UpdatedIC(*s)
updatedIC, err := updatedICReq.updatedIC(*s, true)
if err != nil {
return fmt.Errorf("AMQP: Unable to update IC %v : %v", s.Name, err)
}
// Finally update the IC in the DB
err = s.Update(updatedIC)

View file

@ -23,6 +23,8 @@ package infrastructure_component
import (
"encoding/json"
"fmt"
"github.com/google/uuid"
"github.com/jinzhu/gorm/dialects/postgres"
"github.com/nsf/jsondiff"
"gopkg.in/go-playground/validator.v9"
@ -32,7 +34,7 @@ import (
var validate *validator.Validate
type validNewIC struct {
UUID string `form:"UUID" validate:"required"`
UUID string `form:"UUID" validate:"omitempty"`
WebsocketURL string `form:"WebsocketURL" validate:"omitempty"`
APIURL string `form:"APIURL" validate:"omitempty"`
Type string `form:"Type" validate:"required"`
@ -67,46 +69,103 @@ type UpdateICRequest struct {
InfrastructureComponent validUpdatedIC `json:"ic"`
}
func (r *AddICRequest) Validate() error {
func (r *AddICRequest) validate() error {
validate = validator.New()
errs := validate.Struct(r)
return errs
}
func (r *UpdateICRequest) Validate() error {
validate = validator.New()
errs := validate.Struct(r)
return errs
}
func (r *AddICRequest) CreateIC() InfrastructureComponent {
var s InfrastructureComponent
s.UUID = r.InfrastructureComponent.UUID
s.WebsocketURL = r.InfrastructureComponent.WebsocketURL
s.APIURL = r.InfrastructureComponent.APIURL
s.Type = r.InfrastructureComponent.Type
s.Name = r.InfrastructureComponent.Name
s.Category = r.InfrastructureComponent.Category
s.Location = r.InfrastructureComponent.Location
s.Description = r.InfrastructureComponent.Description
s.StartParameterScheme = r.InfrastructureComponent.StartParameterScheme
s.ManagedExternally = *r.InfrastructureComponent.ManagedExternally
if r.InfrastructureComponent.State != "" {
s.State = r.InfrastructureComponent.State
} else {
s.State = "unknown"
if errs != nil {
return errs
}
// set last update to creation time of IC
s.StateUpdateAt = time.Now().Format(time.RFC1123)
return s
// check if uuid is valid
_, errs = uuid.Parse(r.InfrastructureComponent.UUID)
return errs
}
func (r *UpdateICRequest) UpdatedIC(oldIC InfrastructureComponent) InfrastructureComponent {
func (r *UpdateICRequest) validate() error {
validate = validator.New()
errs := validate.Struct(r)
return errs
}
func (r *AddICRequest) createIC(receivedViaAMQP bool) (InfrastructureComponent, error) {
var s InfrastructureComponent
var err error
err = nil
// case distinction for externally managed IC
if *r.InfrastructureComponent.ManagedExternally && !receivedViaAMQP {
var action Action
action.Act = "create"
action.When = time.Now().Unix()
action.Properties.Type = new(string)
action.Properties.Name = new(string)
action.Properties.Category = new(string)
*action.Properties.Type = r.InfrastructureComponent.Type
*action.Properties.Name = r.InfrastructureComponent.Name
*action.Properties.Category = r.InfrastructureComponent.Category
// set optional properties
if r.InfrastructureComponent.Description != "" {
action.Properties.Description = new(string)
*action.Properties.Description = r.InfrastructureComponent.Description
}
if r.InfrastructureComponent.Location != "" {
action.Properties.Location = new(string)
*action.Properties.Location = r.InfrastructureComponent.Location
}
if r.InfrastructureComponent.APIURL != "" {
action.Properties.API_url = new(string)
*action.Properties.API_url = r.InfrastructureComponent.APIURL
}
if r.InfrastructureComponent.WebsocketURL != "" {
action.Properties.WS_url = new(string)
*action.Properties.WS_url = r.InfrastructureComponent.WebsocketURL
}
if r.InfrastructureComponent.UUID != "" {
action.Properties.UUID = new(string)
*action.Properties.UUID = r.InfrastructureComponent.UUID
}
err = sendActionAMQP(action)
// s remains empty
} else {
s.UUID = r.InfrastructureComponent.UUID
s.WebsocketURL = r.InfrastructureComponent.WebsocketURL
s.APIURL = r.InfrastructureComponent.APIURL
s.Type = r.InfrastructureComponent.Type
s.Name = r.InfrastructureComponent.Name
s.Category = r.InfrastructureComponent.Category
s.Location = r.InfrastructureComponent.Location
s.Description = r.InfrastructureComponent.Description
s.StartParameterScheme = r.InfrastructureComponent.StartParameterScheme
s.ManagedExternally = *r.InfrastructureComponent.ManagedExternally
if r.InfrastructureComponent.State != "" {
s.State = r.InfrastructureComponent.State
} else {
s.State = "unknown"
}
// set last update to creation time of IC
s.StateUpdateAt = time.Now().Format(time.RFC1123)
}
return s, err
}
func (r *UpdateICRequest) updatedIC(oldIC InfrastructureComponent, receivedViaAMQP bool) (InfrastructureComponent, error) {
// Use the old InfrastructureComponent as a basis for the updated InfrastructureComponent `s`
s := oldIC
if s.ManagedExternally && !receivedViaAMQP {
// externally managed IC cannot be updated via API, only via AMQP
return s, fmt.Errorf("cannot update externally managed IC %v", s.Name)
}
if r.InfrastructureComponent.UUID != "" {
s.UUID = r.InfrastructureComponent.UUID
}
@ -161,5 +220,5 @@ func (r *UpdateICRequest) UpdatedIC(oldIC InfrastructureComponent) Infrastructur
s.StartParameterScheme = r.InfrastructureComponent.StartParameterScheme
}
return s
return s, nil
}