Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Drop unnecessary listing for the sake of watch reinitialization #616

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 17 additions & 16 deletions pkg/cache/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ type clusterCache struct {
// namespacedResources is a simple map which indicates a groupKind is namespaced
namespacedResources map[schema.GroupKind]bool

// maximum time we allow watches to run before relisting the group/kind and restarting the watch
// maximum time we allow watches to run before restarting them
watchResyncTimeout time.Duration
// sync retry timeout for cluster when sync error happens
clusterSyncRetryTimeout time.Duration
Expand Down Expand Up @@ -605,6 +605,7 @@ func (c *clusterCache) loadInitialState(ctx context.Context, api kube.APIResourc
}

func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo, resClient dynamic.ResourceInterface, ns string, resourceVersion string) {
timeoutSeconds := int64(c.watchResyncTimeout.Seconds())
kube.RetryUntilSucceed(ctx, watchResourcesRetryTimeout, fmt.Sprintf("watch %s on %s", api.GroupKind, c.config.Host), c.log, func() (err error) {
defer func() {
if r := recover(); r != nil {
Expand All @@ -622,6 +623,7 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo

w, err := watchutil.NewRetryWatcher(resourceVersion, &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.TimeoutSeconds = &timeoutSeconds
res, err := resClient.Watch(ctx, options)
if errors.IsNotFound(err) {
c.stopWatching(api.GroupKind, ns)
Expand All @@ -633,30 +635,21 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo
return err
}

defer func() {
w.Stop()
resourceVersion = ""
}()

var watchResyncTimeoutCh <-chan time.Time
if c.watchResyncTimeout > 0 {
tosi3k marked this conversation as resolved.
Show resolved Hide resolved
shouldResync := time.NewTimer(c.watchResyncTimeout)
defer shouldResync.Stop()
watchResyncTimeoutCh = shouldResync.C
}
defer w.Stop()

for {
select {
// stop watching when parent context got cancelled
case <-ctx.Done():
return nil

// re-synchronize API state and restart watch periodically
case <-watchResyncTimeoutCh:
return fmt.Errorf("Resyncing %s on %s due to timeout", api.GroupKind, c.config.Host)

// re-synchronize API state and restart watch if retry watcher failed to continue watching using provided resource version
case <-w.Done():
// The underlying retry watcher has stopped, possibly due to specifying an RV in
// the watch request that is stale (error code 410). This forces us to relist
// objects from the kube-apiserver to get a fresher RV and we invoke that relist
// by resetting the locally stored RV.
resourceVersion = ""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe see if we can get away without this? If we need it, we'll probably want to add a comment exactly why we needed it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't drop it - I put appropriate comment in the code to explain the rationale behind resetting resourceVersion.

This is the place where the retry watcher exits early and we are forced to relist in order to get a fresher RV from the server: https://github.com/kubernetes/kubernetes/blob/0fc167103189a4d462c3cc7a17b360c3d998f4bf/staging/src/k8s.io/client-go/tools/watch/retrywatcher.go#L208-L211.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 - this has to remain here.

This should generally be rare, so relist from etcd is fine in those situations.

return fmt.Errorf("Watch %s on %s has closed", api.GroupKind, c.config.Host)

case event, ok := <-w.ResultChan():
Expand All @@ -666,8 +659,16 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo

obj, ok := event.Object.(*unstructured.Unstructured)
if !ok {
// We failed to cast the object received in the watch event to something
// that contains a resource version field. Because of that, we don't know
// from what RV we should reinitialize the watch connection, so in order to
// avoid any inconsistencies due to accidental skipping of a potential RV,
// we reset the locally stored RV to forcefully invoke the list API call to
// get it from the kube-apiserver.
resourceVersion = ""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see if we can get away without resetting this and using whatever resourceVersion we already had. If we find we get into some nasty loop, we can always re-add it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't drop resetting the locally held RV here as well - put appropriate rationale in a comment in the code there as well.

return fmt.Errorf("Failed to convert to *unstructured.Unstructured: %v", event.Object)
}
resourceVersion = obj.GetResourceVersion()

c.processEvent(event.Type, obj)
if kube.IsCRD(obj) {
Expand Down
Loading