started refactoring the AMPQ session, WIP #74

This commit is contained in:
Sonja Happ 2021-08-27 17:20:51 +02:00
parent 8584b4ac22
commit 2dada33562
8 changed files with 365 additions and 379 deletions

277
helper/amqp_session.go Normal file
View file

@ -0,0 +1,277 @@
package helper
import (
"fmt"
"github.com/streadway/amqp"
"log"
"time"
)
type AMQPsession struct {
connection *amqp.Connection
sendCh *amqp.Channel
recvCh *amqp.Channel
notifyConnClose chan *amqp.Error
notifySendChanClose chan *amqp.Error
notifySendConfirm chan amqp.Confirmation
notifyRecvChanClose chan *amqp.Error
notifyRecvConfirm chan amqp.Confirmation
IsReady bool
done chan bool
name string
exchange string
processMessage func(delivery amqp.Delivery)
}
const (
// When reconnecting to the server after connection failure
reconnectDelay = 5 * time.Second
// When setting up the channel after a channel exception
reInitDelay = 2 * time.Second
// When resending messages the server didn't confirm
resendDelay = 5 * time.Second
)
//var client AMQPsession
// NewAMQPSession creates a new consumer state instance, and automatically
// attempts to connect to the server.
func NewAMQPSession(name string, AMQPurl string, exchange string, processMessage func(delivery amqp.Delivery)) *AMQPsession {
if AMQPurl != "" {
log.Println("Starting AMQP client")
session := AMQPsession{
name: name,
exchange: exchange,
done: make(chan bool),
processMessage: processMessage,
}
go session.handleReconnect(AMQPurl)
return &session
}
return nil
}
// handleReconnect will wait for a connection error on
// notifyConnClose, and then continuously attempt to reconnect.
func (session *AMQPsession) handleReconnect(addr string) {
for {
session.IsReady = false
log.Println("Attempting to connect to AMQP broker ", addr)
conn, err := session.connect(addr)
if err != nil {
log.Println("Failed to connect. Retrying...")
select {
case <-session.done:
return
case <-time.After(reconnectDelay):
}
continue
}
if done := session.handleReInit(conn); done {
break
}
}
}
// connect will create a new AMQP connection
func (session *AMQPsession) connect(addr string) (*amqp.Connection, error) {
conn, err := amqp.Dial(addr)
if err != nil {
return nil, err
}
// take a new connection to the queue, and updates the close listener to reflect this.
session.connection = conn
session.notifyConnClose = make(chan *amqp.Error)
session.connection.NotifyClose(session.notifyConnClose)
log.Println("Connected!")
return conn, nil
}
// handleReInit will wait for a channel error
// and then continuously attempt to re-initialize both channels
func (session *AMQPsession) handleReInit(conn *amqp.Connection) bool {
for {
session.IsReady = false
err := session.init(conn)
if err != nil {
log.Println("Failed to initialize channel. Retrying...")
select {
case <-session.done:
return true
case <-time.After(reInitDelay):
}
continue
}
select {
case <-session.done:
return true
case <-session.notifyConnClose:
log.Println("Connection closed. Reconnecting...")
return false
case <-session.notifySendChanClose:
log.Println("Send channel closed. Re-running init...")
case <-session.notifyRecvChanClose:
log.Println("Receive channel closed. Re-running init...")
}
}
}
// init will initialize channel & declare queue
func (session *AMQPsession) init(conn *amqp.Connection) error {
ch, err := conn.Channel()
if err != nil {
return err
}
err = ch.Confirm(false)
if err != nil {
return err
}
_, err = ch.QueueDeclare(
session.name,
false, // Durable
false, // Delete when unused
false, // Exclusive
false, // No-wait
nil, // Arguments
)
if err != nil {
return err
}
// create sendCh
sendCh, err := conn.Channel()
if err != nil {
return fmt.Errorf("AMQP: failed to open a sendCh, error: %v", err)
}
// declare exchange
err = sendCh.ExchangeDeclare(session.exchange,
"headers",
true,
false,
false,
false,
nil)
if err != nil {
return fmt.Errorf("AMQP: failed to declare the exchange, error: %v", err)
}
// add a queue for the ICs
ICQueue, err := sendCh.QueueDeclare("infrastructure_components",
true,
false,
false,
false,
nil)
if err != nil {
return fmt.Errorf("AMQP: failed to declare the queue, error: %v", err)
}
err = sendCh.QueueBind(ICQueue.Name, "", session.exchange, false, nil)
if err != nil {
return fmt.Errorf("AMQP: failed to bind the queue, error: %v", err)
}
session.sendCh = sendCh
session.notifySendChanClose = make(chan *amqp.Error)
session.notifySendConfirm = make(chan amqp.Confirmation, 1)
session.sendCh.NotifyClose(session.notifySendChanClose)
session.sendCh.NotifyPublish(session.notifySendConfirm)
// create receive channel
recvCh, err := conn.Channel()
if err != nil {
return fmt.Errorf("AMQP: failed to open a recvCh, error: %v", err)
}
session.recvCh = recvCh
session.notifyRecvChanClose = make(chan *amqp.Error)
session.notifyRecvConfirm = make(chan amqp.Confirmation, 1)
session.recvCh.NotifyClose(session.notifyRecvChanClose)
session.recvCh.NotifyPublish(session.notifyRecvConfirm)
// start deliveries
messages, err := session.recvCh.Consume(ICQueue.Name,
"",
true,
false,
false,
false,
nil)
if err != nil {
return fmt.Errorf("AMQP: failed to start deliveries: %v", err)
}
// consume deliveries
go func() {
for {
for message := range messages {
session.processMessage(message)
}
}
}()
session.IsReady = true
log.Println("AMQP channels setup! Waiting for messages...")
return nil
}
func (session *AMQPsession) CheckConnection() error {
if session.connection != nil {
if session.connection.IsClosed() {
return fmt.Errorf("connection to broker is closed")
}
} else {
return fmt.Errorf("connection is nil")
}
return nil
}
func (session *AMQPsession) Send(payload []byte, destinationUuid string) error {
message := amqp.Publishing{
DeliveryMode: 2,
Timestamp: time.Now(),
ContentType: "application/json",
ContentEncoding: "utf-8",
Priority: 0,
Body: payload,
}
// set message headers
message.Headers = make(map[string]interface{}) // empty map
message.Headers["uuid"] = destinationUuid // leave uuid empty if message should go to all ICs
err := session.CheckConnection()
if err != nil {
return err
}
err = session.sendCh.Publish(session.exchange,
"",
false,
false,
message)
return err
}

View file

@ -29,15 +29,20 @@ import (
"git.rwth-aachen.de/acs/public/villas/web-backend-go/configuration"
"git.rwth-aachen.de/acs/public/villas/web-backend-go/database"
"git.rwth-aachen.de/acs/public/villas/web-backend-go/helper"
infrastructure_component "git.rwth-aachen.de/acs/public/villas/web-backend-go/routes/infrastructure-component"
"github.com/gin-gonic/gin"
)
var session *helper.AMQPsession
func RegisterHealthzEndpoint(r *gin.RouterGroup) {
r.GET("", getHealth)
}
func SetAMQPSession(s *helper.AMQPsession) {
session = s
}
// getHealth godoc
// @Summary Get health status of backend
// @ID getHealth
@ -69,7 +74,7 @@ func getHealth(c *gin.Context) {
}
if len(url) != 0 {
err = infrastructure_component.CheckConnection()
err = session.CheckConnection()
if err != nil {
log.Println(err.Error())
c.JSON(http.StatusInternalServerError, gin.H{

View file

@ -80,8 +80,14 @@ func TestHealthz(t *testing.T) {
amqpURI := "amqp://" + user + ":" + pass + "@" + host
log.Println("AMQP URI is", amqpURI)
err = infrastructure_component.ConnectAMQP(amqpURI)
assert.NoError(t, err)
session = helper.NewAMQPSession("villas-test-session", amqpURI, "villas", infrastructure_component.ProcessMessage)
SetAMQPSession(session)
for {
if session.IsReady {
break
}
}
// test healthz endpoint for connected DB and AMQP client
code, resp, err = helper.TestEndpoint(router, "", "/api/v2/healthz", http.MethodGet, nil)

View file

@ -24,25 +24,14 @@ package infrastructure_component
import (
"encoding/json"
"fmt"
"log"
"strings"
"time"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"github.com/jinzhu/gorm"
"github.com/jinzhu/gorm/dialects/postgres"
"github.com/streadway/amqp"
"log"
"strings"
)
const VILLAS_EXCHANGE = "villas"
type AMQPclient struct {
connection *amqp.Connection
sendCh *amqp.Channel
recvCh *amqp.Channel
}
type Action struct {
Act string `json:"action"`
When int64 `json:"when"`
@ -85,205 +74,24 @@ type ICUpdate struct {
Action string `json:"action"`
}
var client AMQPclient
func ConnectAMQP(uri string) error {
var err error
// connect to broker
client.connection, err = amqp.Dial(uri)
if err != nil {
return fmt.Errorf("AMQP: failed to connect to RabbitMQ broker %v, error: %v", uri, err)
}
// create sendCh
client.sendCh, err = client.connection.Channel()
if err != nil {
return fmt.Errorf("AMQP: failed to open a sendCh, error: %v", err)
}
// declare exchange
err = client.sendCh.ExchangeDeclare(VILLAS_EXCHANGE,
"headers",
true,
false,
false,
false,
nil)
if err != nil {
return fmt.Errorf("AMQP: failed to declare the exchange, error: %v", err)
}
// add a queue for the ICs
ICQueue, err := client.sendCh.QueueDeclare("infrastructure_components",
true,
false,
false,
false,
nil)
if err != nil {
return fmt.Errorf("AMQP: failed to declare the queue, error: %v", err)
}
err = client.sendCh.QueueBind(ICQueue.Name, "", VILLAS_EXCHANGE, false, nil)
if err != nil {
return fmt.Errorf("AMQP: failed to bind the queue, error: %v", err)
}
// create receive channel
client.recvCh, err = client.connection.Channel()
if err != nil {
return fmt.Errorf("AMQP: failed to open a recvCh, error: %v", err)
}
// start deliveries
messages, err := client.recvCh.Consume(ICQueue.Name,
"",
true,
false,
false,
false,
nil)
if err != nil {
return fmt.Errorf("AMQP: failed to start deliveries: %v", err)
}
// consume deliveries
go func() {
for {
for message := range messages {
err = processMessage(message)
if err != nil {
log.Println("AMQP: Error processing message: ", err.Error())
}
}
}
}()
log.Printf(" AMQP: Waiting for messages... ")
return nil
}
func sendActionAMQP(action Action, destinationUUID string) error {
payload, err := json.Marshal(action)
if err != nil {
return err
}
msg := amqp.Publishing{
DeliveryMode: 2,
Timestamp: time.Now(),
ContentType: "application/json",
ContentEncoding: "utf-8",
Priority: 0,
Body: payload,
}
// set message headers
msg.Headers = make(map[string]interface{}) // empty map
msg.Headers["uuid"] = destinationUUID
err = CheckConnection()
if err != nil {
return err
}
//log.Println("AMQP: Sending message", string(msg.Body))
err = client.sendCh.Publish(VILLAS_EXCHANGE,
"",
false,
false,
msg)
return err
}
func SendPing(uuid string) error {
var ping Action
ping.Act = "ping"
payload, err := json.Marshal(ping)
if err != nil {
return err
}
msg := amqp.Publishing{
DeliveryMode: 2,
Timestamp: time.Now(),
ContentType: "application/json",
ContentEncoding: "utf-8",
Priority: 0,
Body: payload,
}
// set message headers
msg.Headers = make(map[string]interface{}) // empty map
msg.Headers["uuid"] = uuid // leave uuid empty if ping should go to all ICs
err = CheckConnection()
if err != nil {
return err
}
err = client.sendCh.Publish(VILLAS_EXCHANGE,
"",
false,
false,
msg)
return err
}
func CheckConnection() error {
if client.connection != nil {
if client.connection.IsClosed() {
return fmt.Errorf("connection to broker is closed")
}
} else {
return fmt.Errorf("connection is nil")
}
return nil
}
func StartAMQP(AMQPurl string, api *gin.RouterGroup) error {
if AMQPurl != "" {
log.Println("Starting AMQP client")
err := ConnectAMQP(AMQPurl)
if err != nil {
return err
}
// register IC action endpoint only if AMQP client is used
RegisterAMQPEndpoint(api.Group("/ic"))
log.Printf("Connected AMQP client to %s", AMQPurl)
}
return nil
}
func processMessage(message amqp.Delivery) error {
func ProcessMessage(message amqp.Delivery) {
var payload ICUpdate
err := json.Unmarshal(message.Body, &payload)
if err != nil {
return fmt.Errorf("AMQP: Could not unmarshal message to JSON: %v err: %v", string(message.Body), err)
log.Printf("AMQP: Could not unmarshal message to JSON: %v err: %v", string(message.Body), err)
}
if payload.Action != "" {
// if a message contains an action, it is not intended for the backend
//log.Println("AMQP: Ignoring action message ", payload)
return nil
return
}
ICUUID := payload.Properties.UUID
_, err = uuid.Parse(ICUUID)
if err != nil {
return fmt.Errorf("AMQP: UUID not valid: %v, message ignored: %v \n", ICUUID, string(message.Body))
log.Printf("AMQP: UUID not valid: %v, message ignored: %v \n", ICUUID, string(message.Body))
}
var sToBeUpdated InfrastructureComponent
@ -299,8 +107,10 @@ func processMessage(message amqp.Delivery) error {
// update record based on payload
err = sToBeUpdated.updateExternalIC(payload, message.Body)
}
if err != nil {
log.Printf(err.Error())
}
return err
}
func createExternalIC(payload ICUpdate, ICUUID string, body []byte) error {
@ -358,7 +168,11 @@ func createExternalIC(payload ICUpdate, ICUUID string, body []byte) error {
log.Println("AMQP: Created IC with UUID ", newIC.UUID)
// send ping to get full status update of this IC
err = SendPing(ICUUID)
if session != nil {
err = SendPing(ICUUID)
} else {
err = fmt.Errorf("cannot sent ping to %v because AMQP session is nil", ICUUID)
}
return err
}
@ -433,3 +247,27 @@ func newFalse() *bool {
b := false
return &b
}
func SendPing(uuid string) error {
var ping Action
ping.Act = "ping"
payload, err := json.Marshal(ping)
if err != nil {
return err
}
err = session.Send(payload, uuid)
return err
}
func sendActionAMQP(action Action, destinationUUID string) error {
payload, err := json.Marshal(action)
if err != nil {
return err
}
err = session.Send(payload, destinationUUID)
return err
}

View file

@ -37,10 +37,13 @@ func RegisterICEndpoints(r *gin.RouterGroup) {
r.GET("/:ICID", getIC)
r.DELETE("/:ICID", deleteIC)
r.GET("/:ICID/configs", getConfigsOfIC)
r.POST("/:ICID/action", sendActionToIC)
}
func RegisterAMQPEndpoint(r *gin.RouterGroup) {
r.POST("/:ICID/action", sendActionToIC)
var session *helper.AMQPsession
func SetAMQPSession(s *helper.AMQPsession) {
session = s
}
// getICs godoc

View file

@ -24,13 +24,10 @@ package infrastructure_component
import (
"encoding/json"
"fmt"
"log"
"os"
"testing"
"time"
"github.com/streadway/amqp"
"git.rwth-aachen.de/acs/public/villas/web-backend-go/helper"
component_configuration "git.rwth-aachen.de/acs/public/villas/web-backend-go/routes/component-configuration"
"git.rwth-aachen.de/acs/public/villas/web-backend-go/routes/scenario"
@ -133,12 +130,6 @@ func TestMain(m *testing.M) {
// that can be associated with a new component configuration
scenario.RegisterScenarioEndpoints(api.Group("/scenarios"))
// check AMQP connection
err = CheckConnection()
if err.Error() != "connection is nil" {
return
}
// connect AMQP client
// Make sure that AMQP_HOST, AMQP_USER, AMQP_PASS are set
host, err := configuration.GlobalConfig.String("amqp.host")
@ -148,11 +139,8 @@ func TestMain(m *testing.M) {
// AMQP Connection startup is tested here
// Not repeated in other tests because it is only needed once
err = StartAMQP(amqpURI, api)
if err != nil {
log.Println("unable to connect to AMQP")
return
}
session = helper.NewAMQPSession("villas-test-session", amqpURI, "villas", ProcessMessage)
SetAMQPSession(session)
os.Exit(m.Run())
}
@ -306,28 +294,14 @@ func TestUpdateICAsAdmin(t *testing.T) {
payload, err := json.Marshal(update)
assert.NoError(t, err)
var headers map[string]interface{}
headers = make(map[string]interface{}) // empty map
headers["uuid"] = newIC2.Manager // set uuid
//var headers map[string]interface{}
//headers = make(map[string]interface{}) // empty map
//headers["uuid"] = newIC2.Manager // set uuid
msg := amqp.Publishing{
DeliveryMode: 2,
Timestamp: time.Now(),
ContentType: "application/json",
ContentEncoding: "utf-8",
Priority: 0,
Body: payload,
Headers: headers,
}
err = CheckConnection()
err = session.CheckConnection()
assert.NoError(t, err)
err = client.sendCh.Publish(VILLAS_EXCHANGE,
"",
false,
false,
msg)
err = session.Send(payload, newIC2.Manager)
assert.NoError(t, err)
// Wait until externally managed IC is created (happens async)
@ -430,28 +404,10 @@ func TestDeleteICAsAdmin(t *testing.T) {
payload, err := json.Marshal(update)
assert.NoError(t, err)
var headers map[string]interface{}
headers = make(map[string]interface{}) // empty map
headers["uuid"] = newIC2.UUID // set uuid
msg := amqp.Publishing{
DeliveryMode: 2,
Timestamp: time.Now(),
ContentType: "application/json",
ContentEncoding: "utf-8",
Priority: 0,
Body: payload,
Headers: headers,
}
err = CheckConnection()
err = session.CheckConnection()
assert.NoError(t, err)
err = client.sendCh.Publish(VILLAS_EXCHANGE,
"",
false,
false,
msg)
err = session.Send(payload, newIC2.UUID)
assert.NoError(t, err)
// Wait until externally managed IC is created (happens async)
@ -662,28 +618,10 @@ func TestCreateUpdateViaAMQPRecv(t *testing.T) {
payload, err := json.Marshal(update)
assert.NoError(t, err)
var headers map[string]interface{}
headers = make(map[string]interface{}) // empty map
headers["uuid"] = newIC1.Manager // set uuid
msg := amqp.Publishing{
DeliveryMode: 2,
Timestamp: time.Now(),
ContentType: "application/json",
ContentEncoding: "utf-8",
Priority: 0,
Body: payload,
Headers: headers,
}
err = CheckConnection()
err = session.CheckConnection()
assert.NoError(t, err)
err = client.sendCh.Publish(VILLAS_EXCHANGE,
"",
false,
false,
msg)
err = session.Send(payload, newIC1.Manager)
assert.NoError(t, err)
time.Sleep(waitingTime * time.Second)
@ -709,25 +647,7 @@ func TestCreateUpdateViaAMQPRecv(t *testing.T) {
payload, err = json.Marshal(update)
assert.NoError(t, err)
var headersA map[string]interface{}
headersA = make(map[string]interface{}) // empty map
headersA["uuid"] = newIC1.Manager
msg = amqp.Publishing{
DeliveryMode: 2,
Timestamp: time.Now(),
ContentType: "application/json",
ContentEncoding: "utf-8",
Priority: 0,
Body: payload,
Headers: headersA,
}
err = client.sendCh.Publish(VILLAS_EXCHANGE,
"",
false,
false,
msg)
err = session.Send(payload, newIC1.Manager)
assert.NoError(t, err)
time.Sleep(waitingTime * time.Second)
@ -743,21 +663,7 @@ func TestCreateUpdateViaAMQPRecv(t *testing.T) {
payload, err = json.Marshal(update)
assert.NoError(t, err)
msg = amqp.Publishing{
DeliveryMode: 2,
Timestamp: time.Now(),
ContentType: "application/json",
ContentEncoding: "utf-8",
Priority: 0,
Body: payload,
Headers: headersA,
}
err = client.sendCh.Publish(VILLAS_EXCHANGE,
"",
false,
false,
msg)
err = session.Send(payload, newIC1.Manager)
assert.NoError(t, err)
time.Sleep(waitingTime * time.Second)
@ -797,27 +703,10 @@ func TestDeleteICViaAMQPRecv(t *testing.T) {
payload, err := json.Marshal(update)
assert.NoError(t, err)
var headers map[string]interface{}
headers = make(map[string]interface{}) // empty map
headers["uuid"] = newIC1.Manager // set uuid
msg := amqp.Publishing{
DeliveryMode: 2,
Timestamp: time.Now(),
ContentType: "application/json",
ContentEncoding: "utf-8",
Priority: 0,
Body: payload,
Headers: headers,
}
err = CheckConnection()
err = session.CheckConnection()
assert.NoError(t, err)
err = client.sendCh.Publish(VILLAS_EXCHANGE,
"",
false,
false,
msg)
err = session.Send(payload, newIC1.Manager)
assert.NoError(t, err)
time.Sleep(waitingTime * time.Second)
@ -874,22 +763,7 @@ func TestDeleteICViaAMQPRecv(t *testing.T) {
payload, err = json.Marshal(update)
assert.NoError(t, err)
msg = amqp.Publishing{
DeliveryMode: 2,
Timestamp: time.Now(),
ContentType: "application/json",
ContentEncoding: "utf-8",
Priority: 0,
Body: payload,
Headers: headers,
}
// attempt to delete IC (should not work immediately because IC is still associated with component config)
err = client.sendCh.Publish(VILLAS_EXCHANGE,
"",
false,
false,
msg)
err = session.Send(payload, newIC1.Manager)
assert.NoError(t, err)
time.Sleep(waitingTime * time.Second)

View file

@ -26,8 +26,6 @@ import (
"os"
"testing"
infrastructure_component "git.rwth-aachen.de/acs/public/villas/web-backend-go/routes/infrastructure-component"
"git.rwth-aachen.de/acs/public/villas/web-backend-go/configuration"
"git.rwth-aachen.de/acs/public/villas/web-backend-go/database"
"github.com/gin-gonic/gin"
@ -55,27 +53,6 @@ func TestMain(m *testing.M) {
os.Exit(m.Run())
}
/*
* The order of test functions is important here
* 1. Start and connect AMQP
* 2. Register endpoints
* 3. Add test data
*/
func TestStartAMQP(t *testing.T) {
// connect AMQP client
// Make sure that AMQP_HOST, AMQP_USER, AMQP_PASS are set
host, err := configuration.GlobalConfig.String("amqp.host")
user, err := configuration.GlobalConfig.String("amqp.user")
pass, err := configuration.GlobalConfig.String("amqp.pass")
amqpURI := "amqp://" + user + ":" + pass + "@" + host
// AMQP Connection startup is tested here
// Not repeated in other tests because it is only needed once
err = infrastructure_component.StartAMQP(amqpURI, api)
assert.NoError(t, err)
}
func TestRegisterEndpoints(t *testing.T) {
database.DropTables()
database.MigrateModels()

View file

@ -23,6 +23,8 @@ package main
import (
"fmt"
"git.rwth-aachen.de/acs/public/villas/web-backend-go/helper"
"git.rwth-aachen.de/acs/public/villas/web-backend-go/routes/healthz"
"log"
"time"
@ -122,15 +124,19 @@ func main() {
if AMQPhost != "" {
// create amqp URL based on username, password and host
amqpurl := "amqp://" + AMQPuser + ":" + AMQPpass + "@" + AMQPhost
err = infrastructure_component.StartAMQP(amqpurl, api)
if err != nil {
log.Fatal(err)
}
session := helper.NewAMQPSession("villas-amqp-session", amqpurl, "villas", infrastructure_component.ProcessMessage)
healthz.SetAMQPSession(session) // healthz needs to know the amqp session to check the health of the backend
infrastructure_component.SetAMQPSession(session) // IC needs to know the session to send amqp messages
// send Ping to all externally managed ICs
err = infrastructure_component.SendPing("")
if err != nil {
log.Println("error sending ping action via AMQP: ", err)
for {
if session.IsReady {
err = infrastructure_component.SendPing("")
if err != nil {
log.Println("error sending ping action via AMQP: ", err.Error())
}
break
}
}
}