Skip to content

Commit

Permalink
Implemented (not optimal) discovery stopping #140
Browse files Browse the repository at this point in the history
  • Loading branch information
yyyar committed Jun 9, 2019
1 parent bce940a commit e04dd98
Showing 1 changed file with 65 additions and 8 deletions.
73 changes: 65 additions & 8 deletions src/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ type Discovery struct {
* Channel where to push newly discovered backends
*/
out chan ([]core.Backend)

/**
* Channel for stopping discovery
*/
stop chan bool
}

/**
Expand All @@ -91,53 +96,105 @@ func (this *Discovery) Start() {
log := logging.For("discovery")

this.out = make(chan []core.Backend)
this.stop = make(chan bool)

// Prepare interval
interval, err := time.ParseDuration(this.cfg.Interval)
if err != nil {
log.Fatal(err)
}

// TODO: rewrite with channels for stop
go func() {
for {
backends, err := this.fetch(this.cfg)

select {
case <-this.stop:
log.Info("Stopping discovery ", this.cfg)
return
default:
}

if err != nil {
log.Error(this.cfg.Kind, " error ", err, " retrying in ", this.opts.RetryWaitDuration.String())

log.Info("Applying failpolicy ", this.cfg.Failpolicy)

if this.cfg.Failpolicy == "setempty" {
this.backends = &[]core.Backend{}
this.out <- *this.backends
if !this.send() {
log.Info("Stopping discovery ", this.cfg)
return
}
}

if !this.wait(this.opts.RetryWaitDuration) {
log.Info("Stopping discovery ", this.cfg)
return
}

time.Sleep(this.opts.RetryWaitDuration)
continue
}

// cache
this.backends = backends

// out
this.out <- *this.backends
if !this.send() {
log.Info("Stopping discovery ", this.cfg)
return
}

// exit gorouting if no cacheTtl
// used for static discovery
if interval == 0 {
return
}

time.Sleep(interval)
if !this.wait(interval) {
log.Info("Stopping discovery ", this.cfg)
return
}
}
}()
}

func (this *Discovery) send() bool {
// out if not stopped
select {
case <-this.stop:
return false
default:
this.out <- *this.backends
return true
}
}

/**
* wait waits for interval or stop
* returns true if waiting was successfull
* return false if waiting was interrupted with stop
*/
func (this *Discovery) wait(interval time.Duration) bool {

t := time.NewTimer(interval)

select {
case <-t.C:
return true

case <-this.stop:
if !t.Stop() {
<-t.C
}
return false
}

}

/**
* Stop discovery
*/
func (this *Discovery) Stop() {
// TODO: Add stopping function
this.stop <- true
}

/**
Expand Down

0 comments on commit e04dd98

Please sign in to comment.