Skip to content

Commit

Permalink
Fix amq router race condition (#59)
Browse files Browse the repository at this point in the history
Add retries when starting AMQP server to deal with the race
condition when AMQP router starts later than cloud-event-proxy
service, for example when a node is rebooted.

Signed-off-by: Jack Ding <jackding@gmail.com>
  • Loading branch information
jzding authored Dec 2, 2022
1 parent e226a93 commit 6d25e1b
Showing 1 changed file with 38 additions and 19 deletions.
57 changes: 38 additions & 19 deletions v1/amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,39 +15,58 @@
package amqp

import (
"context"
"sync"
"time"

"github.com/Azure/go-amqp"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/redhat-cne/sdk-go/pkg/channel"
"github.com/redhat-cne/sdk-go/pkg/errorhandler"
amqp1 "github.com/redhat-cne/sdk-go/pkg/protocol/amqp"
log "github.com/sirupsen/logrus"
)

var (
instance *AMQP
once sync.Once
instance *AMQP
retryTimeout = 500 * time.Millisecond
cancelTimeout = 30 * time.Second
)

//AMQP exposes amqp api methods
// AMQP exposes amqp api methods
type AMQP struct {
Router *amqp1.Router
}

//GetAMQPInstance get event instance
// GetAMQPInstance get event instance
func GetAMQPInstance(amqpHost string, dataIn <-chan *channel.DataChan, dataOut chan<- *channel.DataChan, closeCh <-chan struct{}) (*AMQP, error) {
once.Do(func() {
router, err := amqp1.InitServer(amqpHost, dataIn, dataOut, closeCh)
if err == nil {
instance = &AMQP{
Router: router,
}
ctx, cancel := context.WithTimeout(context.Background(), cancelTimeout)
defer cancel()
var router *amqp1.Router
var err error
for {
select {
case <-ctx.Done():
return nil, errorhandler.AMQPConnectionError{}
default:
}
})
router, err = amqp1.InitServer(amqpHost, dataIn, dataOut, closeCh)
if err != nil {
log.Info("retrying connecting to amqp.")
time.Sleep(retryTimeout)
continue
}

instance = &AMQP{
Router: router,
}
break
}
if instance == nil || instance.Router == nil {
return nil, errorhandler.AMQPConnectionError{Desc: "amqp connection error"}
return nil, errorhandler.AMQPConnectionError{}
}

if instance.Router.Client == nil {
client, err := instance.Router.NewClient(amqpHost, []amqp.ConnOption{})
if err != nil {
Expand All @@ -58,12 +77,12 @@ func GetAMQPInstance(amqpHost string, dataIn <-chan *channel.DataChan, dataOut c
return instance, nil
}

//Start start amqp processors
// Start start amqp processors
func (a *AMQP) Start(wg *sync.WaitGroup) {
go instance.Router.QDRRouter(wg)
}

//NewSender - create new sender independent of the framework
// NewSender - create new sender independent of the framework
func NewSender(hostName string, port int, address string) (*amqp1.Protocol, error) {
return amqp1.NewSender(hostName, port, address)
}
Expand All @@ -73,7 +92,7 @@ func NewReceiver(hostName string, port int, address string) (*amqp1.Protocol, er
return amqp1.NewReceiver(hostName, port, address)
}

//DeleteSender send publisher address information on a channel to delete its sender object
// DeleteSender send publisher address information on a channel to delete its sender object
func DeleteSender(inChan chan<- *channel.DataChan, address string) {
// go ahead and create QDR to this address
inChan <- &channel.DataChan{
Expand All @@ -83,7 +102,7 @@ func DeleteSender(inChan chan<- *channel.DataChan, address string) {
}
}

//CreateSender send publisher address information on a channel to create it's sender object
// CreateSender send publisher address information on a channel to create it's sender object
func CreateSender(inChan chan<- *channel.DataChan, address string) {
// go ahead and create QDR to this address
inChan <- &channel.DataChan{
Expand All @@ -93,7 +112,7 @@ func CreateSender(inChan chan<- *channel.DataChan, address string) {
}
}

//DeleteListener send subscription address information on a channel to delete its listener object
// DeleteListener send subscription address information on a channel to delete its listener object
func DeleteListener(inChan chan<- *channel.DataChan, address string) {
// go ahead and create QDR listener to this address
inChan <- &channel.DataChan{
Expand All @@ -103,7 +122,7 @@ func DeleteListener(inChan chan<- *channel.DataChan, address string) {
}
}

//CreateListener send subscription address information on a channel to create its listener object
// CreateListener send subscription address information on a channel to create its listener object
func CreateListener(inChan chan<- *channel.DataChan, address string) {
// go ahead and create QDR listener to this address
inChan <- &channel.DataChan{
Expand All @@ -113,7 +132,7 @@ func CreateListener(inChan chan<- *channel.DataChan, address string) {
}
}

//CreateNewStatusListener send status address information on a channel to create its listener object
// CreateNewStatusListener send status address information on a channel to create its listener object
func CreateNewStatusListener(inChan chan<- *channel.DataChan, address string,
onReceiveOverrideFn func(e cloudevents.Event, dataChan *channel.DataChan) error,
processEventFn func(e interface{}) error) {
Expand Down

0 comments on commit 6d25e1b

Please sign in to comment.