Skip to content

Commit

Permalink
Implement fallback for consistent reads from cache
Browse files Browse the repository at this point in the history
Kubernetes-commit: 35962561e44425fe5e23f19aeccba9269fab3a56
  • Loading branch information
serathius authored and k8s-publishing-bot committed Jul 30, 2024
1 parent e749b34 commit dbb9b30
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 1 deletion.
18 changes: 17 additions & 1 deletion pkg/storage/cacher/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -848,7 +848,8 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
preparedKey += "/"
}
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
if resourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported {
consistentRead := resourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported
if consistentRead {
listRV, err = storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String())
if err != nil {
return err
Expand Down Expand Up @@ -887,9 +888,24 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
}

objs, readResourceVersion, indexUsed, err := c.listItems(ctx, listRV, preparedKey, pred, recursive)
success := "true"
fallback := "false"
if err != nil {
if consistentRead {
if storage.IsTooLargeResourceVersion(err) {
fallback = "true"
err = c.storage.GetList(ctx, key, opts, listObj)
}
if err != nil {
success = "false"
}
metrics.ConsistentReadTotal.WithLabelValues(c.resourcePrefix, success, fallback).Add(1)
}
return err
}
if consistentRead {
metrics.ConsistentReadTotal.WithLabelValues(c.resourcePrefix, success, fallback).Add(1)
}
span.AddEvent("Listed items from cache", attribute.Int("count", len(objs)))
// store pointer of eligible objects,
// Why not directly put object in the items of listObj?
Expand Down
136 changes: 136 additions & 0 deletions pkg/storage/cacher/cacher_whitebox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"reflect"
goruntime "runtime"
"strconv"
"strings"
"sync"
"testing"
"time"
Expand All @@ -45,10 +46,13 @@ import (
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/cacher/metrics"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
k8smetrics "k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/testutil"
"k8s.io/utils/clock"
testingclock "k8s.io/utils/clock/testing"
"k8s.io/utils/pointer"
Expand Down Expand Up @@ -288,6 +292,138 @@ func testGetListCacheBypass(t *testing.T, options storage.ListOptions, expectByp
}
}

func TestConsistentReadFallback(t *testing.T) {
tcs := []struct {
name string
consistentReadsEnabled bool
watchCacheRV string
storageRV string
fallbackError bool

expectError bool
expectRV string
expectBlock bool
expectRequestsToStorage int
expectMetric string
}{
{
name: "Success",
consistentReadsEnabled: true,
watchCacheRV: "42",
storageRV: "42",
expectRV: "42",
expectRequestsToStorage: 1,
expectMetric: `
# HELP apiserver_watch_cache_consistent_read_total [ALPHA] Counter for consistent reads from cache.
# TYPE apiserver_watch_cache_consistent_read_total counter
apiserver_watch_cache_consistent_read_total{fallback="false", resource="pods", success="true"} 1
`,
},
{
name: "Fallback",
consistentReadsEnabled: true,
watchCacheRV: "2",
storageRV: "42",
expectRV: "42",
expectBlock: true,
expectRequestsToStorage: 2,
expectMetric: `
# HELP apiserver_watch_cache_consistent_read_total [ALPHA] Counter for consistent reads from cache.
# TYPE apiserver_watch_cache_consistent_read_total counter
apiserver_watch_cache_consistent_read_total{fallback="true", resource="pods", success="true"} 1
`,
},
{
name: "Fallback Failure",
consistentReadsEnabled: true,
watchCacheRV: "2",
storageRV: "42",
fallbackError: true,
expectError: true,
expectBlock: true,
expectRequestsToStorage: 2,
expectMetric: `
# HELP apiserver_watch_cache_consistent_read_total [ALPHA] Counter for consistent reads from cache.
# TYPE apiserver_watch_cache_consistent_read_total counter
apiserver_watch_cache_consistent_read_total{fallback="true", resource="pods", success="false"} 1
`,
},
{
name: "Disabled",
watchCacheRV: "2",
storageRV: "42",
expectRV: "42",
expectRequestsToStorage: 1,
expectMetric: ``,
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, tc.consistentReadsEnabled)
if tc.consistentReadsEnabled {
forceRequestWatchProgressSupport(t)
}

registry := k8smetrics.NewKubeRegistry()
metrics.ConsistentReadTotal.Reset()
if err := registry.Register(metrics.ConsistentReadTotal); err != nil {
t.Errorf("unexpected error: %v", err)
}
backingStorage := &dummyStorage{}
backingStorage.getListFn = func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
podList := listObj.(*example.PodList)
podList.ResourceVersion = tc.watchCacheRV
return nil
}
// TODO: Use fake clock for this test to reduce execution time.
cacher, _, err := newTestCacher(backingStorage)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()

if fmt.Sprintf("%d", cacher.watchCache.resourceVersion) != tc.watchCacheRV {
t.Fatalf("Expected watch cache RV to equal watchCacheRV, got: %d, want: %s", cacher.watchCache.resourceVersion, tc.watchCacheRV)
}
requestToStorageCount := 0
backingStorage.getListFn = func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
requestToStorageCount += 1
podList := listObj.(*example.PodList)
if key == cacher.resourcePrefix {
podList.ResourceVersion = tc.storageRV
return nil
}
if tc.fallbackError {
return errDummy
}
podList.ResourceVersion = tc.storageRV
return nil
}
result := &example.PodList{}
start := cacher.clock.Now()
err = cacher.GetList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: ""}, result)
duration := cacher.clock.Since(start)
if (err != nil) != tc.expectError {
t.Fatalf("Unexpected error err: %v", err)
}
if result.ResourceVersion != tc.expectRV {
t.Fatalf("Unexpected List response RV, got: %q, want: %q", result.ResourceVersion, tc.expectRV)
}
if requestToStorageCount != tc.expectRequestsToStorage {
t.Fatalf("Unexpected number of requests to storage, got: %d, want: %d", requestToStorageCount, tc.expectRequestsToStorage)
}
blocked := duration >= blockTimeout
if blocked != tc.expectBlock {
t.Fatalf("Unexpected block, got: %v, want: %v", blocked, tc.expectBlock)
}

if err := testutil.GatherAndCompare(registry, strings.NewReader(tc.expectMetric), "apiserver_watch_cache_consistent_read_total"); err != nil {
t.Errorf("unexpected error: %v", err)
}
})
}
}

func TestGetListNonRecursiveCacheBypass(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)
backingStorage := &dummyStorage{}
Expand Down
10 changes: 10 additions & 0 deletions pkg/storage/cacher/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,15 @@ var (
StabilityLevel: compbasemetrics.ALPHA,
Buckets: []float64{0.005, 0.025, 0.05, 0.1, 0.2, 0.4, 0.6, 0.8, 1.0, 1.25, 1.5, 2, 3},
}, []string{"resource"})

ConsistentReadTotal = compbasemetrics.NewCounterVec(
&compbasemetrics.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "consistent_read_total",
Help: "Counter for consistent reads from cache.",
StabilityLevel: compbasemetrics.ALPHA,
}, []string{"resource", "success", "fallback"})
)

var registerMetrics sync.Once
Expand All @@ -188,6 +197,7 @@ func Register() {
legacyregistry.MustRegister(WatchCacheCapacity)
legacyregistry.MustRegister(WatchCacheInitializations)
legacyregistry.MustRegister(WatchCacheReadWait)
legacyregistry.MustRegister(ConsistentReadTotal)
})
}

Expand Down

0 comments on commit dbb9b30

Please sign in to comment.