diff --git a/amqp/amqp_endpoints.go b/amqp/amqp_endpoints.go deleted file mode 100644 index 8082859..0000000 --- a/amqp/amqp_endpoints.go +++ /dev/null @@ -1,85 +0,0 @@ -/** AMQP package, endpoints. -* -* @author Sonja Happ -* @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC -* @license GNU General Public License (version 3) -* -* VILLASweb-backend-go -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU General Public License as published by -* the Free Software Foundation, either version 3 of the License, or -* any later version. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU General Public License for more details. -* -* You should have received a copy of the GNU General Public License -* along with this program. If not, see . -*********************************************************************************/ -package amqp - -import ( - "git.rwth-aachen.de/acs/public/villas/web-backend-go/database" - "git.rwth-aachen.de/acs/public/villas/web-backend-go/helper" - "git.rwth-aachen.de/acs/public/villas/web-backend-go/routes/infrastructure-component" - "github.com/gin-gonic/gin" - "log" - "net/http" -) - -func RegisterAMQPEndpoint(r *gin.RouterGroup) { - r.POST("/:ICID/action", sendActionToIC) -} - -// sendActionToIC godoc -// @Summary Send an action to IC (only available if backend server is started with -amqp parameter) -// @ID sendActionToIC -// @Tags AMQP -// @Produce json -// @Param inputAction query string true "Action for IC" -// @Success 200 {object} docs.ResponseError "Action sent successfully" -// @Failure 400 {object} docs.ResponseError "Bad request" -// @Failure 404 {object} docs.ResponseError "Not found" -// @Failure 422 {object} docs.ResponseError "Unprocessable entity" -// @Failure 500 {object} docs.ResponseError "Internal server error" -// @Param ICID path int true "InfrastructureComponent ID" -// @Router /ic/{ICID}/action [post] -// @Security Bearer -func sendActionToIC(c *gin.Context) { - - ok, s := infrastructure_component.CheckPermissions(c, database.ModelInfrastructureComponentAction, database.Update, true) - if !ok { - return - } - - var actions []Action - err := c.BindJSON(&actions) - if err != nil { - helper.BadRequestError(c, "Error binding form data to JSON: "+err.Error()) - return - } - - //now := time.Now() - log.Println("AMQP: Will attempt to send the following actions:", actions) - - for _, action := range actions { - /*if action.When == 0 { - action.When = float32(now.Unix()) - }*/ - action.UUID = new(string) - *action.UUID = s.UUID - err = SendActionAMQP(action) - if err != nil { - helper.InternalServerError(c, "Unable to send actions to IC: "+err.Error()) - return - } - } - - c.JSON(http.StatusOK, gin.H{ - "success": true, - "message": "OK.", - }) -} diff --git a/amqp/amqpclient.go b/amqp/amqpclient.go deleted file mode 100644 index d8398ac..0000000 --- a/amqp/amqpclient.go +++ /dev/null @@ -1,350 +0,0 @@ -/** AMQP package, client. -* -* @author Sonja Happ -* @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC -* @license GNU General Public License (version 3) -* -* VILLASweb-backend-go -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU General Public License as published by -* the Free Software Foundation, either version 3 of the License, or -* any later version. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU General Public License for more details. -* -* You should have received a copy of the GNU General Public License -* along with this program. If not, see . -*********************************************************************************/ -package amqp - -import ( - "encoding/json" - "fmt" - "log" - "time" - - infrastructure_component "git.rwth-aachen.de/acs/public/villas/web-backend-go/routes/infrastructure-component" - "github.com/gin-gonic/gin" - "github.com/google/uuid" - "github.com/jinzhu/gorm" - "github.com/jinzhu/gorm/dialects/postgres" - "github.com/streadway/amqp" -) - -const VILLAS_EXCHANGE = "villas" - -type AMQPclient struct { - connection *amqp.Connection - channel *amqp.Channel - replies <-chan amqp.Delivery -} - -type Action struct { - Act string `json:"action"` - When float32 `json:"when"` - Parameters struct{} `json:"parameters"` - UUID *string `json:"uuid"` - //Model struct{} `json:"model"` - //Results struct{} `json:"results"` -} - -type ICUpdate struct { - State *string `json:"state"` - Properties struct { - UUID string `json:"uuid"` - Name *string `json:"name"` - Category *string `json:"category"` - Type *string `json:"type"` - Location *string `json:"location"` - WS_url *string `json:"ws_url"` - API_url *string `json:"api_url"` - } `json:"properties"` -} - -var client AMQPclient - -func ConnectAMQP(uri string) error { - - var err error - - // connect to broker - client.connection, err = amqp.Dial(uri) - if err != nil { - return fmt.Errorf("AMQP: failed to connect to RabbitMQ broker %v, error: %v", uri, err) - } - - // create channel - client.channel, err = client.connection.Channel() - if err != nil { - return fmt.Errorf("AMQP: failed to open a channel, error: %v", err) - } - // declare exchange - err = client.channel.ExchangeDeclare(VILLAS_EXCHANGE, - "headers", - true, - false, - false, - false, - nil) - if err != nil { - return fmt.Errorf("AMQP: failed to declare the exchange, error: %v", err) - } - - // add a queue for the ICs - ICQueue, err := client.channel.QueueDeclare("infrastructure_components", - true, - false, - false, - false, - nil) - if err != nil { - return fmt.Errorf("AMQP: failed to declare the queue, error: %v", err) - } - - err = client.channel.QueueBind(ICQueue.Name, "", VILLAS_EXCHANGE, false, nil) - if err != nil { - return fmt.Errorf("AMQP: failed to bind the queue, error: %v", err) - } - - // consume deliveries - client.replies, err = client.channel.Consume(ICQueue.Name, - "", - true, - false, - false, - false, - nil) - if err != nil { - return fmt.Errorf("AMQP: failed to consume deliveries, error: %v", err) - } - - // consuming queue - go func() { - for { - for message := range client.replies { - processMessage(message) - } - time.Sleep(2) // sleep for 2 sek - } - }() - - log.Printf("AMQP: Waiting for messages... ") - - return nil -} - -func SendActionAMQP(action Action) error { - - payload, err := json.Marshal(action) - if err != nil { - return err - } - - msg := amqp.Publishing{ - DeliveryMode: 2, - Timestamp: time.Now(), - ContentType: "application/json", - ContentEncoding: "utf-8", - Priority: 0, - Body: payload, - } - - err = CheckConnection() - if err != nil { - return err - } - - log.Println("AMQP: Sending message", string(msg.Body)) - err = client.channel.Publish(VILLAS_EXCHANGE, - "", - false, - false, - msg) - return err - -} - -func PingAMQP() error { - log.Println("AMQP: sending ping command to all ICs") - - var a Action - a.Act = "ping" - *a.UUID = "" - - err := SendActionAMQP(a) - return err -} - -func CheckConnection() error { - - if client.connection != nil { - if client.connection.IsClosed() { - return fmt.Errorf("connection to broker is closed") - } - } else { - return fmt.Errorf("connection is nil") - } - - return nil -} - -func StartAMQP(AMQPurl string, api *gin.RouterGroup) error { - if AMQPurl != "" { - log.Println("Starting AMQP client") - - err := ConnectAMQP(AMQPurl) - if err != nil { - return err - } - - // register IC action endpoint only if AMQP client is used - RegisterAMQPEndpoint(api.Group("/ic")) - - // Periodically call the Ping function to check which ICs are still there - ticker := time.NewTicker(10 * time.Second) - go func() { - - for { - select { - case <-ticker.C: - //TODO Add a useful regular event here - /* - err = PingAMQP() - if err != nil { - log.Println("AMQP Error: ", err.Error()) - } - */ - } - } - - }() - - log.Printf("Connected AMQP client to %s", AMQPurl) - } - - return nil -} - -func processMessage(message amqp.Delivery) { - - log.Println("Processing AMQP message: ", string(message.Body)) - - var payload ICUpdate - err := json.Unmarshal(message.Body, &payload) - if err != nil { - log.Println("AMQP: Could not unmarshal message to JSON:", string(message.Body), "err: ", err) - return - } - - ICUUID := payload.Properties.UUID - _, err = uuid.Parse(ICUUID) - - if err != nil { - log.Printf("AMQP: UUID not valid: %v, message ignored: %v \n", ICUUID, string(message.Body)) - return - } - - var sToBeUpdated infrastructure_component.InfrastructureComponent - err = sToBeUpdated.ByUUID(ICUUID) - - if err == gorm.ErrRecordNotFound { - // create new record - var newICReq infrastructure_component.AddICRequest - newICReq.InfrastructureComponent.UUID = payload.Properties.UUID - if payload.Properties.Name == nil || - payload.Properties.Category == nil || - payload.Properties.Type == nil { - // cannot create new IC because required information (name, type, and/or category missing) - log.Println("AMQP: Cannot create new IC, required field(s) is/are missing: name, type, category") - return - } - newICReq.InfrastructureComponent.Name = *payload.Properties.Name - newICReq.InfrastructureComponent.Category = *payload.Properties.Category - newICReq.InfrastructureComponent.Type = *payload.Properties.Type - - // add optional params - if payload.State != nil { - newICReq.InfrastructureComponent.State = *payload.State - } else { - newICReq.InfrastructureComponent.State = "unknown" - } - if payload.Properties.WS_url != nil { - newICReq.InfrastructureComponent.Host = *payload.Properties.WS_url - } - if payload.Properties.API_url != nil { - newICReq.InfrastructureComponent.APIHost = *payload.Properties.API_url - } - if payload.Properties.Location != nil { - newICReq.InfrastructureComponent.Properties = postgres.Jsonb{json.RawMessage(`{"location" : " ` + *payload.Properties.Location + `"}`)} - } - - // Validate the new IC - err = newICReq.Validate() - if err != nil { - log.Println("AMQP: Validation of new IC failed:", err) - return - } - - // Create the new IC - newIC := newICReq.CreateIC() - - // save IC - err = newIC.Save() - if err != nil { - log.Println("AMQP: Saving new IC to DB failed:", err) - return - } - - log.Println("AMQP: Created IC ", newIC.Name) - - } else if err != nil { - log.Println("AMQP: Database error for IC", ICUUID, " DB error message: ", err) - return - } else { - - var updatedICReq infrastructure_component.UpdateICRequest - if payload.State != nil { - updatedICReq.InfrastructureComponent.State = *payload.State - } - if payload.Properties.Type != nil { - updatedICReq.InfrastructureComponent.Type = *payload.Properties.Type - } - if payload.Properties.Category != nil { - updatedICReq.InfrastructureComponent.Category = *payload.Properties.Category - } - if payload.Properties.Name != nil { - updatedICReq.InfrastructureComponent.Name = *payload.Properties.Name - } - if payload.Properties.WS_url != nil { - updatedICReq.InfrastructureComponent.Host = *payload.Properties.WS_url - } - if payload.Properties.API_url != nil { - updatedICReq.InfrastructureComponent.APIHost = *payload.Properties.API_url - } - if payload.Properties.Location != nil { - updatedICReq.InfrastructureComponent.Properties = postgres.Jsonb{json.RawMessage(`{"location" : " ` + *payload.Properties.Location + `"}`)} - } - - // Validate the updated IC - if err = updatedICReq.Validate(); err != nil { - log.Println("AMQP: Validation of updated IC failed:", err) - return - } - - // Create the updated IC from old IC - updatedIC := updatedICReq.UpdatedIC(sToBeUpdated) - - // Finally update the IC in the DB - err = sToBeUpdated.Update(updatedIC) - if err != nil { - log.Println("AMQP: Unable to update IC", sToBeUpdated.Name, "in DB: ", err) - return - } - - log.Println("AMQP: Updated IC ", sToBeUpdated.Name) - } -} diff --git a/configuration/config.go b/configuration/config.go index 50e28d6..5a8d28a 100644 --- a/configuration/config.go +++ b/configuration/config.go @@ -23,6 +23,7 @@ package configuration import ( "flag" + "git.rwth-aachen.de/acs/public/villas/web-backend-go/helper" "log" "os" @@ -51,9 +52,9 @@ func InitConfig() error { port = flag.String("port", "4000", "Port of the backend (default is 4000)") baseHost = flag.String("base-host", "localhost:4000", "The host at which the backend is hosted (default: localhost)") basePath = flag.String("base-path", "/api/v2", "The path at which the API routes are located (default /api/v2)") - adminUser = flag.String("admin-user", "", "Initial admin username") - adminPass = flag.String("admin-pass", "", "Initial admin password") - adminMail = flag.String("admin-mail", "", "Initial admin mail address") + adminUser = flag.String("admin-user", helper.User0.Username, "Initial admin username") + adminPass = flag.String("admin-pass", helper.StrPassword0, "Initial admin password") + adminMail = flag.String("admin-mail", helper.User0.Mail, "Initial admin mail address") ) flag.Parse() diff --git a/database/models.go b/database/models.go index 8d1f36a..787a7c3 100644 --- a/database/models.go +++ b/database/models.go @@ -120,24 +120,28 @@ type InfrastructureComponent struct { UUID string `json:"uuid" gorm:"not null"` // Name of the IC Name string `json:"name" gorm:"default:''"` - // Host if the IC - Host string `json:"host" gorm:"default:''"` - // Host of API for IC - APIHost string `json:"apihost" gorm:"default:''"` + // WebsocketURL if the IC + WebsocketURL string `json:"websocketurl" gorm:"default:''"` + // API URL of API for IC + APIURL string `json:"apiurl" gorm:"default:''"` // Category of IC (simulator, gateway, database, etc.) Category string `json:"category" gorm:"default:''"` // Type of IC (RTDS, VILLASnode, RTDS, etc.) Type string `json:"type" gorm:"default:''"` // Uptime of the IC - Uptime int `json:"uptime" gorm:"default:0"` + Uptime float64 `json:"uptime" gorm:"default:0"` // State of the IC State string `json:"state" gorm:"default:''"` // Time of last state update StateUpdateAt string `json:"stateUpdateAt" gorm:"default:''"` - // Properties of IC as JSON string - Properties postgres.Jsonb `json:"properties"` - // Raw properties of IC as JSON string - RawProperties postgres.Jsonb `json:"rawProperties"` + // Location of the IC + Location string `json:"location" gorm:"default:''"` + // Description of the IC + Description string `json:"description" gorm:"default:''"` + // JSON scheme of start parameters for IC + StartParameterScheme postgres.Jsonb `json:"startparameterscheme"` + // Boolean indicating if IC is managed externally (via AMQP/ VILLAScontroller) + ManagedExternally bool `json:"managedexternally" gorm:"default:false"` // ComponentConfigurations in which the IC is used ComponentConfigurations []ComponentConfiguration `json:"-" gorm:"foreignkey:ICID"` } diff --git a/doc/api/docs.go b/doc/api/docs.go index a32cd60..a381b6b 100644 --- a/doc/api/docs.go +++ b/doc/api/docs.go @@ -1,6 +1,6 @@ // GENERATED BY THE COMMAND ABOVE; DO NOT EDIT // This file was generated by swaggo/swag at -// 2020-09-25 16:13:15.130920598 +0200 CEST m=+0.092357808 +// 2020-11-11 16:32:47.799676915 +0100 CET m=+0.126448240 package docs @@ -1078,7 +1078,7 @@ var doc = `{ "required": true, "schema": { "type": "object", - "$ref": "#/definitions/infrastructure_component.addICRequest" + "$ref": "#/definitions/infrastructure_component.AddICRequest" } } ], @@ -1198,7 +1198,7 @@ var doc = `{ "required": true, "schema": { "type": "object", - "$ref": "#/definitions/infrastructure_component.updateICRequest" + "$ref": "#/definitions/infrastructure_component.UpdateICRequest" } }, { @@ -1310,7 +1310,7 @@ var doc = `{ "application/json" ], "tags": [ - "AMQP" + "infrastructure-components" ], "summary": "Send an action to IC (only available if backend server is started with -amqp parameter)", "operationId": "sendActionToIC", @@ -2977,31 +2977,35 @@ var doc = `{ "database.InfrastructureComponent": { "type": "object", "properties": { - "apihost": { - "description": "Host of API for IC", + "apiurl": { + "description": "API URL of API for IC", "type": "string" }, "category": { "description": "Category of IC (simulator, gateway, database, etc.)", "type": "string" }, - "host": { - "description": "Host if the IC", + "description": { + "description": "Description of the IC", "type": "string" }, "id": { "type": "integer" }, + "location": { + "description": "Location of the IC", + "type": "string" + }, + "managedexternally": { + "description": "Boolean indicating if IC is managed externally (via AMQP/ VILLAScontroller)", + "type": "boolean" + }, "name": { "description": "Name of the IC", "type": "string" }, - "properties": { - "description": "Properties of IC as JSON string", - "type": "string" - }, - "rawProperties": { - "description": "Raw properties of IC as JSON string", + "startparameterscheme": { + "description": "JSON scheme of start parameters for IC", "type": "string" }, "state": { @@ -3018,11 +3022,15 @@ var doc = `{ }, "uptime": { "description": "Uptime of the IC", - "type": "integer" + "type": "number" }, "uuid": { "description": "UUID of the IC", "type": "string" + }, + "websocketurl": { + "description": "WebsocketURL if the IC", + "type": "string" } } }, @@ -3351,7 +3359,7 @@ var doc = `{ } } }, - "infrastructure_component.addICRequest": { + "infrastructure_component.AddICRequest": { "type": "object", "properties": { "ic": { @@ -3360,7 +3368,7 @@ var doc = `{ } } }, - "infrastructure_component.updateICRequest": { + "infrastructure_component.UpdateICRequest": { "type": "object", "properties": { "ic": { @@ -3373,24 +3381,30 @@ var doc = `{ "type": "object", "required": [ "Category", + "ManagedExternally", "Name", - "Type", - "UUID" + "Type" ], "properties": { - "APIHost": { + "APIURL": { "type": "string" }, "Category": { "type": "string" }, - "Host": { + "Description": { "type": "string" }, + "Location": { + "type": "string" + }, + "ManagedExternally": { + "type": "boolean" + }, "Name": { "type": "string" }, - "Properties": { + "StartParameterScheme": { "type": "string" }, "State": { @@ -3401,25 +3415,34 @@ var doc = `{ }, "UUID": { "type": "string" + }, + "Uptime": { + "type": "number" + }, + "WebsocketURL": { + "type": "string" } } }, "infrastructure_component.validUpdatedIC": { "type": "object", "properties": { - "APIHost": { + "APIURL": { "type": "string" }, "Category": { "type": "string" }, - "Host": { + "Description": { + "type": "string" + }, + "Location": { "type": "string" }, "Name": { "type": "string" }, - "Properties": { + "StartParameterScheme": { "type": "string" }, "State": { @@ -3430,6 +3453,12 @@ var doc = `{ }, "UUID": { "type": "string" + }, + "Uptime": { + "type": "number" + }, + "WebsocketURL": { + "type": "string" } } }, @@ -3765,7 +3794,7 @@ type swaggerInfo struct { var SwaggerInfo = swaggerInfo{ Version: "2.0", Host: "", - BasePath: "http://localhost:4000/api/v2/", + BasePath: "/api/v2", Schemes: []string{}, Title: "VILLASweb Backend API", Description: "This is the [VILLASweb Backend](https://git.rwth-aachen.de/acs/public/villas/web-backend-go) API v2.0.\nThis documentation is auto-generated based on the API documentation in the code. The tool [swag](https://github.com/swaggo/swag) is used to auto-generate API docs for the [gin-gonic](https://github.com/gin-gonic/gin) framework.\nAuthentication: Use the authenticate endpoint below to obtain a token for your user account, copy the token into to the value field of the dialog showing up for the green Authorize button below and confirm with Done.", diff --git a/doc/api/swagger.json b/doc/api/swagger.json index 20834e5..f992ef0 100644 --- a/doc/api/swagger.json +++ b/doc/api/swagger.json @@ -13,7 +13,7 @@ }, "version": "2.0" }, - "basePath": "http://localhost:4000/api/v2/", + "basePath": "/api/v2", "paths": { "/authenticate": { "post": { @@ -1061,7 +1061,7 @@ "required": true, "schema": { "type": "object", - "$ref": "#/definitions/infrastructure_component.addICRequest" + "$ref": "#/definitions/infrastructure_component.AddICRequest" } } ], @@ -1181,7 +1181,7 @@ "required": true, "schema": { "type": "object", - "$ref": "#/definitions/infrastructure_component.updateICRequest" + "$ref": "#/definitions/infrastructure_component.UpdateICRequest" } }, { @@ -1293,7 +1293,7 @@ "application/json" ], "tags": [ - "AMQP" + "infrastructure-components" ], "summary": "Send an action to IC (only available if backend server is started with -amqp parameter)", "operationId": "sendActionToIC", @@ -2960,31 +2960,35 @@ "database.InfrastructureComponent": { "type": "object", "properties": { - "apihost": { - "description": "Host of API for IC", + "apiurl": { + "description": "API URL of API for IC", "type": "string" }, "category": { "description": "Category of IC (simulator, gateway, database, etc.)", "type": "string" }, - "host": { - "description": "Host if the IC", + "description": { + "description": "Description of the IC", "type": "string" }, "id": { "type": "integer" }, + "location": { + "description": "Location of the IC", + "type": "string" + }, + "managedexternally": { + "description": "Boolean indicating if IC is managed externally (via AMQP/ VILLAScontroller)", + "type": "boolean" + }, "name": { "description": "Name of the IC", "type": "string" }, - "properties": { - "description": "Properties of IC as JSON string", - "type": "string" - }, - "rawProperties": { - "description": "Raw properties of IC as JSON string", + "startparameterscheme": { + "description": "JSON scheme of start parameters for IC", "type": "string" }, "state": { @@ -3001,11 +3005,15 @@ }, "uptime": { "description": "Uptime of the IC", - "type": "integer" + "type": "number" }, "uuid": { "description": "UUID of the IC", "type": "string" + }, + "websocketurl": { + "description": "WebsocketURL if the IC", + "type": "string" } } }, @@ -3334,7 +3342,7 @@ } } }, - "infrastructure_component.addICRequest": { + "infrastructure_component.AddICRequest": { "type": "object", "properties": { "ic": { @@ -3343,7 +3351,7 @@ } } }, - "infrastructure_component.updateICRequest": { + "infrastructure_component.UpdateICRequest": { "type": "object", "properties": { "ic": { @@ -3356,24 +3364,30 @@ "type": "object", "required": [ "Category", + "ManagedExternally", "Name", - "Type", - "UUID" + "Type" ], "properties": { - "APIHost": { + "APIURL": { "type": "string" }, "Category": { "type": "string" }, - "Host": { + "Description": { "type": "string" }, + "Location": { + "type": "string" + }, + "ManagedExternally": { + "type": "boolean" + }, "Name": { "type": "string" }, - "Properties": { + "StartParameterScheme": { "type": "string" }, "State": { @@ -3384,25 +3398,34 @@ }, "UUID": { "type": "string" + }, + "Uptime": { + "type": "number" + }, + "WebsocketURL": { + "type": "string" } } }, "infrastructure_component.validUpdatedIC": { "type": "object", "properties": { - "APIHost": { + "APIURL": { "type": "string" }, "Category": { "type": "string" }, - "Host": { + "Description": { + "type": "string" + }, + "Location": { "type": "string" }, "Name": { "type": "string" }, - "Properties": { + "StartParameterScheme": { "type": "string" }, "State": { @@ -3413,6 +3436,12 @@ }, "UUID": { "type": "string" + }, + "Uptime": { + "type": "number" + }, + "WebsocketURL": { + "type": "string" } } }, diff --git a/doc/api/swagger.yaml b/doc/api/swagger.yaml index 8668a64..7cb52c3 100644 --- a/doc/api/swagger.yaml +++ b/doc/api/swagger.yaml @@ -1,4 +1,4 @@ -basePath: http://localhost:4000/api/v2/ +basePath: /api/v2 definitions: component_configuration.addConfigRequest: properties: @@ -146,25 +146,28 @@ definitions: type: object database.InfrastructureComponent: properties: - apihost: - description: Host of API for IC + apiurl: + description: API URL of API for IC type: string category: description: Category of IC (simulator, gateway, database, etc.) type: string - host: - description: Host if the IC + description: + description: Description of the IC type: string id: type: integer + location: + description: Location of the IC + type: string + managedexternally: + description: Boolean indicating if IC is managed externally (via AMQP/ VILLAScontroller) + type: boolean name: description: Name of the IC type: string - properties: - description: Properties of IC as JSON string - type: string - rawProperties: - description: Raw properties of IC as JSON string + startparameterscheme: + description: JSON scheme of start parameters for IC type: string state: description: State of the IC @@ -177,10 +180,13 @@ definitions: type: string uptime: description: Uptime of the IC - type: integer + type: number uuid: description: UUID of the IC type: string + websocketurl: + description: WebsocketURL if the IC + type: string type: object database.Scenario: properties: @@ -404,13 +410,13 @@ definitions: $ref: '#/definitions/database.Widget' type: array type: object - infrastructure_component.addICRequest: + infrastructure_component.AddICRequest: properties: ic: $ref: '#/definitions/infrastructure_component.validNewIC' type: object type: object - infrastructure_component.updateICRequest: + infrastructure_component.UpdateICRequest: properties: ic: $ref: '#/definitions/infrastructure_component.validUpdatedIC' @@ -418,15 +424,19 @@ definitions: type: object infrastructure_component.validNewIC: properties: - APIHost: + APIURL: type: string Category: type: string - Host: + Description: type: string + Location: + type: string + ManagedExternally: + type: boolean Name: type: string - Properties: + StartParameterScheme: type: string State: type: string @@ -434,23 +444,29 @@ definitions: type: string UUID: type: string + Uptime: + type: number + WebsocketURL: + type: string required: - Category + - ManagedExternally - Name - Type - - UUID type: object infrastructure_component.validUpdatedIC: properties: - APIHost: + APIURL: type: string Category: type: string - Host: + Description: + type: string + Location: type: string Name: type: string - Properties: + StartParameterScheme: type: string State: type: string @@ -458,6 +474,10 @@ definitions: type: string UUID: type: string + Uptime: + type: number + WebsocketURL: + type: string type: object scenario.addScenarioRequest: properties: @@ -1357,7 +1377,7 @@ paths: name: inputIC required: true schema: - $ref: '#/definitions/infrastructure_component.addICRequest' + $ref: '#/definitions/infrastructure_component.AddICRequest' type: object produces: - application/json @@ -1470,7 +1490,7 @@ paths: name: inputIC required: true schema: - $ref: '#/definitions/infrastructure_component.updateICRequest' + $ref: '#/definitions/infrastructure_component.UpdateICRequest' type: object - description: InfrastructureComponent ID in: path @@ -1547,7 +1567,7 @@ paths: summary: Send an action to IC (only available if backend server is started with -amqp parameter) tags: - - AMQP + - infrastructure-components /ic/{ICID}/configs: get: operationId: getConfigsOfIC diff --git a/helper/test_data.go b/helper/test_data.go index c04d757..6d0120f 100644 --- a/helper/test_data.go +++ b/helper/test_data.go @@ -92,34 +92,38 @@ var NewUserC = UserRequest{ // Infrastructure components -var propertiesA = json.RawMessage(`{"location" : "ACSlab"}`) -var propertiesB = json.RawMessage(`{"location" : "ACSlab"}`) +var propertiesA = json.RawMessage(`{"prop1" : "a nice prop"}`) +var propertiesB = json.RawMessage(`{"prop1" : "not so nice"}`) var ICA = database.InfrastructureComponent{ - UUID: "4854af30-325f-44a5-ad59-b67b2597de68", - Host: "xxx.yyy.zzz.aaa", - Type: "DPsim", - Category: "Simulator", - Name: "Test DPsim Simulator", - Uptime: 0, - State: "running", - StateUpdateAt: time.Now().Format(time.RFC1123), - Properties: postgres.Jsonb{propertiesA}, - RawProperties: postgres.Jsonb{propertiesA}, + UUID: "7be0322d-354e-431e-84bd-ae4c9633138b", + WebsocketURL: "https://villas-new.k8s.eonerc.rwth-aachen.de/ws/ws_sig", + APIURL: "https://villas-new.k8s.eonerc.rwth-aachen.de/ws/api/v2", + Type: "villas-node", + Category: "gateway", + Name: "ACS Demo Signals", + Uptime: -1.0, + State: "idle", + Location: "k8s", + Description: "A signal generator for testing purposes", + //StateUpdateAt: time.Now().Format(time.RFC1123), + StartParameterScheme: postgres.Jsonb{propertiesA}, + ManagedExternally: false, } var ICB = database.InfrastructureComponent{ - UUID: "7be0322d-354e-431e-84bd-ae4c9633138b", - Host: "https://villas-new.k8s.eonerc.rwth-aachen.de/ws/ws_sig", - APIHost: "https://villas-new.k8s.eonerc.rwth-aachen.de/ws/api", - Type: "VILLASnode Signal Generator", - Category: "Signal Generator", - Name: "ACS Demo Signals", - Uptime: 0, - State: "idle", - StateUpdateAt: time.Now().Format(time.RFC1123), - Properties: postgres.Jsonb{propertiesB}, - RawProperties: postgres.Jsonb{propertiesB}, + UUID: "4854af30-325f-44a5-ad59-b67b2597de68", + WebsocketURL: "xxx.yyy.zzz.aaa", + Type: "dpsim", + Category: "simulator", + Name: "Test DPsim Simulator", + Uptime: -1.0, + State: "running", + Location: "ACS Laboratory", + Description: "This is a test description", + //StateUpdateAt: time.Now().Format(time.RFC1123), + StartParameterScheme: postgres.Jsonb{propertiesB}, + ManagedExternally: true, } // Scenarios @@ -350,20 +354,28 @@ func DBAddAdminUser(cfg *config.Config) error { if len(users) == 0 { fmt.Println("No admin user found in DB, adding default admin user.") + mode, err := cfg.String("mode") + name, err := cfg.String("admin.user") - if err != nil || name == "" { + if (err != nil || name == "") && mode != "test" { name = "admin" + } else if mode == "test" { + name = User0.Username } pw, err := cfg.String("admin.pass") - if err != nil || pw == "" { + if (err != nil || pw == "") && mode != "test" { pw = generatePassword(16) fmt.Printf(" Generated admin password: %s\n", pw) + } else if mode == "test" { + pw = StrPassword0 } mail, err := cfg.String("admin.mail") - if err == nil || mail == "" { + if (err == nil || mail == "") && mode != "test" { mail = "admin@example.com" + } else if mode == "test" { + mail = User0.Mail } pwEnc, _ := bcrypt.GenerateFromPassword([]byte(pw), bcryptCost) diff --git a/routes/component-configuration/config_methods.go b/routes/component-configuration/config_methods.go index c48473f..767b76c 100644 --- a/routes/component-configuration/config_methods.go +++ b/routes/component-configuration/config_methods.go @@ -23,7 +23,6 @@ package component_configuration import ( "git.rwth-aachen.de/acs/public/villas/web-backend-go/database" - "git.rwth-aachen.de/acs/public/villas/web-backend-go/routes/infrastructure-component" "git.rwth-aachen.de/acs/public/villas/web-backend-go/routes/scenario" ) @@ -61,8 +60,11 @@ func (m *ComponentConfiguration) addToScenario() error { } // associate IC with component configuration - var ic infrastructure_component.InfrastructureComponent - err = ic.ByID(m.ICID) + var ic database.InfrastructureComponent + err = db.Find(&ic, m.ICID).Error + if err != nil { + return err + } err = db.Model(&ic).Association("ComponentConfigurations").Append(m).Error if err != nil { return err @@ -80,23 +82,24 @@ func (m *ComponentConfiguration) Update(modifiedConfig ComponentConfiguration) e // check if IC has been updated if m.ICID != modifiedConfig.ICID { // update IC - var s infrastructure_component.InfrastructureComponent - var s_old infrastructure_component.InfrastructureComponent - err := s.ByID(modifiedConfig.ICID) + var ic database.InfrastructureComponent + var ic_old database.InfrastructureComponent + err := db.Find(&ic, modifiedConfig.ICID).Error if err != nil { return err } - err = s_old.ByID(m.ICID) + err = db.Find(&ic_old, m.ICID).Error if err != nil { return err } + // remove component configuration from old IC - err = db.Model(&s_old).Association("ComponentConfigurations").Delete(m).Error + err = db.Model(&ic_old).Association("ComponentConfigurations").Delete(m).Error if err != nil { return err } // add component configuration to new IC - err = db.Model(&s).Association("ComponentConfigurations").Append(m).Error + err = db.Model(&ic).Association("ComponentConfigurations").Append(m).Error if err != nil { return err } @@ -121,9 +124,37 @@ func (m *ComponentConfiguration) delete() error { return err } + var ic database.InfrastructureComponent + err = db.Find(&ic, m.ICID).Error + if err != nil { + return err + } + // remove association between ComponentConfiguration and Scenario // ComponentConfiguration itself is not deleted from DB, it remains as "dangling" err = db.Model(&so).Association("ComponentConfigurations").Delete(m).Error + if err != nil { + return err + } - return err + // remove association between Infrastructure component and config + err = db.Model(&ic).Association("ComponentConfigurations").Delete(m).Error + if err != nil { + return err + } + + // delete component configuration + err = db.Delete(m).Error + if err != nil { + return err + } + + // if IC has state gone and there is no component configuration associated with it: delete IC + no_configs := db.Model(ic).Association("ComponentConfigurations").Count() + if no_configs == 0 && ic.State == "gone" { + err = db.Delete(ic).Error + return err + } + + return nil } diff --git a/routes/component-configuration/config_test.go b/routes/component-configuration/config_test.go index e283f99..579f13c 100644 --- a/routes/component-configuration/config_test.go +++ b/routes/component-configuration/config_test.go @@ -49,13 +49,16 @@ type ConfigRequest struct { } type ICRequest struct { - UUID string `json:"uuid,omitempty"` - Host string `json:"host,omitempty"` - Type string `json:"type,omitempty"` - Name string `json:"name,omitempty"` - Category string `json:"category,omitempty"` - State string `json:"state,omitempty"` - Properties postgres.Jsonb `json:"properties,omitempty"` + UUID string `json:"uuid,omitempty"` + WebsocketURL string `json:"websocketurl,omitempty"` + Type string `json:"type,omitempty"` + Name string `json:"name,omitempty"` + Category string `json:"category,omitempty"` + State string `json:"state,omitempty"` + Location string `json:"location,omitempty"` + Description string `json:"description,omitempty"` + StartParameterScheme postgres.Jsonb `json:"startparameterscheme,omitempty"` + ManagedExternally *bool `json:"managedexternally,omitempty"` } type ScenarioRequest struct { @@ -72,32 +75,44 @@ func addScenarioAndIC() (scenarioID uint, ICID uint) { // POST $newICA newICA := ICRequest{ - UUID: helper.ICA.UUID, - Host: helper.ICA.Host, - Type: helper.ICA.Type, - Name: helper.ICA.Name, - Category: helper.ICA.Category, - State: helper.ICA.State, - Properties: helper.ICA.Properties, + UUID: helper.ICA.UUID, + WebsocketURL: helper.ICA.WebsocketURL, + Type: helper.ICA.Type, + Name: helper.ICA.Name, + Category: helper.ICA.Category, + State: helper.ICA.State, + Location: helper.ICA.Location, + Description: helper.ICA.Description, + StartParameterScheme: helper.ICA.StartParameterScheme, + ManagedExternally: &helper.ICA.ManagedExternally, } - _, resp, _ := helper.TestEndpoint(router, token, + code, resp, err := helper.TestEndpoint(router, token, "/api/ic", "POST", helper.KeyModels{"ic": newICA}) + if code != 200 || err != nil { + fmt.Println("Adding IC returned code", code, err, resp) + } // Read newIC's ID from the response newICID, _ := helper.GetResponseID(resp) // POST a second IC to change to that IC during testing newICB := ICRequest{ - UUID: helper.ICB.UUID, - Host: helper.ICB.Host, - Type: helper.ICB.Type, - Name: helper.ICB.Name, - Category: helper.ICB.Category, - State: helper.ICB.State, - Properties: helper.ICB.Properties, + UUID: helper.ICB.UUID, + WebsocketURL: helper.ICB.WebsocketURL, + Type: helper.ICB.Type, + Name: helper.ICB.Name, + Category: helper.ICB.Category, + State: helper.ICB.State, + Location: helper.ICB.Location, + Description: helper.ICB.Description, + StartParameterScheme: helper.ICB.StartParameterScheme, + ManagedExternally: &helper.ICA.ManagedExternally, } - _, resp, _ = helper.TestEndpoint(router, token, + code, resp, err = helper.TestEndpoint(router, token, "/api/ic", "POST", helper.KeyModels{"ic": newICB}) + if code != 200 || err != nil { + fmt.Println("Adding IC returned code", code, err, resp) + } // authenticate as normal user token, _ = helper.AuthenticateForTest(router, @@ -109,8 +124,11 @@ func addScenarioAndIC() (scenarioID uint, ICID uint) { Running: helper.ScenarioA.Running, StartParameters: helper.ScenarioA.StartParameters, } - _, resp, _ = helper.TestEndpoint(router, token, + code, resp, err = helper.TestEndpoint(router, token, "/api/scenarios", "POST", helper.KeyModels{"scenario": newScenario}) + if code != 200 || err != nil { + fmt.Println("Adding Scenario returned code", code, err, resp) + } // Read newScenario's ID from the response newScenarioID, _ := helper.GetResponseID(resp) diff --git a/routes/healthz/healthz_endpoint.go b/routes/healthz/healthz_endpoint.go index 5d2276c..e9928ab 100644 --- a/routes/healthz/healthz_endpoint.go +++ b/routes/healthz/healthz_endpoint.go @@ -22,10 +22,10 @@ package healthz import ( - "git.rwth-aachen.de/acs/public/villas/web-backend-go/amqp" "git.rwth-aachen.de/acs/public/villas/web-backend-go/configuration" "git.rwth-aachen.de/acs/public/villas/web-backend-go/database" "git.rwth-aachen.de/acs/public/villas/web-backend-go/helper" + "git.rwth-aachen.de/acs/public/villas/web-backend-go/routes/infrastructure-component" "github.com/gin-gonic/gin" "log" "net/http" @@ -68,7 +68,7 @@ func getHealth(c *gin.Context) { } if len(url) != 0 { - err = amqp.CheckConnection() + err = infrastructure_component.CheckConnection() if err != nil { log.Println(err.Error()) c.JSON(http.StatusInternalServerError, gin.H{ diff --git a/routes/healthz/healthz_test.go b/routes/healthz/healthz_test.go index 24e4a61..6bdada4 100644 --- a/routes/healthz/healthz_test.go +++ b/routes/healthz/healthz_test.go @@ -22,10 +22,10 @@ package healthz import ( - "git.rwth-aachen.de/acs/public/villas/web-backend-go/amqp" "git.rwth-aachen.de/acs/public/villas/web-backend-go/configuration" "git.rwth-aachen.de/acs/public/villas/web-backend-go/database" "git.rwth-aachen.de/acs/public/villas/web-backend-go/helper" + "git.rwth-aachen.de/acs/public/villas/web-backend-go/routes/infrastructure-component" "github.com/gin-gonic/gin" "github.com/stretchr/testify/assert" "log" @@ -78,7 +78,7 @@ func TestHealthz(t *testing.T) { amqpURI := "amqp://" + user + ":" + pass + "@" + host log.Println("AMQP URI is", amqpURI) - err = amqp.ConnectAMQP(amqpURI) + err = infrastructure_component.ConnectAMQP(amqpURI) assert.NoError(t, err) // test healthz endpoint for connected DB and AMQP client diff --git a/routes/infrastructure-component/ic_amqpclient.go b/routes/infrastructure-component/ic_amqpclient.go new file mode 100644 index 0000000..1adf68e --- /dev/null +++ b/routes/infrastructure-component/ic_amqpclient.go @@ -0,0 +1,431 @@ +/** AMQP package, client. +* +* @author Sonja Happ +* @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC +* @license GNU General Public License (version 3) +* +* VILLASweb-backend-go +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* any later version. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with this program. If not, see . +*********************************************************************************/ +package infrastructure_component + +import ( + "encoding/json" + "fmt" + "github.com/gin-gonic/gin" + "github.com/google/uuid" + "github.com/jinzhu/gorm" + "github.com/streadway/amqp" + "log" + "time" +) + +const VILLAS_EXCHANGE = "villas" + +type AMQPclient struct { + connection *amqp.Connection + channel *amqp.Channel + replies <-chan amqp.Delivery +} + +type Action struct { + Act string `json:"action"` + When int64 `json:"when"` + Properties struct { + UUID *string `json:"uuid"` + Name *string `json:"name"` + Category *string `json:"category"` + Type *string `json:"type"` + Location *string `json:"location"` + WS_url *string `json:"ws_url"` + API_url *string `json:"api_url"` + Description *string `json:"description"` + } `json:"properties"` +} + +type ICStatus struct { + UUID string `json:"uuid"` + State *string `json:"state"` + Name *string `json:"name"` + Category *string `json:"category"` + Type *string `json:"type"` + Location *string `json:"location"` + WS_url *string `json:"ws_url"` + API_url *string `json:"api_url"` + Description *string `json:"description"` + Uptime *float64 `json:"uptime"` // TODO check if data type of uptime is float64 or int +} + +type ICUpdate struct { + Status *ICStatus `json:"status"` + // TODO add JSON start parameter scheme +} + +var client AMQPclient + +func ConnectAMQP(uri string) error { + + var err error + + // connect to broker + client.connection, err = amqp.Dial(uri) + if err != nil { + return fmt.Errorf("AMQP: failed to connect to RabbitMQ broker %v, error: %v", uri, err) + } + + // create channel + client.channel, err = client.connection.Channel() + if err != nil { + return fmt.Errorf("AMQP: failed to open a channel, error: %v", err) + } + // declare exchange + err = client.channel.ExchangeDeclare(VILLAS_EXCHANGE, + "headers", + true, + false, + false, + false, + nil) + if err != nil { + return fmt.Errorf("AMQP: failed to declare the exchange, error: %v", err) + } + + // add a queue for the ICs + ICQueue, err := client.channel.QueueDeclare("infrastructure_components", + true, + false, + false, + false, + nil) + if err != nil { + return fmt.Errorf("AMQP: failed to declare the queue, error: %v", err) + } + + err = client.channel.QueueBind(ICQueue.Name, "", VILLAS_EXCHANGE, false, nil) + if err != nil { + return fmt.Errorf("AMQP: failed to bind the queue, error: %v", err) + } + + // consume deliveries + client.replies, err = client.channel.Consume(ICQueue.Name, + "", + true, + false, + false, + false, + nil) + if err != nil { + return fmt.Errorf("AMQP: failed to consume deliveries, error: %v", err) + } + + // consuming queue + go func() { + for { + for message := range client.replies { + err = processMessage(message) + if err != nil { + log.Println(err.Error()) + } + } + time.Sleep(2) // sleep for 2 sek + } + }() + + log.Printf(" AMQP: Waiting for messages... ") + + return nil +} + +func sendActionAMQP(action Action) error { + + payload, err := json.Marshal(action) + if err != nil { + return err + } + + msg := amqp.Publishing{ + DeliveryMode: 2, + Timestamp: time.Now(), + ContentType: "application/json", + ContentEncoding: "utf-8", + Priority: 0, + Body: payload, + } + + // set message headers + var headers map[string]interface{} + headers = make(map[string]interface{}) // empty map + if action.Properties.UUID != nil { + headers["uuid"] = *action.Properties.UUID + } + if action.Properties.Type != nil { + headers["type"] = *action.Properties.Type + } + if action.Properties.Category != nil { + headers["category"] = *action.Properties.Category + } + msg.Headers = headers + + err = CheckConnection() + if err != nil { + return err + } + + //log.Println("AMQP: Sending message", string(msg.Body)) + err = client.channel.Publish(VILLAS_EXCHANGE, + "", + false, + false, + msg) + return err + +} + +//func PingAMQP() error { +// log.Println("AMQP: sending ping command to all ICs") +// +// var a Action +// a.Act = "ping" +// *a.Properties.UUID = "" +// +// err := sendActionAMQP(a) +// return err +//} + +func CheckConnection() error { + + if client.connection != nil { + if client.connection.IsClosed() { + return fmt.Errorf("connection to broker is closed") + } + } else { + return fmt.Errorf("connection is nil") + } + + return nil +} + +func StartAMQP(AMQPurl string, api *gin.RouterGroup) error { + if AMQPurl != "" { + log.Println("Starting AMQP client") + + err := ConnectAMQP(AMQPurl) + if err != nil { + return err + } + + // register IC action endpoint only if AMQP client is used + RegisterAMQPEndpoint(api.Group("/ic")) + + // Periodically call the Ping function to check which ICs are still there + ticker := time.NewTicker(10 * time.Second) + go func() { + + for { + select { + case <-ticker.C: + //TODO Add a useful regular event here + /* + err = PingAMQP() + if err != nil { + log.Println("AMQP Error: ", err.Error()) + } + */ + } + } + + }() + + log.Printf("Connected AMQP client to %s", AMQPurl) + } + + return nil +} + +func processMessage(message amqp.Delivery) error { + + var payload ICUpdate + err := json.Unmarshal(message.Body, &payload) + if err != nil { + return fmt.Errorf("AMQP: Could not unmarshal message to JSON: %v err: %v", string(message.Body), err) + } + + if payload.Status != nil { + //log.Println("Processing AMQP message: ", string(message.Body)) + // if a message contains a "state" field, it is an update for an IC + ICUUID := payload.Status.UUID + _, err = uuid.Parse(ICUUID) + + if err != nil { + return fmt.Errorf("AMQP: UUID not valid: %v, message ignored: %v \n", ICUUID, string(message.Body)) + } + var sToBeUpdated InfrastructureComponent + err = sToBeUpdated.byUUID(ICUUID) + + if err == gorm.ErrRecordNotFound { + // create new record + err = createExternalIC(payload) + } else if err != nil { + // database error + err = fmt.Errorf("AMQP: Database error for IC %v DB error message: %v", ICUUID, err) + } else { + // update record based on payload + err = sToBeUpdated.updateExternalIC(payload) + } + } + return err +} + +func createExternalIC(payload ICUpdate) error { + + var newICReq AddICRequest + newICReq.InfrastructureComponent.UUID = payload.Status.UUID + if payload.Status.Name == nil || + payload.Status.Category == nil || + payload.Status.Type == nil { + // cannot create new IC because required information (name, type, and/or category missing) + return fmt.Errorf("AMQP: Cannot create new IC, required field(s) is/are missing: name, type, category") + } + newICReq.InfrastructureComponent.Name = *payload.Status.Name + newICReq.InfrastructureComponent.Category = *payload.Status.Category + newICReq.InfrastructureComponent.Type = *payload.Status.Type + + // add optional params + if payload.Status.State != nil { + newICReq.InfrastructureComponent.State = *payload.Status.State + } else { + newICReq.InfrastructureComponent.State = "unknown" + } + if newICReq.InfrastructureComponent.State == "gone" { + // Check if state is "gone" and abort creation of IC in this case + log.Println("AMQP: Aborting creation of IC with state gone") + return nil + } + + if payload.Status.WS_url != nil { + newICReq.InfrastructureComponent.WebsocketURL = *payload.Status.WS_url + } + if payload.Status.API_url != nil { + newICReq.InfrastructureComponent.APIURL = *payload.Status.API_url + } + if payload.Status.Location != nil { + newICReq.InfrastructureComponent.Location = *payload.Status.Location + } + if payload.Status.Description != nil { + newICReq.InfrastructureComponent.Description = *payload.Status.Description + } + if payload.Status.Uptime != nil { + newICReq.InfrastructureComponent.Uptime = *payload.Status.Uptime + } + // TODO add JSON start parameter scheme + + // set managed externally to true because this IC is created via AMQP + newICReq.InfrastructureComponent.ManagedExternally = newTrue() + + // Validate the new IC + err := newICReq.validate() + if err != nil { + return fmt.Errorf("AMQP: Validation of new IC failed: %v", err) + } + + // Create the new IC + newIC, err := newICReq.createIC(true) + if err != nil { + return fmt.Errorf("AMQP: Creating new IC failed: %v", err) + } + + // save IC + err = newIC.save() + if err != nil { + return fmt.Errorf("AMQP: Saving new IC to DB failed: %v", err) + } + + log.Println("AMQP: Created IC with UUID ", newIC.UUID) + return nil +} + +func (s *InfrastructureComponent) updateExternalIC(payload ICUpdate) error { + + var updatedICReq UpdateICRequest + if payload.Status.State != nil { + updatedICReq.InfrastructureComponent.State = *payload.Status.State + + if *payload.Status.State == "gone" { + // remove IC from DB + log.Println("AMQP: Deleting IC with state gone") + err := s.delete(true) + if err != nil { + // if component could not be deleted there are still configurations using it in the DB + // continue with the update to save the new state of the component and get back to the deletion later + log.Println("AMQP: Deletion of IC postponed (config(s) associated to it)") + } + + } + } + if payload.Status.Type != nil { + updatedICReq.InfrastructureComponent.Type = *payload.Status.Type + } + if payload.Status.Category != nil { + updatedICReq.InfrastructureComponent.Category = *payload.Status.Category + } + if payload.Status.Name != nil { + updatedICReq.InfrastructureComponent.Name = *payload.Status.Name + } + if payload.Status.WS_url != nil { + updatedICReq.InfrastructureComponent.WebsocketURL = *payload.Status.WS_url + } + if payload.Status.API_url != nil { + updatedICReq.InfrastructureComponent.APIURL = *payload.Status.API_url + } + if payload.Status.Location != nil { + //postgres.Jsonb{json.RawMessage(`{"location" : " ` + *payload.Status.Location + `"}`)} + updatedICReq.InfrastructureComponent.Location = *payload.Status.Location + } + if payload.Status.Description != nil { + updatedICReq.InfrastructureComponent.Description = *payload.Status.Description + } + if payload.Status.Uptime != nil { + updatedICReq.InfrastructureComponent.Uptime = *payload.Status.Uptime + } + // TODO add JSON start parameter scheme + + // Validate the updated IC + err := updatedICReq.validate() + if err != nil { + return fmt.Errorf("AMQP: Validation of updated IC failed: %v", err) + } + + // Create the updated IC from old IC + updatedIC := updatedICReq.updatedIC(*s) + + // Finally update the IC in the DB + err = s.update(updatedIC) + if err != nil { + return fmt.Errorf("AMQP: Unable to update IC %v in DB: %v", s.Name, err) + } + + log.Println("AMQP: Updated IC with UUID ", s.UUID) + return err +} + +func newTrue() *bool { + b := true + return &b +} + +func newFalse() *bool { + b := false + return &b +} diff --git a/routes/infrastructure-component/ic_endpoints.go b/routes/infrastructure-component/ic_endpoints.go index afc3b32..2b9f5b8 100644 --- a/routes/infrastructure-component/ic_endpoints.go +++ b/routes/infrastructure-component/ic_endpoints.go @@ -22,11 +22,11 @@ package infrastructure_component import ( + "git.rwth-aachen.de/acs/public/villas/web-backend-go/database" "git.rwth-aachen.de/acs/public/villas/web-backend-go/helper" "github.com/gin-gonic/gin" + "log" "net/http" - - "git.rwth-aachen.de/acs/public/villas/web-backend-go/database" ) func RegisterICEndpoints(r *gin.RouterGroup) { @@ -38,6 +38,10 @@ func RegisterICEndpoints(r *gin.RouterGroup) { r.GET("/:ICID/configs", getConfigsOfIC) } +func RegisterAMQPEndpoint(r *gin.RouterGroup) { + r.POST("/:ICID/action", sendActionToIC) +} + // getICs godoc // @Summary Get all infrastructure components // @ID getICs @@ -91,20 +95,28 @@ func addIC(c *gin.Context) { } // Validate the request - if err = req.Validate(); err != nil { + if err = req.validate(); err != nil { helper.UnprocessableEntityError(c, err.Error()) return } // Create the new IC from the request - newIC := req.CreateIC() - - // Save new IC to DB - err = newIC.Save() - if !helper.DBError(c, err) { - c.JSON(http.StatusOK, gin.H{"ic": newIC.InfrastructureComponent}) + newIC, err := req.createIC(false) + if err != nil { + helper.InternalServerError(c, "Unable to send create action: "+err.Error()) + return } + if !(newIC.ManagedExternally) { + // Save new IC to DB if not managed externally + err = newIC.save() + + if helper.DBError(c, err) { + return + } + } + + c.JSON(http.StatusOK, gin.H{"ic": newIC.InfrastructureComponent}) } // updateIC godoc @@ -129,6 +141,11 @@ func updateIC(c *gin.Context) { return } + if oldIC.ManagedExternally { + helper.ForbiddenError(c, "Cannot update externally managed component via API") + return + } + var req UpdateICRequest err := c.BindJSON(&req) if err != nil { @@ -137,16 +154,16 @@ func updateIC(c *gin.Context) { } // Validate the request - if err = req.Validate(); err != nil { + if err = req.validate(); err != nil { helper.UnprocessableEntityError(c, err.Error()) return } // Create the updatedIC from oldIC - updatedIC := req.UpdatedIC(oldIC) + updatedIC := req.updatedIC(oldIC) // Finally update the IC in the DB - err = oldIC.Update(updatedIC) + err = oldIC.update(updatedIC) if !helper.DBError(c, err) { c.JSON(http.StatusOK, gin.H{"ic": updatedIC.InfrastructureComponent}) } @@ -197,11 +214,15 @@ func deleteIC(c *gin.Context) { } // Delete the IC - err := s.delete() - if !helper.DBError(c, err) { - c.JSON(http.StatusOK, gin.H{"ic": s.InfrastructureComponent}) + err := s.delete(false) + if helper.DBError(c, err) { + return + } else if err != nil { + helper.InternalServerError(c, "Unable to send delete action: "+err.Error()) + return } + c.JSON(http.StatusOK, gin.H{"ic": s.InfrastructureComponent}) } // getConfigsOfIC godoc @@ -231,3 +252,69 @@ func getConfigsOfIC(c *gin.Context) { } } + +// sendActionToIC godoc +// @Summary Send an action to IC (only available if backend server is started with -amqp parameter) +// @ID sendActionToIC +// @Tags infrastructure-components +// @Produce json +// @Param inputAction query string true "Action for IC" +// @Success 200 {object} docs.ResponseError "Action sent successfully" +// @Failure 400 {object} docs.ResponseError "Bad request" +// @Failure 404 {object} docs.ResponseError "Not found" +// @Failure 422 {object} docs.ResponseError "Unprocessable entity" +// @Failure 500 {object} docs.ResponseError "Internal server error" +// @Param ICID path int true "InfrastructureComponent ID" +// @Router /ic/{ICID}/action [post] +// @Security Bearer +func sendActionToIC(c *gin.Context) { + + ok, s := CheckPermissions(c, database.ModelInfrastructureComponentAction, database.Update, true) + if !ok { + return + } + + var actions []Action + err := c.BindJSON(&actions) + if err != nil { + helper.BadRequestError(c, "Error binding form data to JSON: "+err.Error()) + return + } + + //now := time.Now() + log.Println("AMQP: Sending actions:", actions) + + for _, action := range actions { + /*if action.When == 0 { + action.When = float32(now.Unix()) + }*/ + // make sure that the important properties are set correctly so that the message can be identified by the receiver + if action.Properties.UUID == nil { + action.Properties.UUID = new(string) + *action.Properties.UUID = s.UUID + } + if action.Properties.Type == nil { + action.Properties.Type = new(string) + *action.Properties.Type = s.Type + } + if action.Properties.Category == nil { + action.Properties.Category = new(string) + *action.Properties.Category = s.Category + } + if action.Properties.Name == nil { + action.Properties.Name = new(string) + *action.Properties.Name = s.Name + } + + err = sendActionAMQP(action) + if err != nil { + helper.InternalServerError(c, "Unable to send actions to IC: "+err.Error()) + return + } + } + + c.JSON(http.StatusOK, gin.H{ + "success": true, + "message": "OK.", + }) +} diff --git a/routes/infrastructure-component/ic_methods.go b/routes/infrastructure-component/ic_methods.go index 62a2449..b8d06c1 100644 --- a/routes/infrastructure-component/ic_methods.go +++ b/routes/infrastructure-component/ic_methods.go @@ -23,33 +23,34 @@ package infrastructure_component import ( "fmt" - "git.rwth-aachen.de/acs/public/villas/web-backend-go/database" + "log" + "time" ) type InfrastructureComponent struct { database.InfrastructureComponent } -func (s *InfrastructureComponent) Save() error { +func (s *InfrastructureComponent) save() error { db := database.GetDB() err := db.Create(s).Error return err } -func (s *InfrastructureComponent) ByID(id uint) error { +func (s *InfrastructureComponent) byID(id uint) error { db := database.GetDB() err := db.Find(s, id).Error return err } -func (s *InfrastructureComponent) ByUUID(uuid string) error { +func (s *InfrastructureComponent) byUUID(uuid string) error { db := database.GetDB() err := db.Find(s, "UUID = ?", uuid).Error return err } -func (s *InfrastructureComponent) Update(updatedIC InfrastructureComponent) error { +func (s *InfrastructureComponent) update(updatedIC InfrastructureComponent) error { db := database.GetDB() err := db.Model(s).Updates(updatedIC).Error @@ -57,7 +58,19 @@ func (s *InfrastructureComponent) Update(updatedIC InfrastructureComponent) erro return err } -func (s *InfrastructureComponent) delete() error { +func (s *InfrastructureComponent) delete(receivedViaAMQP bool) error { + if s.ManagedExternally && !receivedViaAMQP { + var action Action + action.Act = "delete" + action.When = time.Now().Unix() + action.Properties.UUID = new(string) + *action.Properties.UUID = s.UUID + + log.Println("AMQP: Sending request to delete IC with UUID", s.UUID) + err := sendActionAMQP(action) + return err + } + db := database.GetDB() no_configs := db.Model(s).Association("ComponentConfigurations").Count() diff --git a/routes/infrastructure-component/ic_middleware.go b/routes/infrastructure-component/ic_middleware.go index a5c7d5a..3125d5b 100644 --- a/routes/infrastructure-component/ic_middleware.go +++ b/routes/infrastructure-component/ic_middleware.go @@ -45,7 +45,7 @@ func CheckPermissions(c *gin.Context, modeltype database.ModelName, operation da return false, s } - err = s.ByID(uint(ICID)) + err = s.byID(uint(ICID)) if helper.DBError(c, err) { return false, s } diff --git a/routes/infrastructure-component/ic_test.go b/routes/infrastructure-component/ic_test.go index 178fb9b..21a064f 100644 --- a/routes/infrastructure-component/ic_test.go +++ b/routes/infrastructure-component/ic_test.go @@ -22,12 +22,17 @@ package infrastructure_component import ( + "encoding/json" "fmt" "git.rwth-aachen.de/acs/public/villas/web-backend-go/helper" + component_configuration "git.rwth-aachen.de/acs/public/villas/web-backend-go/routes/component-configuration" + "git.rwth-aachen.de/acs/public/villas/web-backend-go/routes/scenario" "github.com/jinzhu/gorm/dialects/postgres" + "github.com/streadway/amqp" "github.com/stretchr/testify/assert" "os" "testing" + "time" "github.com/gin-gonic/gin" @@ -37,16 +42,50 @@ import ( ) var router *gin.Engine +var api *gin.RouterGroup +var waitingTime time.Duration = 2 type ICRequest struct { - UUID string `json:"uuid,omitempty"` - Host string `json:"host,omitempty"` - APIHost string `json:"apihost,omitempty"` - Type string `json:"type,omitempty"` - Name string `json:"name,omitempty"` - Category string `json:"category,omitempty"` - State string `json:"state,omitempty"` - Properties postgres.Jsonb `json:"properties,omitempty"` + UUID string `json:"uuid,omitempty"` + WebsocketURL string `json:"websocketurl,omitempty"` + APIURL string `json:"apiurl,omitempty"` + Type string `json:"type,omitempty"` + Name string `json:"name,omitempty"` + Category string `json:"category,omitempty"` + State string `json:"state,omitempty"` + Location string `json:"location,omitempty"` + Description string `json:"description,omitempty"` + StartParameterScheme postgres.Jsonb `json:"startparameterscheme,omitempty"` + ManagedExternally *bool `json:"managedexternally"` +} + +type ScenarioRequest struct { + Name string `json:"name,omitempty"` + Running bool `json:"running,omitempty"` + StartParameters postgres.Jsonb `json:"startParameters,omitempty"` +} + +type ConfigRequest struct { + Name string `json:"name,omitempty"` + ScenarioID uint `json:"scenarioID,omitempty"` + ICID uint `json:"icID,omitempty"` + StartParameters postgres.Jsonb `json:"startParameters,omitempty"` + FileIDs []int64 `json:"fileIDs,omitempty"` +} + +type ICAction struct { + Act string `json:"action,omitempty"` + When int64 `json:"when,omitempty"` + Properties struct { + UUID *string `json:"uuid,omitempty"` + Name *string `json:"name,omitempty"` + Category *string `json:"category,omitempty"` + Type *string `json:"type,omitempty"` + Location *string `json:"location,omitempty"` + WS_url *string `json:"ws_url,omitempty"` + API_url *string `json:"api_url,omitempty"` + Description *string `json:"description,omitempty"` + } `json:"properties,omitempty"` } func TestMain(m *testing.M) { @@ -62,11 +101,16 @@ func TestMain(m *testing.M) { defer database.DBpool.Close() router = gin.Default() - api := router.Group("/api") + api = router.Group("/api") user.RegisterAuthenticate(api.Group("/authenticate")) api.Use(user.Authentication(true)) RegisterICEndpoints(api.Group("/ic")) + // component configuration endpoints required to associate an IC with a component config + component_configuration.RegisterComponentConfigurationEndpoints(api.Group("/configs")) + // scenario endpoints required here to first add a scenario to the DB + // that can be associated with a new component configuration + scenario.RegisterScenarioEndpoints(api.Group("/scenarios")) os.Exit(m.Run()) } @@ -76,6 +120,22 @@ func TestAddICAsAdmin(t *testing.T) { database.MigrateModels() assert.NoError(t, helper.DBAddAdminAndUserAndGuest()) + // check AMQP connection + err := CheckConnection() + assert.Errorf(t, err, "connection is nil") + + // connect AMQP client + // Make sure that AMQP_HOST, AMQP_USER, AMQP_PASS are set + host, err := configuration.GolbalConfig.String("amqp.host") + user, err := configuration.GolbalConfig.String("amqp.user") + pass, err := configuration.GolbalConfig.String("amqp.pass") + amqpURI := "amqp://" + user + ":" + pass + "@" + host + + // AMQP Connection startup is tested here + // Not repeated in other tests because it is only needed once + err = StartAMQP(amqpURI, api) + assert.NoError(t, err) + // authenticate as admin token, err := helper.AuthenticateForTest(router, "/api/authenticate", "POST", helper.AdminCredentials) @@ -100,13 +160,17 @@ func TestAddICAsAdmin(t *testing.T) { // test POST ic/ $newIC newIC := ICRequest{ - UUID: helper.ICA.UUID, - Host: helper.ICA.Host, - Type: helper.ICA.Type, - Name: helper.ICA.Name, - Category: helper.ICA.Category, - State: helper.ICA.State, - Properties: helper.ICA.Properties, + UUID: helper.ICA.UUID, + WebsocketURL: helper.ICA.WebsocketURL, + APIURL: helper.ICB.APIURL, + Type: helper.ICA.Type, + Name: helper.ICA.Name, + Category: helper.ICA.Category, + State: helper.ICA.State, + Location: helper.ICA.Location, + Description: helper.ICA.Description, + StartParameterScheme: helper.ICA.StartParameterScheme, + ManagedExternally: newFalse(), } code, resp, err = helper.TestEndpoint(router, token, "/api/ic", "POST", helper.KeyModels{"ic": newIC}) @@ -138,6 +202,30 @@ func TestAddICAsAdmin(t *testing.T) { fmt.Sprintf("/api/ic/%v", newICID+1), "GET", nil) assert.NoError(t, err) assert.Equalf(t, 404, code, "Response body: \n%v\n", resp) + + newExternalIC := ICRequest{ + UUID: helper.ICB.UUID, + WebsocketURL: helper.ICB.WebsocketURL, + APIURL: helper.ICB.APIURL, + Type: helper.ICB.Type, + Name: helper.ICB.Name, + Category: helper.ICB.Category, + State: helper.ICB.State, + Location: helper.ICB.Location, + Description: helper.ICB.Description, + StartParameterScheme: helper.ICB.StartParameterScheme, + ManagedExternally: newTrue(), + } + + // test creation of external IC (should lead to emission of AMQP message to VILLAS) + code, resp, err = helper.TestEndpoint(router, token, + "/api/ic", "POST", helper.KeyModels{"ic": newExternalIC}) + assert.NoError(t, err) + assert.Equalf(t, 200, code, "Response body: \n%v\n", resp) + + // Compare POST's response with the newExternalIC + err = helper.CompareResponse(resp, helper.KeyModels{"ic": newExternalIC}) + assert.NoError(t, err) } func TestAddICAsUser(t *testing.T) { @@ -152,13 +240,16 @@ func TestAddICAsUser(t *testing.T) { // test POST ic/ $newIC newIC := ICRequest{ - UUID: helper.ICA.UUID, - Host: helper.ICA.Host, - Type: helper.ICA.Type, - Name: helper.ICA.Name, - Category: helper.ICA.Category, - State: helper.ICA.State, - Properties: helper.ICA.Properties, + UUID: helper.ICA.UUID, + WebsocketURL: helper.ICA.WebsocketURL, + Type: helper.ICA.Type, + Name: helper.ICA.Name, + Category: helper.ICA.Category, + State: helper.ICA.State, + Location: helper.ICA.Location, + Description: helper.ICA.Description, + StartParameterScheme: helper.ICA.StartParameterScheme, + ManagedExternally: newFalse(), } // This should fail with unprocessable entity 422 error code @@ -181,13 +272,16 @@ func TestUpdateICAsAdmin(t *testing.T) { // test POST ic/ $newIC newIC := ICRequest{ - UUID: helper.ICA.UUID, - Host: helper.ICA.Host, - Type: helper.ICA.Type, - Name: helper.ICA.Name, - Category: helper.ICA.Category, - State: helper.ICA.State, - Properties: helper.ICA.Properties, + UUID: helper.ICA.UUID, + WebsocketURL: helper.ICA.WebsocketURL, + Type: helper.ICA.Type, + Name: helper.ICA.Name, + Category: helper.ICA.Category, + State: helper.ICA.State, + Location: helper.ICA.Location, + Description: helper.ICA.Description, + StartParameterScheme: helper.ICA.StartParameterScheme, + ManagedExternally: newFalse(), } code, resp, err := helper.TestEndpoint(router, token, "/api/ic", "POST", helper.KeyModels{"ic": newIC}) @@ -210,7 +304,7 @@ func TestUpdateICAsAdmin(t *testing.T) { assert.Equalf(t, 400, code, "Response body: \n%v\n", resp) // Test PUT IC - newIC.Host = "ThisIsMyNewHost" + newIC.WebsocketURL = "ThisIsMyNewURL" code, resp, err = helper.TestEndpoint(router, token, fmt.Sprintf("/api/ic/%v", newICID), "PUT", helper.KeyModels{"ic": newIC}) assert.NoError(t, err) @@ -231,6 +325,53 @@ func TestUpdateICAsAdmin(t *testing.T) { err = helper.CompareResponse(resp, helper.KeyModels{"ic": newIC}) assert.NoError(t, err) + // fake an IC update (create) message + var update ICUpdate + update.Status = new(ICStatus) + update.Status.UUID = helper.ICB.UUID + update.Status.State = new(string) + *update.Status.State = "idle" + update.Status.Name = new(string) + *update.Status.Name = helper.ICB.Name + update.Status.Category = new(string) + *update.Status.Category = helper.ICB.Category + update.Status.Type = new(string) + *update.Status.Type = helper.ICB.Type + + payload, err := json.Marshal(update) + assert.NoError(t, err) + + msg := amqp.Publishing{ + DeliveryMode: 2, + Timestamp: time.Now(), + ContentType: "application/json", + ContentEncoding: "utf-8", + Priority: 0, + Body: payload, + } + + err = CheckConnection() + assert.NoError(t, err) + + err = client.channel.Publish(VILLAS_EXCHANGE, + "", + false, + false, + msg) + assert.NoError(t, err) + + // Wait until externally managed IC is created (happens async) + time.Sleep(waitingTime * time.Second) + + // try to update this IC + var updatedIC ICRequest + updatedIC.Name = "a new name" + + // Should result in forbidden return code 403 + code, resp, err = helper.TestEndpoint(router, token, + fmt.Sprintf("/api/ic/%v", 2), "PUT", helper.KeyModels{"ic": updatedIC}) + assert.NoError(t, err) + assert.Equalf(t, 403, code, "Response body: \n%v\n", resp) } func TestUpdateICAsUser(t *testing.T) { @@ -245,13 +386,16 @@ func TestUpdateICAsUser(t *testing.T) { // test POST ic/ $newIC newIC := ICRequest{ - UUID: helper.ICA.UUID, - Host: helper.ICA.Host, - Type: helper.ICA.Type, - Name: helper.ICA.Name, - Category: helper.ICA.Category, - State: helper.ICA.State, - Properties: helper.ICA.Properties, + UUID: helper.ICA.UUID, + WebsocketURL: helper.ICA.WebsocketURL, + Type: helper.ICA.Type, + Name: helper.ICA.Name, + Category: helper.ICA.Category, + State: helper.ICA.State, + Location: helper.ICA.Location, + Description: helper.ICA.Description, + StartParameterScheme: helper.ICA.StartParameterScheme, + ManagedExternally: newFalse(), } code, resp, err := helper.TestEndpoint(router, token, "/api/ic", "POST", helper.KeyModels{"ic": newIC}) @@ -269,7 +413,7 @@ func TestUpdateICAsUser(t *testing.T) { // Test PUT IC // This should fail with unprocessable entity status code 422 - newIC.Host = "ThisIsMyNewHost" + newIC.WebsocketURL = "ThisIsMyNewURL" code, resp, err = helper.TestEndpoint(router, token, fmt.Sprintf("/api/ic/%v", newICID), "PUT", helper.KeyModels{"ic": newIC}) assert.NoError(t, err) @@ -289,13 +433,16 @@ func TestDeleteICAsAdmin(t *testing.T) { // test POST ic/ $newIC newIC := ICRequest{ - UUID: helper.ICA.UUID, - Host: helper.ICA.Host, - Type: helper.ICA.Type, - Name: helper.ICA.Name, - Category: helper.ICA.Category, - State: helper.ICA.State, - Properties: helper.ICA.Properties, + UUID: helper.ICA.UUID, + WebsocketURL: helper.ICA.WebsocketURL, + Type: helper.ICA.Type, + Name: helper.ICA.Name, + Category: helper.ICA.Category, + State: helper.ICA.State, + Location: helper.ICA.Location, + Description: helper.ICA.Description, + StartParameterScheme: helper.ICA.StartParameterScheme, + ManagedExternally: newFalse(), } code, resp, err := helper.TestEndpoint(router, token, "/api/ic", "POST", helper.KeyModels{"ic": newIC}) @@ -327,6 +474,58 @@ func TestDeleteICAsAdmin(t *testing.T) { assert.NoError(t, err) assert.Equal(t, finalNumber, initialNumber-1) + + // fake an IC update (create) message + var update ICUpdate + update.Status = new(ICStatus) + update.Status.UUID = helper.ICB.UUID + update.Status.State = new(string) + *update.Status.State = "idle" + update.Status.Name = new(string) + *update.Status.Name = helper.ICB.Name + update.Status.Category = new(string) + *update.Status.Category = helper.ICB.Category + update.Status.Type = new(string) + *update.Status.Type = helper.ICB.Type + + payload, err := json.Marshal(update) + assert.NoError(t, err) + + msg := amqp.Publishing{ + DeliveryMode: 2, + Timestamp: time.Now(), + ContentType: "application/json", + ContentEncoding: "utf-8", + Priority: 0, + Body: payload, + } + + err = CheckConnection() + assert.NoError(t, err) + + err = client.channel.Publish(VILLAS_EXCHANGE, + "", + false, + false, + msg) + assert.NoError(t, err) + + // Wait until externally managed IC is created (happens async) + time.Sleep(waitingTime * time.Second) + + // Delete the added external IC (triggers an AMQP message, but should not remove the IC from the DB) + code, resp, err = helper.TestEndpoint(router, token, + fmt.Sprintf("/api/ic/%v", 2), "DELETE", nil) + assert.NoError(t, err) + assert.Equalf(t, 200, code, "Response body: \n%v\n", resp) + + // Again count the number of all the ICs returned + finalNumberAfterExtneralDelete, err := helper.LengthOfResponse(router, token, + "/api/ic", "GET", nil) + assert.NoError(t, err) + + assert.Equal(t, finalNumber+1, finalNumberAfterExtneralDelete) + } func TestDeleteICAsUser(t *testing.T) { @@ -341,13 +540,16 @@ func TestDeleteICAsUser(t *testing.T) { // test POST ic/ $newIC newIC := ICRequest{ - UUID: helper.ICA.UUID, - Host: helper.ICA.Host, - Type: helper.ICA.Type, - Name: helper.ICA.Name, - Category: helper.ICA.Category, - State: helper.ICA.State, - Properties: helper.ICA.Properties, + UUID: helper.ICA.UUID, + WebsocketURL: helper.ICA.WebsocketURL, + Type: helper.ICA.Type, + Name: helper.ICA.Name, + Category: helper.ICA.Category, + State: helper.ICA.State, + Location: helper.ICA.Location, + Description: helper.ICA.Description, + StartParameterScheme: helper.ICA.StartParameterScheme, + ManagedExternally: newFalse(), } code, resp, err := helper.TestEndpoint(router, token, "/api/ic", "POST", helper.KeyModels{"ic": newIC}) @@ -365,7 +567,7 @@ func TestDeleteICAsUser(t *testing.T) { // Test DELETE ICs // This should fail with unprocessable entity status code 422 - newIC.Host = "ThisIsMyNewHost" + newIC.WebsocketURL = "ThisIsMyNewURL" code, resp, err = helper.TestEndpoint(router, token, fmt.Sprintf("/api/ic/%v", newICID), "DELETE", nil) assert.NoError(t, err) @@ -389,13 +591,16 @@ func TestGetAllICs(t *testing.T) { // test POST ic/ $newICA newICA := ICRequest{ - UUID: helper.ICA.UUID, - Host: helper.ICA.Host, - Type: helper.ICA.Type, - Name: helper.ICA.Name, - Category: helper.ICA.Category, - State: helper.ICA.State, - Properties: helper.ICA.Properties, + UUID: helper.ICA.UUID, + WebsocketURL: helper.ICA.WebsocketURL, + Type: helper.ICA.Type, + Name: helper.ICA.Name, + Category: helper.ICA.Category, + State: helper.ICA.State, + Location: helper.ICA.Location, + Description: helper.ICA.Description, + StartParameterScheme: helper.ICA.StartParameterScheme, + ManagedExternally: newFalse(), } code, resp, err := helper.TestEndpoint(router, token, "/api/ic", "POST", helper.KeyModels{"ic": newICA}) @@ -404,14 +609,18 @@ func TestGetAllICs(t *testing.T) { // test POST ic/ $newICB newICB := ICRequest{ - UUID: helper.ICB.UUID, - Host: helper.ICB.Host, - Type: helper.ICB.Type, - Name: helper.ICB.Name, - Category: helper.ICB.Category, - State: helper.ICB.State, - Properties: helper.ICB.Properties, + UUID: helper.ICB.UUID, + WebsocketURL: helper.ICB.WebsocketURL, + Type: helper.ICB.Type, + Name: helper.ICB.Name, + Category: helper.ICB.Category, + State: helper.ICB.State, + Location: helper.ICB.Location, + Description: helper.ICB.Description, + StartParameterScheme: helper.ICB.StartParameterScheme, + ManagedExternally: newFalse(), } + code, resp, err = helper.TestEndpoint(router, token, "/api/ic", "POST", helper.KeyModels{"ic": newICB}) assert.NoError(t, err) @@ -449,13 +658,16 @@ func TestGetConfigsOfIC(t *testing.T) { // test POST ic/ $newICA newICA := ICRequest{ - UUID: helper.ICA.UUID, - Host: helper.ICA.Host, - Type: helper.ICA.Type, - Name: helper.ICA.Name, - Category: helper.ICA.Category, - State: helper.ICA.State, - Properties: helper.ICA.Properties, + UUID: helper.ICA.UUID, + WebsocketURL: helper.ICA.WebsocketURL, + Type: helper.ICA.Type, + Name: helper.ICA.Name, + Category: helper.ICA.Category, + State: helper.ICA.State, + Location: helper.ICA.Location, + Description: helper.ICA.Description, + StartParameterScheme: helper.ICA.StartParameterScheme, + ManagedExternally: newFalse(), } code, resp, err := helper.TestEndpoint(router, token, "/api/ic", "POST", helper.KeyModels{"ic": newICA}) @@ -467,7 +679,6 @@ func TestGetConfigsOfIC(t *testing.T) { assert.NoError(t, err) // test GET ic/ID/confis - // TODO how to properly test this without using component configuration endpoints? numberOfConfigs, err := helper.LengthOfResponse(router, token, fmt.Sprintf("/api/ic/%v/configs", newICID), "GET", nil) assert.NoError(t, err) @@ -481,7 +692,6 @@ func TestGetConfigsOfIC(t *testing.T) { assert.NoError(t, err) // test GET ic/ID/configs - // TODO how to properly test this without using component configuration endpoints? numberOfConfigs, err = helper.LengthOfResponse(router, token, fmt.Sprintf("/api/ic/%v/configs", newICID), "GET", nil) assert.NoError(t, err) @@ -496,3 +706,332 @@ func TestGetConfigsOfIC(t *testing.T) { assert.NoError(t, err) assert.Equalf(t, 404, code, "Response body: \n%v\n", resp) } + +func TestSendActionToIC(t *testing.T) { + database.DropTables() + database.MigrateModels() + assert.NoError(t, helper.DBAddAdminAndUserAndGuest()) + + // authenticate as admin + token, err := helper.AuthenticateForTest(router, + "/api/authenticate", "POST", helper.AdminCredentials) + assert.NoError(t, err) + + // test POST ic/ $newICA + newICA := ICRequest{ + UUID: helper.ICA.UUID, + WebsocketURL: helper.ICA.WebsocketURL, + Type: helper.ICA.Type, + Name: helper.ICA.Name, + Category: helper.ICA.Category, + State: helper.ICA.State, + Location: helper.ICA.Location, + Description: helper.ICA.Description, + StartParameterScheme: helper.ICA.StartParameterScheme, + ManagedExternally: newFalse(), + } + code, resp, err := helper.TestEndpoint(router, token, + "/api/ic", "POST", helper.KeyModels{"ic": newICA}) + assert.NoError(t, err) + assert.Equalf(t, 200, code, "Response body: \n%v\n", resp) + + // Read newIC's ID from the response + newICID, err := helper.GetResponseID(resp) + assert.NoError(t, err) + + // create action to be sent to IC + action1 := ICAction{ + Act: "start", + When: time.Now().Unix(), + } + action1.Properties.UUID = new(string) + *action1.Properties.UUID = newICA.UUID + actions := [1]ICAction{action1} + + // Send action to IC + code, resp, err = helper.TestEndpoint(router, token, + fmt.Sprintf("/api/ic/%v/action", newICID), "POST", actions) + assert.NoError(t, err) + assert.Equalf(t, 200, code, "Response body: \n%v\n", resp) + + // Send malformed actions array to IC (should yield bad request) + code, resp, err = helper.TestEndpoint(router, token, + fmt.Sprintf("/api/ic/%v/action", newICID), "POST", action1) + assert.NoError(t, err) + assert.Equalf(t, 400, code, "Response body: \n%v\n", resp) +} + +func TestCreateUpdateViaAMQPRecv(t *testing.T) { + + database.DropTables() + database.MigrateModels() + assert.NoError(t, helper.DBAddAdminAndUserAndGuest()) + + // authenticate as admin + token, err := helper.AuthenticateForTest(router, + "/api/authenticate", "POST", helper.AdminCredentials) + assert.NoError(t, err) + + // fake an IC update message + var update ICUpdate + update.Status = new(ICStatus) + update.Status.UUID = helper.ICA.UUID + update.Status.State = new(string) + *update.Status.State = "idle" + + payload, err := json.Marshal(update) + assert.NoError(t, err) + + msg := amqp.Publishing{ + DeliveryMode: 2, + Timestamp: time.Now(), + ContentType: "application/json", + ContentEncoding: "utf-8", + Priority: 0, + Body: payload, + } + + err = CheckConnection() + assert.NoError(t, err) + err = client.channel.Publish(VILLAS_EXCHANGE, + "", + false, + false, + msg) + assert.NoError(t, err) + + time.Sleep(waitingTime * time.Second) + + // get the length of the GET all ICs response for user + number, err := helper.LengthOfResponse(router, token, + "/api/ic", "GET", nil) + assert.NoError(t, err) + assert.Equal(t, 0, number) + + // complete the (required) data of an IC + update.Status.Name = new(string) + *update.Status.Name = helper.ICA.Name + update.Status.Category = new(string) + *update.Status.Category = helper.ICA.Category + update.Status.Type = new(string) + *update.Status.Type = helper.ICA.Type + update.Status.Uptime = new(float64) + *update.Status.Uptime = -1.0 + update.Status.WS_url = new(string) + *update.Status.WS_url = helper.ICA.WebsocketURL + update.Status.API_url = new(string) + *update.Status.API_url = helper.ICA.APIURL + update.Status.Description = new(string) + *update.Status.Description = helper.ICA.Description + update.Status.Location = new(string) + *update.Status.Location = helper.ICA.Location + + payload, err = json.Marshal(update) + assert.NoError(t, err) + + msg = amqp.Publishing{ + DeliveryMode: 2, + Timestamp: time.Now(), + ContentType: "application/json", + ContentEncoding: "utf-8", + Priority: 0, + Body: payload, + } + + err = client.channel.Publish(VILLAS_EXCHANGE, + "", + false, + false, + msg) + assert.NoError(t, err) + + time.Sleep(waitingTime * time.Second) + + // get the length of the GET all ICs response for user + number, err = helper.LengthOfResponse(router, token, + "/api/ic", "GET", nil) + assert.NoError(t, err) + assert.Equal(t, 1, number) + + // modify status update + *update.Status.Name = "This is the new name" + payload, err = json.Marshal(update) + assert.NoError(t, err) + + msg = amqp.Publishing{ + DeliveryMode: 2, + Timestamp: time.Now(), + ContentType: "application/json", + ContentEncoding: "utf-8", + Priority: 0, + Body: payload, + } + + err = client.channel.Publish(VILLAS_EXCHANGE, + "", + false, + false, + msg) + assert.NoError(t, err) + + time.Sleep(waitingTime * time.Second) + // get the length of the GET all ICs response for user + number, err = helper.LengthOfResponse(router, token, + "/api/ic", "GET", nil) + assert.NoError(t, err) + assert.Equal(t, 1, number) + +} + +func TestDeleteICViaAMQPRecv(t *testing.T) { + + database.DropTables() + database.MigrateModels() + assert.NoError(t, helper.DBAddAdminAndUserAndGuest()) + + // authenticate as admin + token, err := helper.AuthenticateForTest(router, + "/api/authenticate", "POST", helper.AdminCredentials) + assert.NoError(t, err) + + // fake an IC update message + var update ICUpdate + update.Status = new(ICStatus) + update.Status.UUID = helper.ICA.UUID + update.Status.State = new(string) + *update.Status.State = "idle" + // complete the (required) data of an IC + update.Status.Name = new(string) + *update.Status.Name = helper.ICA.Name + update.Status.Category = new(string) + *update.Status.Category = helper.ICA.Category + update.Status.Type = new(string) + *update.Status.Type = helper.ICA.Type + update.Status.Uptime = new(float64) + *update.Status.Uptime = -1.0 + update.Status.WS_url = new(string) + *update.Status.WS_url = helper.ICA.WebsocketURL + update.Status.API_url = new(string) + *update.Status.API_url = helper.ICA.APIURL + update.Status.Description = new(string) + *update.Status.Description = helper.ICA.Description + update.Status.Location = new(string) + *update.Status.Location = helper.ICA.Location + + payload, err := json.Marshal(update) + assert.NoError(t, err) + + msg := amqp.Publishing{ + DeliveryMode: 2, + Timestamp: time.Now(), + ContentType: "application/json", + ContentEncoding: "utf-8", + Priority: 0, + Body: payload, + } + + err = CheckConnection() + assert.NoError(t, err) + err = client.channel.Publish(VILLAS_EXCHANGE, + "", + false, + false, + msg) + assert.NoError(t, err) + + time.Sleep(waitingTime * time.Second) + + // get the length of the GET all ICs response for user + number, err := helper.LengthOfResponse(router, token, + "/api/ic", "GET", nil) + assert.NoError(t, err) + assert.Equal(t, 1, number) + + // add scenario + newScenario := ScenarioRequest{ + Name: helper.ScenarioA.Name, + Running: helper.ScenarioA.Running, + StartParameters: helper.ScenarioA.StartParameters, + } + + code, resp, err := helper.TestEndpoint(router, token, + "/api/scenarios", "POST", helper.KeyModels{"scenario": newScenario}) + assert.NoError(t, err) + assert.Equalf(t, 200, code, "Response body: \n%v\n", resp) + + // Compare POST's response with the newScenario + err = helper.CompareResponse(resp, helper.KeyModels{"scenario": newScenario}) + assert.NoError(t, err) + + // Read newScenario's ID from the response + newScenarioID, err := helper.GetResponseID(resp) + assert.NoError(t, err) + + // Add component config and associate with IC and scenario + newConfig := ConfigRequest{ + Name: helper.ConfigA.Name, + ScenarioID: uint(newScenarioID), + ICID: 1, + StartParameters: helper.ConfigA.StartParameters, + FileIDs: helper.ConfigA.FileIDs, + } + + code, resp, err = helper.TestEndpoint(router, token, + "/api/configs", "POST", helper.KeyModels{"config": newConfig}) + assert.NoError(t, err) + assert.Equalf(t, 200, code, "Response body: \n%v\n", resp) + + // Compare POST's response with the newConfig + err = helper.CompareResponse(resp, helper.KeyModels{"config": newConfig}) + assert.NoError(t, err) + + // Read newConfig's ID from the response + newConfigID, err := helper.GetResponseID(resp) + assert.NoError(t, err) + + // modify status update to state "gone" + *update.Status.State = "gone" + payload, err = json.Marshal(update) + assert.NoError(t, err) + + msg = amqp.Publishing{ + DeliveryMode: 2, + Timestamp: time.Now(), + ContentType: "application/json", + ContentEncoding: "utf-8", + Priority: 0, + Body: payload, + } + + // attempt to delete IC (should not work immediately because IC is still associated with component config) + err = client.channel.Publish(VILLAS_EXCHANGE, + "", + false, + false, + msg) + assert.NoError(t, err) + + time.Sleep(waitingTime * time.Second) + + // get the length of the GET all ICs response for user + number, err = helper.LengthOfResponse(router, token, + "/api/ic", "GET", nil) + assert.NoError(t, err) + assert.Equal(t, 1, number) + + // Delete component config from earlier + code, resp, err = helper.TestEndpoint(router, token, + fmt.Sprintf("/api/configs/%v", newConfigID), "DELETE", nil) + assert.NoError(t, err) + assert.Equalf(t, 200, code, "Response body: \n%v\n", resp) + + // Compare DELETE's response with the newConfig + err = helper.CompareResponse(resp, helper.KeyModels{"config": newConfig}) + assert.NoError(t, err) + + // get the length of the GET all ICs response for user + number, err = helper.LengthOfResponse(router, token, + "/api/ic", "GET", nil) + assert.NoError(t, err) + assert.Equal(t, 0, number) +} diff --git a/routes/infrastructure-component/ic_validators.go b/routes/infrastructure-component/ic_validators.go index 6ecc03b..fae7369 100644 --- a/routes/infrastructure-component/ic_validators.go +++ b/routes/infrastructure-component/ic_validators.go @@ -23,34 +23,43 @@ package infrastructure_component import ( "encoding/json" + "github.com/google/uuid" "github.com/jinzhu/gorm/dialects/postgres" "github.com/nsf/jsondiff" "gopkg.in/go-playground/validator.v9" + "log" "time" ) var validate *validator.Validate type validNewIC struct { - UUID string `form:"UUID" validate:"required"` - Host string `form:"Host" validate:"omitempty"` - APIHost string `form:"APIHost" validate:"omitempty"` - Type string `form:"Type" validate:"required"` - Name string `form:"Name" validate:"required"` - Category string `form:"Category" validate:"required"` - Properties postgres.Jsonb `form:"Properties" validate:"omitempty"` - State string `form:"State" validate:"omitempty"` + UUID string `form:"UUID" validate:"omitempty"` + WebsocketURL string `form:"WebsocketURL" validate:"omitempty"` + APIURL string `form:"APIURL" validate:"omitempty"` + Type string `form:"Type" validate:"required"` + Name string `form:"Name" validate:"required"` + Category string `form:"Category" validate:"required"` + State string `form:"State" validate:"omitempty"` + Location string `form:"Location" validate:"omitempty"` + Description string `form:"Description" validate:"omitempty"` + StartParameterScheme postgres.Jsonb `form:"StartParameterScheme" validate:"omitempty"` + ManagedExternally *bool `form:"ManagedExternally" validate:"required"` + Uptime float64 `form:"Uptime" validate:"omitempty"` } type validUpdatedIC struct { - UUID string `form:"UUID" validate:"omitempty"` - Host string `form:"Host" validate:"omitempty"` - APIHost string `form:"APIHost" validate:"omitempty"` - Type string `form:"Type" validate:"omitempty"` - Name string `form:"Name" validate:"omitempty"` - Category string `form:"Category" validate:"omitempty"` - Properties postgres.Jsonb `form:"Properties" validate:"omitempty"` - State string `form:"State" validate:"omitempty"` + UUID string `form:"UUID" validate:"omitempty"` + WebsocketURL string `form:"WebsocketURL" validate:"omitempty"` + APIURL string `form:"APIURL" validate:"omitempty"` + Type string `form:"Type" validate:"omitempty"` + Name string `form:"Name" validate:"omitempty"` + Category string `form:"Category" validate:"omitempty"` + State string `form:"State" validate:"omitempty"` + Location string `form:"Location" validate:"omitempty"` + Description string `form:"Description" validate:"omitempty"` + StartParameterScheme postgres.Jsonb `form:"StartParameterScheme" validate:"omitempty"` + Uptime float64 `form:"Uptime" validate:"omitempty"` } type AddICRequest struct { @@ -61,28 +70,83 @@ type UpdateICRequest struct { InfrastructureComponent validUpdatedIC `json:"ic"` } -func (r *AddICRequest) Validate() error { +func (r *AddICRequest) validate() error { + validate = validator.New() + errs := validate.Struct(r) + if errs != nil { + return errs + } + + // check if uuid is valid + _, errs = uuid.Parse(r.InfrastructureComponent.UUID) + return errs +} + +func (r *UpdateICRequest) validate() error { validate = validator.New() errs := validate.Struct(r) return errs } -func (r *UpdateICRequest) Validate() error { - validate = validator.New() - errs := validate.Struct(r) - return errs -} - -func (r *AddICRequest) CreateIC() InfrastructureComponent { +func (r *AddICRequest) createIC(receivedViaAMQP bool) (InfrastructureComponent, error) { var s InfrastructureComponent + var err error + err = nil + + // case distinction for externally managed IC + if *r.InfrastructureComponent.ManagedExternally && !receivedViaAMQP { + var action Action + action.Act = "create" + action.When = time.Now().Unix() + action.Properties.Type = new(string) + action.Properties.Name = new(string) + action.Properties.Category = new(string) + + *action.Properties.Type = r.InfrastructureComponent.Type + *action.Properties.Name = r.InfrastructureComponent.Name + *action.Properties.Category = r.InfrastructureComponent.Category + + // set optional properties + if r.InfrastructureComponent.Description != "" { + action.Properties.Description = new(string) + *action.Properties.Description = r.InfrastructureComponent.Description + } + + if r.InfrastructureComponent.Location != "" { + action.Properties.Location = new(string) + *action.Properties.Location = r.InfrastructureComponent.Location + } + + if r.InfrastructureComponent.APIURL != "" { + action.Properties.API_url = new(string) + *action.Properties.API_url = r.InfrastructureComponent.APIURL + } + + if r.InfrastructureComponent.WebsocketURL != "" { + action.Properties.WS_url = new(string) + *action.Properties.WS_url = r.InfrastructureComponent.WebsocketURL + } + + if r.InfrastructureComponent.UUID != "" { + action.Properties.UUID = new(string) + *action.Properties.UUID = r.InfrastructureComponent.UUID + } + + log.Println("AMQP: Sending request to create new IC") + err = sendActionAMQP(action) + } s.UUID = r.InfrastructureComponent.UUID - s.Host = r.InfrastructureComponent.Host - s.APIHost = r.InfrastructureComponent.APIHost + s.WebsocketURL = r.InfrastructureComponent.WebsocketURL + s.APIURL = r.InfrastructureComponent.APIURL s.Type = r.InfrastructureComponent.Type s.Name = r.InfrastructureComponent.Name s.Category = r.InfrastructureComponent.Category - s.Properties = r.InfrastructureComponent.Properties + s.Location = r.InfrastructureComponent.Location + s.Description = r.InfrastructureComponent.Description + s.StartParameterScheme = r.InfrastructureComponent.StartParameterScheme + s.ManagedExternally = *r.InfrastructureComponent.ManagedExternally + s.Uptime = -1.0 // no uptime available if r.InfrastructureComponent.State != "" { s.State = r.InfrastructureComponent.State } else { @@ -91,10 +155,10 @@ func (r *AddICRequest) CreateIC() InfrastructureComponent { // set last update to creation time of IC s.StateUpdateAt = time.Now().Format(time.RFC1123) - return s + return s, err } -func (r *UpdateICRequest) UpdatedIC(oldIC InfrastructureComponent) InfrastructureComponent { +func (r *UpdateICRequest) updatedIC(oldIC InfrastructureComponent) InfrastructureComponent { // Use the old InfrastructureComponent as a basis for the updated InfrastructureComponent `s` s := oldIC @@ -102,12 +166,12 @@ func (r *UpdateICRequest) UpdatedIC(oldIC InfrastructureComponent) Infrastructur s.UUID = r.InfrastructureComponent.UUID } - if r.InfrastructureComponent.Host != "" { - s.Host = r.InfrastructureComponent.Host + if r.InfrastructureComponent.WebsocketURL != "" { + s.WebsocketURL = r.InfrastructureComponent.WebsocketURL } - if r.InfrastructureComponent.APIHost != "" { - s.APIHost = r.InfrastructureComponent.APIHost + if r.InfrastructureComponent.APIURL != "" { + s.APIURL = r.InfrastructureComponent.APIURL } if r.InfrastructureComponent.Type != "" { @@ -126,6 +190,14 @@ func (r *UpdateICRequest) UpdatedIC(oldIC InfrastructureComponent) Infrastructur s.State = r.InfrastructureComponent.State } + if r.InfrastructureComponent.Location != "" { + s.Location = r.InfrastructureComponent.Location + } + + if r.InfrastructureComponent.Description != "" { + s.Description = r.InfrastructureComponent.Description + } + // set last update time s.StateUpdateAt = time.Now().Format(time.RFC1123) @@ -133,11 +205,11 @@ func (r *UpdateICRequest) UpdatedIC(oldIC InfrastructureComponent) Infrastructur var emptyJson postgres.Jsonb // Serialize empty json and params emptyJson_ser, _ := json.Marshal(emptyJson) - startParams_ser, _ := json.Marshal(r.InfrastructureComponent.Properties) + startParams_ser, _ := json.Marshal(r.InfrastructureComponent.StartParameterScheme) opts := jsondiff.DefaultConsoleOptions() diff, _ := jsondiff.Compare(emptyJson_ser, startParams_ser, &opts) if diff.String() != "FullMatch" { - s.Properties = r.InfrastructureComponent.Properties + s.StartParameterScheme = r.InfrastructureComponent.StartParameterScheme } return s diff --git a/routes/register.go b/routes/register.go index 4d49d79..3cc1078 100644 --- a/routes/register.go +++ b/routes/register.go @@ -117,9 +117,12 @@ func AddTestData(cfg *config.Config, router *gin.Engine) (*bytes.Buffer, error) if code != http.StatusOK { return resp, fmt.Errorf("error adding IC A") } - code, resp, err = helper.TestEndpoint(router, token, basePath+"/ic", "POST", helper.KeyModels{"ic": helper.ICB}) - if code != http.StatusOK { - return resp, fmt.Errorf("error adding IC B") + amqphost, err := cfg.String("amqp.host") + if err != nil && amqphost != "" { + code, resp, err = helper.TestEndpoint(router, token, basePath+"/ic", "POST", helper.KeyModels{"ic": helper.ICB}) + if code != http.StatusOK { + return resp, fmt.Errorf("error adding IC B") + } } // add scenarios @@ -155,7 +158,7 @@ func AddTestData(cfg *config.Config, router *gin.Engine) (*bytes.Buffer, error) configB := helper.ConfigB configA.ScenarioID = 1 configB.ScenarioID = 1 - configA.ICID = 2 + configA.ICID = 1 configB.ICID = 1 code, resp, err = helper.TestEndpoint(router, token, basePath+"/configs", "POST", helper.KeyModels{"config": configA}) if code != http.StatusOK { diff --git a/routes/register_test.go b/routes/register_test.go index 4f76ba9..6d9c20b 100644 --- a/routes/register_test.go +++ b/routes/register_test.go @@ -28,11 +28,13 @@ import ( "git.rwth-aachen.de/acs/public/villas/web-backend-go/configuration" "git.rwth-aachen.de/acs/public/villas/web-backend-go/database" + infrastructure_component "git.rwth-aachen.de/acs/public/villas/web-backend-go/routes/infrastructure-component" "github.com/gin-gonic/gin" "github.com/stretchr/testify/assert" ) var router *gin.Engine +var amqpURI string func TestMain(m *testing.M) { err := configuration.InitConfig() @@ -48,6 +50,15 @@ func TestMain(m *testing.M) { router = gin.Default() + // connect AMQP client (make sure that AMQP_HOST, AMQP_USER, AMQP_PASS are set via command line parameters) + host, err := configuration.GolbalConfig.String("amqp.host") + user, err := configuration.GolbalConfig.String("amqp.user") + pass, err := configuration.GolbalConfig.String("amqp.pass") + + amqpURI := "amqp://" + user + ":" + pass + "@" + host + + err = infrastructure_component.ConnectAMQP(amqpURI) + os.Exit(m.Run()) } diff --git a/routes/signal/signal_test.go b/routes/signal/signal_test.go index 9ab5ef1..045d53c 100644 --- a/routes/signal/signal_test.go +++ b/routes/signal/signal_test.go @@ -56,13 +56,16 @@ type ConfigRequest struct { } type ICRequest struct { - UUID string `json:"uuid,omitempty"` - Host string `json:"host,omitempty"` - Type string `json:"type,omitempty"` - Name string `json:"name,omitempty"` - Category string `json:"category,omitempty"` - State string `json:"state,omitempty"` - Properties postgres.Jsonb `json:"properties,omitempty"` + UUID string `json:"uuid,omitempty"` + WebsocketURL string `json:"websocketurl,omitempty"` + Type string `json:"type,omitempty"` + Name string `json:"name,omitempty"` + Category string `json:"category,omitempty"` + State string `json:"state,omitempty"` + Location string `json:"location,omitempty"` + Description string `json:"description,omitempty"` + StartParameterScheme postgres.Jsonb `json:"startparameterscheme,omitempty"` + ManagedExternally *bool `json:"managedexternally,omitempty"` } type ScenarioRequest struct { @@ -79,13 +82,16 @@ func addScenarioAndICAndConfig() (scenarioID uint, ICID uint, configID uint) { // POST $newICA newICA := ICRequest{ - UUID: helper.ICA.UUID, - Host: helper.ICA.Host, - Type: helper.ICA.Type, - Name: helper.ICA.Name, - Category: helper.ICA.Category, - State: helper.ICA.State, - Properties: helper.ICA.Properties, + UUID: helper.ICA.UUID, + WebsocketURL: helper.ICA.WebsocketURL, + Type: helper.ICA.Type, + Name: helper.ICA.Name, + Category: helper.ICA.Category, + State: helper.ICA.State, + Location: helper.ICA.Location, + Description: helper.ICA.Description, + StartParameterScheme: helper.ICA.StartParameterScheme, + ManagedExternally: &helper.ICA.ManagedExternally, } _, resp, _ := helper.TestEndpoint(router, token, "/api/ic", "POST", helper.KeyModels{"ic": newICA}) diff --git a/start.go b/start.go index 3b0cf5a..96b2219 100644 --- a/start.go +++ b/start.go @@ -25,12 +25,12 @@ import ( "fmt" "log" - "git.rwth-aachen.de/acs/public/villas/web-backend-go/amqp" "git.rwth-aachen.de/acs/public/villas/web-backend-go/configuration" "git.rwth-aachen.de/acs/public/villas/web-backend-go/database" apidocs "git.rwth-aachen.de/acs/public/villas/web-backend-go/doc/api" // doc/api folder is used by Swag CLI, you have to import it "git.rwth-aachen.de/acs/public/villas/web-backend-go/helper" "git.rwth-aachen.de/acs/public/villas/web-backend-go/routes" + "git.rwth-aachen.de/acs/public/villas/web-backend-go/routes/infrastructure-component" "github.com/gin-gonic/gin" "github.com/zpatrick/go-config" ) @@ -96,22 +96,22 @@ func main() { apidocs.SwaggerInfo.Host = baseHost apidocs.SwaggerInfo.BasePath = basePath + //Start AMQP client + if amqphost != "" { + // create amqp URL based on username, password and host + amqpurl := "amqp://" + amqpuser + ":" + amqppass + "@" + amqphost + err = infrastructure_component.StartAMQP(amqpurl, api) + if err != nil { + panic(err) + } + } + // add data to DB (if any) err = addData(r, configuration.GolbalConfig) if err != nil { panic(err) } - //Start AMQP client - if amqphost != "" { - // create amqp URL based on username, password and host - amqpurl := "amqp://" + amqpuser + ":" + amqppass + "@" + amqphost - err = amqp.StartAMQP(amqpurl, api) - if err != nil { - panic(err) - } - } - // server at port 4000 to match frontend's redirect path r.Run(":" + port) }