From 36f560c86ef2efcc5b649a67af23c15200a48c3b Mon Sep 17 00:00:00 2001 From: Nolan Brubaker Date: Wed, 29 Mar 2023 16:45:00 -0400 Subject: [PATCH 1/7] Use SyncTargets cache in resource controller Signed-off-by: Nolan Brubaker --- .../workload/resource/resource_controller.go | 22 ++++++++++++------- tmc/pkg/server/controllers.go | 1 + 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/pkg/reconciler/workload/resource/resource_controller.go b/pkg/reconciler/workload/resource/resource_controller.go index 29522908d92..df069ed6b44 100644 --- a/pkg/reconciler/workload/resource/resource_controller.go +++ b/pkg/reconciler/workload/resource/resource_controller.go @@ -64,7 +64,7 @@ const ( func NewController( dynamicClusterClient kcpdynamic.ClusterInterface, ddsif *informer.DiscoveringDynamicSharedInformerFactory, - syncTargetInformer workloadv1alpha1informers.SyncTargetClusterInformer, + syncTargetInformer, globalSyncTargetInformer workloadv1alpha1informers.SyncTargetClusterInformer, namespaceInformer kcpcorev1informers.NamespaceClusterInformer, placementInformer schedulingv1alpha1informers.PlacementClusterInformer, ) (*Controller, error) { @@ -98,19 +98,21 @@ func NewController( getSyncTargetFromKey: func(syncTargetKey string) (*workloadv1alpha1.SyncTarget, bool, error) { syncTargets, err := indexers.ByIndex[*workloadv1alpha1.SyncTarget](syncTargetInformer.Informer().GetIndexer(), bySyncTargetKey, syncTargetKey) - if err != nil && !errors.IsNotFound(err) { - return nil, false, err + if errors.IsNotFound(err) || len(syncTargets) == 0 { + // check cache before deciding what to return + syncTargets, err = indexers.ByIndex[*workloadv1alpha1.SyncTarget](globalSyncTargetInformer.Informer().GetIndexer(), bySyncTargetKey, syncTargetKey) + if errors.IsNotFound(err) || len(syncTargets) == 0 { + // still no SyncTarget, so return empty results. + return nil, false, nil + } } - if errors.IsNotFound(err) { - return nil, false, nil + if err != nil { + return nil, false, err } // This shouldn't happen, more than one SyncTarget with the same key means a hash collision. if len(syncTargets) > 1 { return nil, false, fmt.Errorf("possible collision: multiple sync targets found for key %q", syncTargetKey) } - if len(syncTargets) == 0 { - return nil, false, nil - } return syncTargets[0], true, nil }, @@ -149,6 +151,10 @@ func NewController( bySyncTargetKey: indexBySyncTargetKey, }) + indexers.AddIfNotPresentOrDie(globalSyncTargetInformer.Informer().GetIndexer(), cache.Indexers{ + bySyncTargetKey: indexBySyncTargetKey, + }) + syncTargetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ DeleteFunc: func(obj interface{}) { c.enqueueSyncTarget(obj) diff --git a/tmc/pkg/server/controllers.go b/tmc/pkg/server/controllers.go index 7e69614d879..2b54dce5375 100644 --- a/tmc/pkg/server/controllers.go +++ b/tmc/pkg/server/controllers.go @@ -62,6 +62,7 @@ func (s *Server) installWorkloadResourceScheduler(ctx context.Context, config *r dynamicClusterClient, s.Core.DiscoveringDynamicSharedInformerFactory, s.Core.KcpSharedInformerFactory.Workload().V1alpha1().SyncTargets(), + s.Core.CacheKcpSharedInformerFactory.Workload().V1alpha1().SyncTargets(), s.Core.KubeSharedInformerFactory.Core().V1().Namespaces(), s.Core.KcpSharedInformerFactory.Scheduling().V1alpha1().Placements(), ) From 759c1a4a80a59aeccb9640d2378238d1e7ca72f1 Mon Sep 17 00:00:00 2001 From: Nolan Brubaker Date: Fri, 31 Mar 2023 13:39:07 -0400 Subject: [PATCH 2/7] Add cache lookup to SyncTarget(Export) controllers Signed-off-by: Nolan Brubaker --- .../synctarget/synctarget_controller.go | 26 +++++--- .../synctargetexports_controller.go | 62 ++++++++++++------- tmc/pkg/server/controllers.go | 5 +- 3 files changed, 58 insertions(+), 35 deletions(-) diff --git a/pkg/reconciler/workload/synctarget/synctarget_controller.go b/pkg/reconciler/workload/synctarget/synctarget_controller.go index b4a73737c2b..05ed9124b9e 100644 --- a/pkg/reconciler/workload/synctarget/synctarget_controller.go +++ b/pkg/reconciler/workload/synctarget/synctarget_controller.go @@ -42,7 +42,6 @@ import ( kcpclientset "github.com/kcp-dev/kcp/sdk/client/clientset/versioned/cluster" corev1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/core/v1alpha1" workloadv1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/workload/v1alpha1" - corev1alpha1listers "github.com/kcp-dev/kcp/sdk/client/listers/core/v1alpha1" ) const ControllerName = "kcp-synctarget-controller" @@ -50,13 +49,22 @@ const ControllerName = "kcp-synctarget-controller" func NewController( kcpClusterClient kcpclientset.ClusterInterface, syncTargetInformer workloadv1alpha1informers.SyncTargetClusterInformer, - workspaceShardInformer corev1alpha1informers.ShardClusterInformer, + workspaceShardInformer, globalWorkspaceShardInformer corev1alpha1informers.ShardClusterInformer, ) *Controller { c := &Controller{ - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName), - kcpClusterClient: kcpClusterClient, - syncTargetIndexer: syncTargetInformer.Informer().GetIndexer(), - workspaceShardLister: workspaceShardInformer.Lister(), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName), + kcpClusterClient: kcpClusterClient, + syncTargetIndexer: syncTargetInformer.Informer().GetIndexer(), + listWorkspaceShards: func(selector labels.Selector) ([]*corev1alpha1.Shard, error) { + shards, err := workspaceShardInformer.Lister().List(selector) + if len(shards) == 0 { + return globalWorkspaceShardInformer.Lister().List(selector) + } + if err != nil { + return nil, err + } + return shards, nil + }, } // Watch for events related to SyncTargets @@ -80,8 +88,8 @@ type Controller struct { queue workqueue.RateLimitingInterface kcpClusterClient kcpclientset.ClusterInterface - workspaceShardLister corev1alpha1listers.ShardClusterLister - syncTargetIndexer cache.Indexer + listWorkspaceShards func(labels.Selector) ([]*corev1alpha1.Shard, error) + syncTargetIndexer cache.Indexer } func (c *Controller) enqueueSyncTarget(obj interface{}) { @@ -174,7 +182,7 @@ func (c *Controller) process(ctx context.Context, key string) error { logger = logging.WithObject(klog.FromContext(ctx), currentSyncTarget) ctx = klog.NewContext(ctx, logger) - workspacesShards, err := c.workspaceShardLister.List(labels.Everything()) + workspacesShards, err := c.listWorkspaceShards(labels.Everything()) if err != nil { return err } diff --git a/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go b/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go index c3a376656fb..b6a3ea023b8 100644 --- a/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go +++ b/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go @@ -26,6 +26,7 @@ import ( "github.com/kcp-dev/logicalcluster/v3" "k8s.io/apimachinery/pkg/api/equality" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/runtime" @@ -48,7 +49,6 @@ import ( apisv1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/apis/v1alpha1" workloadv1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/workload/v1alpha1" apiresourcev1alpha1listers "github.com/kcp-dev/kcp/sdk/client/listers/apiresource/v1alpha1" - apisv1alpha1listers "github.com/kcp-dev/kcp/sdk/client/listers/apis/v1alpha1" workloadv1alpha1listers "github.com/kcp-dev/kcp/sdk/client/listers/workload/v1alpha1" ) @@ -64,20 +64,42 @@ const ( func NewController( kcpClusterClient kcpclientset.ClusterInterface, syncTargetInformer workloadv1alpha1informers.SyncTargetClusterInformer, - apiExportInformer apisv1alpha1informers.APIExportClusterInformer, - apiResourceSchemaInformer apisv1alpha1informers.APIResourceSchemaClusterInformer, + apiExportInformer, globalAPIExportInformer apisv1alpha1informers.APIExportClusterInformer, + apiResourceSchemaInformer, globalAPIResourceSchemaInformer apisv1alpha1informers.APIResourceSchemaClusterInformer, apiResourceImportInformer apiresourcev1alpha1informers.APIResourceImportClusterInformer, ) (*Controller, error) { c := &Controller{ - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName), - kcpClusterClient: kcpClusterClient, - syncTargetIndexer: syncTargetInformer.Informer().GetIndexer(), - syncTargetLister: syncTargetInformer.Lister(), - apiExportsIndexer: apiExportInformer.Informer().GetIndexer(), - apiExportLister: apiExportInformer.Lister(), - resourceSchemaLister: apiResourceSchemaInformer.Lister(), - apiImportLister: apiResourceImportInformer.Lister(), - commit: committer.NewCommitter[*SyncTarget, Patcher, *SyncTargetSpec, *SyncTargetStatus](kcpClusterClient.WorkloadV1alpha1().SyncTargets()), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName), + kcpClusterClient: kcpClusterClient, + syncTargetIndexer: syncTargetInformer.Informer().GetIndexer(), + syncTargetLister: syncTargetInformer.Lister(), + apiExportsIndexer: apiExportInformer.Informer().GetIndexer(), + getAPIExport: func(path logicalcluster.Path, name string) (*apisv1alpha1.APIExport, error) { + // Try local informer first + export, err := indexers.ByPathAndName[*apisv1alpha1.APIExport](apisv1alpha1.Resource("apiexports"), apiExportInformer.Informer().GetIndexer(), path, name) + if err == nil { + // Quick happy path - found it locally + return export, nil + } + if !apierrors.IsNotFound(err) { + // Unrecoverable error + return nil, err + } + // Didn't find it locally - try remote + return indexers.ByPathAndName[*apisv1alpha1.APIExport](apisv1alpha1.Resource("apiexports"), globalAPIExportInformer.Informer().GetIndexer(), path, name) + }, + getAPIResourceSchema: func(path logicalcluster.Name, name string) (*apisv1alpha1.APIResourceSchema, error) { + schema, err := apiResourceSchemaInformer.Cluster(path).Lister().Get(name) + if apierrors.IsNotFound(err) { + return globalAPIResourceSchemaInformer.Cluster(path).Lister().Get(name) + } + if err != nil { + return nil, err + } + return schema, nil + }, + apiImportLister: apiResourceImportInformer.Lister(), + commit: committer.NewCommitter[*SyncTarget, Patcher, *SyncTargetSpec, *SyncTargetStatus](kcpClusterClient.WorkloadV1alpha1().SyncTargets()), } if err := syncTargetInformer.Informer().AddIndexers(cache.Indexers{ @@ -154,8 +176,8 @@ type Controller struct { syncTargetIndexer cache.Indexer syncTargetLister workloadv1alpha1listers.SyncTargetClusterLister apiExportsIndexer cache.Indexer - apiExportLister apisv1alpha1listers.APIExportClusterLister - resourceSchemaLister apisv1alpha1listers.APIResourceSchemaClusterLister + getAPIExport func(path logicalcluster.Path, name string) (*apisv1alpha1.APIExport, error) + getAPIResourceSchema func(path logicalcluster.Name, name string) (*apisv1alpha1.APIResourceSchema, error) apiImportLister apiresourcev1alpha1listers.APIResourceImportClusterLister commit CommitFunc @@ -314,7 +336,7 @@ func (c *Controller) process(ctx context.Context, key string) error { exportReconciler := &exportReconciler{ getAPIExport: c.getAPIExport, - getResourceSchema: c.getResourceSchema, + getResourceSchema: c.getAPIResourceSchema, } currentSyncTarget, err = exportReconciler.reconcile(ctx, currentSyncTarget) if err != nil { @@ -323,7 +345,7 @@ func (c *Controller) process(ctx context.Context, key string) error { apiCompatibleReconciler := &apiCompatibleReconciler{ getAPIExport: c.getAPIExport, - getResourceSchema: c.getResourceSchema, + getResourceSchema: c.getAPIResourceSchema, listAPIResourceImports: c.listAPIResourceImports, } currentSyncTarget, err = apiCompatibleReconciler.reconcile(ctx, currentSyncTarget) @@ -341,14 +363,6 @@ func (c *Controller) process(ctx context.Context, key string) error { return errors.NewAggregate(errs) } -func (c *Controller) getAPIExport(path logicalcluster.Path, name string) (*apisv1alpha1.APIExport, error) { - return indexers.ByPathAndName[*apisv1alpha1.APIExport](apisv1alpha1.Resource("apiexports"), c.apiExportsIndexer, path, name) -} - -func (c *Controller) getResourceSchema(clusterName logicalcluster.Name, name string) (*apisv1alpha1.APIResourceSchema, error) { - return c.resourceSchemaLister.Cluster(clusterName).Get(name) -} - func (c *Controller) listAPIResourceImports(clusterName logicalcluster.Name) ([]*apiresourcev1alpha1.APIResourceImport, error) { return c.apiImportLister.Cluster(clusterName).List(labels.Everything()) } diff --git a/tmc/pkg/server/controllers.go b/tmc/pkg/server/controllers.go index 2b54dce5375..058eff86715 100644 --- a/tmc/pkg/server/controllers.go +++ b/tmc/pkg/server/controllers.go @@ -356,7 +356,9 @@ func (s *Server) installWorkloadSyncTargetExportController(ctx context.Context, kcpClusterClient, s.Core.KcpSharedInformerFactory.Workload().V1alpha1().SyncTargets(), s.Core.KcpSharedInformerFactory.Apis().V1alpha1().APIExports(), + s.Core.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIExports(), s.Core.KcpSharedInformerFactory.Apis().V1alpha1().APIResourceSchemas(), + s.Core.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIResourceSchemas(), s.Core.KcpSharedInformerFactory.Apiresource().V1alpha1().APIResourceImports(), ) if err != nil { @@ -387,9 +389,8 @@ func (s *Server) installSyncTargetController(ctx context.Context, config *rest.C c := synctargetcontroller.NewController( kcpClusterClient, s.Core.KcpSharedInformerFactory.Workload().V1alpha1().SyncTargets(), - // TODO: change to s.CacheKcpSharedInformerFactory.Core().V1alpha1().Shards(), - // once https://github.com/kcp-dev/kcp/issues/2649 is resolved s.Core.KcpSharedInformerFactory.Core().V1alpha1().Shards(), + s.Core.CacheKcpSharedInformerFactory.Core().V1alpha1().Shards(), ) if err != nil { return err From 7abdab5ca08c7279a91243f0391c409b59a0af32 Mon Sep 17 00:00:00 2001 From: Nolan Brubaker Date: Mon, 3 Apr 2023 15:38:38 -0400 Subject: [PATCH 3/7] Handle replicated synctarget deletion events Signed-off-by: Nolan Brubaker --- pkg/reconciler/workload/resource/resource_controller.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/reconciler/workload/resource/resource_controller.go b/pkg/reconciler/workload/resource/resource_controller.go index df069ed6b44..5834f502a30 100644 --- a/pkg/reconciler/workload/resource/resource_controller.go +++ b/pkg/reconciler/workload/resource/resource_controller.go @@ -161,6 +161,12 @@ func NewController( }, }) + globalSyncTargetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + DeleteFunc: func(obj interface{}) { + c.enqueueSyncTarget(obj) + }, + }) + return c, nil } From a447cdfb19ac5873a43d4184f1de9c82ea708e6b Mon Sep 17 00:00:00 2001 From: Nolan Brubaker Date: Tue, 4 Apr 2023 14:23:38 -0400 Subject: [PATCH 4/7] Use generics to do global cache lookup. Signed-off-by: Nolan Brubaker --- pkg/informer/fallback_lookup.go | 100 ++++++++++++++++++ .../synctarget/synctarget_controller.go | 20 ++-- .../synctargetexports_controller.go | 19 ++-- 3 files changed, 112 insertions(+), 27 deletions(-) create mode 100644 pkg/informer/fallback_lookup.go diff --git a/pkg/informer/fallback_lookup.go b/pkg/informer/fallback_lookup.go new file mode 100644 index 00000000000..c1b65207ab9 --- /dev/null +++ b/pkg/informer/fallback_lookup.go @@ -0,0 +1,100 @@ +/* +Copyright 2023 The KCP Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package informer + +import ( + "github.com/kcp-dev/logicalcluster/v3" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" +) + +// ClusterLister is a cluster-aware Lister API. +type ClusterLister[R runtime.Object, L Lister[R]] interface { + Cluster(name logicalcluster.Name) L +} + +// Lister is a Lister API with generics. +type Lister[R runtime.Object] interface { + List(labels.Selector) ([]R, error) +} + +// FallbackListFunc is a function that returns []R from either local or global informer Listers based on a label selector. +type FallbackListFunc[R runtime.Object] func(selector labels.Selector) ([]R, error) + +// ClusterGetter is a cluster-aware Getter API. +type ClusterGetter[R runtime.Object, G Getter[R]] interface { + Cluster(name logicalcluster.Name) G +} + +// Getter is a GetAPI call with generics. +type Getter[R runtime.Object] interface { + Get(name string) (R, error) +} + +// FallbackGetFunc is a function that returns an instance of R from either local or global informer Listers based on a name. +type FallbackGetFunc[R runtime.Object] func(name string) (R, error) + +// ScopedFallbackGetFunc is a function that returns an instance of R from either local or global cluster-scoped informer Listers based on a name. +type ScopedFallbackGetFunc[R runtime.Object] func(clusterName logicalcluster.Name, name string) (R, error) + +// NewListerWithFallback creates a new FallbackListFunc that looks up an object of type R first in the local lister, then in the global lister if no local results are found. +func NewListerWithFallback[R runtime.Object](localLister Lister[R], globalLister Lister[R]) FallbackListFunc[R] { + return func(selector labels.Selector) ([]R, error) { + r, err := localLister.List(selector) + if len(r) == 0 { + return globalLister.List(selector) + } + if err != nil { + return nil, err + } + return r, nil + } +} + +// NewGetterWithFallback creates a new FallbackGetFunc that gets an object of type R first looking in the local lister, then in the global lister if not found. +func NewGetterWithFallback[R runtime.Object](localGetter, globalGetter Getter[R]) FallbackGetFunc[R] { + return func(name string) (R, error) { + var r R + r, err := localGetter.Get(name) + if errors.IsNotFound(err) { + return globalGetter.Get(name) + } + if err != nil { + // r will be nil in this case + return r, err + } + return r, nil + } +} + +// NewScopedGetterWithFallback creates a new ScopedFallbackGetFunc that gets an object of type R within a given cluster path. The local lister is +// checked first, and if nothing is found, the global lister is checked. +func NewScopedGetterWithFallback[R runtime.Object, G Getter[R]](localGetter, globalGetter ClusterGetter[R, G]) ScopedFallbackGetFunc[R] { + return func(clusterName logicalcluster.Name, name string) (R, error) { + var r R + r, err := localGetter.Cluster(clusterName).Get(name) + if errors.IsNotFound(err) { + return globalGetter.Cluster(clusterName).Get(name) + } + if err != nil { + return r, err + } + return r, nil + } +} diff --git a/pkg/reconciler/workload/synctarget/synctarget_controller.go b/pkg/reconciler/workload/synctarget/synctarget_controller.go index 05ed9124b9e..6128ebe53af 100644 --- a/pkg/reconciler/workload/synctarget/synctarget_controller.go +++ b/pkg/reconciler/workload/synctarget/synctarget_controller.go @@ -36,6 +36,7 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "github.com/kcp-dev/kcp/pkg/informer" "github.com/kcp-dev/kcp/pkg/logging" corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1" workloadv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/workload/v1alpha1" @@ -52,19 +53,10 @@ func NewController( workspaceShardInformer, globalWorkspaceShardInformer corev1alpha1informers.ShardClusterInformer, ) *Controller { c := &Controller{ - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName), - kcpClusterClient: kcpClusterClient, - syncTargetIndexer: syncTargetInformer.Informer().GetIndexer(), - listWorkspaceShards: func(selector labels.Selector) ([]*corev1alpha1.Shard, error) { - shards, err := workspaceShardInformer.Lister().List(selector) - if len(shards) == 0 { - return globalWorkspaceShardInformer.Lister().List(selector) - } - if err != nil { - return nil, err - } - return shards, nil - }, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName), + kcpClusterClient: kcpClusterClient, + syncTargetIndexer: syncTargetInformer.Informer().GetIndexer(), + listWorkspaceShards: informer.NewListerWithFallback[*corev1alpha1.Shard](workspaceShardInformer.Lister(), globalWorkspaceShardInformer.Lister()), } // Watch for events related to SyncTargets @@ -88,7 +80,7 @@ type Controller struct { queue workqueue.RateLimitingInterface kcpClusterClient kcpclientset.ClusterInterface - listWorkspaceShards func(labels.Selector) ([]*corev1alpha1.Shard, error) + listWorkspaceShards informer.FallbackListFunc[*corev1alpha1.Shard] syncTargetIndexer cache.Indexer } diff --git a/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go b/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go index b6a3ea023b8..cf455da4853 100644 --- a/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go +++ b/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go @@ -37,6 +37,7 @@ import ( "k8s.io/klog/v2" "github.com/kcp-dev/kcp/pkg/indexers" + "github.com/kcp-dev/kcp/pkg/informer" "github.com/kcp-dev/kcp/pkg/logging" "github.com/kcp-dev/kcp/pkg/reconciler/committer" apiresourcev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apiresource/v1alpha1" @@ -49,6 +50,7 @@ import ( apisv1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/apis/v1alpha1" workloadv1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/workload/v1alpha1" apiresourcev1alpha1listers "github.com/kcp-dev/kcp/sdk/client/listers/apiresource/v1alpha1" + apisv1alpha1listers "github.com/kcp-dev/kcp/sdk/client/listers/apis/v1alpha1" workloadv1alpha1listers "github.com/kcp-dev/kcp/sdk/client/listers/workload/v1alpha1" ) @@ -88,18 +90,9 @@ func NewController( // Didn't find it locally - try remote return indexers.ByPathAndName[*apisv1alpha1.APIExport](apisv1alpha1.Resource("apiexports"), globalAPIExportInformer.Informer().GetIndexer(), path, name) }, - getAPIResourceSchema: func(path logicalcluster.Name, name string) (*apisv1alpha1.APIResourceSchema, error) { - schema, err := apiResourceSchemaInformer.Cluster(path).Lister().Get(name) - if apierrors.IsNotFound(err) { - return globalAPIResourceSchemaInformer.Cluster(path).Lister().Get(name) - } - if err != nil { - return nil, err - } - return schema, nil - }, - apiImportLister: apiResourceImportInformer.Lister(), - commit: committer.NewCommitter[*SyncTarget, Patcher, *SyncTargetSpec, *SyncTargetStatus](kcpClusterClient.WorkloadV1alpha1().SyncTargets()), + getAPIResourceSchema: informer.NewScopedGetterWithFallback[*apisv1alpha1.APIResourceSchema, apisv1alpha1listers.APIResourceSchemaLister](apiResourceSchemaInformer.Lister(), globalAPIResourceSchemaInformer.Lister()), + apiImportLister: apiResourceImportInformer.Lister(), + commit: committer.NewCommitter[*SyncTarget, Patcher, *SyncTargetSpec, *SyncTargetStatus](kcpClusterClient.WorkloadV1alpha1().SyncTargets()), } if err := syncTargetInformer.Informer().AddIndexers(cache.Indexers{ @@ -177,7 +170,7 @@ type Controller struct { syncTargetLister workloadv1alpha1listers.SyncTargetClusterLister apiExportsIndexer cache.Indexer getAPIExport func(path logicalcluster.Path, name string) (*apisv1alpha1.APIExport, error) - getAPIResourceSchema func(path logicalcluster.Name, name string) (*apisv1alpha1.APIResourceSchema, error) + getAPIResourceSchema informer.ScopedFallbackGetFunc[*apisv1alpha1.APIResourceSchema] apiImportLister apiresourcev1alpha1listers.APIResourceImportClusterLister commit CommitFunc From 0fc890996ac110c2bbb48fd31ea8d2ae811f5ab7 Mon Sep 17 00:00:00 2001 From: Nolan Brubaker Date: Tue, 4 Apr 2023 14:39:23 -0400 Subject: [PATCH 5/7] Use generic index lookup Signed-off-by: Nolan Brubaker --- .../workload/resource/resource_controller.go | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/pkg/reconciler/workload/resource/resource_controller.go b/pkg/reconciler/workload/resource/resource_controller.go index 5834f502a30..c27ee52f1ad 100644 --- a/pkg/reconciler/workload/resource/resource_controller.go +++ b/pkg/reconciler/workload/resource/resource_controller.go @@ -97,19 +97,14 @@ func NewController( }, getSyncTargetFromKey: func(syncTargetKey string) (*workloadv1alpha1.SyncTarget, bool, error) { - syncTargets, err := indexers.ByIndex[*workloadv1alpha1.SyncTarget](syncTargetInformer.Informer().GetIndexer(), bySyncTargetKey, syncTargetKey) - if errors.IsNotFound(err) || len(syncTargets) == 0 { - // check cache before deciding what to return - syncTargets, err = indexers.ByIndex[*workloadv1alpha1.SyncTarget](globalSyncTargetInformer.Informer().GetIndexer(), bySyncTargetKey, syncTargetKey) - if errors.IsNotFound(err) || len(syncTargets) == 0 { - // still no SyncTarget, so return empty results. - return nil, false, nil - } - } + syncTargets, err := indexers.ByIndexWithFallback[*workloadv1alpha1.SyncTarget](syncTargetInformer.Informer().GetIndexer(), + globalSyncTargetInformer.Informer().GetIndexer(), bySyncTargetKey, syncTargetKey) if err != nil { return nil, false, err } - // This shouldn't happen, more than one SyncTarget with the same key means a hash collision. + if len(syncTargets) == 0 { + return nil, false, nil + } if len(syncTargets) > 1 { return nil, false, fmt.Errorf("possible collision: multiple sync targets found for key %q", syncTargetKey) } From 67543d10ad8e335eb36707ab1b914f87ca9b61c7 Mon Sep 17 00:00:00 2001 From: Nolan Brubaker Date: Tue, 4 Apr 2023 16:02:01 -0400 Subject: [PATCH 6/7] Use cache lookup with fallback Signed-off-by: Nolan Brubaker --- .../apis/apibinding/apibinding_controller.go | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/pkg/reconciler/apis/apibinding/apibinding_controller.go b/pkg/reconciler/apis/apibinding/apibinding_controller.go index b3e7f238ee3..57f7aa0009f 100644 --- a/pkg/reconciler/apis/apibinding/apibinding_controller.go +++ b/pkg/reconciler/apis/apibinding/apibinding_controller.go @@ -42,6 +42,7 @@ import ( "k8s.io/klog/v2" "github.com/kcp-dev/kcp/pkg/indexers" + "github.com/kcp-dev/kcp/pkg/informer" "github.com/kcp-dev/kcp/pkg/logging" "github.com/kcp-dev/kcp/pkg/reconciler/committer" apisv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1" @@ -49,6 +50,7 @@ import ( kcpclientset "github.com/kcp-dev/kcp/sdk/client/clientset/versioned/cluster" apisv1alpha1client "github.com/kcp-dev/kcp/sdk/client/clientset/versioned/typed/apis/v1alpha1" apisv1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/apis/v1alpha1" + apisv1alpha1listers "github.com/kcp-dev/kcp/sdk/client/listers/apis/v1alpha1" ) const ( @@ -143,21 +145,9 @@ func NewController( return indexers.ByIndexWithFallback[*apisv1alpha1.APIExport](apiExportInformer.Informer().GetIndexer(), globalAPIExportInformer.Informer().GetIndexer(), indexAPIExportsByAPIResourceSchema, key) }, - getAPIResourceSchema: func(clusterName logicalcluster.Name, name string) (*apisv1alpha1.APIResourceSchema, error) { - apiResourceSchema, err := apiResourceSchemaInformer.Lister().Cluster(clusterName).Get(name) - if apierrors.IsNotFound(err) { - return globalAPIResourceSchemaInformer.Lister().Cluster(clusterName).Get(name) - } - return apiResourceSchema, err - }, + getAPIResourceSchema: informer.NewScopedGetterWithFallback[*apisv1alpha1.APIResourceSchema, apisv1alpha1listers.APIResourceSchemaLister](apiResourceSchemaInformer.Lister(), globalAPIResourceSchemaInformer.Lister()), - getAPIConversion: func(clusterName logicalcluster.Name, name string) (*apisv1alpha1.APIConversion, error) { - apiConversion, err := apiConversionInformer.Cluster(clusterName).Lister().Get(name) - if apierrors.IsNotFound(err) { - return globalAPIConversionInformer.Lister().Cluster(clusterName).Get(name) - } - return apiConversion, err - }, + getAPIConversion: informer.NewScopedGetterWithFallback[*apisv1alpha1.APIConversion, apisv1alpha1listers.APIConversionLister](apiConversionInformer.Lister(), globalAPIConversionInformer.Lister()), createCRD: func(ctx context.Context, clusterName logicalcluster.Path, crd *apiextensionsv1.CustomResourceDefinition) (*apiextensionsv1.CustomResourceDefinition, error) { return crdClusterClient.Cluster(clusterName).ApiextensionsV1().CustomResourceDefinitions().Create(ctx, crd, metav1.CreateOptions{}) From 1c83615c8014e99ccb3d4a2c5e712771c4331b58 Mon Sep 17 00:00:00 2001 From: Nolan Brubaker Date: Wed, 5 Apr 2023 11:27:07 -0400 Subject: [PATCH 7/7] Look up APIexports with fallback Signed-off-by: Nolan Brubaker --- .../synctargetexports_controller.go | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go b/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go index cf455da4853..158bf79f94c 100644 --- a/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go +++ b/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go @@ -26,7 +26,6 @@ import ( "github.com/kcp-dev/logicalcluster/v3" "k8s.io/apimachinery/pkg/api/equality" - apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/runtime" @@ -77,18 +76,7 @@ func NewController( syncTargetLister: syncTargetInformer.Lister(), apiExportsIndexer: apiExportInformer.Informer().GetIndexer(), getAPIExport: func(path logicalcluster.Path, name string) (*apisv1alpha1.APIExport, error) { - // Try local informer first - export, err := indexers.ByPathAndName[*apisv1alpha1.APIExport](apisv1alpha1.Resource("apiexports"), apiExportInformer.Informer().GetIndexer(), path, name) - if err == nil { - // Quick happy path - found it locally - return export, nil - } - if !apierrors.IsNotFound(err) { - // Unrecoverable error - return nil, err - } - // Didn't find it locally - try remote - return indexers.ByPathAndName[*apisv1alpha1.APIExport](apisv1alpha1.Resource("apiexports"), globalAPIExportInformer.Informer().GetIndexer(), path, name) + return indexers.ByPathAndNameWithFallback[*apisv1alpha1.APIExport](apisv1alpha1.Resource("apiexports"), apiExportInformer.Informer().GetIndexer(), globalAPIExportInformer.Informer().GetIndexer(), path, name) }, getAPIResourceSchema: informer.NewScopedGetterWithFallback[*apisv1alpha1.APIResourceSchema, apisv1alpha1listers.APIResourceSchemaLister](apiResourceSchemaInformer.Lister(), globalAPIResourceSchemaInformer.Lister()), apiImportLister: apiResourceImportInformer.Lister(),