Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🌱 Add cache lookup to TMC controllers #2939

Merged
merged 7 commits into from
Apr 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions pkg/informer/fallback_lookup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
Copyright 2023 The KCP Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package informer

import (
"github.com/kcp-dev/logicalcluster/v3"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
)

// ClusterLister is a cluster-aware Lister API.
type ClusterLister[R runtime.Object, L Lister[R]] interface {
Cluster(name logicalcluster.Name) L
}

// Lister is a Lister API with generics.
type Lister[R runtime.Object] interface {
List(labels.Selector) ([]R, error)
}

// FallbackListFunc is a function that returns []R from either local or global informer Listers based on a label selector.
type FallbackListFunc[R runtime.Object] func(selector labels.Selector) ([]R, error)

// ClusterGetter is a cluster-aware Getter API.
type ClusterGetter[R runtime.Object, G Getter[R]] interface {
Cluster(name logicalcluster.Name) G
}

// Getter is a GetAPI call with generics.
type Getter[R runtime.Object] interface {
Get(name string) (R, error)
}

// FallbackGetFunc is a function that returns an instance of R from either local or global informer Listers based on a name.
type FallbackGetFunc[R runtime.Object] func(name string) (R, error)

// ScopedFallbackGetFunc is a function that returns an instance of R from either local or global cluster-scoped informer Listers based on a name.
type ScopedFallbackGetFunc[R runtime.Object] func(clusterName logicalcluster.Name, name string) (R, error)

// NewListerWithFallback creates a new FallbackListFunc that looks up an object of type R first in the local lister, then in the global lister if no local results are found.
func NewListerWithFallback[R runtime.Object](localLister Lister[R], globalLister Lister[R]) FallbackListFunc[R] {
return func(selector labels.Selector) ([]R, error) {
r, err := localLister.List(selector)
if len(r) == 0 {
return globalLister.List(selector)
}
if err != nil {
return nil, err
}
return r, nil
}
}

// NewGetterWithFallback creates a new FallbackGetFunc that gets an object of type R first looking in the local lister, then in the global lister if not found.
func NewGetterWithFallback[R runtime.Object](localGetter, globalGetter Getter[R]) FallbackGetFunc[R] {
return func(name string) (R, error) {
var r R
r, err := localGetter.Get(name)
if errors.IsNotFound(err) {
return globalGetter.Get(name)
}
if err != nil {
// r will be nil in this case
return r, err
}
return r, nil
}
}

// NewScopedGetterWithFallback creates a new ScopedFallbackGetFunc that gets an object of type R within a given cluster path. The local lister is
// checked first, and if nothing is found, the global lister is checked.
func NewScopedGetterWithFallback[R runtime.Object, G Getter[R]](localGetter, globalGetter ClusterGetter[R, G]) ScopedFallbackGetFunc[R] {
return func(clusterName logicalcluster.Name, name string) (R, error) {
var r R
r, err := localGetter.Cluster(clusterName).Get(name)
if errors.IsNotFound(err) {
return globalGetter.Cluster(clusterName).Get(name)
}
if err != nil {
return r, err
}
return r, nil
}
}
18 changes: 4 additions & 14 deletions pkg/reconciler/apis/apibinding/apibinding_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@ import (
"k8s.io/klog/v2"

"github.com/kcp-dev/kcp/pkg/indexers"
"github.com/kcp-dev/kcp/pkg/informer"
"github.com/kcp-dev/kcp/pkg/logging"
"github.com/kcp-dev/kcp/pkg/reconciler/committer"
apisv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1"
"github.com/kcp-dev/kcp/sdk/apis/core"
kcpclientset "github.com/kcp-dev/kcp/sdk/client/clientset/versioned/cluster"
apisv1alpha1client "github.com/kcp-dev/kcp/sdk/client/clientset/versioned/typed/apis/v1alpha1"
apisv1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/apis/v1alpha1"
apisv1alpha1listers "github.com/kcp-dev/kcp/sdk/client/listers/apis/v1alpha1"
)

const (
Expand Down Expand Up @@ -143,21 +145,9 @@ func NewController(
return indexers.ByIndexWithFallback[*apisv1alpha1.APIExport](apiExportInformer.Informer().GetIndexer(), globalAPIExportInformer.Informer().GetIndexer(), indexAPIExportsByAPIResourceSchema, key)
},

getAPIResourceSchema: func(clusterName logicalcluster.Name, name string) (*apisv1alpha1.APIResourceSchema, error) {
apiResourceSchema, err := apiResourceSchemaInformer.Lister().Cluster(clusterName).Get(name)
if apierrors.IsNotFound(err) {
return globalAPIResourceSchemaInformer.Lister().Cluster(clusterName).Get(name)
}
return apiResourceSchema, err
},
getAPIResourceSchema: informer.NewScopedGetterWithFallback[*apisv1alpha1.APIResourceSchema, apisv1alpha1listers.APIResourceSchemaLister](apiResourceSchemaInformer.Lister(), globalAPIResourceSchemaInformer.Lister()),

getAPIConversion: func(clusterName logicalcluster.Name, name string) (*apisv1alpha1.APIConversion, error) {
apiConversion, err := apiConversionInformer.Cluster(clusterName).Lister().Get(name)
if apierrors.IsNotFound(err) {
return globalAPIConversionInformer.Lister().Cluster(clusterName).Get(name)
}
return apiConversion, err
},
getAPIConversion: informer.NewScopedGetterWithFallback[*apisv1alpha1.APIConversion, apisv1alpha1listers.APIConversionLister](apiConversionInformer.Lister(), globalAPIConversionInformer.Lister()),

createCRD: func(ctx context.Context, clusterName logicalcluster.Path, crd *apiextensionsv1.CustomResourceDefinition) (*apiextensionsv1.CustomResourceDefinition, error) {
return crdClusterClient.Cluster(clusterName).ApiextensionsV1().CustomResourceDefinitions().Create(ctx, crd, metav1.CreateOptions{})
Expand Down
23 changes: 15 additions & 8 deletions pkg/reconciler/workload/resource/resource_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ const (
func NewController(
dynamicClusterClient kcpdynamic.ClusterInterface,
ddsif *informer.DiscoveringDynamicSharedInformerFactory,
syncTargetInformer workloadv1alpha1informers.SyncTargetClusterInformer,
syncTargetInformer, globalSyncTargetInformer workloadv1alpha1informers.SyncTargetClusterInformer,
namespaceInformer kcpcorev1informers.NamespaceClusterInformer,
placementInformer schedulingv1alpha1informers.PlacementClusterInformer,
) (*Controller, error) {
Expand Down Expand Up @@ -97,20 +97,17 @@ func NewController(
},

getSyncTargetFromKey: func(syncTargetKey string) (*workloadv1alpha1.SyncTarget, bool, error) {
syncTargets, err := indexers.ByIndex[*workloadv1alpha1.SyncTarget](syncTargetInformer.Informer().GetIndexer(), bySyncTargetKey, syncTargetKey)
if err != nil && !errors.IsNotFound(err) {
syncTargets, err := indexers.ByIndexWithFallback[*workloadv1alpha1.SyncTarget](syncTargetInformer.Informer().GetIndexer(),
globalSyncTargetInformer.Informer().GetIndexer(), bySyncTargetKey, syncTargetKey)
if err != nil {
return nil, false, err
nrb marked this conversation as resolved.
Show resolved Hide resolved
}
if errors.IsNotFound(err) {
if len(syncTargets) == 0 {
return nil, false, nil
}
// This shouldn't happen, more than one SyncTarget with the same key means a hash collision.
if len(syncTargets) > 1 {
return nil, false, fmt.Errorf("possible collision: multiple sync targets found for key %q", syncTargetKey)
}
if len(syncTargets) == 0 {
return nil, false, nil
}
return syncTargets[0], true, nil
},

Expand Down Expand Up @@ -149,12 +146,22 @@ func NewController(
bySyncTargetKey: indexBySyncTargetKey,
})

indexers.AddIfNotPresentOrDie(globalSyncTargetInformer.Informer().GetIndexer(), cache.Indexers{
bySyncTargetKey: indexBySyncTargetKey,
})

syncTargetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
c.enqueueSyncTarget(obj)
},
})

globalSyncTargetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
c.enqueueSyncTarget(obj)
},
})

return c, nil
}

Expand Down
18 changes: 9 additions & 9 deletions pkg/reconciler/workload/synctarget/synctarget_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,27 +36,27 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

"github.com/kcp-dev/kcp/pkg/informer"
"github.com/kcp-dev/kcp/pkg/logging"
corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1"
workloadv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/workload/v1alpha1"
kcpclientset "github.com/kcp-dev/kcp/sdk/client/clientset/versioned/cluster"
corev1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/core/v1alpha1"
workloadv1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/workload/v1alpha1"
corev1alpha1listers "github.com/kcp-dev/kcp/sdk/client/listers/core/v1alpha1"
)

const ControllerName = "kcp-synctarget-controller"

func NewController(
kcpClusterClient kcpclientset.ClusterInterface,
syncTargetInformer workloadv1alpha1informers.SyncTargetClusterInformer,
workspaceShardInformer corev1alpha1informers.ShardClusterInformer,
workspaceShardInformer, globalWorkspaceShardInformer corev1alpha1informers.ShardClusterInformer,
) *Controller {
c := &Controller{
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName),
kcpClusterClient: kcpClusterClient,
syncTargetIndexer: syncTargetInformer.Informer().GetIndexer(),
workspaceShardLister: workspaceShardInformer.Lister(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName),
kcpClusterClient: kcpClusterClient,
syncTargetIndexer: syncTargetInformer.Informer().GetIndexer(),
listWorkspaceShards: informer.NewListerWithFallback[*corev1alpha1.Shard](workspaceShardInformer.Lister(), globalWorkspaceShardInformer.Lister()),
}

// Watch for events related to SyncTargets
Expand All @@ -80,8 +80,8 @@ type Controller struct {
queue workqueue.RateLimitingInterface
kcpClusterClient kcpclientset.ClusterInterface

workspaceShardLister corev1alpha1listers.ShardClusterLister
syncTargetIndexer cache.Indexer
listWorkspaceShards informer.FallbackListFunc[*corev1alpha1.Shard]
syncTargetIndexer cache.Indexer
}

func (c *Controller) enqueueSyncTarget(obj interface{}) {
Expand Down Expand Up @@ -174,7 +174,7 @@ func (c *Controller) process(ctx context.Context, key string) error {
logger = logging.WithObject(klog.FromContext(ctx), currentSyncTarget)
ctx = klog.NewContext(ctx, logger)

workspacesShards, err := c.workspaceShardLister.List(labels.Everything())
workspacesShards, err := c.listWorkspaceShards(labels.Everything())
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"k8s.io/klog/v2"

"github.com/kcp-dev/kcp/pkg/indexers"
"github.com/kcp-dev/kcp/pkg/informer"
"github.com/kcp-dev/kcp/pkg/logging"
"github.com/kcp-dev/kcp/pkg/reconciler/committer"
apiresourcev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apiresource/v1alpha1"
Expand Down Expand Up @@ -64,18 +65,20 @@ const (
func NewController(
kcpClusterClient kcpclientset.ClusterInterface,
syncTargetInformer workloadv1alpha1informers.SyncTargetClusterInformer,
apiExportInformer apisv1alpha1informers.APIExportClusterInformer,
apiResourceSchemaInformer apisv1alpha1informers.APIResourceSchemaClusterInformer,
apiExportInformer, globalAPIExportInformer apisv1alpha1informers.APIExportClusterInformer,
apiResourceSchemaInformer, globalAPIResourceSchemaInformer apisv1alpha1informers.APIResourceSchemaClusterInformer,
apiResourceImportInformer apiresourcev1alpha1informers.APIResourceImportClusterInformer,
) (*Controller, error) {
c := &Controller{
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName),
kcpClusterClient: kcpClusterClient,
syncTargetIndexer: syncTargetInformer.Informer().GetIndexer(),
syncTargetLister: syncTargetInformer.Lister(),
apiExportsIndexer: apiExportInformer.Informer().GetIndexer(),
apiExportLister: apiExportInformer.Lister(),
resourceSchemaLister: apiResourceSchemaInformer.Lister(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName),
kcpClusterClient: kcpClusterClient,
syncTargetIndexer: syncTargetInformer.Informer().GetIndexer(),
syncTargetLister: syncTargetInformer.Lister(),
apiExportsIndexer: apiExportInformer.Informer().GetIndexer(),
getAPIExport: func(path logicalcluster.Path, name string) (*apisv1alpha1.APIExport, error) {
return indexers.ByPathAndNameWithFallback[*apisv1alpha1.APIExport](apisv1alpha1.Resource("apiexports"), apiExportInformer.Informer().GetIndexer(), globalAPIExportInformer.Informer().GetIndexer(), path, name)
},
getAPIResourceSchema: informer.NewScopedGetterWithFallback[*apisv1alpha1.APIResourceSchema, apisv1alpha1listers.APIResourceSchemaLister](apiResourceSchemaInformer.Lister(), globalAPIResourceSchemaInformer.Lister()),
apiImportLister: apiResourceImportInformer.Lister(),
commit: committer.NewCommitter[*SyncTarget, Patcher, *SyncTargetSpec, *SyncTargetStatus](kcpClusterClient.WorkloadV1alpha1().SyncTargets()),
}
Expand Down Expand Up @@ -154,8 +157,8 @@ type Controller struct {
syncTargetIndexer cache.Indexer
syncTargetLister workloadv1alpha1listers.SyncTargetClusterLister
apiExportsIndexer cache.Indexer
apiExportLister apisv1alpha1listers.APIExportClusterLister
resourceSchemaLister apisv1alpha1listers.APIResourceSchemaClusterLister
getAPIExport func(path logicalcluster.Path, name string) (*apisv1alpha1.APIExport, error)
getAPIResourceSchema informer.ScopedFallbackGetFunc[*apisv1alpha1.APIResourceSchema]
apiImportLister apiresourcev1alpha1listers.APIResourceImportClusterLister

commit CommitFunc
Expand Down Expand Up @@ -314,7 +317,7 @@ func (c *Controller) process(ctx context.Context, key string) error {

exportReconciler := &exportReconciler{
getAPIExport: c.getAPIExport,
getResourceSchema: c.getResourceSchema,
getResourceSchema: c.getAPIResourceSchema,
}
currentSyncTarget, err = exportReconciler.reconcile(ctx, currentSyncTarget)
if err != nil {
Expand All @@ -323,7 +326,7 @@ func (c *Controller) process(ctx context.Context, key string) error {

apiCompatibleReconciler := &apiCompatibleReconciler{
getAPIExport: c.getAPIExport,
getResourceSchema: c.getResourceSchema,
getResourceSchema: c.getAPIResourceSchema,
listAPIResourceImports: c.listAPIResourceImports,
}
currentSyncTarget, err = apiCompatibleReconciler.reconcile(ctx, currentSyncTarget)
Expand All @@ -341,14 +344,6 @@ func (c *Controller) process(ctx context.Context, key string) error {
return errors.NewAggregate(errs)
}

func (c *Controller) getAPIExport(path logicalcluster.Path, name string) (*apisv1alpha1.APIExport, error) {
return indexers.ByPathAndName[*apisv1alpha1.APIExport](apisv1alpha1.Resource("apiexports"), c.apiExportsIndexer, path, name)
}

func (c *Controller) getResourceSchema(clusterName logicalcluster.Name, name string) (*apisv1alpha1.APIResourceSchema, error) {
return c.resourceSchemaLister.Cluster(clusterName).Get(name)
}

func (c *Controller) listAPIResourceImports(clusterName logicalcluster.Name) ([]*apiresourcev1alpha1.APIResourceImport, error) {
return c.apiImportLister.Cluster(clusterName).List(labels.Everything())
}
6 changes: 4 additions & 2 deletions tmc/pkg/server/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func (s *Server) installWorkloadResourceScheduler(ctx context.Context, config *r
dynamicClusterClient,
s.Core.DiscoveringDynamicSharedInformerFactory,
s.Core.KcpSharedInformerFactory.Workload().V1alpha1().SyncTargets(),
s.Core.CacheKcpSharedInformerFactory.Workload().V1alpha1().SyncTargets(),
s.Core.KubeSharedInformerFactory.Core().V1().Namespaces(),
s.Core.KcpSharedInformerFactory.Scheduling().V1alpha1().Placements(),
)
Expand Down Expand Up @@ -355,7 +356,9 @@ func (s *Server) installWorkloadSyncTargetExportController(ctx context.Context,
kcpClusterClient,
s.Core.KcpSharedInformerFactory.Workload().V1alpha1().SyncTargets(),
s.Core.KcpSharedInformerFactory.Apis().V1alpha1().APIExports(),
s.Core.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIExports(),
s.Core.KcpSharedInformerFactory.Apis().V1alpha1().APIResourceSchemas(),
s.Core.CacheKcpSharedInformerFactory.Apis().V1alpha1().APIResourceSchemas(),
s.Core.KcpSharedInformerFactory.Apiresource().V1alpha1().APIResourceImports(),
)
if err != nil {
Expand Down Expand Up @@ -386,9 +389,8 @@ func (s *Server) installSyncTargetController(ctx context.Context, config *rest.C
c := synctargetcontroller.NewController(
kcpClusterClient,
s.Core.KcpSharedInformerFactory.Workload().V1alpha1().SyncTargets(),
// TODO: change to s.CacheKcpSharedInformerFactory.Core().V1alpha1().Shards(),
// once https://github.com/kcp-dev/kcp/issues/2649 is resolved
s.Core.KcpSharedInformerFactory.Core().V1alpha1().Shards(),
s.Core.CacheKcpSharedInformerFactory.Core().V1alpha1().Shards(),
)
if err != nil {
return err
Expand Down