Skip to content

Commit

Permalink
Moved watch logic to the right place
Browse files Browse the repository at this point in the history
  • Loading branch information
pierresouchay committed Feb 6, 2018
1 parent ba75e2f commit 5f87db9
Showing 1 changed file with 11 additions and 8 deletions.
19 changes: 11 additions & 8 deletions watch/view.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ func (v *View) DataAndLastIndex() (interface{}, uint64) {
func (v *View) poll(viewCh chan<- *View, errCh chan<- error) {
var retries int
for {
start := time.Now()
doneCh := make(chan struct{}, 1)
successCh := make(chan struct{}, 1)
fetchErrCh := make(chan error, 1)
Expand All @@ -151,13 +150,7 @@ func (v *View) poll(viewCh chan<- *View, errCh chan<- error) {
if v.once {
return
}
elapsed := time.Since(start)
if v.rateLimitFunc != nil {
wait, sleep := v.rateLimitFunc(elapsed)
if wait {
<-time.After(sleep)
}
}

case <-successCh:
// We successfully received a non-error response from the server. This
// does not mean we have data (that's dataCh's job), but rather this
Expand Down Expand Up @@ -221,6 +214,7 @@ func (v *View) fetch(doneCh, successCh chan<- struct{}, errCh chan<- error) {
return
default:
}
start := time.Now()

data, rm, err := v.dependency.Fetch(v.clients, &dep.QueryOptions{
AllowStale: allowStale,
Expand Down Expand Up @@ -262,6 +256,15 @@ func (v *View) fetch(doneCh, successCh chan<- struct{}, errCh chan<- error) {
allowStale = true
}

if v.rateLimitFunc != nil {
elapsed := time.Since(start)
wait, sleep := v.rateLimitFunc(elapsed)
if wait {
log.Printf("[TRACE] (view) %s %s elapsed, sleeping for %s", v.dependency, elapsed, sleep)
<-time.After(sleep)
}
}

if rm.LastIndex == v.lastIndex {
log.Printf("[TRACE] (view) %s no new data (index was the same)", v.dependency)
continue
Expand Down

0 comments on commit 5f87db9

Please sign in to comment.