diff --git a/probe/kubernetes/client.go b/probe/kubernetes/client.go index 33110c7cc4..49c47ef22b 100644 --- a/probe/kubernetes/client.go +++ b/probe/kubernetes/client.go @@ -16,9 +16,11 @@ import ( clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" - "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/pkg/util/runtime" ) +const maxRetryDelay = 5 * time.Minute + // Client keeps track of running kubernetes pods and services type Client interface { Stop() @@ -51,19 +53,45 @@ type client struct { replicationControllerStore *cache.StoreToReplicationControllerLister nodeStore *cache.StoreToNodeLister - podWatchesMutex sync.Mutex - podWatches []func(Event, Pod) + podWatchesMutex sync.Mutex + podWatches []func(Event, Pod) } // runReflectorUntil is equivalent to cache.Reflector.RunUntil, but it also logs -// errors, which cache.Reflector.RunUntil simply ignores +// errors (which cache.Reflector.RunUntil simply ignores) and backs off exponentially +// on errors. func runReflectorUntil(r *cache.Reflector, resyncPeriod time.Duration, stopCh <-chan struct{}) { - loggingListAndWatch := func() { - if err := r.ListAndWatch(stopCh); err != nil { + select { + case <-stopCh: + return + default: + } + + var err error + wait := resyncPeriod + for { + func() { + defer runtime.HandleCrash() + err = r.ListAndWatch(stopCh) + }() + + if err != nil { log.Errorf("Kubernetes reflector: %v", err) + wait *= 2 + if wait > maxRetryDelay { + wait = maxRetryDelay + } + } else { + wait = resyncPeriod + } + + + select { + case <-stopCh: + return + case <-time.After(wait): } } - go wait.Until(loggingListAndWatch, resyncPeriod, stopCh) } // ClientConfig establishes the configuration for the kubernetes client @@ -169,7 +197,7 @@ func (c *client) setupStore(kclient cache.Getter, resource string, itemType inte if store == nil { store = cache.NewStore(cache.MetaNamespaceKeyFunc) } - runReflectorUntil(cache.NewReflector(lw, itemType, store, c.resyncPeriod), c.resyncPeriod, c.quit) + go runReflectorUntil(cache.NewReflector(lw, itemType, store, c.resyncPeriod), c.resyncPeriod, c.quit) return store }