Skip to content

Commit

Permalink
Merge pull request #2939 from nrb/2900-all-controllers
Browse files Browse the repository at this point in the history
🌱 Add cache lookup to TMC controllers
  • Loading branch information
openshift-merge-robot authored Apr 5, 2023
2 parents ee2ba92 + 1c83615 commit 419199c
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 54 deletions.
100 changes: 100 additions & 0 deletions pkg/informer/fallback_lookup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
Copyright 2023 The KCP Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package informer

import (
"github.com/kcp-dev/logicalcluster/v3"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
)

// ClusterLister is a cluster-aware Lister API.
type ClusterLister[R runtime.Object, L Lister[R]] interface {
Cluster(name logicalcluster.Name) L
}

// Lister is a Lister API with generics.
type Lister[R runtime.Object] interface {
List(labels.Selector) ([]R, error)
}

// FallbackListFunc is a function that returns []R from either local or global informer Listers based on a label selector.
type FallbackListFunc[R runtime.Object] func(selector labels.Selector) ([]R, error)

// ClusterGetter is a cluster-aware Getter API.
type ClusterGetter[R runtime.Object, G Getter[R]] interface {
Cluster(name logicalcluster.Name) G
}

// Getter is a GetAPI call with generics.
type Getter[R runtime.Object] interface {
Get(name string) (R, error)
}

// FallbackGetFunc is a function that returns an instance of R from either local or global informer Listers based on a name.
type FallbackGetFunc[R runtime.Object] func(name string) (R, error)

// ScopedFallbackGetFunc is a function that returns an instance of R from either local or global cluster-scoped informer Listers based on a name.
type ScopedFallbackGetFunc[R runtime.Object] func(clusterName logicalcluster.Name, name string) (R, error)

// NewListerWithFallback creates a new FallbackListFunc that looks up an object of type R first in the local lister, then in the global lister if no local results are found.
func NewListerWithFallback[R runtime.Object](localLister Lister[R], globalLister Lister[R]) FallbackListFunc[R] {
return func(selector labels.Selector) ([]R, error) {
r, err := localLister.List(selector)
if len(r) == 0 {
return globalLister.List(selector)
}
if err != nil {
return nil, err
}
return r, nil
}
}

// NewGetterWithFallback creates a new FallbackGetFunc that gets an object of type R first looking in the local lister, then in the global lister if not found.
func NewGetterWithFallback[R runtime.Object](localGetter, globalGetter Getter[R]) FallbackGetFunc[R] {
return func(name string) (R, error) {
var r R
r, err := localGetter.Get(name)
if errors.IsNotFound(err) {
return globalGetter.Get(name)
}
if err != nil {
// r will be nil in this case
return r, err
}
return r, nil
}
}

// NewScopedGetterWithFallback creates a new ScopedFallbackGetFunc that gets an object of type R within a given cluster path. The local lister is
// checked first, and if nothing is found, the global lister is checked.
func NewScopedGetterWithFallback[R runtime.Object, G Getter[R]](localGetter, globalGetter ClusterGetter[R, G]) ScopedFallbackGetFunc[R] {
return func(clusterName logicalcluster.Name, name string) (R, error) {
var r R
r, err := localGetter.Cluster(clusterName).Get(name)
if errors.IsNotFound(err) {
return globalGetter.Cluster(clusterName).Get(name)
}
if err != nil {
return r, err
}
return r, nil
}
}
18 changes: 4 additions & 14 deletions pkg/reconciler/apis/apibinding/apibinding_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@ import (
"k8s.io/klog/v2"

"github.com/kcp-dev/kcp/pkg/indexers"
"github.com/kcp-dev/kcp/pkg/informer"
"github.com/kcp-dev/kcp/pkg/logging"
"github.com/kcp-dev/kcp/pkg/reconciler/committer"
apisv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1"
"github.com/kcp-dev/kcp/sdk/apis/core"
kcpclientset "github.com/kcp-dev/kcp/sdk/client/clientset/versioned/cluster"
apisv1alpha1client "github.com/kcp-dev/kcp/sdk/client/clientset/versioned/typed/apis/v1alpha1"
apisv1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/apis/v1alpha1"
apisv1alpha1listers "github.com/kcp-dev/kcp/sdk/client/listers/apis/v1alpha1"
)

const (
Expand Down Expand Up @@ -143,21 +145,9 @@ func NewController(
return indexers.ByIndexWithFallback[*apisv1alpha1.APIExport](apiExportInformer.Informer().GetIndexer(), globalAPIExportInformer.Informer().GetIndexer(), indexAPIExportsByAPIResourceSchema, key)
},

getAPIResourceSchema: func(clusterName logicalcluster.Name, name string) (*apisv1alpha1.APIResourceSchema, error) {
apiResourceSchema, err := apiResourceSchemaInformer.Lister().Cluster(clusterName).Get(name)
if apierrors.IsNotFound(err) {
return globalAPIResourceSchemaInformer.Lister().Cluster(clusterName).Get(name)
}
return apiResourceSchema, err
},
getAPIResourceSchema: informer.NewScopedGetterWithFallback[*apisv1alpha1.APIResourceSchema, apisv1alpha1listers.APIResourceSchemaLister](apiResourceSchemaInformer.Lister(), globalAPIResourceSchemaInformer.Lister()),

getAPIConversion: func(clusterName logicalcluster.Name, name string) (*apisv1alpha1.APIConversion, error) {
apiConversion, err := apiConversionInformer.Cluster(clusterName).Lister().Get(name)
if apierrors.IsNotFound(err) {
return globalAPIConversionInformer.Lister().Cluster(clusterName).Get(name)
}
return apiConversion, err
},
getAPIConversion: informer.NewScopedGetterWithFallback[*apisv1alpha1.APIConversion, apisv1alpha1listers.APIConversionLister](apiConversionInformer.Lister(), globalAPIConversionInformer.Lister()),

createCRD: func(ctx context.Context, clusterName logicalcluster.Path, crd *apiextensionsv1.CustomResourceDefinition) (*apiextensionsv1.CustomResourceDefinition, error) {
return crdClusterClient.Cluster(clusterName).ApiextensionsV1().CustomResourceDefinitions().Create(ctx, crd, metav1.CreateOptions{})
Expand Down
23 changes: 15 additions & 8 deletions pkg/reconciler/workload/resource/resource_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ const (
func NewController(
dynamicClusterClient kcpdynamic.ClusterInterface,
ddsif *informer.DiscoveringDynamicSharedInformerFactory,
syncTargetInformer workloadv1alpha1informers.SyncTargetClusterInformer,
syncTargetInformer, globalSyncTargetInformer workloadv1alpha1informers.SyncTargetClusterInformer,
namespaceInformer kcpcorev1informers.NamespaceClusterInformer,
placementInformer schedulingv1alpha1informers.PlacementClusterInformer,
) (*Controller, error) {
Expand Down Expand Up @@ -97,20 +97,17 @@ func NewController(
},

getSyncTargetFromKey: func(syncTargetKey string) (*workloadv1alpha1.SyncTarget, bool, error) {
syncTargets, err := indexers.ByIndex[*workloadv1alpha1.SyncTarget](syncTargetInformer.Informer().GetIndexer(), bySyncTargetKey, syncTargetKey)
if err != nil && !errors.IsNotFound(err) {
syncTargets, err := indexers.ByIndexWithFallback[*workloadv1alpha1.SyncTarget](syncTargetInformer.Informer().GetIndexer(),
globalSyncTargetInformer.Informer().GetIndexer(), bySyncTargetKey, syncTargetKey)
if err != nil {
return nil, false, err
}
if errors.IsNotFound(err) {
if len(syncTargets) == 0 {
return nil, false, nil
}
// This shouldn't happen, more than one SyncTarget with the same key means a hash collision.
if len(syncTargets) > 1 {
return nil, false, fmt.Errorf("possible collision: multiple sync targets found for key %q", syncTargetKey)
}
if len(syncTargets) == 0 {
return nil, false, nil
}
return syncTargets[0], true, nil
},

Expand Down Expand Up @@ -149,12 +146,22 @@ func NewController(
bySyncTargetKey: indexBySyncTargetKey,
})

indexers.AddIfNotPresentOrDie(globalSyncTargetInformer.Informer().GetIndexer(), cache.Indexers{
bySyncTargetKey: indexBySyncTargetKey,
})

syncTargetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
c.enqueueSyncTarget(obj)
},
})

globalSyncTargetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
c.enqueueSyncTarget(obj)
},
})

return c, nil
}

Expand Down
18 changes: 9 additions & 9 deletions pkg/reconciler/workload/synctarget/synctarget_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,27 +36,27 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

"github.com/kcp-dev/kcp/pkg/informer"
"github.com/kcp-dev/kcp/pkg/logging"
corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1"
workloadv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/workload/v1alpha1"
kcpclientset "github.com/kcp-dev/kcp/sdk/client/clientset/versioned/cluster"
corev1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/core/v1alpha1"
workloadv1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/workload/v1alpha1"
corev1alpha1listers "github.com/kcp-dev/kcp/sdk/client/listers/core/v1alpha1"
)

const ControllerName = "kcp-synctarget-controller"

func NewController(
kcpClusterClient kcpclientset.ClusterInterface,
syncTargetInformer workloadv1alpha1informers.SyncTargetClusterInformer,
workspaceShardInformer corev1alpha1informers.ShardClusterInformer,
workspaceShardInformer, globalWorkspaceShardInformer corev1alpha1informers.ShardClusterInformer,
) *Controller {
c := &Controller{
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName),
kcpClusterClient: kcpClusterClient,
syncTargetIndexer: syncTargetInformer.Informer().GetIndexer(),
workspaceShardLister: workspaceShardInformer.Lister(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName),
kcpClusterClient: kcpClusterClient,
syncTargetIndexer: syncTargetInformer.Informer().GetIndexer(),
listWorkspaceShards: informer.NewListerWithFallback[*corev1alpha1.Shard](workspaceShardInformer.Lister(), globalWorkspaceShardInformer.Lister()),
}

// Watch for events related to SyncTargets
Expand All @@ -80,8 +80,8 @@ type Controller struct {
queue workqueue.RateLimitingInterface
kcpClusterClient kcpclientset.ClusterInterface

workspaceShardLister corev1alpha1listers.ShardClusterLister
syncTargetIndexer cache.Indexer
listWorkspaceShards informer.FallbackListFunc[*corev1alpha1.Shard]
syncTargetIndexer cache.Indexer
}

func (c *Controller) enqueueSyncTarget(obj interface{}) {
Expand Down Expand Up @@ -174,7 +174,7 @@ func (c *Controller) process(ctx context.Context, key string) error {
logger = logging.WithObject(klog.FromContext(ctx), currentSyncTarget)
ctx = klog.NewContext(ctx, logger)

workspacesShards, err := c.workspaceShardLister.List(labels.Everything())
workspacesShards, err := c.listWorkspaceShards(labels.Everything())
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"k8s.io/klog/v2"

"github.com/kcp-dev/kcp/pkg/indexers"
"github.com/kcp-dev/kcp/pkg/informer"
"github.com/kcp-dev/kcp/pkg/logging"
"github.com/kcp-dev/kcp/pkg/reconciler/committer"
apiresourcev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apiresource/v1alpha1"
Expand Down Expand Up @@ -64,18 +65,20 @@ const (
func NewController(
kcpClusterClient kcpclientset.ClusterInterface,
syncTargetInformer workloadv1alpha1informers.SyncTargetClusterInformer,
apiExportInformer apisv1alpha1informers.APIExportClusterInformer,
apiResourceSchemaInformer apisv1alpha1informers.APIResourceSchemaClusterInformer,
apiExportInformer, globalAPIExportInformer apisv1alpha1informers.APIExportClusterInformer,
apiResourceSchemaInformer, globalAPIResourceSchemaInformer apisv1alpha1informers.APIResourceSchemaClusterInformer,
apiResourceImportInformer apiresourcev1alpha1informers.APIResourceImportClusterInformer,
) (*Controller, error) {
c := &Controller{
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName),
kcpClusterClient: kcpClusterClient,
syncTargetIndexer: syncTargetInformer.Informer().GetIndexer(),
syncTargetLister: syncTargetInformer.Lister(),
apiExportsIndexer: apiExportInformer.Informer().GetIndexer(),
apiExportLister: apiExportInformer.Lister(),
resourceSchemaLister: apiResourceSchemaInformer.Lister(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName),
kcpClusterClient: kcpClusterClient,
syncTargetIndexer: syncTargetInformer.Informer().GetIndexer(),
syncTargetLister: syncTargetInformer.Lister(),
apiExportsIndexer: apiExportInformer.Informer().GetIndexer(),
getAPIExport: func(path logicalcluster.Path, name string) (*apisv1alpha1.APIExport, error) {
return indexers.ByPathAndNameWithFallback[*apisv1alpha1.APIExport](apisv1alpha1.Resource("apiexports"), apiExportInformer.Informer().GetIndexer(), globalAPIExportInformer.Informer().GetIndexer(), path, name)
},
getAPIResourceSchema: informer.NewScopedGetterWithFallback[*apisv1alpha1.APIResourceSchema, apisv1alpha1listers.APIResourceSchemaLister](apiResourceSchemaInformer.Lister(), globalAPIResourceSchemaInformer.Lister()),
apiImportLister: apiResourceImportInformer.Lister(),
commit: committer.NewCommitter[*SyncTarget, Patcher, *SyncTargetSpec, *SyncTargetStatus](kcpClusterClient.WorkloadV1alpha1().SyncTargets()),
}
Expand Down Expand Up @@ -154,8 +157,8 @@ type Controller struct {
syncTargetIndexer cache.Indexer
syncTargetLister workloadv1alpha1listers.SyncTargetClusterLister
apiExportsIndexer cache.Indexer
apiExportLister apisv1alpha1listers.APIExportClusterLister
resourceSchemaLister apisv1alpha1listers.APIResourceSchemaClusterLister
getAPIExport func(path logicalcluster.Path, name string) (*apisv1alpha1.APIExport, error)
getAPIResourceSchema informer.ScopedFallbackGetFunc[*apisv1alpha1.APIResourceSchema]
apiImportLister apiresourcev1alpha1listers.APIResourceImportClusterLister

commit CommitFunc
Expand Down Expand Up @@ -314,7 +317,7 @@ func (c *Controller) process(ctx context.Context, key string) error {

exportReconciler := &exportReconciler{
getAPIExport: c.getAPIExport,
getResourceSchema: c.getResourceSchema,
getResourceSchema: c.getAPIResourceSchema,
}
currentSyncTarget, err = exportReconciler.reconcile(ctx, currentSyncTarget)
if err != nil {
Expand All @@ -323,7 +326,7 @@ func (c *Controller) process(ctx context.Context, key string) error {

apiCompatibleReconciler := &apiCompatibleReconciler{
getAPIExport: c.getAPIExport,
getResourceSchema: c.getResourceSchema,
getResourceSchema: c.getAPIResourceSchema,
listAPIResourceImports: c.listAPIResourceImports,
}
currentSyncTarget, err = apiCompatibleReconciler.reconcile(ctx, currentSyncTarget)
Expand All @@ -341,14 +344,6 @@ func (c *Controller) process(ctx context.Context, key string) error {
return errors.NewAggregate(errs)
}

func (c *Controller) getAPIExport(path logicalcluster.Path, name string) (*apisv1alpha1.APIExport, error) {
return indexers.ByPathAndName[*apisv1alpha1.APIExport](apisv1alpha1.Resource("apiexports"), c.apiExportsIndexer, path, name)
}

func (c *Controller) getResourceSchema(clusterName logicalcluster.Name, name string) (*apisv1alpha1.APIResourceSchema, error) {
return c.resourceSchemaLister.Cluster(clusterName).Get(name)
}

func (c *Controller) listAPIResourceImports(clusterName logicalcluster.Name) ([]*apiresourcev1alpha1.APIResourceImport, error) {
return c.apiImportLister.Cluster(clusterName).List(labels.Everything())
}
6 changes: 4 additions & 2 deletions tmc/pkg/server/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func (s *Server) installWorkloadResourceScheduler(ctx context.Context, config *r
dynamicClusterClient,
s.Core.DiscoveringDynamicSharedInformerFactory,
s.Core.KcpSharedInformerFactory.Workload().V1alpha1().SyncTargets(),
s.Core.CacheKcpSharedInformerFactory.Workload().V1alpha1().SyncTargets(),
s.Core.KubeSharedInformerFactory.Core().V1().Namespaces(),
s.Core.KcpSharedInformerFactory.Scheduling().V1alpha1().Placements(),
)
Expand Down Expand Up @@ -355,7 +356,9 @@ func (s *Server) installWorkloadSyncTargetExportController(ctx context.Context,
kcpClusterClient,
s.Core.KcpSharedInformerFactory.Workload().V1alpha1().SyncTargets(),
s.Core.KcpSharedInformerFactory.Apis().V1alpha1().APIExports(),
s.Core.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIExports(),
s.Core.KcpSharedInformerFactory.Apis().V1alpha1().APIResourceSchemas(),
s.Core.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIResourceSchemas(),
s.Core.KcpSharedInformerFactory.Apiresource().V1alpha1().APIResourceImports(),
)
if err != nil {
Expand Down Expand Up @@ -386,9 +389,8 @@ func (s *Server) installSyncTargetController(ctx context.Context, config *rest.C
c := synctargetcontroller.NewController(
kcpClusterClient,
s.Core.KcpSharedInformerFactory.Workload().V1alpha1().SyncTargets(),
// TODO: change to s.CacheKcpSharedInformerFactory.Core().V1alpha1().Shards(),
// once https://github.com/kcp-dev/kcp/issues/2649 is resolved
s.Core.KcpSharedInformerFactory.Core().V1alpha1().Shards(),
s.Core.CacheKcpSharedInformerFactory.Core().V1alpha1().Shards(),
)
if err != nil {
return err
Expand Down

0 comments on commit 419199c

Please sign in to comment.