Skip to content

Commit

Permalink
Use generics to do global cache lookup.
Browse files Browse the repository at this point in the history
Signed-off-by: Nolan Brubaker <nolan@nbrubaker.com>
  • Loading branch information
nrb committed Apr 4, 2023
1 parent 7abdab5 commit a447cdf
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 27 deletions.
100 changes: 100 additions & 0 deletions pkg/informer/fallback_lookup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
Copyright 2023 The KCP Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package informer

import (
"github.com/kcp-dev/logicalcluster/v3"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
)

// ClusterLister is a cluster-aware Lister API.
type ClusterLister[R runtime.Object, L Lister[R]] interface {
Cluster(name logicalcluster.Name) L
}

// Lister is a Lister API with generics.
type Lister[R runtime.Object] interface {
List(labels.Selector) ([]R, error)
}

// FallbackListFunc is a function that returns []R from either local or global informer Listers based on a label selector.
type FallbackListFunc[R runtime.Object] func(selector labels.Selector) ([]R, error)

// ClusterGetter is a cluster-aware Getter API.
type ClusterGetter[R runtime.Object, G Getter[R]] interface {
Cluster(name logicalcluster.Name) G
}

// Getter is a GetAPI call with generics.
type Getter[R runtime.Object] interface {
Get(name string) (R, error)
}

// FallbackGetFunc is a function that returns an instance of R from either local or global informer Listers based on a name.
type FallbackGetFunc[R runtime.Object] func(name string) (R, error)

// ScopedFallbackGetFunc is a function that returns an instance of R from either local or global cluster-scoped informer Listers based on a name.
type ScopedFallbackGetFunc[R runtime.Object] func(clusterName logicalcluster.Name, name string) (R, error)

// NewListerWithFallback creates a new FallbackListFunc that looks up an object of type R first in the local lister, then in the global lister if no local results are found.
func NewListerWithFallback[R runtime.Object](localLister Lister[R], globalLister Lister[R]) FallbackListFunc[R] {
return func(selector labels.Selector) ([]R, error) {
r, err := localLister.List(selector)
if len(r) == 0 {
return globalLister.List(selector)
}
if err != nil {
return nil, err
}
return r, nil
}
}

// NewGetterWithFallback creates a new FallbackGetFunc that gets an object of type R first looking in the local lister, then in the global lister if not found.
func NewGetterWithFallback[R runtime.Object](localGetter, globalGetter Getter[R]) FallbackGetFunc[R] {
return func(name string) (R, error) {
var r R
r, err := localGetter.Get(name)
if errors.IsNotFound(err) {
return globalGetter.Get(name)
}
if err != nil {
// r will be nil in this case
return r, err
}
return r, nil
}
}

// NewScopedGetterWithFallback creates a new ScopedFallbackGetFunc that gets an object of type R within a given cluster path. The local lister is
// checked first, and if nothing is found, the global lister is checked.
func NewScopedGetterWithFallback[R runtime.Object, G Getter[R]](localGetter, globalGetter ClusterGetter[R, G]) ScopedFallbackGetFunc[R] {
return func(clusterName logicalcluster.Name, name string) (R, error) {
var r R
r, err := localGetter.Cluster(clusterName).Get(name)
if errors.IsNotFound(err) {
return globalGetter.Cluster(clusterName).Get(name)
}
if err != nil {
return r, err
}
return r, nil
}
}
20 changes: 6 additions & 14 deletions pkg/reconciler/workload/synctarget/synctarget_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

"github.com/kcp-dev/kcp/pkg/informer"
"github.com/kcp-dev/kcp/pkg/logging"
corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1"
workloadv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/workload/v1alpha1"
Expand All @@ -52,19 +53,10 @@ func NewController(
workspaceShardInformer, globalWorkspaceShardInformer corev1alpha1informers.ShardClusterInformer,
) *Controller {
c := &Controller{
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName),
kcpClusterClient: kcpClusterClient,
syncTargetIndexer: syncTargetInformer.Informer().GetIndexer(),
listWorkspaceShards: func(selector labels.Selector) ([]*corev1alpha1.Shard, error) {
shards, err := workspaceShardInformer.Lister().List(selector)
if len(shards) == 0 {
return globalWorkspaceShardInformer.Lister().List(selector)
}
if err != nil {
return nil, err
}
return shards, nil
},
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName),
kcpClusterClient: kcpClusterClient,
syncTargetIndexer: syncTargetInformer.Informer().GetIndexer(),
listWorkspaceShards: informer.NewListerWithFallback[*corev1alpha1.Shard](workspaceShardInformer.Lister(), globalWorkspaceShardInformer.Lister()),
}

// Watch for events related to SyncTargets
Expand All @@ -88,7 +80,7 @@ type Controller struct {
queue workqueue.RateLimitingInterface
kcpClusterClient kcpclientset.ClusterInterface

listWorkspaceShards func(labels.Selector) ([]*corev1alpha1.Shard, error)
listWorkspaceShards informer.FallbackListFunc[*corev1alpha1.Shard]
syncTargetIndexer cache.Indexer
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"k8s.io/klog/v2"

"github.com/kcp-dev/kcp/pkg/indexers"
"github.com/kcp-dev/kcp/pkg/informer"
"github.com/kcp-dev/kcp/pkg/logging"
"github.com/kcp-dev/kcp/pkg/reconciler/committer"
apiresourcev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apiresource/v1alpha1"
Expand All @@ -49,6 +50,7 @@ import (
apisv1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/apis/v1alpha1"
workloadv1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/workload/v1alpha1"
apiresourcev1alpha1listers "github.com/kcp-dev/kcp/sdk/client/listers/apiresource/v1alpha1"
apisv1alpha1listers "github.com/kcp-dev/kcp/sdk/client/listers/apis/v1alpha1"
workloadv1alpha1listers "github.com/kcp-dev/kcp/sdk/client/listers/workload/v1alpha1"
)

Expand Down Expand Up @@ -88,18 +90,9 @@ func NewController(
// Didn't find it locally - try remote
return indexers.ByPathAndName[*apisv1alpha1.APIExport](apisv1alpha1.Resource("apiexports"), globalAPIExportInformer.Informer().GetIndexer(), path, name)
},
getAPIResourceSchema: func(path logicalcluster.Name, name string) (*apisv1alpha1.APIResourceSchema, error) {
schema, err := apiResourceSchemaInformer.Cluster(path).Lister().Get(name)
if apierrors.IsNotFound(err) {
return globalAPIResourceSchemaInformer.Cluster(path).Lister().Get(name)
}
if err != nil {
return nil, err
}
return schema, nil
},
apiImportLister: apiResourceImportInformer.Lister(),
commit: committer.NewCommitter[*SyncTarget, Patcher, *SyncTargetSpec, *SyncTargetStatus](kcpClusterClient.WorkloadV1alpha1().SyncTargets()),
getAPIResourceSchema: informer.NewScopedGetterWithFallback[*apisv1alpha1.APIResourceSchema, apisv1alpha1listers.APIResourceSchemaLister](apiResourceSchemaInformer.Lister(), globalAPIResourceSchemaInformer.Lister()),
apiImportLister: apiResourceImportInformer.Lister(),
commit: committer.NewCommitter[*SyncTarget, Patcher, *SyncTargetSpec, *SyncTargetStatus](kcpClusterClient.WorkloadV1alpha1().SyncTargets()),
}

if err := syncTargetInformer.Informer().AddIndexers(cache.Indexers{
Expand Down Expand Up @@ -177,7 +170,7 @@ type Controller struct {
syncTargetLister workloadv1alpha1listers.SyncTargetClusterLister
apiExportsIndexer cache.Indexer
getAPIExport func(path logicalcluster.Path, name string) (*apisv1alpha1.APIExport, error)
getAPIResourceSchema func(path logicalcluster.Name, name string) (*apisv1alpha1.APIResourceSchema, error)
getAPIResourceSchema informer.ScopedFallbackGetFunc[*apisv1alpha1.APIResourceSchema]
apiImportLister apiresourcev1alpha1listers.APIResourceImportClusterLister

commit CommitFunc
Expand Down

0 comments on commit a447cdf

Please sign in to comment.