Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add handling for unhealthy connections via new methods/"constructors" #33

Merged
merged 6 commits into from
Jul 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions v2/pkg/tcr/connectionhost.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ func NewConnectionHost(

// Connect tries to connect (or reconnect) to the provided properties of the host one time.
func (ch *ConnectionHost) Connect() bool {
return ch.ConnectWithErrorHandler(nil)
}

// ConnectWithErrorHandler tries to connect (or reconnect) to the provided properties of the host one time with an error handler.
func (ch *ConnectionHost) ConnectWithErrorHandler(errorHandler func(error)) bool {

// Compare, Lock, Recompare Strategy
if ch.Connection != nil && !ch.Connection.IsClosed() /* <- atomic */ {
Expand All @@ -80,6 +85,9 @@ func (ch *ConnectionHost) Connect() bool {
ch.tlsConfig.PEMCertLocation,
ch.tlsConfig.LocalCertLocation)
if err != nil {
if errorHandler != nil {
errorHandler(err)
}
return false
}
}
Expand All @@ -103,6 +111,9 @@ func (ch *ConnectionHost) Connect() bool {
})
}
if err != nil {
if errorHandler != nil {
errorHandler(err)
}
return false
}

Expand Down
49 changes: 19 additions & 30 deletions v2/pkg/tcr/connectionpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,41 +23,26 @@ type ConnectionPool struct {
flaggedConnections map[uint64]bool
sleepOnErrorInterval time.Duration
errorHandler func(error)
unhealthyHandler func(error)
}

// NewConnectionPool creates hosting structure for the ConnectionPool.
func NewConnectionPool(config *PoolConfig) (*ConnectionPool, error) {

if config.Heartbeat == 0 || config.ConnectionTimeout == 0 {
return nil, errors.New("connectionpool heartbeat or connectiontimeout can't be 0")
}

if config.MaxConnectionCount == 0 {
return nil, errors.New("connectionpool maxconnectioncount can't be 0")
}

cp := &ConnectionPool{
Config: *config,
uri: config.URI,
heartbeatInterval: time.Duration(config.Heartbeat) * time.Second,
connectionTimeout: time.Duration(config.ConnectionTimeout) * time.Second,
connections: queue.New(int64(config.MaxConnectionCount)), // possible overflow error
channels: make(chan *ChannelHost, config.MaxCacheChannelCount),
poolRWLock: &sync.RWMutex{},
flaggedConnections: make(map[uint64]bool),
sleepOnErrorInterval: time.Duration(config.SleepOnErrorInterval) * time.Millisecond,
}

if ok := cp.initializeConnections(); !ok {
return nil, errors.New("initialization failed during connection creation")
}

return cp, nil
return NewConnectionPoolWithHandlers(config, nil, nil)
}

// NewConnectionPoolWithErrorHandler creates hosting structure for the ConnectionPool.
// NewConnectionPoolWithErrorHandler creates hosting structure for the ConnectionPool with an error handler.
func NewConnectionPoolWithErrorHandler(config *PoolConfig, errorHandler func(error)) (*ConnectionPool, error) {
return NewConnectionPoolWithHandlers(config, errorHandler, nil)
}

// NewConnectionPoolWithUnhealthyHandler creates hosting structure for the ConnectionPool with an unhealthy handler.
func NewConnectionPoolWithUnhealthyHandler(config *PoolConfig, unhealthyHandler func(error)) (*ConnectionPool, error) {
return NewConnectionPoolWithHandlers(config, nil, unhealthyHandler)
}

// NewConnectionPoolWithHandlers creates hosting structure for the ConnectionPool with an error and/or unhealthy handler.
func NewConnectionPoolWithHandlers(config *PoolConfig, errorHandler func(error), unhealthyHandler func(error)) (*ConnectionPool, error) {
if config.Heartbeat == 0 || config.ConnectionTimeout == 0 {
return nil, errors.New("connectionpool heartbeat or connectiontimeout can't be 0")
}
Expand All @@ -77,6 +62,7 @@ func NewConnectionPoolWithErrorHandler(config *PoolConfig, errorHandler func(err
flaggedConnections: make(map[uint64]bool),
sleepOnErrorInterval: time.Duration(config.SleepOnErrorInterval) * time.Millisecond,
errorHandler: errorHandler,
unhealthyHandler: unhealthyHandler,
}

if ok := cp.initializeConnections(); !ok {
Expand Down Expand Up @@ -158,16 +144,19 @@ func (cp *ConnectionPool) verifyHealthyConnection(connHost *ConnectionHost) {

healthy := true
select {
case <-connHost.Errors:
case err := <-connHost.Errors:
healthy = false
if cp.unhealthyHandler != nil {
cp.unhealthyHandler(err)
}
default:
break
}

flagged := cp.isConnectionFlagged(connHost.ConnectionID)

// Between these three states we do our best to determine that a connection is dead in the various lifecycles.
if flagged || !healthy || connHost.Connection.IsClosed( /* atomic */ ) {
if flagged || !healthy || connHost.Connection.IsClosed( /* atomic */) {
cp.triggerConnectionRecovery(connHost)
}

Expand All @@ -178,7 +167,7 @@ func (cp *ConnectionPool) triggerConnectionRecovery(connHost *ConnectionHost) {

// InfiniteLoop: Stay here till we reconnect.
for {
ok := connHost.Connect()
ok := connHost.ConnectWithErrorHandler(cp.unhealthyHandler)
if !ok {
if cp.sleepOnErrorInterval > 0 {
time.Sleep(cp.sleepOnErrorInterval)
Expand Down
30 changes: 27 additions & 3 deletions v2/pkg/tcr/rabbitservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,35 @@ func NewRabbitService(
return nil, err
}

return NewRabbitServiceWithConnectionPool(connectionPool, config, passphrase, salt, processPublishReceipts, processError)
}

// NewRabbitServiceWithConnectionPool creates everything you need for a RabbitMQ communication service from a connection pool.
func NewRabbitServiceWithConnectionPool(
connectionPool *ConnectionPool,
config *RabbitSeasoning,
passphrase string,
salt string,
processPublishReceipts func(*PublishReceipt),
processError func(error)) (*RabbitService, error) {

publisher := NewPublisherFromConfig(config, connectionPool)
topologer := NewTopologer(connectionPool)
return NewRabbitServiceWithPublisher(publisher, config, passphrase, salt, processPublishReceipts, processError)
}

// NewRabbitServiceWithPublisher creates everything you need for a RabbitMQ communication service from a publisher.
func NewRabbitServiceWithPublisher(
publisher *Publisher,
config *RabbitSeasoning,
passphrase string,
salt string,
processPublishReceipts func(*PublishReceipt),
processError func(error)) (*RabbitService, error) {

topologer := NewTopologer(publisher.ConnectionPool)

rs := &RabbitService{
ConnectionPool: connectionPool,
ConnectionPool: publisher.ConnectionPool,
Config: config,
Publisher: publisher,
Topologer: topologer,
Expand All @@ -55,7 +79,7 @@ func NewRabbitService(
}

// Build a Map for Consumer retrieval.
err = rs.createConsumers(config.ConsumerConfigs)
err := rs.createConsumers(config.ConsumerConfigs)
if err != nil {
return nil, err
}
Expand Down