mirror of
https://git.rwth-aachen.de/acs/public/villas/web-backend-go/
synced 2025-03-30 00:00:12 +01:00
AMQP: Create unknown ICs that are received via AMQP automatically
This commit is contained in:
parent
f638ae59ed
commit
95159f54ec
4 changed files with 80 additions and 28 deletions
|
@ -25,8 +25,11 @@ import (
|
|||
"encoding/json"
|
||||
"fmt"
|
||||
"git.rwth-aachen.de/acs/public/villas/web-backend-go/database"
|
||||
infrastructure_component "git.rwth-aachen.de/acs/public/villas/web-backend-go/routes/infrastructure-component"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/google/uuid"
|
||||
"github.com/jinzhu/gorm"
|
||||
"github.com/jinzhu/gorm/dialects/postgres"
|
||||
"github.com/streadway/amqp"
|
||||
"log"
|
||||
"time"
|
||||
|
@ -48,11 +51,15 @@ type Action struct {
|
|||
Results struct{} `json:"results"`
|
||||
}
|
||||
|
||||
type Update struct {
|
||||
State string `json:"state"`
|
||||
type ICUpdate struct {
|
||||
State *string `json:"state"`
|
||||
//Uptime float64 `json:"uptime"`
|
||||
Properties struct {
|
||||
UUID string `json:"uuid"`
|
||||
UUID string `json:"uuid"`
|
||||
Name *string `json:"name"`
|
||||
Category *string `json:"category"`
|
||||
Type *string `json:"type"`
|
||||
Location *string `json:"location"`
|
||||
} `json:"properties"`
|
||||
}
|
||||
|
||||
|
@ -123,7 +130,7 @@ func ConnectAMQP(uri string) error {
|
|||
// fmt.Println("AMQP: Unable to ack message:", err)
|
||||
//}
|
||||
|
||||
var payload Update
|
||||
var payload ICUpdate
|
||||
err := json.Unmarshal(message.Body, &payload)
|
||||
if err != nil {
|
||||
log.Println("AMQP: Could not unmarshal message to JSON:", string(message.Body), "err: ", err)
|
||||
|
@ -139,24 +146,69 @@ func ConnectAMQP(uri string) error {
|
|||
|
||||
var sToBeUpdated database.InfrastructureComponent
|
||||
err = db.Find(&sToBeUpdated, "UUID = ?", ICUUID).Error
|
||||
if err != nil {
|
||||
log.Println("AMQP: Unable to find IC with UUID: ", ICUUID, " DB error message: ", err)
|
||||
if err == gorm.ErrRecordNotFound {
|
||||
// create new record
|
||||
var newICReq infrastructure_component.AddICRequest
|
||||
newICReq.InfrastructureComponent.UUID = payload.Properties.UUID
|
||||
if payload.Properties.Name == nil ||
|
||||
payload.Properties.Category == nil ||
|
||||
payload.Properties.Type == nil {
|
||||
// cannot create new IC because required information (name, type, and/or category missing)
|
||||
log.Println("AMQP: Cannot create new IC, required field(s) is/are missing: name, type, category")
|
||||
continue
|
||||
}
|
||||
newICReq.InfrastructureComponent.Name = *payload.Properties.Name
|
||||
newICReq.InfrastructureComponent.Category = *payload.Properties.Category
|
||||
newICReq.InfrastructureComponent.Category = *payload.Properties.Type
|
||||
|
||||
// add optional params
|
||||
if payload.State != nil {
|
||||
newICReq.InfrastructureComponent.State = *payload.State
|
||||
} else {
|
||||
newICReq.InfrastructureComponent.State = "unknown"
|
||||
}
|
||||
if payload.Properties.Location != nil {
|
||||
newICReq.InfrastructureComponent.Properties = postgres.Jsonb{json.RawMessage(`{"location" : " ` + *payload.Properties.Location + `"}`)}
|
||||
}
|
||||
|
||||
// Validate the new IC
|
||||
if err = newICReq.Validate(); err != nil {
|
||||
log.Println("AMQP: Validation of new IC failed:", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Create the new IC
|
||||
newIC := newICReq.CreateIC()
|
||||
|
||||
// save IC
|
||||
err = newIC.Save()
|
||||
if err != nil {
|
||||
log.Println("AMQP: Saving new IC to DB failed:", err)
|
||||
continue
|
||||
}
|
||||
|
||||
log.Println("AMQP: Created IC ", newIC.Name)
|
||||
|
||||
} else if err != nil {
|
||||
log.Println("AMQP: Database error for IC", ICUUID, " DB error message: ", err)
|
||||
continue
|
||||
}
|
||||
//var stateUpdateAt = message.Timestamp.UTC()
|
||||
err = db.Model(&sToBeUpdated).Updates(map[string]interface{}{
|
||||
//"Host": gjson.Get(content, "host"),
|
||||
//"Type": gjson.Get(content, "model"),
|
||||
//"Uptime": math.Round(payload.Uptime),
|
||||
"State": payload.State,
|
||||
//"StateUpdateAt": stateUpdateAt.Format(time.RFC1123),
|
||||
//"RawProperties": gjson.Get(content, "properties"),
|
||||
}).Error
|
||||
if err != nil {
|
||||
log.Println("AMQP: Unable to update IC", sToBeUpdated.Name, "in DB: ", err)
|
||||
} else {
|
||||
//var stateUpdateAt = message.Timestamp.UTC()
|
||||
err = db.Model(&sToBeUpdated).Updates(map[string]interface{}{
|
||||
//"Host": gjson.Get(content, "host"),
|
||||
//"Type": gjson.Get(content, "model"),
|
||||
//"Uptime": math.Round(payload.Uptime),
|
||||
"State": *payload.State,
|
||||
//"StateUpdateAt": stateUpdateAt.Format(time.RFC1123),
|
||||
//"RawProperties": gjson.Get(content, "properties"),
|
||||
}).Error
|
||||
if err != nil {
|
||||
log.Println("AMQP: Unable to update IC", sToBeUpdated.Name, "in DB: ", err)
|
||||
}
|
||||
|
||||
log.Println("AMQP: Updated IC ", sToBeUpdated.Name)
|
||||
}
|
||||
|
||||
log.Println("AMQP: Updated IC ", sToBeUpdated.Name)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
|
|
@ -73,7 +73,7 @@ func getICs(c *gin.Context) {
|
|||
// @Failure 404 {object} docs.ResponseError "Not found"
|
||||
// @Failure 422 {object} docs.ResponseError "Unprocessable entity"
|
||||
// @Failure 500 {object} docs.ResponseError "Internal server error"
|
||||
// @Param inputIC body infrastructure_component.addICRequest true "Infrastructure Component to be added"
|
||||
// @Param inputIC body infrastructure_component.AddICRequest true "Infrastructure Component to be added"
|
||||
// @Router /ic [post]
|
||||
// @Security Bearer
|
||||
func addIC(c *gin.Context) {
|
||||
|
@ -83,7 +83,7 @@ func addIC(c *gin.Context) {
|
|||
return
|
||||
}
|
||||
|
||||
var req addICRequest
|
||||
var req AddICRequest
|
||||
err := c.BindJSON(&req)
|
||||
if err != nil {
|
||||
helper.BadRequestError(c, "Error binding form data to JSON: "+err.Error())
|
||||
|
@ -91,16 +91,16 @@ func addIC(c *gin.Context) {
|
|||
}
|
||||
|
||||
// Validate the request
|
||||
if err = req.validate(); err != nil {
|
||||
if err = req.Validate(); err != nil {
|
||||
helper.UnprocessableEntityError(c, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// Create the new IC from the request
|
||||
newIC := req.createIC()
|
||||
newIC := req.CreateIC()
|
||||
|
||||
// Save new IC to DB
|
||||
err = newIC.save()
|
||||
err = newIC.Save()
|
||||
if !helper.DBError(c, err) {
|
||||
c.JSON(http.StatusOK, gin.H{"ic": newIC.InfrastructureComponent})
|
||||
}
|
||||
|
|
|
@ -31,7 +31,7 @@ type InfrastructureComponent struct {
|
|||
database.InfrastructureComponent
|
||||
}
|
||||
|
||||
func (s *InfrastructureComponent) save() error {
|
||||
func (s *InfrastructureComponent) Save() error {
|
||||
db := database.GetDB()
|
||||
err := db.Create(s).Error
|
||||
return err
|
||||
|
|
|
@ -53,7 +53,7 @@ type validUpdatedIC struct {
|
|||
State string `form:"State" validate:"omitempty"`
|
||||
}
|
||||
|
||||
type addICRequest struct {
|
||||
type AddICRequest struct {
|
||||
InfrastructureComponent validNewIC `json:"ic"`
|
||||
}
|
||||
|
||||
|
@ -61,7 +61,7 @@ type updateICRequest struct {
|
|||
InfrastructureComponent validUpdatedIC `json:"ic"`
|
||||
}
|
||||
|
||||
func (r *addICRequest) validate() error {
|
||||
func (r *AddICRequest) Validate() error {
|
||||
validate = validator.New()
|
||||
errs := validate.Struct(r)
|
||||
return errs
|
||||
|
@ -73,7 +73,7 @@ func (r *validUpdatedIC) validate() error {
|
|||
return errs
|
||||
}
|
||||
|
||||
func (r *addICRequest) createIC() InfrastructureComponent {
|
||||
func (r *AddICRequest) CreateIC() InfrastructureComponent {
|
||||
var s InfrastructureComponent
|
||||
|
||||
s.UUID = r.InfrastructureComponent.UUID
|
||||
|
|
Loading…
Add table
Reference in a new issue