AMQP: move message processing into infinite loop

This commit is contained in:
Sonja Happ 2020-10-12 10:04:38 +02:00
parent 2e6b980d3d
commit bfcc8983fe

View file

@ -120,127 +120,18 @@ func ConnectAMQP(uri string) error {
return fmt.Errorf("AMQP: failed to consume deliveries, error: %v", err)
}
forever := make(chan bool)
// consuming queue
go func() {
for message := range client.replies {
//err = message.Ack(false)
//if err != nil {
// fmt.Println("AMQP: Unable to ack message:", err)
//}
var payload ICUpdate
err := json.Unmarshal(message.Body, &payload)
if err != nil {
log.Println("AMQP: Could not unmarshal message to JSON:", string(message.Body), "err: ", err)
continue
}
ICUUID := payload.Properties.UUID
_, err = uuid.Parse(ICUUID)
if err != nil {
//log.Printf("AMQP: UUID not valid: %v, message ignored: %v \n", ICUUID, string(message.Body))
continue
} else {
var sToBeUpdated infrastructure_component.InfrastructureComponent
err = sToBeUpdated.ByUUID(ICUUID)
if err == gorm.ErrRecordNotFound {
// create new record
var newICReq infrastructure_component.AddICRequest
newICReq.InfrastructureComponent.UUID = payload.Properties.UUID
if payload.Properties.Name == nil ||
payload.Properties.Category == nil ||
payload.Properties.Type == nil {
// cannot create new IC because required information (name, type, and/or category missing)
log.Println("AMQP: Cannot create new IC, required field(s) is/are missing: name, type, category")
continue
}
newICReq.InfrastructureComponent.Name = *payload.Properties.Name
newICReq.InfrastructureComponent.Category = *payload.Properties.Category
newICReq.InfrastructureComponent.Type = *payload.Properties.Type
// add optional params
if payload.State != nil {
newICReq.InfrastructureComponent.State = *payload.State
} else {
newICReq.InfrastructureComponent.State = "unknown"
}
if payload.Properties.Location != nil {
newICReq.InfrastructureComponent.Properties = postgres.Jsonb{json.RawMessage(`{"location" : " ` + *payload.Properties.Location + `"}`)}
}
// Validate the new IC
err = newICReq.Validate()
if err != nil {
log.Println("AMQP: Validation of new IC failed:", err)
continue
}
// Create the new IC
newIC := newICReq.CreateIC()
// save IC
err = newIC.Save()
if err != nil {
log.Println("AMQP: Saving new IC to DB failed:", err)
continue
}
log.Println("AMQP: Created IC ", newIC.Name)
} else if err != nil {
log.Println("AMQP: Database error for IC", ICUUID, " DB error message: ", err)
continue
} else {
var updatedICReq infrastructure_component.UpdateICRequest
if payload.State != nil {
updatedICReq.InfrastructureComponent.State = *payload.State
}
if payload.Properties.Type != nil {
updatedICReq.InfrastructureComponent.Type = *payload.Properties.Type
}
if payload.Properties.Category != nil {
updatedICReq.InfrastructureComponent.Category = *payload.Properties.Category
}
if payload.Properties.Name != nil {
updatedICReq.InfrastructureComponent.Name = *payload.Properties.Name
}
if payload.Properties.Location != nil {
updatedICReq.InfrastructureComponent.Properties = postgres.Jsonb{json.RawMessage(`{"location" : " ` + *payload.Properties.Location + `"}`)}
}
// Validate the updated IC
if err = updatedICReq.Validate(); err != nil {
log.Println("AMQP: Validation of updated IC failed:", err)
continue
}
// Create the updated IC from old IC
updatedIC := updatedICReq.UpdatedIC(sToBeUpdated)
// Finally update the IC in the DB
err = sToBeUpdated.Update(updatedIC)
if err != nil {
log.Println("AMQP: Unable to update IC", sToBeUpdated.Name, "in DB: ", err)
continue
}
log.Println("AMQP: Updated IC ", sToBeUpdated.Name)
}
for {
for message := range client.replies {
processMessage(message)
}
time.Sleep(2) // sleep for 2 sek
}
}()
log.Printf(" AMQP: Waiting for messages... ")
<-forever
return nil
}
@ -333,3 +224,109 @@ func StartAMQP(AMQPurl string, api *gin.RouterGroup) error {
return nil
}
func processMessage(message amqp.Delivery) {
var payload ICUpdate
err := json.Unmarshal(message.Body, &payload)
if err != nil {
log.Println("AMQP: Could not unmarshal message to JSON:", string(message.Body), "err: ", err)
return
}
ICUUID := payload.Properties.UUID
_, err = uuid.Parse(ICUUID)
if err != nil {
log.Printf("AMQP: UUID not valid: %v, message ignored: %v \n", ICUUID, string(message.Body))
return
}
var sToBeUpdated infrastructure_component.InfrastructureComponent
err = sToBeUpdated.ByUUID(ICUUID)
if err == gorm.ErrRecordNotFound {
// create new record
var newICReq infrastructure_component.AddICRequest
newICReq.InfrastructureComponent.UUID = payload.Properties.UUID
if payload.Properties.Name == nil ||
payload.Properties.Category == nil ||
payload.Properties.Type == nil {
// cannot create new IC because required information (name, type, and/or category missing)
log.Println("AMQP: Cannot create new IC, required field(s) is/are missing: name, type, category")
return
}
newICReq.InfrastructureComponent.Name = *payload.Properties.Name
newICReq.InfrastructureComponent.Category = *payload.Properties.Category
newICReq.InfrastructureComponent.Type = *payload.Properties.Type
// add optional params
if payload.State != nil {
newICReq.InfrastructureComponent.State = *payload.State
} else {
newICReq.InfrastructureComponent.State = "unknown"
}
if payload.Properties.Location != nil {
newICReq.InfrastructureComponent.Properties = postgres.Jsonb{json.RawMessage(`{"location" : " ` + *payload.Properties.Location + `"}`)}
}
// Validate the new IC
err = newICReq.Validate()
if err != nil {
log.Println("AMQP: Validation of new IC failed:", err)
return
}
// Create the new IC
newIC := newICReq.CreateIC()
// save IC
err = newIC.Save()
if err != nil {
log.Println("AMQP: Saving new IC to DB failed:", err)
return
}
log.Println("AMQP: Created IC ", newIC.Name)
} else if err != nil {
log.Println("AMQP: Database error for IC", ICUUID, " DB error message: ", err)
return
} else {
var updatedICReq infrastructure_component.UpdateICRequest
if payload.State != nil {
updatedICReq.InfrastructureComponent.State = *payload.State
}
if payload.Properties.Type != nil {
updatedICReq.InfrastructureComponent.Type = *payload.Properties.Type
}
if payload.Properties.Category != nil {
updatedICReq.InfrastructureComponent.Category = *payload.Properties.Category
}
if payload.Properties.Name != nil {
updatedICReq.InfrastructureComponent.Name = *payload.Properties.Name
}
if payload.Properties.Location != nil {
updatedICReq.InfrastructureComponent.Properties = postgres.Jsonb{json.RawMessage(`{"location" : " ` + *payload.Properties.Location + `"}`)}
}
// Validate the updated IC
if err = updatedICReq.Validate(); err != nil {
log.Println("AMQP: Validation of updated IC failed:", err)
return
}
// Create the updated IC from old IC
updatedIC := updatedICReq.UpdatedIC(sToBeUpdated)
// Finally update the IC in the DB
err = sToBeUpdated.Update(updatedIC)
if err != nil {
log.Println("AMQP: Unable to update IC", sToBeUpdated.Name, "in DB: ", err)
return
}
log.Println("AMQP: Updated IC ", sToBeUpdated.Name)
}
}