Skip to content

Commit

Permalink
Serve watch without resourceVersion from cache and introduce a WatchF…
Browse files Browse the repository at this point in the history
…romStorageWithoutResourceVersion feature gate to allow serving watch from storage.

Kubernetes-commit: 0130072b053f85fb736c24d34552208cdd1bccfe
  • Loading branch information
serathius authored and k8s-publishing-bot committed Mar 14, 2024
1 parent 7661bd2 commit 13a815b
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 17 deletions.
8 changes: 8 additions & 0 deletions pkg/features/kube_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,12 @@ const (
// Enables support for watch bookmark events.
WatchBookmark featuregate.Feature = "WatchBookmark"

// owner: @serathius
// beta: 1.30
// Enables watches without resourceVersion to be served from storage.
// Used to prevent https://github.com/kubernetes/kubernetes/issues/123072 until etcd fixes the issue.
WatchFromStorageWithoutResourceVersion featuregate.Feature = "WatchFromStorageWithoutResourceVersion"

// owner: @vinaykul
// kep: http://kep.k8s.io/1287
// alpha: v1.27
Expand Down Expand Up @@ -349,6 +355,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS

WatchBookmark: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},

WatchFromStorageWithoutResourceVersion: {Default: false, PreRelease: featuregate.Beta},

InPlacePodVerticalScaling: {Default: false, PreRelease: featuregate.Alpha},

WatchList: {Default: false, PreRelease: featuregate.Alpha},
Expand Down
8 changes: 5 additions & 3 deletions pkg/storage/cacher/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
opts.SendInitialEvents = nil
}
// TODO: we should eventually get rid of this legacy case
if opts.SendInitialEvents == nil && opts.ResourceVersion == "" {
if utilfeature.DefaultFeatureGate.Enabled(features.WatchFromStorageWithoutResourceVersion) && opts.SendInitialEvents == nil && opts.ResourceVersion == "" {
return c.storage.Watch(ctx, key, opts)
}
requestedWatchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
Expand Down Expand Up @@ -1282,12 +1282,14 @@ func (c *Cacher) getBookmarkAfterResourceVersionLockedFunc(parsedResourceVersion
//
// if SendInitiaEvents != nil => ResourceVersionMatch = NotOlderThan
// if ResourceVersionmatch != nil => ResourceVersionMatch = NotOlderThan & SendInitialEvents != nil
//
// to satisfy the legacy case (SendInitialEvents = true, RV="") we skip checking opts.Predicate.AllowWatchBookmarks
func (c *Cacher) getWatchCacheResourceVersion(ctx context.Context, parsedWatchResourceVersion uint64, opts storage.ListOptions) (uint64, error) {
if len(opts.ResourceVersion) != 0 {
return parsedWatchResourceVersion, nil
}
// legacy case
if !utilfeature.DefaultFeatureGate.Enabled(features.WatchFromStorageWithoutResourceVersion) && opts.SendInitialEvents == nil && opts.ResourceVersion == "" {
return 0, nil
}
rv, err := storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String())
return rv, err
}
Expand Down
15 changes: 12 additions & 3 deletions pkg/storage/cacher/cacher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,9 +381,18 @@ func TestSendInitialEventsBackwardCompatibility(t *testing.T) {
}

func TestWatchSemantics(t *testing.T) {
store, terminate := testSetupWithEtcdAndCreateWrapper(t)
t.Cleanup(terminate)
storagetesting.RunWatchSemantics(context.TODO(), t, store)
t.Run("WatchFromStorageWithoutResourceVersion=true", func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchFromStorageWithoutResourceVersion, true)()
store, terminate := testSetupWithEtcdAndCreateWrapper(t)
t.Cleanup(terminate)
storagetesting.RunWatchSemantics(context.TODO(), t, store)
})
t.Run("WatchFromStorageWithoutResourceVersion=false", func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchFromStorageWithoutResourceVersion, false)()
store, terminate := testSetupWithEtcdAndCreateWrapper(t)
t.Cleanup(terminate)
storagetesting.RunWatchSemantics(context.TODO(), t, store)
})
}

func TestWatchSemanticInitialEventsExtended(t *testing.T) {
Expand Down
44 changes: 33 additions & 11 deletions pkg/storage/cacher/cacher_whitebox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,6 @@ func TestWatchCacheBypass(t *testing.T) {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

// Inject error to underlying layer and check if cacher is not bypassed.
backingStorage.injectError(errDummy)
_, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{
ResourceVersion: "0",
Predicate: storage.Everything,
Expand All @@ -348,12 +346,32 @@ func TestWatchCacheBypass(t *testing.T) {
t.Errorf("Watch with RV=0 should be served from cache: %v", err)
}

// With unset RV, check if cacher is bypassed.
_, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{
ResourceVersion: "",
Predicate: storage.Everything,
})
if err != errDummy {
t.Errorf("Watch with unset RV should bypass cacher: %v", err)
if err != nil {
t.Errorf("Watch with RV=0 should be served from cache: %v", err)
}

defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchFromStorageWithoutResourceVersion, false)()
_, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{
ResourceVersion: "",
Predicate: storage.Everything,
})
if err != nil {
t.Errorf("With WatchFromStorageWithoutResourceVersion disabled, watch with unset RV should be served from cache: %v", err)
}

// Inject error to underlying layer and check if cacher is not bypassed.
backingStorage.injectError(errDummy)
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchFromStorageWithoutResourceVersion, true)()
_, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{
ResourceVersion: "",
Predicate: storage.Everything,
})
if !errors.Is(err, errDummy) {
t.Errorf("With WatchFromStorageWithoutResourceVersion enabled, watch with unset RV should be served from storage: %v", err)
}
}

Expand Down Expand Up @@ -2032,9 +2050,11 @@ func TestGetWatchCacheResourceVersion(t *testing.T) {
// | Unset | true/false | nil/true/false |
// +-----------------+---------------------+-----------------------+
{
name: "RV=unset, allowWatchBookmarks=true, sendInitialEvents=nil",
opts: listOptions(true, nil, ""),
expectedWatchResourceVersion: 100,
name: "RV=unset, allowWatchBookmarks=true, sendInitialEvents=nil",
opts: listOptions(true, nil, ""),
// Expecting RV 0, due to https://github.com/kubernetes/kubernetes/pull/123935 reverted to serving those requests from watch cache.
// Set to 100, when WatchFromStorageWithoutResourceVersion is set to true.
expectedWatchResourceVersion: 0,
},
{
name: "RV=unset, allowWatchBookmarks=true, sendInitialEvents=true",
Expand All @@ -2047,9 +2067,11 @@ func TestGetWatchCacheResourceVersion(t *testing.T) {
expectedWatchResourceVersion: 100,
},
{
name: "RV=unset, allowWatchBookmarks=false, sendInitialEvents=nil",
opts: listOptions(false, nil, ""),
expectedWatchResourceVersion: 100,
name: "RV=unset, allowWatchBookmarks=false, sendInitialEvents=nil",
opts: listOptions(false, nil, ""),
// Expecting RV 0, due to https://github.com/kubernetes/kubernetes/pull/123935 reverted to serving those requests from watch cache.
// Set to 100, when WatchFromStorageWithoutResourceVersion is set to true.
expectedWatchResourceVersion: 0,
},
{
name: "RV=unset, allowWatchBookmarks=false, sendInitialEvents=true, legacy",
Expand Down

0 comments on commit 13a815b

Please sign in to comment.