Skip to content

Commit

Permalink
Remove refetching from resourceWatcher (#14262)
Browse files Browse the repository at this point in the history
The resourceWatcher is meant to be a long lived way for a component
to receive events about a particular resource from an upstream cache.
However, there was a refetching mechanism that would cause a healthy
and subscribed watcher to be closed, the resourceWatcher to fetch all
the resource types it is watching from the upstream cache and to create a
new watcher **every 10 minutes**. This causes unneeded load on
the upstream cache and also eats up network bandwidth.

This removes the refetching behavior entirely to ensure watchers
aren't unnecessarily closed. The change should be transparent to
users of the resourceWatcher, but should noticeably reduce both
the number of init events being emitted through out a cluster
and the number of cache reads.

Fixes #14234
  • Loading branch information
rosstimothy authored Jul 11, 2022
1 parent 39214dd commit dea633f
Showing 1 changed file with 2 additions and 16 deletions.
18 changes: 2 additions & 16 deletions lib/services/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,6 @@ type ResourceWatcherConfig struct {
Log logrus.FieldLogger
// MaxRetryPeriod is the maximum retry period on failed watchers.
MaxRetryPeriod time.Duration
// RefetchPeriod is a period after which to explicitly refetch the resources.
// It is to protect against unexpected cache syncing issues.
RefetchPeriod time.Duration
// Clock is used to control time.
Clock clockwork.Clock
// Client is used to create new watchers.
Expand All @@ -80,9 +77,6 @@ func (cfg *ResourceWatcherConfig) CheckAndSetDefaults() error {
if cfg.MaxRetryPeriod == 0 {
cfg.MaxRetryPeriod = defaults.MaxWatcherBackoff
}
if cfg.RefetchPeriod == 0 {
cfg.RefetchPeriod = defaults.LowResPollingPeriod
}
if cfg.Clock == nil {
cfg.Clock = clockwork.NewRealClock()
}
Expand Down Expand Up @@ -217,10 +211,7 @@ func (p *resourceWatcher) runWatchLoop() {
}
if err != nil {
p.Log.Warningf("Restart watch on error: %v.", err)
} else {
p.Log.Debug("Triggering scheduled refetch.")
}

}
}

Expand All @@ -236,7 +227,6 @@ func (p *resourceWatcher) watch() error {
return trace.Wrap(err)
}
defer watcher.Close()
refetchC := time.After(p.RefetchPeriod)

// before fetch, make sure watcher is synced by receiving init event,
// to avoid the scenario:
Expand All @@ -254,9 +244,7 @@ func (p *resourceWatcher) watch() error {
// by receiving init event first.
select {
case <-watcher.Done():
return trace.ConnectionProblem(watcher.Error(), "watcher is closed")
case <-refetchC:
return nil
return trace.ConnectionProblem(watcher.Error(), "watcher is closed: %v", watcher.Error())
case <-p.ctx.Done():
return trace.ConnectionProblem(p.ctx.Err(), "context is closing")
case event := <-watcher.Events():
Expand All @@ -274,9 +262,7 @@ func (p *resourceWatcher) watch() error {
for {
select {
case <-watcher.Done():
return trace.ConnectionProblem(watcher.Error(), "watcher is closed")
case <-refetchC:
return nil
return trace.ConnectionProblem(watcher.Error(), "watcher is closed: %v", watcher.Error())
case <-p.ctx.Done():
return trace.ConnectionProblem(p.ctx.Err(), "context is closing")
case event := <-watcher.Events():
Expand Down

0 comments on commit dea633f

Please sign in to comment.