diff --git a/pkg/informer/fallback_lookup.go b/pkg/informer/fallback_lookup.go new file mode 100644 index 00000000000..c1b65207ab9 --- /dev/null +++ b/pkg/informer/fallback_lookup.go @@ -0,0 +1,100 @@ +/* +Copyright 2023 The KCP Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package informer + +import ( + "github.com/kcp-dev/logicalcluster/v3" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" +) + +// ClusterLister is a cluster-aware Lister API. +type ClusterLister[R runtime.Object, L Lister[R]] interface { + Cluster(name logicalcluster.Name) L +} + +// Lister is a Lister API with generics. +type Lister[R runtime.Object] interface { + List(labels.Selector) ([]R, error) +} + +// FallbackListFunc is a function that returns []R from either local or global informer Listers based on a label selector. +type FallbackListFunc[R runtime.Object] func(selector labels.Selector) ([]R, error) + +// ClusterGetter is a cluster-aware Getter API. +type ClusterGetter[R runtime.Object, G Getter[R]] interface { + Cluster(name logicalcluster.Name) G +} + +// Getter is a GetAPI call with generics. +type Getter[R runtime.Object] interface { + Get(name string) (R, error) +} + +// FallbackGetFunc is a function that returns an instance of R from either local or global informer Listers based on a name. +type FallbackGetFunc[R runtime.Object] func(name string) (R, error) + +// ScopedFallbackGetFunc is a function that returns an instance of R from either local or global cluster-scoped informer Listers based on a name. +type ScopedFallbackGetFunc[R runtime.Object] func(clusterName logicalcluster.Name, name string) (R, error) + +// NewListerWithFallback creates a new FallbackListFunc that looks up an object of type R first in the local lister, then in the global lister if no local results are found. +func NewListerWithFallback[R runtime.Object](localLister Lister[R], globalLister Lister[R]) FallbackListFunc[R] { + return func(selector labels.Selector) ([]R, error) { + r, err := localLister.List(selector) + if len(r) == 0 { + return globalLister.List(selector) + } + if err != nil { + return nil, err + } + return r, nil + } +} + +// NewGetterWithFallback creates a new FallbackGetFunc that gets an object of type R first looking in the local lister, then in the global lister if not found. +func NewGetterWithFallback[R runtime.Object](localGetter, globalGetter Getter[R]) FallbackGetFunc[R] { + return func(name string) (R, error) { + var r R + r, err := localGetter.Get(name) + if errors.IsNotFound(err) { + return globalGetter.Get(name) + } + if err != nil { + // r will be nil in this case + return r, err + } + return r, nil + } +} + +// NewScopedGetterWithFallback creates a new ScopedFallbackGetFunc that gets an object of type R within a given cluster path. The local lister is +// checked first, and if nothing is found, the global lister is checked. +func NewScopedGetterWithFallback[R runtime.Object, G Getter[R]](localGetter, globalGetter ClusterGetter[R, G]) ScopedFallbackGetFunc[R] { + return func(clusterName logicalcluster.Name, name string) (R, error) { + var r R + r, err := localGetter.Cluster(clusterName).Get(name) + if errors.IsNotFound(err) { + return globalGetter.Cluster(clusterName).Get(name) + } + if err != nil { + return r, err + } + return r, nil + } +} diff --git a/pkg/reconciler/apis/apibinding/apibinding_controller.go b/pkg/reconciler/apis/apibinding/apibinding_controller.go index b3e7f238ee3..57f7aa0009f 100644 --- a/pkg/reconciler/apis/apibinding/apibinding_controller.go +++ b/pkg/reconciler/apis/apibinding/apibinding_controller.go @@ -42,6 +42,7 @@ import ( "k8s.io/klog/v2" "github.com/kcp-dev/kcp/pkg/indexers" + "github.com/kcp-dev/kcp/pkg/informer" "github.com/kcp-dev/kcp/pkg/logging" "github.com/kcp-dev/kcp/pkg/reconciler/committer" apisv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1" @@ -49,6 +50,7 @@ import ( kcpclientset "github.com/kcp-dev/kcp/sdk/client/clientset/versioned/cluster" apisv1alpha1client "github.com/kcp-dev/kcp/sdk/client/clientset/versioned/typed/apis/v1alpha1" apisv1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/apis/v1alpha1" + apisv1alpha1listers "github.com/kcp-dev/kcp/sdk/client/listers/apis/v1alpha1" ) const ( @@ -143,21 +145,9 @@ func NewController( return indexers.ByIndexWithFallback[*apisv1alpha1.APIExport](apiExportInformer.Informer().GetIndexer(), globalAPIExportInformer.Informer().GetIndexer(), indexAPIExportsByAPIResourceSchema, key) }, - getAPIResourceSchema: func(clusterName logicalcluster.Name, name string) (*apisv1alpha1.APIResourceSchema, error) { - apiResourceSchema, err := apiResourceSchemaInformer.Lister().Cluster(clusterName).Get(name) - if apierrors.IsNotFound(err) { - return globalAPIResourceSchemaInformer.Lister().Cluster(clusterName).Get(name) - } - return apiResourceSchema, err - }, + getAPIResourceSchema: informer.NewScopedGetterWithFallback[*apisv1alpha1.APIResourceSchema, apisv1alpha1listers.APIResourceSchemaLister](apiResourceSchemaInformer.Lister(), globalAPIResourceSchemaInformer.Lister()), - getAPIConversion: func(clusterName logicalcluster.Name, name string) (*apisv1alpha1.APIConversion, error) { - apiConversion, err := apiConversionInformer.Cluster(clusterName).Lister().Get(name) - if apierrors.IsNotFound(err) { - return globalAPIConversionInformer.Lister().Cluster(clusterName).Get(name) - } - return apiConversion, err - }, + getAPIConversion: informer.NewScopedGetterWithFallback[*apisv1alpha1.APIConversion, apisv1alpha1listers.APIConversionLister](apiConversionInformer.Lister(), globalAPIConversionInformer.Lister()), createCRD: func(ctx context.Context, clusterName logicalcluster.Path, crd *apiextensionsv1.CustomResourceDefinition) (*apiextensionsv1.CustomResourceDefinition, error) { return crdClusterClient.Cluster(clusterName).ApiextensionsV1().CustomResourceDefinitions().Create(ctx, crd, metav1.CreateOptions{}) diff --git a/pkg/reconciler/workload/resource/resource_controller.go b/pkg/reconciler/workload/resource/resource_controller.go index 29522908d92..c27ee52f1ad 100644 --- a/pkg/reconciler/workload/resource/resource_controller.go +++ b/pkg/reconciler/workload/resource/resource_controller.go @@ -64,7 +64,7 @@ const ( func NewController( dynamicClusterClient kcpdynamic.ClusterInterface, ddsif *informer.DiscoveringDynamicSharedInformerFactory, - syncTargetInformer workloadv1alpha1informers.SyncTargetClusterInformer, + syncTargetInformer, globalSyncTargetInformer workloadv1alpha1informers.SyncTargetClusterInformer, namespaceInformer kcpcorev1informers.NamespaceClusterInformer, placementInformer schedulingv1alpha1informers.PlacementClusterInformer, ) (*Controller, error) { @@ -97,20 +97,17 @@ func NewController( }, getSyncTargetFromKey: func(syncTargetKey string) (*workloadv1alpha1.SyncTarget, bool, error) { - syncTargets, err := indexers.ByIndex[*workloadv1alpha1.SyncTarget](syncTargetInformer.Informer().GetIndexer(), bySyncTargetKey, syncTargetKey) - if err != nil && !errors.IsNotFound(err) { + syncTargets, err := indexers.ByIndexWithFallback[*workloadv1alpha1.SyncTarget](syncTargetInformer.Informer().GetIndexer(), + globalSyncTargetInformer.Informer().GetIndexer(), bySyncTargetKey, syncTargetKey) + if err != nil { return nil, false, err } - if errors.IsNotFound(err) { + if len(syncTargets) == 0 { return nil, false, nil } - // This shouldn't happen, more than one SyncTarget with the same key means a hash collision. if len(syncTargets) > 1 { return nil, false, fmt.Errorf("possible collision: multiple sync targets found for key %q", syncTargetKey) } - if len(syncTargets) == 0 { - return nil, false, nil - } return syncTargets[0], true, nil }, @@ -149,12 +146,22 @@ func NewController( bySyncTargetKey: indexBySyncTargetKey, }) + indexers.AddIfNotPresentOrDie(globalSyncTargetInformer.Informer().GetIndexer(), cache.Indexers{ + bySyncTargetKey: indexBySyncTargetKey, + }) + syncTargetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ DeleteFunc: func(obj interface{}) { c.enqueueSyncTarget(obj) }, }) + globalSyncTargetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + DeleteFunc: func(obj interface{}) { + c.enqueueSyncTarget(obj) + }, + }) + return c, nil } diff --git a/pkg/reconciler/workload/synctarget/synctarget_controller.go b/pkg/reconciler/workload/synctarget/synctarget_controller.go index b4a73737c2b..6128ebe53af 100644 --- a/pkg/reconciler/workload/synctarget/synctarget_controller.go +++ b/pkg/reconciler/workload/synctarget/synctarget_controller.go @@ -36,13 +36,13 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "github.com/kcp-dev/kcp/pkg/informer" "github.com/kcp-dev/kcp/pkg/logging" corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1" workloadv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/workload/v1alpha1" kcpclientset "github.com/kcp-dev/kcp/sdk/client/clientset/versioned/cluster" corev1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/core/v1alpha1" workloadv1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/workload/v1alpha1" - corev1alpha1listers "github.com/kcp-dev/kcp/sdk/client/listers/core/v1alpha1" ) const ControllerName = "kcp-synctarget-controller" @@ -50,13 +50,13 @@ const ControllerName = "kcp-synctarget-controller" func NewController( kcpClusterClient kcpclientset.ClusterInterface, syncTargetInformer workloadv1alpha1informers.SyncTargetClusterInformer, - workspaceShardInformer corev1alpha1informers.ShardClusterInformer, + workspaceShardInformer, globalWorkspaceShardInformer corev1alpha1informers.ShardClusterInformer, ) *Controller { c := &Controller{ - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName), - kcpClusterClient: kcpClusterClient, - syncTargetIndexer: syncTargetInformer.Informer().GetIndexer(), - workspaceShardLister: workspaceShardInformer.Lister(), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName), + kcpClusterClient: kcpClusterClient, + syncTargetIndexer: syncTargetInformer.Informer().GetIndexer(), + listWorkspaceShards: informer.NewListerWithFallback[*corev1alpha1.Shard](workspaceShardInformer.Lister(), globalWorkspaceShardInformer.Lister()), } // Watch for events related to SyncTargets @@ -80,8 +80,8 @@ type Controller struct { queue workqueue.RateLimitingInterface kcpClusterClient kcpclientset.ClusterInterface - workspaceShardLister corev1alpha1listers.ShardClusterLister - syncTargetIndexer cache.Indexer + listWorkspaceShards informer.FallbackListFunc[*corev1alpha1.Shard] + syncTargetIndexer cache.Indexer } func (c *Controller) enqueueSyncTarget(obj interface{}) { @@ -174,7 +174,7 @@ func (c *Controller) process(ctx context.Context, key string) error { logger = logging.WithObject(klog.FromContext(ctx), currentSyncTarget) ctx = klog.NewContext(ctx, logger) - workspacesShards, err := c.workspaceShardLister.List(labels.Everything()) + workspacesShards, err := c.listWorkspaceShards(labels.Everything()) if err != nil { return err } diff --git a/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go b/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go index c3a376656fb..158bf79f94c 100644 --- a/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go +++ b/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go @@ -36,6 +36,7 @@ import ( "k8s.io/klog/v2" "github.com/kcp-dev/kcp/pkg/indexers" + "github.com/kcp-dev/kcp/pkg/informer" "github.com/kcp-dev/kcp/pkg/logging" "github.com/kcp-dev/kcp/pkg/reconciler/committer" apiresourcev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apiresource/v1alpha1" @@ -64,18 +65,20 @@ const ( func NewController( kcpClusterClient kcpclientset.ClusterInterface, syncTargetInformer workloadv1alpha1informers.SyncTargetClusterInformer, - apiExportInformer apisv1alpha1informers.APIExportClusterInformer, - apiResourceSchemaInformer apisv1alpha1informers.APIResourceSchemaClusterInformer, + apiExportInformer, globalAPIExportInformer apisv1alpha1informers.APIExportClusterInformer, + apiResourceSchemaInformer, globalAPIResourceSchemaInformer apisv1alpha1informers.APIResourceSchemaClusterInformer, apiResourceImportInformer apiresourcev1alpha1informers.APIResourceImportClusterInformer, ) (*Controller, error) { c := &Controller{ - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName), - kcpClusterClient: kcpClusterClient, - syncTargetIndexer: syncTargetInformer.Informer().GetIndexer(), - syncTargetLister: syncTargetInformer.Lister(), - apiExportsIndexer: apiExportInformer.Informer().GetIndexer(), - apiExportLister: apiExportInformer.Lister(), - resourceSchemaLister: apiResourceSchemaInformer.Lister(), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName), + kcpClusterClient: kcpClusterClient, + syncTargetIndexer: syncTargetInformer.Informer().GetIndexer(), + syncTargetLister: syncTargetInformer.Lister(), + apiExportsIndexer: apiExportInformer.Informer().GetIndexer(), + getAPIExport: func(path logicalcluster.Path, name string) (*apisv1alpha1.APIExport, error) { + return indexers.ByPathAndNameWithFallback[*apisv1alpha1.APIExport](apisv1alpha1.Resource("apiexports"), apiExportInformer.Informer().GetIndexer(), globalAPIExportInformer.Informer().GetIndexer(), path, name) + }, + getAPIResourceSchema: informer.NewScopedGetterWithFallback[*apisv1alpha1.APIResourceSchema, apisv1alpha1listers.APIResourceSchemaLister](apiResourceSchemaInformer.Lister(), globalAPIResourceSchemaInformer.Lister()), apiImportLister: apiResourceImportInformer.Lister(), commit: committer.NewCommitter[*SyncTarget, Patcher, *SyncTargetSpec, *SyncTargetStatus](kcpClusterClient.WorkloadV1alpha1().SyncTargets()), } @@ -154,8 +157,8 @@ type Controller struct { syncTargetIndexer cache.Indexer syncTargetLister workloadv1alpha1listers.SyncTargetClusterLister apiExportsIndexer cache.Indexer - apiExportLister apisv1alpha1listers.APIExportClusterLister - resourceSchemaLister apisv1alpha1listers.APIResourceSchemaClusterLister + getAPIExport func(path logicalcluster.Path, name string) (*apisv1alpha1.APIExport, error) + getAPIResourceSchema informer.ScopedFallbackGetFunc[*apisv1alpha1.APIResourceSchema] apiImportLister apiresourcev1alpha1listers.APIResourceImportClusterLister commit CommitFunc @@ -314,7 +317,7 @@ func (c *Controller) process(ctx context.Context, key string) error { exportReconciler := &exportReconciler{ getAPIExport: c.getAPIExport, - getResourceSchema: c.getResourceSchema, + getResourceSchema: c.getAPIResourceSchema, } currentSyncTarget, err = exportReconciler.reconcile(ctx, currentSyncTarget) if err != nil { @@ -323,7 +326,7 @@ func (c *Controller) process(ctx context.Context, key string) error { apiCompatibleReconciler := &apiCompatibleReconciler{ getAPIExport: c.getAPIExport, - getResourceSchema: c.getResourceSchema, + getResourceSchema: c.getAPIResourceSchema, listAPIResourceImports: c.listAPIResourceImports, } currentSyncTarget, err = apiCompatibleReconciler.reconcile(ctx, currentSyncTarget) @@ -341,14 +344,6 @@ func (c *Controller) process(ctx context.Context, key string) error { return errors.NewAggregate(errs) } -func (c *Controller) getAPIExport(path logicalcluster.Path, name string) (*apisv1alpha1.APIExport, error) { - return indexers.ByPathAndName[*apisv1alpha1.APIExport](apisv1alpha1.Resource("apiexports"), c.apiExportsIndexer, path, name) -} - -func (c *Controller) getResourceSchema(clusterName logicalcluster.Name, name string) (*apisv1alpha1.APIResourceSchema, error) { - return c.resourceSchemaLister.Cluster(clusterName).Get(name) -} - func (c *Controller) listAPIResourceImports(clusterName logicalcluster.Name) ([]*apiresourcev1alpha1.APIResourceImport, error) { return c.apiImportLister.Cluster(clusterName).List(labels.Everything()) } diff --git a/tmc/pkg/server/controllers.go b/tmc/pkg/server/controllers.go index 7e69614d879..058eff86715 100644 --- a/tmc/pkg/server/controllers.go +++ b/tmc/pkg/server/controllers.go @@ -62,6 +62,7 @@ func (s *Server) installWorkloadResourceScheduler(ctx context.Context, config *r dynamicClusterClient, s.Core.DiscoveringDynamicSharedInformerFactory, s.Core.KcpSharedInformerFactory.Workload().V1alpha1().SyncTargets(), + s.Core.CacheKcpSharedInformerFactory.Workload().V1alpha1().SyncTargets(), s.Core.KubeSharedInformerFactory.Core().V1().Namespaces(), s.Core.KcpSharedInformerFactory.Scheduling().V1alpha1().Placements(), ) @@ -355,7 +356,9 @@ func (s *Server) installWorkloadSyncTargetExportController(ctx context.Context, kcpClusterClient, s.Core.KcpSharedInformerFactory.Workload().V1alpha1().SyncTargets(), s.Core.KcpSharedInformerFactory.Apis().V1alpha1().APIExports(), + s.Core.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIExports(), s.Core.KcpSharedInformerFactory.Apis().V1alpha1().APIResourceSchemas(), + s.Core.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIResourceSchemas(), s.Core.KcpSharedInformerFactory.Apiresource().V1alpha1().APIResourceImports(), ) if err != nil { @@ -386,9 +389,8 @@ func (s *Server) installSyncTargetController(ctx context.Context, config *rest.C c := synctargetcontroller.NewController( kcpClusterClient, s.Core.KcpSharedInformerFactory.Workload().V1alpha1().SyncTargets(), - // TODO: change to s.CacheKcpSharedInformerFactory.Core().V1alpha1().Shards(), - // once https://github.com/kcp-dev/kcp/issues/2649 is resolved s.Core.KcpSharedInformerFactory.Core().V1alpha1().Shards(), + s.Core.CacheKcpSharedInformerFactory.Core().V1alpha1().Shards(), ) if err != nil { return err