Skip to content

Commit

Permalink
Retry reaching the server if there are transient errors
Browse files Browse the repository at this point in the history
Signed-off-by: Chetan Banavikalmutt <chetanrns1997@gmail.com>
  • Loading branch information
chetan-rns committed May 9, 2024
1 parent c129232 commit b3e1c67
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 11 deletions.
34 changes: 24 additions & 10 deletions pkg/cache/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ type WeightedSemaphore interface {
Release(n int64)
}

type ListRetryFunc func(err error) bool
type RetryFunc func(err error) bool

// NewClusterCache creates new instance of cluster cache
func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCache {
Expand Down Expand Up @@ -176,9 +176,10 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa
log: log,
listRetryLimit: 1,
listRetryUseBackoff: false,
listRetryFunc: ListRetryFuncNever,
listRetryFunc: RetryFuncNever,
connectionStatus: ConnectionStatusUnknown,
watchFails: newWatchFailures(),
clusterStatusRetryFunc: RetryFuncNever,
clusterConnectionInterval: defaultClusterConnectionInterval,
}
for i := range opts {
Expand Down Expand Up @@ -208,6 +209,8 @@ type clusterCache struct {
// watchFails is used to keep track of the failures while watching resources.
watchFails *watchFailures

clusterStatusRetryFunc RetryFunc

apisMeta map[schema.GroupKind]*apiMeta
serverVersion string
apiResources []kube.APIResourceInfo
Expand All @@ -228,7 +231,7 @@ type clusterCache struct {
// retry options for list operations
listRetryLimit int32
listRetryUseBackoff bool
listRetryFunc ListRetryFunc
listRetryFunc RetryFunc

// lock is a rw lock which protects the fields of clusterInfo
lock sync.RWMutex
Expand Down Expand Up @@ -264,13 +267,13 @@ type clusterCacheSync struct {
resyncTimeout time.Duration
}

// ListRetryFuncNever never retries on errors
func ListRetryFuncNever(err error) bool {
// RetryFuncNever never retries on errors
func RetryFuncNever(err error) bool {
return false
}

// ListRetryFuncAlways always retries on errors
func ListRetryFuncAlways(err error) bool {
// RetryFuncAlways always retries on errors
func RetryFuncAlways(err error) bool {
return true
}

Expand Down Expand Up @@ -1248,6 +1251,10 @@ func (c *clusterCache) StartClusterConnectionStatusMonitoring(ctx context.Contex
}

func (c *clusterCache) clusterConnectionService(ctx context.Context) {
if c.clusterConnectionInterval <= 0 {
return
}

ticker := time.NewTicker(c.clusterConnectionInterval)
defer ticker.Stop()

Expand All @@ -1268,16 +1275,23 @@ func (c *clusterCache) clusterConnectionService(ctx context.Context) {
}

if watchErrors > 0 || watchesRecovered {
c.log.V(1).Info("verifying cluster connection", "watches", watchErrors)

_, err := c.kubectl.GetServerVersion(c.config)
c.log.V(1).Info("verifying cluster connection", "server", c.config.Host)
// Retry fetching the server version to avoid invalidating the cache due to transient errors.
err := retry.OnError(retry.DefaultBackoff, c.clusterStatusRetryFunc, func() error {
_, err := c.kubectl.GetServerVersion(c.config)
if err != nil && c.clusterStatusRetryFunc(err) {
c.log.V(1).Info("Error while fetching server version", "error", err.Error())
}
return err
})
if err != nil {
c.updateConnectionStatus(ConnectionStatusFailed)
} else {
c.updateConnectionStatus(ConnectionStatusSuccessful)
}
}
case <-ctx.Done():
c.log.V(1).Info("Stopping cluster connection status monitoring", "server", c.config.Host)
ticker.Stop()
return
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/cache/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func SetTracer(tracer tracing.Tracer) UpdateSettingsFunc {
}

// SetRetryOptions sets cluster list retry options
func SetRetryOptions(maxRetries int32, useBackoff bool, retryFunc ListRetryFunc) UpdateSettingsFunc {
func SetRetryOptions(maxRetries int32, useBackoff bool, retryFunc RetryFunc) UpdateSettingsFunc {
return func(cache *clusterCache) {
// Max retries must be at least one
if maxRetries < 1 {
Expand Down Expand Up @@ -177,3 +177,10 @@ func SetClusterConnectionInterval(interval time.Duration) UpdateSettingsFunc {
cache.clusterConnectionInterval = interval
}
}

// SetClusterStatusRetryFunc sets the retry function for monitoring the cluster connection status.
func SetClusterStatusRetryFunc(retryFunc RetryFunc) UpdateSettingsFunc {
return func(cache *clusterCache) {
cache.clusterStatusRetryFunc = retryFunc
}
}

0 comments on commit b3e1c67

Please sign in to comment.