diff --git a/v2/pkg/tcr/connectionhost.go b/v2/pkg/tcr/connectionhost.go index 17d5a30..f8efe3e 100644 --- a/v2/pkg/tcr/connectionhost.go +++ b/v2/pkg/tcr/connectionhost.go @@ -55,6 +55,11 @@ func NewConnectionHost( // Connect tries to connect (or reconnect) to the provided properties of the host one time. func (ch *ConnectionHost) Connect() bool { + return ch.ConnectWithErrorHandler(nil) +} + +// ConnectWithErrorHandler tries to connect (or reconnect) to the provided properties of the host one time with an error handler. +func (ch *ConnectionHost) ConnectWithErrorHandler(errorHandler func(error)) bool { // Compare, Lock, Recompare Strategy if ch.Connection != nil && !ch.Connection.IsClosed() /* <- atomic */ { @@ -80,6 +85,9 @@ func (ch *ConnectionHost) Connect() bool { ch.tlsConfig.PEMCertLocation, ch.tlsConfig.LocalCertLocation) if err != nil { + if errorHandler != nil { + errorHandler(err) + } return false } } @@ -103,6 +111,9 @@ func (ch *ConnectionHost) Connect() bool { }) } if err != nil { + if errorHandler != nil { + errorHandler(err) + } return false } diff --git a/v2/pkg/tcr/connectionpool.go b/v2/pkg/tcr/connectionpool.go index bdf6cbf..54b3c72 100644 --- a/v2/pkg/tcr/connectionpool.go +++ b/v2/pkg/tcr/connectionpool.go @@ -23,41 +23,26 @@ type ConnectionPool struct { flaggedConnections map[uint64]bool sleepOnErrorInterval time.Duration errorHandler func(error) + unhealthyHandler func(error) } // NewConnectionPool creates hosting structure for the ConnectionPool. func NewConnectionPool(config *PoolConfig) (*ConnectionPool, error) { - - if config.Heartbeat == 0 || config.ConnectionTimeout == 0 { - return nil, errors.New("connectionpool heartbeat or connectiontimeout can't be 0") - } - - if config.MaxConnectionCount == 0 { - return nil, errors.New("connectionpool maxconnectioncount can't be 0") - } - - cp := &ConnectionPool{ - Config: *config, - uri: config.URI, - heartbeatInterval: time.Duration(config.Heartbeat) * time.Second, - connectionTimeout: time.Duration(config.ConnectionTimeout) * time.Second, - connections: queue.New(int64(config.MaxConnectionCount)), // possible overflow error - channels: make(chan *ChannelHost, config.MaxCacheChannelCount), - poolRWLock: &sync.RWMutex{}, - flaggedConnections: make(map[uint64]bool), - sleepOnErrorInterval: time.Duration(config.SleepOnErrorInterval) * time.Millisecond, - } - - if ok := cp.initializeConnections(); !ok { - return nil, errors.New("initialization failed during connection creation") - } - - return cp, nil + return NewConnectionPoolWithHandlers(config, nil, nil) } -// NewConnectionPoolWithErrorHandler creates hosting structure for the ConnectionPool. +// NewConnectionPoolWithErrorHandler creates hosting structure for the ConnectionPool with an error handler. func NewConnectionPoolWithErrorHandler(config *PoolConfig, errorHandler func(error)) (*ConnectionPool, error) { + return NewConnectionPoolWithHandlers(config, errorHandler, nil) +} +// NewConnectionPoolWithUnhealthyHandler creates hosting structure for the ConnectionPool with an unhealthy handler. +func NewConnectionPoolWithUnhealthyHandler(config *PoolConfig, unhealthyHandler func(error)) (*ConnectionPool, error) { + return NewConnectionPoolWithHandlers(config, nil, unhealthyHandler) +} + +// NewConnectionPoolWithHandlers creates hosting structure for the ConnectionPool with an error and/or unhealthy handler. +func NewConnectionPoolWithHandlers(config *PoolConfig, errorHandler func(error), unhealthyHandler func(error)) (*ConnectionPool, error) { if config.Heartbeat == 0 || config.ConnectionTimeout == 0 { return nil, errors.New("connectionpool heartbeat or connectiontimeout can't be 0") } @@ -77,6 +62,7 @@ func NewConnectionPoolWithErrorHandler(config *PoolConfig, errorHandler func(err flaggedConnections: make(map[uint64]bool), sleepOnErrorInterval: time.Duration(config.SleepOnErrorInterval) * time.Millisecond, errorHandler: errorHandler, + unhealthyHandler: unhealthyHandler, } if ok := cp.initializeConnections(); !ok { @@ -158,8 +144,11 @@ func (cp *ConnectionPool) verifyHealthyConnection(connHost *ConnectionHost) { healthy := true select { - case <-connHost.Errors: + case err := <-connHost.Errors: healthy = false + if cp.unhealthyHandler != nil { + cp.unhealthyHandler(err) + } default: break } @@ -167,7 +156,7 @@ func (cp *ConnectionPool) verifyHealthyConnection(connHost *ConnectionHost) { flagged := cp.isConnectionFlagged(connHost.ConnectionID) // Between these three states we do our best to determine that a connection is dead in the various lifecycles. - if flagged || !healthy || connHost.Connection.IsClosed( /* atomic */ ) { + if flagged || !healthy || connHost.Connection.IsClosed( /* atomic */) { cp.triggerConnectionRecovery(connHost) } @@ -178,7 +167,7 @@ func (cp *ConnectionPool) triggerConnectionRecovery(connHost *ConnectionHost) { // InfiniteLoop: Stay here till we reconnect. for { - ok := connHost.Connect() + ok := connHost.ConnectWithErrorHandler(cp.unhealthyHandler) if !ok { if cp.sleepOnErrorInterval > 0 { time.Sleep(cp.sleepOnErrorInterval) diff --git a/v2/pkg/tcr/rabbitservice.go b/v2/pkg/tcr/rabbitservice.go index c9aa5d8..a2dd284 100644 --- a/v2/pkg/tcr/rabbitservice.go +++ b/v2/pkg/tcr/rabbitservice.go @@ -39,11 +39,35 @@ func NewRabbitService( return nil, err } + return NewRabbitServiceWithConnectionPool(connectionPool, config, passphrase, salt, processPublishReceipts, processError) +} + +// NewRabbitServiceWithConnectionPool creates everything you need for a RabbitMQ communication service from a connection pool. +func NewRabbitServiceWithConnectionPool( + connectionPool *ConnectionPool, + config *RabbitSeasoning, + passphrase string, + salt string, + processPublishReceipts func(*PublishReceipt), + processError func(error)) (*RabbitService, error) { + publisher := NewPublisherFromConfig(config, connectionPool) - topologer := NewTopologer(connectionPool) + return NewRabbitServiceWithPublisher(publisher, config, passphrase, salt, processPublishReceipts, processError) +} + +// NewRabbitServiceWithPublisher creates everything you need for a RabbitMQ communication service from a publisher. +func NewRabbitServiceWithPublisher( + publisher *Publisher, + config *RabbitSeasoning, + passphrase string, + salt string, + processPublishReceipts func(*PublishReceipt), + processError func(error)) (*RabbitService, error) { + + topologer := NewTopologer(publisher.ConnectionPool) rs := &RabbitService{ - ConnectionPool: connectionPool, + ConnectionPool: publisher.ConnectionPool, Config: config, Publisher: publisher, Topologer: topologer, @@ -55,7 +79,7 @@ func NewRabbitService( } // Build a Map for Consumer retrieval. - err = rs.createConsumers(config.ConsumerConfigs) + err := rs.createConsumers(config.ConsumerConfigs) if err != nil { return nil, err }