Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

controller/engine: use local cache for client read requests #554

Merged
merged 3 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
287 changes: 287 additions & 0 deletions pkg/controller/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
/*
Copyright 2023 The Crossplane Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controller

import (
"context"
"strings"
"sync"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"

"github.com/crossplane/crossplane-runtime/pkg/errors"
)

// GVKRoutedCache is a cache that routes requests by GVK to other caches.
type GVKRoutedCache struct {
scheme *runtime.Scheme

fallback cache.Cache

lock sync.RWMutex
delegates map[schema.GroupVersionKind]cache.Cache
}

// NewGVKRoutedCache returns a new routed cache.
func NewGVKRoutedCache(scheme *runtime.Scheme, fallback cache.Cache) *GVKRoutedCache {
return &GVKRoutedCache{
scheme: scheme,
fallback: fallback,
delegates: make(map[schema.GroupVersionKind]cache.Cache),
}
}

var _ cache.Cache = &GVKRoutedCache{}

// AddDelegate adds a delegated cache for a given GVK.
func (c *GVKRoutedCache) AddDelegate(gvk schema.GroupVersionKind, delegate cache.Cache) {
c.lock.Lock()
defer c.lock.Unlock()

c.delegates[gvk] = delegate
}

// RemoveDelegate removes a delegated cache for a given GVK.
func (c *GVKRoutedCache) RemoveDelegate(gvk schema.GroupVersionKind) {
c.lock.Lock()
defer c.lock.Unlock()

delete(c.delegates, gvk)
}

// Get retrieves an object for a given ObjectKey backed by a cache.
func (c *GVKRoutedCache) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error {
gvk, err := apiutil.GVKForObject(obj, c.scheme)
if err != nil {
return errors.Errorf("failed to get GVK for type %T: %w", obj, err)
}

c.lock.RLock()
delegate, ok := c.delegates[gvk]
c.lock.RUnlock()

if ok {
return delegate.Get(ctx, key, obj, opts...)
}

return c.fallback.Get(ctx, key, obj, opts...)
}

// List lists objects for a given ObjectList backed by a cache.
func (c *GVKRoutedCache) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error {
gvk, err := apiutil.GVKForObject(list, c.scheme)
if err != nil {
return errors.Errorf("failed to get GVK for type %T: %w", list, err)
}

if !strings.HasSuffix(gvk.Kind, "List") {
// following controller-runtime here which does not support non
// <Kind>List types.
return errors.Errorf("non-list type %T (kind %q) passed as output", list, gvk)
}
gvk.Kind = strings.TrimSuffix(gvk.Kind, "List")

c.lock.RLock()
delegate, ok := c.delegates[gvk]
c.lock.RUnlock()

if ok {
return delegate.List(ctx, list, opts...)
}

return c.fallback.List(ctx, list, opts...)
}

// GetInformer returns an informer for the given object.
func (c *GVKRoutedCache) GetInformer(ctx context.Context, obj client.Object, opts ...cache.InformerGetOption) (cache.Informer, error) {
gvk, err := apiutil.GVKForObject(obj, c.scheme)
if err != nil {
return nil, errors.Errorf("failed to get GVK for type %T: %w", obj, err)
}

c.lock.RLock()
delegate, ok := c.delegates[gvk]
c.lock.RUnlock()

if ok {
return delegate.GetInformer(ctx, obj, opts...)
}

return c.fallback.GetInformer(ctx, obj, opts...)
}

// GetInformerForKind returns an informer for the given GVK.
func (c *GVKRoutedCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...cache.InformerGetOption) (cache.Informer, error) {
c.lock.RLock()
delegate, ok := c.delegates[gvk]
c.lock.RUnlock()

if ok {
return delegate.GetInformerForKind(ctx, gvk, opts...)
}

return c.fallback.GetInformerForKind(ctx, gvk, opts...)
}

// Start for a GVKRoutedCache is a no-op. Start must be called for each delegate.
func (c *GVKRoutedCache) Start(_ context.Context) error {
return nil
}

// WaitForCacheSync for a GVKRoutedCache waits for all delegates to sync, and
// returns false if any of them fails to sync.
func (c *GVKRoutedCache) WaitForCacheSync(ctx context.Context) bool {
c.lock.RLock()
syncedCh := make(chan bool, len(c.delegates)+1)
cas := make([]cache.Cache, 0, len(c.delegates))
for _, ca := range c.delegates {
cas = append(cas, ca)
}
cas = append(cas, c.fallback)
c.lock.RUnlock()

var wg sync.WaitGroup
ctx, cancelFn := context.WithCancel(ctx)

for _, ca := range cas {
wg.Add(1)
go func(ca cache.Cache) {
defer wg.Done()
synced := ca.WaitForCacheSync(ctx)
if !synced {
// first unsynced cache breaks the whole wait
cancelFn()
}
syncedCh <- synced
}(ca)
}

wg.Wait()
close(syncedCh)
cancelFn()

// any not synced?
for synced := range syncedCh {
if !synced {
return false
}
}

return true
}

// IndexField adds an index with the given field name on the given object type
// by using the given function to extract the value for that field.
func (c *GVKRoutedCache) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error {
gvk, err := apiutil.GVKForObject(obj, c.scheme)
if err != nil {
return errors.Errorf("failed to get GVK for type %T: %w", obj, err)
}

c.lock.RLock()
delegate, ok := c.delegates[gvk]
c.lock.RUnlock()

if ok {
return delegate.IndexField(ctx, obj, field, extractValue)
}

return c.fallback.IndexField(ctx, obj, field, extractValue)
}

// cachedRoutedClient wraps a client and routes read requests by GVK to a cache.
type cachedRoutedClient struct {
client.Client

scheme *runtime.Scheme
cache *GVKRoutedCache
}

func (c *cachedRoutedClient) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error {
gvk, err := apiutil.GVKForObject(obj, c.scheme)
if err != nil {
return errors.Errorf("failed to get GVK for type %T: %w", obj, err)
}

c.cache.lock.RLock()
delegate, ok := c.cache.delegates[gvk]
c.cache.lock.RUnlock()

if ok {
return delegate.Get(ctx, key, obj, opts...)
}

return c.Client.Get(ctx, key, obj, opts...)
}

func (c *cachedRoutedClient) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error {
gvk, err := apiutil.GVKForObject(list, c.scheme)
if err != nil {
return errors.Errorf("failed to get GVK for type %T: %w", list, err)
}

if !strings.HasSuffix(gvk.Kind, "List") {
// following controller-runtime here which does not support non
// <Kind>List types.
return errors.Errorf("non-list type %T (kind %q) passed as output", list, gvk)
}
gvk.Kind = strings.TrimSuffix(gvk.Kind, "List")

c.cache.lock.RLock()
delegate, ok := c.cache.delegates[gvk]
c.cache.lock.RUnlock()

if ok {
return delegate.List(ctx, list, opts...)
}

return c.Client.List(ctx, list, opts...)
}

// WithGVKRoutedCache returns a manager backed by a GVKRoutedCache. The client
// returned by the manager will route read requests to cached GVKs.
func WithGVKRoutedCache(c *GVKRoutedCache, mgr controllerruntime.Manager) controllerruntime.Manager {
negz marked this conversation as resolved.
Show resolved Hide resolved
return &routedManager{
Manager: mgr,
client: &cachedRoutedClient{
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
cache: c,
},
cache: c,
}
}

type routedManager struct {
controllerruntime.Manager

client client.Client
cache cache.Cache
}

func (m *routedManager) GetClient() client.Client {
return m.client
}

func (m *routedManager) GetCache() cache.Cache {
return m.cache
}
Loading
Loading