Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Implement caching for the Scoper
Browse files Browse the repository at this point in the history
  • Loading branch information
Alfonso Acosta committed Jan 23, 2019
1 parent a936989 commit 759d954
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 15 deletions.
105 changes: 92 additions & 13 deletions cluster/kubernetes/scoper.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,112 @@
package kubernetes

import (
"math/rand"
"sync"
"time"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/discovery"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/weaveworks/flux/cluster"
)

var (
cacheBaseTTL = 7 * time.Minute
maxTTLJitter = cacheBaseTTL / 10 // use jitter to make sure entries are note updated at the same time
)

type cacheEntry struct {
ttl time.Time
kindToScope map[string]int
}

type ServerResourcesForGroupVersion interface {
ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error)
}

type Scoper struct {
Client discovery.ServerResourcesInterface
client ServerResourcesForGroupVersion
cache map[string]cacheEntry // groupVersion to cached kind scopes
sync.RWMutex
}

func NewScoper(client ServerResourcesForGroupVersion) *Scoper {
return &Scoper{
client: client,
cache: map[string]cacheEntry{},
}
}

func (s *Scoper) Scope(groupVersion string, kind string) (int, error) {
// TODO: add caching
resourceList, err := s.Client.ServerResourcesForGroupVersion(groupVersion)
if err != nil || resourceList == nil {
if scope, err, refresh := s.scopeFromCache(groupVersion, kind, true); !refresh {
return scope, err
}
if err := s.refreshGroupVersionCacheEntry(groupVersion); err != nil {
return 0, err
}
scope, err, _ := s.scopeFromCache(groupVersion, kind, false)
return scope, err
}

func (s *Scoper) scopeFromCache(groupVersion string, kind string, checkTTL bool) (int, error, bool) {
s.RLock()
defer s.RUnlock()
groupVersionEntry, found := s.cache[groupVersion]
// Cache entry needs refreshing
if !found {
return 0, cluster.ErrGroupVersionNotFound(groupVersion), true
}
if checkTTL && groupVersionEntry.ttl.Before(time.Now()) {
return 0, nil, true
}

// We go with what's in the cache
if groupVersionEntry.kindToScope == nil {
return 0, cluster.ErrGroupVersionNotFound(groupVersion), false
}
if scope, found := groupVersionEntry.kindToScope[kind]; found {
return scope, nil, false
}
return 0, cluster.ErrGroupVersionNotFound(groupVersion), false

}

func (s *Scoper) refreshGroupVersionCacheEntry(groupVersion string) error {
ttlJitter := time.Duration(rand.Int63n(int64(maxTTLJitter)))
ttl := time.Now().Add(cacheBaseTTL).Add(ttlJitter)
groupVersionEntry := cacheEntry{
ttl: ttl,
kindToScope: nil,
}

resourceList, err := s.client.ServerResourcesForGroupVersion(groupVersion)
if err != nil {
if errors.IsNotFound(err) {
cluster.ErrGroupVersionKindNotFound(groupVersion, kind)
// mark the group as missing (nil kindToScope)
goto done
}
return 0, err
return err
}
for _, resource := range resourceList.APIResources {
if resource.Kind == kind {

groupVersionEntry.kindToScope = map[string]int{}
if resourceList.APIResources != nil {
for _, resource := range resourceList.APIResources {
scope := cluster.ClusterScope
if resource.Namespaced {
return cluster.NamespaceScope, nil
} else {
return cluster.ClusterScope, nil
scope = cluster.NamespaceScope
}
// NOTE: the kind may not be unique in the API resources
// e.g. in Group Version "v1" there are resources with names
// "pods", "pods/attach" "pods/binding" etc ... all with Kind "Pod"
// but the "Namespaced" field is consistent for all of them
groupVersionEntry.kindToScope[resource.Kind] = scope
}
}
return 0, cluster.ErrGroupVersionKindNotFound(groupVersion, kind)

done:
s.Lock()
s.cache[groupVersion] = groupVersionEntry
s.Unlock()
return nil
}
131 changes: 131 additions & 0 deletions cluster/kubernetes/scoper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package kubernetes

import (
"testing"
"time"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/weaveworks/flux/cluster"
)

type mockClient struct {
groupVersionResources map[string]*metav1.APIResourceList // group to resources
}

func (m *mockClient) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) {
resources, ok := m.groupVersionResources[groupVersion]
if !ok {
return nil, &errors.StatusError{metav1.Status{Reason: metav1.StatusReasonNotFound}}
}
return resources, nil
}

var v1groupVersionResources = map[string]*metav1.APIResourceList{
"v1": &metav1.APIResourceList{
GroupVersion: "v1",
APIResources: []metav1.APIResource{
metav1.APIResource{
Name: "pods",
Kind: "Pod",
Namespaced: true,
},
metav1.APIResource{
Name: "services",
Kind: "Service",
Namespaced: true,
},
metav1.APIResource{
Name: "persistentvolumes",
Kind: "PersistentVolume",
Namespaced: false,
},
},
},
}

func TestHappyPath(t *testing.T) {
s := NewScoper(&mockClient{groupVersionResources: v1groupVersionResources})
scope, err := s.Scope("v1", "Pod")
if err != nil {
t.Fatal("unexpected error", err)
}
if scope != cluster.NamespaceScope {
t.Fatal("unexpected scope")
}
scope, err = s.Scope("v1", "PersistentVolume")
if err != nil {
t.Fatal("unexpected error", err)
}
if scope != cluster.ClusterScope {
t.Fatal("unexpected scope")
}
}

func TestGroupVersionNotFound(t *testing.T) {
s := NewScoper(&mockClient{groupVersionResources: v1groupVersionResources})
_, err := s.Scope("madeup", "madeup")
if err == nil {
t.Fatal("expected error", err)
}
switch typ := err.(type) {
case cluster.ScoperError:
default:
t.Fatal("expected error type", typ)
}
}

func TestKindNotFound(t *testing.T) {
s := NewScoper(&mockClient{groupVersionResources: v1groupVersionResources})
_, err := s.Scope("v1", "madeup")
if err == nil {
t.Fatal("expected error", err)
}
switch typ := err.(type) {
case cluster.ScoperError:
default:
t.Fatal("expected error type", typ)
}
}

func TestTTLExpiration(t *testing.T) {
// set up reduce ttls to make it testeable in a resonable time
savedBaseTTL, savedMaxJitter := cacheBaseTTL, maxTTLJitter
defer func() { cacheBaseTTL, maxTTLJitter = savedBaseTTL, savedMaxJitter }()
cacheBaseTTL = 10 * time.Millisecond
maxTTLJitter = time.Millisecond

mc := &mockClient{groupVersionResources: v1groupVersionResources}
s := NewScoper(mc)

// fill the cache
scope, err := s.Scope("v1", "Pod")
if scope != cluster.NamespaceScope {
t.Fatal("unexpected namespace")
}

// wait for the TTL to expire and artificially change the scope of pods to cluster
time.Sleep(2 * cacheBaseTTL)
mc.groupVersionResources = map[string]*metav1.APIResourceList{
"v1": &metav1.APIResourceList{
GroupVersion: "v1",
APIResources: []metav1.APIResource{
metav1.APIResource{
Name: "pods",
Kind: "Pod",
Namespaced: false,
},
},
},
}

// make sure that the scope change was caught and the pods now have cluster scope
scope, err = s.Scope("v1", "Pod")
if err != nil {
t.Fatal("unexpected error", err)
}
if scope != cluster.ClusterScope {
t.Fatal("unexpected scope")
}
}
6 changes: 5 additions & 1 deletion cluster/scoper.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ type ScoperError struct {
}

func ErrGroupVersionKindNotFound(groupVersion string, kind string) error {
return ScoperError{fmt.Errorf("apiVersion (%q) Kind (%q) combination not found", groupVersion, kind)}
return ScoperError{fmt.Errorf("apiVersion (%q) and Kind (%q) combination not found", groupVersion, kind)}
}

func ErrGroupVersionNotFound(groupVersion string) error {
return ScoperError{fmt.Errorf("apiVersion (%q) not found", groupVersion)}
}

// Scoper obtains the scope (ClusterScope or NamespaceScope) of a kubernetes resource
Expand Down
2 changes: 1 addition & 1 deletion cmd/fluxd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func main() {
// files as manifests, and that's as Kubernetes yamels.
k8sManifests = &kubernetes.Manifests{
AllowedNamespaces: allowedNamespaces,
Scoper: &kubernetes.Scoper{Client: clientset.Discovery()},
Scoper: kubernetes.NewScoper(clientset.Discovery()),
}
}

Expand Down

0 comments on commit 759d954

Please sign in to comment.