Skip to content

Commit

Permalink
Back off upon errored kubernetes api requests
Browse files Browse the repository at this point in the history
fixes #1009
  • Loading branch information
rndstr committed Jun 5, 2017
1 parent 06bb515 commit 6a2dd58
Showing 1 changed file with 36 additions and 8 deletions.
44 changes: 36 additions & 8 deletions probe/kubernetes/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ import (
clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/runtime"
)

const maxRetryDelay = 5 * time.Minute

// Client keeps track of running kubernetes pods and services
type Client interface {
Stop()
Expand Down Expand Up @@ -51,19 +53,45 @@ type client struct {
replicationControllerStore *cache.StoreToReplicationControllerLister
nodeStore *cache.StoreToNodeLister

podWatchesMutex sync.Mutex
podWatches []func(Event, Pod)
podWatchesMutex sync.Mutex
podWatches []func(Event, Pod)
}

// runReflectorUntil is equivalent to cache.Reflector.RunUntil, but it also logs
// errors, which cache.Reflector.RunUntil simply ignores
// errors (which cache.Reflector.RunUntil simply ignores) and backs off exponentially
// on errors.
func runReflectorUntil(r *cache.Reflector, resyncPeriod time.Duration, stopCh <-chan struct{}) {
loggingListAndWatch := func() {
if err := r.ListAndWatch(stopCh); err != nil {
select {
case <-stopCh:
return
default:
}

var err error
wait := resyncPeriod
for {
func() {
defer runtime.HandleCrash()
err = r.ListAndWatch(stopCh)
}()

if err != nil {
log.Errorf("Kubernetes reflector: %v", err)
wait *= 2
if wait > maxRetryDelay {
wait = maxRetryDelay
}
} else {
wait = resyncPeriod
}


select {
case <-stopCh:
return
case <-time.After(wait):
}
}
go wait.Until(loggingListAndWatch, resyncPeriod, stopCh)
}

// ClientConfig establishes the configuration for the kubernetes client
Expand Down Expand Up @@ -169,7 +197,7 @@ func (c *client) setupStore(kclient cache.Getter, resource string, itemType inte
if store == nil {
store = cache.NewStore(cache.MetaNamespaceKeyFunc)
}
runReflectorUntil(cache.NewReflector(lw, itemType, store, c.resyncPeriod), c.resyncPeriod, c.quit)
go runReflectorUntil(cache.NewReflector(lw, itemType, store, c.resyncPeriod), c.resyncPeriod, c.quit)
return store
}

Expand Down

0 comments on commit 6a2dd58

Please sign in to comment.