mirror of
https://git.rwth-aachen.de/acs/public/villas/web-backend-go/
synced 2025-03-30 00:00:12 +01:00
AMQP: Set update time, improve creation and update of ICs via AMQP
This commit is contained in:
parent
b48976095f
commit
91bd51cba1
4 changed files with 56 additions and 28 deletions
|
@ -24,7 +24,6 @@ package amqp
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"git.rwth-aachen.de/acs/public/villas/web-backend-go/database"
|
|
||||||
infrastructure_component "git.rwth-aachen.de/acs/public/villas/web-backend-go/routes/infrastructure-component"
|
infrastructure_component "git.rwth-aachen.de/acs/public/villas/web-backend-go/routes/infrastructure-component"
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
|
@ -53,7 +52,6 @@ type Action struct {
|
||||||
|
|
||||||
type ICUpdate struct {
|
type ICUpdate struct {
|
||||||
State *string `json:"state"`
|
State *string `json:"state"`
|
||||||
//Uptime float64 `json:"uptime"`
|
|
||||||
Properties struct {
|
Properties struct {
|
||||||
UUID string `json:"uuid"`
|
UUID string `json:"uuid"`
|
||||||
Name *string `json:"name"`
|
Name *string `json:"name"`
|
||||||
|
@ -122,8 +120,6 @@ func ConnectAMQP(uri string) error {
|
||||||
|
|
||||||
// consuming queue
|
// consuming queue
|
||||||
go func() {
|
go func() {
|
||||||
|
|
||||||
db := database.GetDB()
|
|
||||||
for message := range client.replies {
|
for message := range client.replies {
|
||||||
//err = message.Ack(false)
|
//err = message.Ack(false)
|
||||||
//if err != nil {
|
//if err != nil {
|
||||||
|
@ -142,10 +138,12 @@ func ConnectAMQP(uri string) error {
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("AMQP: UUID not valid: %v, message ignored: %v \n", ICUUID, string(message.Body))
|
log.Printf("AMQP: UUID not valid: %v, message ignored: %v \n", ICUUID, string(message.Body))
|
||||||
|
continue
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
var sToBeUpdated database.InfrastructureComponent
|
var sToBeUpdated infrastructure_component.InfrastructureComponent
|
||||||
err = db.Find(&sToBeUpdated, "UUID = ?", ICUUID).Error
|
err = sToBeUpdated.ByUUID(ICUUID)
|
||||||
|
|
||||||
if err == gorm.ErrRecordNotFound {
|
if err == gorm.ErrRecordNotFound {
|
||||||
// create new record
|
// create new record
|
||||||
var newICReq infrastructure_component.AddICRequest
|
var newICReq infrastructure_component.AddICRequest
|
||||||
|
@ -193,20 +191,41 @@ func ConnectAMQP(uri string) error {
|
||||||
log.Println("AMQP: Database error for IC", ICUUID, " DB error message: ", err)
|
log.Println("AMQP: Database error for IC", ICUUID, " DB error message: ", err)
|
||||||
continue
|
continue
|
||||||
} else {
|
} else {
|
||||||
//var stateUpdateAt = message.Timestamp.UTC()
|
|
||||||
err = db.Model(&sToBeUpdated).Updates(map[string]interface{}{
|
var updatedICReq infrastructure_component.UpdateICRequest
|
||||||
//"Host": gjson.Get(content, "host"),
|
if payload.State != nil {
|
||||||
//"Type": gjson.Get(content, "model"),
|
updatedICReq.InfrastructureComponent.State = *payload.State
|
||||||
//"Uptime": math.Round(payload.Uptime),
|
}
|
||||||
"State": *payload.State,
|
if payload.Properties.Type != nil {
|
||||||
//"StateUpdateAt": stateUpdateAt.Format(time.RFC1123),
|
updatedICReq.InfrastructureComponent.Type = *payload.Properties.Type
|
||||||
//"RawProperties": gjson.Get(content, "properties"),
|
}
|
||||||
}).Error
|
if payload.Properties.Category != nil {
|
||||||
if err != nil {
|
updatedICReq.InfrastructureComponent.Category = *payload.Properties.Category
|
||||||
log.Println("AMQP: Unable to update IC", sToBeUpdated.Name, "in DB: ", err)
|
}
|
||||||
|
if payload.Properties.Name != nil {
|
||||||
|
updatedICReq.InfrastructureComponent.Name = *payload.Properties.Name
|
||||||
|
}
|
||||||
|
if payload.Properties.Location != nil {
|
||||||
|
updatedICReq.InfrastructureComponent.Properties = postgres.Jsonb{json.RawMessage(`{"location" : " ` + *payload.Properties.Location + `"}`)}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Println("AMQP: Updated IC ", sToBeUpdated.Name)
|
// Validate the updated IC
|
||||||
|
if err = updatedICReq.Validate(); err != nil {
|
||||||
|
log.Println("AMQP: Validation of updated IC failed:", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the updated IC from old IC
|
||||||
|
updatedIC := updatedICReq.UpdatedIC(sToBeUpdated)
|
||||||
|
|
||||||
|
// Finally update the IC in the DB
|
||||||
|
err = sToBeUpdated.Update(updatedIC)
|
||||||
|
if err != nil {
|
||||||
|
log.Println("AMQP: Unable to update IC", sToBeUpdated.Name, "in DB: ", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
//log.Println("AMQP: Updated IC ", sToBeUpdated.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -118,7 +118,7 @@ func addIC(c *gin.Context) {
|
||||||
// @Failure 404 {object} docs.ResponseError "Not found"
|
// @Failure 404 {object} docs.ResponseError "Not found"
|
||||||
// @Failure 422 {object} docs.ResponseError "Unprocessable entity"
|
// @Failure 422 {object} docs.ResponseError "Unprocessable entity"
|
||||||
// @Failure 500 {object} docs.ResponseError "Internal server error"
|
// @Failure 500 {object} docs.ResponseError "Internal server error"
|
||||||
// @Param inputIC body infrastructure_component.updateICRequest true "InfrastructureComponent to be updated"
|
// @Param inputIC body infrastructure_component.UpdateICRequest true "InfrastructureComponent to be updated"
|
||||||
// @Param ICID path int true "InfrastructureComponent ID"
|
// @Param ICID path int true "InfrastructureComponent ID"
|
||||||
// @Router /ic/{ICID} [put]
|
// @Router /ic/{ICID} [put]
|
||||||
// @Security Bearer
|
// @Security Bearer
|
||||||
|
@ -129,7 +129,7 @@ func updateIC(c *gin.Context) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var req updateICRequest
|
var req UpdateICRequest
|
||||||
err := c.BindJSON(&req)
|
err := c.BindJSON(&req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
helper.BadRequestError(c, "Error binding form data to JSON: "+err.Error())
|
helper.BadRequestError(c, "Error binding form data to JSON: "+err.Error())
|
||||||
|
@ -137,16 +137,16 @@ func updateIC(c *gin.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate the request
|
// Validate the request
|
||||||
if err = req.InfrastructureComponent.validate(); err != nil {
|
if err = req.Validate(); err != nil {
|
||||||
helper.UnprocessableEntityError(c, err.Error())
|
helper.UnprocessableEntityError(c, err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create the updatedIC from oldIC
|
// Create the updatedIC from oldIC
|
||||||
updatedIC := req.updatedIC(oldIC)
|
updatedIC := req.UpdatedIC(oldIC)
|
||||||
|
|
||||||
// Finally update the IC in the DB
|
// Finally update the IC in the DB
|
||||||
err = oldIC.update(updatedIC)
|
err = oldIC.Update(updatedIC)
|
||||||
if !helper.DBError(c, err) {
|
if !helper.DBError(c, err) {
|
||||||
c.JSON(http.StatusOK, gin.H{"ic": updatedIC.InfrastructureComponent})
|
c.JSON(http.StatusOK, gin.H{"ic": updatedIC.InfrastructureComponent})
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,7 +43,13 @@ func (s *InfrastructureComponent) ByID(id uint) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *InfrastructureComponent) update(updatedIC InfrastructureComponent) error {
|
func (s *InfrastructureComponent) ByUUID(uuid string) error {
|
||||||
|
db := database.GetDB()
|
||||||
|
err := db.Find(s, "UUID = ?", uuid).Error
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *InfrastructureComponent) Update(updatedIC InfrastructureComponent) error {
|
||||||
|
|
||||||
db := database.GetDB()
|
db := database.GetDB()
|
||||||
err := db.Model(s).Updates(updatedIC).Error
|
err := db.Model(s).Updates(updatedIC).Error
|
||||||
|
|
|
@ -57,7 +57,7 @@ type AddICRequest struct {
|
||||||
InfrastructureComponent validNewIC `json:"ic"`
|
InfrastructureComponent validNewIC `json:"ic"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type updateICRequest struct {
|
type UpdateICRequest struct {
|
||||||
InfrastructureComponent validUpdatedIC `json:"ic"`
|
InfrastructureComponent validUpdatedIC `json:"ic"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -67,7 +67,7 @@ func (r *AddICRequest) Validate() error {
|
||||||
return errs
|
return errs
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *validUpdatedIC) validate() error {
|
func (r *UpdateICRequest) Validate() error {
|
||||||
validate = validator.New()
|
validate = validator.New()
|
||||||
errs := validate.Struct(r)
|
errs := validate.Struct(r)
|
||||||
return errs
|
return errs
|
||||||
|
@ -94,7 +94,7 @@ func (r *AddICRequest) CreateIC() InfrastructureComponent {
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *updateICRequest) updatedIC(oldIC InfrastructureComponent) InfrastructureComponent {
|
func (r *UpdateICRequest) UpdatedIC(oldIC InfrastructureComponent) InfrastructureComponent {
|
||||||
// Use the old InfrastructureComponent as a basis for the updated InfrastructureComponent `s`
|
// Use the old InfrastructureComponent as a basis for the updated InfrastructureComponent `s`
|
||||||
s := oldIC
|
s := oldIC
|
||||||
|
|
||||||
|
@ -126,6 +126,9 @@ func (r *updateICRequest) updatedIC(oldIC InfrastructureComponent) Infrastructur
|
||||||
s.State = r.InfrastructureComponent.State
|
s.State = r.InfrastructureComponent.State
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// set last update time
|
||||||
|
s.StateUpdateAt = time.Now().Format(time.RFC1123)
|
||||||
|
|
||||||
// only update props if not empty
|
// only update props if not empty
|
||||||
var emptyJson postgres.Jsonb
|
var emptyJson postgres.Jsonb
|
||||||
// Serialize empty json and params
|
// Serialize empty json and params
|
||||||
|
|
Loading…
Add table
Reference in a new issue