- remove ack of message in AMQP client

- add a command line parameter to activate AMQP client
This commit is contained in:
Sonja Happ 2019-07-18 10:35:02 +02:00
parent ef07fdb4ec
commit c565c04ba9
4 changed files with 33 additions and 24 deletions

View file

@ -85,13 +85,14 @@ func ConnectAMQP(uri string) error {
// consuming queue // consuming queue
go func() { go func() {
for message := range client.replies { for message := range client.replies {
err = message.Ack(false) //err = message.Ack(false)
if err != nil { //if err != nil {
fmt.Println("AMQP: Unable to ack message:", err) // fmt.Println("AMQP: Unable to ack message:", err)
} //}
content := string(message.Body) content := string(message.Body)
// any action message sent by the VILLAScontroller should be ignored by the web backend
if strings.Contains(content, "action") { if strings.Contains(content, "action") {
continue continue
} }

View file

@ -13,6 +13,7 @@ var DB_HOST string
var DB_NAME string var DB_NAME string
var DB_DUMMY string var DB_DUMMY string
var DB_SSLMODE string var DB_SSLMODE string
var WITH_AMQP bool
var DBpool *gorm.DB var DBpool *gorm.DB
@ -22,11 +23,13 @@ func init() {
flag.StringVar(&DB_NAME, "dbname", "villasdb", "Name of the database to use (default is villasdb)") flag.StringVar(&DB_NAME, "dbname", "villasdb", "Name of the database to use (default is villasdb)")
flag.StringVar(&DB_DUMMY, "dbdummy", "testvillasdb", "Name of the test database to use (default is testvillasdb)") flag.StringVar(&DB_DUMMY, "dbdummy", "testvillasdb", "Name of the test database to use (default is testvillasdb)")
flag.StringVar(&DB_SSLMODE, "dbsslmode", "disable", "SSL mode of DB (default is disable)") // TODO: change default for production flag.StringVar(&DB_SSLMODE, "dbsslmode", "disable", "SSL mode of DB (default is disable)") // TODO: change default for production
flag.BoolVar(&WITH_AMQP, "amqp", false, "If AMQP client for simulators shall be enabled, set this option to true (default is false)")
flag.Parse() flag.Parse()
fmt.Println("DB_HOST has value ", DB_HOST) fmt.Println("DB_HOST has value ", DB_HOST)
fmt.Println("DB_NAME has value ", DB_NAME) fmt.Println("DB_NAME has value ", DB_NAME)
fmt.Println("DB_DUMMY has value ", DB_DUMMY) fmt.Println("DB_DUMMY has value ", DB_DUMMY)
fmt.Println("DB_SSLMODE has value ", DB_SSLMODE) fmt.Println("DB_SSLMODE has value ", DB_SSLMODE)
fmt.Println("WITH_AMQP has value ", WITH_AMQP)
} }
// Initialize connection to the database // Initialize connection to the database

View file

@ -18,7 +18,10 @@ func RegisterSimulatorEndpoints(r *gin.RouterGroup) {
r.GET("/:simulatorID", getSimulator) r.GET("/:simulatorID", getSimulator)
r.DELETE("/:simulatorID", deleteSimulator) r.DELETE("/:simulatorID", deleteSimulator)
r.GET("/:simulatorID/models", getModelsOfSimulator) r.GET("/:simulatorID/models", getModelsOfSimulator)
r.POST("/:simulatorID/action", sendActionToSimulator) // register action endpoint only if AMQP client is used
if common.WITH_AMQP == true {
r.POST("/:simulatorID/action", sendActionToSimulator)
}
} }
// getSimulators godoc // getSimulators godoc
@ -280,7 +283,7 @@ func getModelsOfSimulator(c *gin.Context) {
} }
// sendActionToSimulator godoc // sendActionToSimulator godoc
// @Summary Send an action to simulator // @Summary Send an action to simulator (only available if backend server is started with -amqp parameter)
// @ID sendActionToSimulator // @ID sendActionToSimulator
// @Tags simulators // @Tags simulators
// @Produce json // @Produce json

View file

@ -65,27 +65,29 @@ func main() {
r.GET("swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler)) r.GET("swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler))
err := common.ConnectAMQP("amqp://localhost") if common.WITH_AMQP == true {
if err != nil { fmt.Println("Starting AMQP client")
panic(err) err := common.ConnectAMQP("amqp://localhost")
} if err != nil {
panic(err)
// Periodically call the Ping function to check which simulators are still there
ticker := time.NewTicker(10 * time.Second)
go func() {
for {
select {
case <-ticker.C:
err = common.PingAMQP()
if err != nil {
fmt.Println("AMQP Error: ", err.Error())
}
}
} }
}() // Periodically call the Ping function to check which simulators are still there
ticker := time.NewTicker(10 * time.Second)
go func() {
for {
select {
case <-ticker.C:
err = common.PingAMQP()
if err != nil {
fmt.Println("AMQP Error: ", err.Error())
}
}
}
}()
}
// server at port 4000 to match frontend's redirect path // server at port 4000 to match frontend's redirect path
r.Run(":4000") r.Run(":4000")
} }