Skip to content

Commit

Permalink
Merge pull request #27 from gateway-fm/fix/call-srv-close-when-remove…
Browse files Browse the repository at this point in the history
…d-from-list

Fix/call srv close when removed from list
  • Loading branch information
dmitriyselivanov authored Jun 26, 2023
2 parents 409e723 + 67104a3 commit 64332bd
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 9 deletions.
6 changes: 6 additions & 0 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type IService interface {
NodeName() string

Tags() map[string]struct{}

Close() error
}

// TODO split address field to host and port
Expand Down Expand Up @@ -83,6 +85,10 @@ func (n *BaseService) Tags() map[string]struct{} {
return n.tags
}

func (n *BaseService) Close() error {
return nil
}

// generateServiceID create BaseService unique id by
// hashing given address string
func generateServiceID(addr string) string {
Expand Down
8 changes: 8 additions & 0 deletions services_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,10 @@ func (l *ServicesList) RemoveFromHealthyByIndex(i int) {
l.mu.Lock()
defer l.mu.Unlock()

if err := l.healthy[i].Close(); err != nil {
logger.Log().Warn(fmt.Errorf("unexpected error during service Close(): %w", err).Error())
}

l.healthy = append(l.healthy[:i], l.healthy[i+1:]...)
}

Expand All @@ -329,6 +333,10 @@ func (l *ServicesList) RemoveFromJail(srv service.IService) {
defer l.mu.Unlock()
l.mu.Lock()

if err := srv.Close(); err != nil {
logger.Log().Warn(fmt.Errorf("unexpected error during service Close(): %w", err).Error())
}

delete(l.jail, srv.ID())
}

Expand Down
40 changes: 31 additions & 9 deletions services_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type IServicesPool interface {
SetOnDiscRemoveCallback(f ServiceCallback)

SetOnDiscCompletedCallback(f func())

SetMutationNeededCallback(f ServiceCallbackB)
}

// ServicesPool holds information about reachable
Expand All @@ -62,6 +64,8 @@ type ServicesPool struct {
onDiscRemoveCallback ServiceCallback

onDiscCompletedCallback func()

mutationNeededCallback ServiceCallbackB
}

// ServicesPoolsOpts is options that needs
Expand All @@ -79,6 +83,7 @@ type ServicesPoolsOpts struct {

type ServiceCallbackE func(srv service.IService) error
type ServiceCallback func(srv service.IService)
type ServiceCallbackB func(srv service.IService) bool

// NewServicesPool create new Services Pool
// based on given params
Expand Down Expand Up @@ -162,19 +167,28 @@ func (p *ServicesPool) DiscoverServices() error {
continue
}

mutatedService, err := p.MutationFnc(newService)
if err != nil {
logger.Log().Warn(fmt.Sprintf("mutate new discovered service: %s", err))
continue
}
// if service doesn't exist in pool or if the callback returns true --
// then we do a mutation.
// otherwise we prefer not to mutate srv to prevent spawning unnecessary goroutines
isServiceExists := p.list.IsServiceExists(newService)
weNeedToMutate := !isServiceExists || (p.mutationNeededCallback != nil && p.mutationNeededCallback(newService))
var mutatedService service.IService

if weNeedToMutate {
mutatedService, err = p.MutationFnc(newService)
if err != nil {
logger.Log().Warn(fmt.Sprintf("mutate new discovered service: %s", err))
continue
}

if p.onNewDiscCallback != nil {
if err := p.onNewDiscCallback(mutatedService); err != nil {
logger.Log().Warn(fmt.Sprintf("callback on new discovered service: %s", err))
if p.onNewDiscCallback != nil {
if err := p.onNewDiscCallback(mutatedService); err != nil {
logger.Log().Warn(fmt.Sprintf("callback on new discovered service: %s", err))
}
}
}

if p.list.IsServiceExists(newService) {
if isServiceExists {
continue
}
p.list.Add(mutatedService)
Expand Down Expand Up @@ -230,6 +244,14 @@ func (p *ServicesPool) SetOnDiscRemoveCallback(f ServiceCallback) {
p.onDiscRemoveCallback = f
}

func (p *ServicesPool) SetMutationNeededCallback(f ServiceCallbackB) {
if p == nil {
return
}

p.mutationNeededCallback = f
}

// discoverServicesLoop spawn discovery for
// services periodically
func (p *ServicesPool) discoverServicesLoop() {
Expand Down

0 comments on commit 64332bd

Please sign in to comment.