Skip to content

Commit

Permalink
feat: add callbacks on the event when discovery completed
Browse files Browse the repository at this point in the history
  • Loading branch information
Dmitry Selivanov committed May 11, 2023
1 parent 1d90652 commit b296555
Showing 1 changed file with 11 additions and 9 deletions.
20 changes: 11 additions & 9 deletions services_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ import (
type IServicesPool interface {
// Start run service pool discovering
// and healthchecks loops
Start(healthchecks bool, callback func(srv service.IService) error)
Start(healthchecks bool, onNewDiscCallback func(srv service.IService) error, onDiscCompletedCallback func())

// DiscoverServices discover all visible active
// services via service-discovery
DiscoverServices(callback func(srv service.IService) error) error
DiscoverServices(onNewDiscCallback func(srv service.IService) error, onDiscCompletedCallback func()) error

// NextService returns next active service
// to take a connection
Expand Down Expand Up @@ -88,8 +88,8 @@ func NewServicesPool(opts *ServicesPoolsOpts) IServicesPool {

// Start run service pool discovering
// and healthchecks loops
func (p *ServicesPool) Start(healthchecks bool, callback func(srv service.IService) error) {
go p.discoverServicesLoop(callback)
func (p *ServicesPool) Start(healthchecks bool, onNewDiscCallback func(srv service.IService) error, onDiscCompletedCallback func()) {
go p.discoverServicesLoop(onNewDiscCallback, onDiscCompletedCallback)

if healthchecks {
go p.list.HealthChecksLoop()
Expand All @@ -98,7 +98,7 @@ func (p *ServicesPool) Start(healthchecks bool, callback func(srv service.IServi

// DiscoverServices discover all visible active
// services via service-discovery
func (p *ServicesPool) DiscoverServices(callback func(srv service.IService) error) error {
func (p *ServicesPool) DiscoverServices(onNewDiscCallback func(srv service.IService) error, onDiscCompletedCallback func()) error {
newServices, err := p.discovery.Discover(p.name)
if err != nil {
return fmt.Errorf("error discovering %s active: %w", p.name, err)
Expand All @@ -123,12 +123,14 @@ func (p *ServicesPool) DiscoverServices(callback func(srv service.IService) erro

p.list.Add(mutatedService)

if callback != nil {
if err := callback(mutatedService); err != nil {
if onNewDiscCallback != nil {
if err := onNewDiscCallback(mutatedService); err != nil {
logger.Log().Warn(fmt.Sprintf("callback on new discovered service: %s", err))
}
}
}

onDiscCompletedCallback()
return nil
}

Expand Down Expand Up @@ -158,7 +160,7 @@ func (p *ServicesPool) Close() {

// discoverServicesLoop spawn discovery for
// services periodically
func (p *ServicesPool) discoverServicesLoop(callback func(srv service.IService) error) {
func (p *ServicesPool) discoverServicesLoop(onNewDiscCallback func(srv service.IService) error, onDiscCompletedCallback func()) {
logger.Log().Info("start discovery loop")

onceShuffled := false
Expand All @@ -168,7 +170,7 @@ func (p *ServicesPool) discoverServicesLoop(callback func(srv service.IService)
logger.Log().Warn("Stop discovery loop")
return
default:
if err := p.DiscoverServices(callback); err != nil {
if err := p.DiscoverServices(onNewDiscCallback, onDiscCompletedCallback); err != nil {
logger.Log().Warn(fmt.Errorf("error discovery services: %w", err).Error())
}

Expand Down

0 comments on commit b296555

Please sign in to comment.