Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perform API discovery only as needed #8

Merged
merged 2 commits into from
Mar 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions build/common/config/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ linters:
enable-all: true
disable:
- bodyclose
# Fails due to Can't run linter goanalysis_metalinter: goanalysis_metalinter:
# contextcheck: package \"client\" (isInitialPkg: true, needAnalyzeSource: true): runtime error: invalid memory
# address or nil pointer dereference
- contextcheck
- cyclop
- depguard
- dupl
Expand Down
117 changes: 67 additions & 50 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ import (
"errors"
"fmt"
"sync"
"time"

apimachinerymeta "k8s.io/apimachinery/pkg/api/meta"
"github.com/jellydator/ttlcache/v3"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
apiWatch "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/watch"
"k8s.io/client-go/util/workqueue"
Expand All @@ -27,8 +28,9 @@ import (
)

var (
ErrNotStarted error = errors.New("DynamicWatcher must be started to perform this action")
ErrInvalidInput error = errors.New("invalid input provided")
ErrNotStarted error = errors.New("DynamicWatcher must be started to perform this action")
ErrInvalidInput error = errors.New("invalid input provided")
ErrNoVersionedResource error = errors.New("the resource version was not found")
)

type Reconciler interface {
Expand Down Expand Up @@ -116,9 +118,16 @@ func New(config *rest.Config, reconciler Reconciler, options *Options) (DynamicW
return nil, fmt.Errorf("failed to initialize a dynamic Kubernetes client: %w", err)
}

gvkToGVRCache := ttlcache.New(
ttlcache.WithTTL[schema.GroupVersionKind, schema.GroupVersionResource](10 * time.Minute),
)
// This will periodically delete expired cache items.
go gvkToGVRCache.Start()

watcher := dynamicWatcher{
client: client,
dynamicClient: dynamicClient,
gvkToGVR: gvkToGVRCache,
rateLimiter: rateLimiter,
Reconciler: reconciler,
startedChan: make(chan struct{}),
Expand All @@ -140,8 +149,9 @@ type dynamicWatcher struct {
Queue workqueue.RateLimitingInterface
Reconciler
rateLimiter ratelimiter.RateLimiter
// restMapper is used to map ObjectIdentifer objects to an API resource.
restMapper apimachinerymeta.RESTMapper
// gvkToGVR is used as a cache of GVK to GVR mappings. The cache entries automatically expire every 10 minutes when
// using the New function.
gvkToGVR *ttlcache.Cache[schema.GroupVersionKind, schema.GroupVersionResource]
// started gets set as part of the Start method.
started bool
// startedChan is closed when the dynamicWatcher is started. This is exposed to the user through the Started method.
Expand All @@ -163,11 +173,6 @@ func (d *dynamicWatcher) Start(ctx context.Context) error {

d.Queue = workqueue.NewRateLimitingQueue(d.rateLimiter)

err := d.refreshRESTMapper()
if err != nil {
return fmt.Errorf("failed to the Kubernetes API REST mapper: %w", err)
}

go func() {
<-ctx.Done()

Expand Down Expand Up @@ -334,25 +339,6 @@ func (d *dynamicWatcher) reconcileHandler(ctx context.Context, obj interface{})
}
}

// refreshRESTMapper will call the discovery API and create a RESTMapper that is used to convert a GVK to a GVR. An
// error is returned if the discovery API call failed.
func (d *dynamicWatcher) refreshRESTMapper() error {
klog.V(2).Info("Refreshing the Kubernetes API REST Mapper")

discovery := d.client.Discovery()

apiGroups, err := restmapper.GetAPIGroupResources(discovery)
if err != nil {
klog.Errorf("Could not get the API groups list from the Kubernetes API, error: %v", err)

return err
}

d.restMapper = restmapper.NewDiscoveryRESTMapper(apiGroups)

return nil
}

// AddOrUpdateWatcher updates the watches for the watcher. When updating, any previously watched objects not specified
// will stop being watched. If an error occurs, any created watches as part of this method execution will be removed.
func (d *dynamicWatcher) AddOrUpdateWatcher(watcher ObjectIdentifier, watchedObjects ...ObjectIdentifier) error {
Expand Down Expand Up @@ -403,31 +389,16 @@ func (d *dynamicWatcher) AddOrUpdateWatcher(watcher ObjectIdentifier, watchedObj
continue
}

gk := schema.GroupKind{Group: watchedObject.Group, Kind: watchedObject.Kind}

mapping, err := d.restMapper.RESTMapping(gk, watchedObject.Version)
gvr, err := d.gvrFromObjectIdentifier(watchedObject)
if err != nil {
// Refresh the REST mapper and try again if the mapping wasn't found.
err = d.refreshRESTMapper()
if err != nil {
klog.Errorf("Failed to refresh the API discovery data, error: %v", err)
klog.Errorf("Could not get the GVR for %s, error: %v", watchedObject, err)

encounteredErr = err

break
}

mapping, err = d.restMapper.RESTMapping(gk, watchedObject.Version)
if err != nil {
klog.Errorf("Could not get resource mapping for %s, error: %v", watchedObject, err)

encounteredErr = err
encounteredErr = err

break
}
break
}

var resource dynamic.ResourceInterface = d.dynamicClient.Resource(mapping.Resource)
var resource dynamic.ResourceInterface = d.dynamicClient.Resource(gvr)
if watchedObject.Namespace != "" {
resource = resource.(dynamic.NamespaceableResourceInterface).Namespace(watchedObject.Namespace)
}
Expand Down Expand Up @@ -549,3 +520,49 @@ func (d *dynamicWatcher) GetWatchCount() uint {

return count
}

// gvrFromObjectIdentifier uses the discovery client to get the versioned resource. If the resource is not found or
// could not be retrieved, an error is always returned.
func (d *dynamicWatcher) gvrFromObjectIdentifier(watchedObject ObjectIdentifier) (schema.GroupVersionResource, error) {
gvk := schema.GroupVersionKind{
Group: watchedObject.Group, Version: watchedObject.Version, Kind: watchedObject.Kind,
}

// First check the cache
if cachedGVR := d.gvkToGVR.Get(gvk); cachedGVR != nil {
// Delete the cached item if it's expired
if cachedGVR.IsExpired() {
d.gvkToGVR.Delete(gvk)
} else {
return cachedGVR.Value(), nil
}
}

rsrcList, err := d.client.Discovery().ServerResourcesForGroupVersion(gvk.GroupVersion().String())
if err != nil {
if k8serrors.IsNotFound(err) {
return schema.GroupVersionResource{}, fmt.Errorf("%w: %s", ErrNoVersionedResource, gvk.String())
}

return schema.GroupVersionResource{}, err
}

for _, rsrc := range rsrcList.APIResources {
if rsrc.Kind == gvk.Kind {
gvr := schema.GroupVersionResource{
Group: gvk.Group,
Version: gvk.Version,
Resource: rsrc.Name,
}

// Cache the value
d.gvkToGVR.Set(gvk, gvr, ttlcache.DefaultTTL)

return gvr, nil
}
}

return schema.GroupVersionResource{}, fmt.Errorf(
"%w: no matching kind was found: %s", ErrNoVersionedResource, gvk.String(),
)
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/stolostron/kubernetes-dependency-watches
go 1.18

require (
github.com/jellydator/ttlcache/v3 v3.0.1
github.com/onsi/ginkgo/v2 v2.1.6
github.com/onsi/gomega v1.20.1
k8s.io/api v0.23.9
Expand Down Expand Up @@ -43,6 +44,7 @@ require (
go.uber.org/zap v1.19.1 // indirect
golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect
golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/text v0.3.7 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJ
github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU=
github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/jellydator/ttlcache/v3 v3.0.1 h1:cHgCSMS7TdQcoprXnWUptJZzyFsqs18Lt8VVhRuZYVU=
github.com/jellydator/ttlcache/v3 v3.0.1/go.mod h1:WwTaEmcXQ3MTjOm4bsZoDFiCu/hMvNWLO1w67RXz6h4=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8=
Expand Down Expand Up @@ -606,6 +608,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down