Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Merge pull request #1442 from Timer/feature/stack-tracking
Browse files Browse the repository at this point in the history
Delete resources no longer in git
  • Loading branch information
squaremo authored Feb 27, 2019
2 parents dd1b207 + e949383 commit d76ecab
Show file tree
Hide file tree
Showing 36 changed files with 2,059 additions and 580 deletions.
36 changes: 32 additions & 4 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ required = ["k8s.io/code-generator/cmd/client-gen"]
name = "k8s.io/apimachinery"
version = "kubernetes-1.11.0"

[[constraint]]
name = "k8s.io/apiextensions-apiserver"
version = "kubernetes-1.11.0"

[[constraint]]
name = "k8s.io/client-go"
version = "8.0.0"
Expand Down Expand Up @@ -57,3 +61,7 @@ required = ["k8s.io/code-generator/cmd/client-gen"]
[[override]]
name = "github.com/BurntSushi/toml"
version = "v0.3.1"

[[constraint]]
name = "github.com/imdario/mergo"
version = "0.3.2"
4 changes: 2 additions & 2 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Cluster interface {
SomeControllers([]flux.ResourceID) ([]Controller, error)
Ping() error
Export() ([]byte, error)
Sync(SyncDef) error
Sync(SyncSet) error
PublicSSHKey(regenerate bool) (ssh.PublicKey, error)
}

Expand Down Expand Up @@ -74,7 +74,7 @@ type Controller struct {
Rollout RolloutStatus
// Errors during the recurring sync from the Git repository to the
// cluster will surface here.
SyncError error
SyncError error

Containers ContainersOrExcuse
}
Expand Down
102 changes: 102 additions & 0 deletions cluster/kubernetes/cached_disco.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package kubernetes

import (
"sync"
"time"

crdv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
crd "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/discovery"
discocache "k8s.io/client-go/discovery/cached"
toolscache "k8s.io/client-go/tools/cache"
)

// This exists so that we can do our own invalidation.
type cachedDiscovery struct {
discovery.CachedDiscoveryInterface

invalidMu sync.Mutex
invalid bool
}

// The k8s.io/client-go v8.0.0 implementation of MemCacheDiscovery
// refreshes the cached values, synchronously, when Invalidate() is
// called. Since we want to invalidate every time a CRD changes, but
// only refresh values when we need to read the cached values, this
// method defers the invalidation until a read is done.
func (d *cachedDiscovery) Invalidate() {
d.invalidMu.Lock()
d.invalid = true
d.invalidMu.Unlock()
}

// ServerResourcesForGroupVersion is the method used by the
// namespacer; so, this is the one where we check whether the cache
// has been invalidated. A cachedDiscovery implementation for more
// general use would do this for all methods (that weren't implemented
// purely in terms of other methods).
func (d *cachedDiscovery) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) {
d.invalidMu.Lock()
invalid := d.invalid
d.invalid = false
d.invalidMu.Unlock()
if invalid {
d.CachedDiscoveryInterface.Invalidate()
}
return d.CachedDiscoveryInterface.ServerResourcesForGroupVersion(groupVersion)
}

// MakeCachedDiscovery constructs a CachedDicoveryInterface that will
// be invalidated whenever the set of CRDs change. The idea is that
// the only avenue of a change to the API resources in a running
// system is CRDs being added, updated or deleted.
func MakeCachedDiscovery(d discovery.DiscoveryInterface, c crd.Interface, shutdown <-chan struct{}) discovery.CachedDiscoveryInterface {
result, _, _ := makeCachedDiscovery(d, c, shutdown, makeInvalidatingHandler)
return result
}

// ---

func makeInvalidatingHandler(cached discovery.CachedDiscoveryInterface) toolscache.ResourceEventHandler {
var handler toolscache.ResourceEventHandler = toolscache.ResourceEventHandlerFuncs{
AddFunc: func(_ interface{}) {
cached.Invalidate()
},
UpdateFunc: func(_, _ interface{}) {
cached.Invalidate()
},
DeleteFunc: func(_ interface{}) {
cached.Invalidate()
},
}
return handler
}

type makeHandle func(discovery.CachedDiscoveryInterface) toolscache.ResourceEventHandler

// makeCachedDiscovery constructs a cached discovery client, with more
// flexibility than MakeCachedDiscovery; e.g., with extra handlers for
// testing.
func makeCachedDiscovery(d discovery.DiscoveryInterface, c crd.Interface, shutdown <-chan struct{}, handlerFn makeHandle) (*cachedDiscovery, toolscache.Store, toolscache.Controller) {
cachedDisco := &cachedDiscovery{CachedDiscoveryInterface: discocache.NewMemCacheClient(d)}
// We have an empty cache, so it's _a priori_ invalid. (Yes, that's the zero value, but better safe than sorry)
cachedDisco.Invalidate()

crdClient := c.ApiextensionsV1beta1().CustomResourceDefinitions()
lw := &toolscache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return crdClient.List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return crdClient.Watch(options)
},
}

handler := handlerFn(cachedDisco)
store, controller := toolscache.NewInformer(lw, &crdv1beta1.CustomResourceDefinition{}, 5*time.Minute, handler)
go controller.Run(shutdown)
return cachedDisco, store, controller
}
134 changes: 134 additions & 0 deletions cluster/kubernetes/cached_disco_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package kubernetes

import (
"testing"
"time"

crdv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
crdfake "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/discovery"
toolscache "k8s.io/client-go/tools/cache"
)

type chainHandler struct {
first toolscache.ResourceEventHandler
next toolscache.ResourceEventHandler
}

func (h chainHandler) OnAdd(obj interface{}) {
h.first.OnAdd(obj)
h.next.OnAdd(obj)
}

func (h chainHandler) OnUpdate(old, new interface{}) {
h.first.OnUpdate(old, new)
h.next.OnUpdate(old, new)
}

func (h chainHandler) OnDelete(old interface{}) {
h.first.OnDelete(old)
h.next.OnDelete(old)
}

func TestCachedDiscovery(t *testing.T) {
coreClient := makeFakeClient()

myCRD := &crdv1beta1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{
Name: "custom",
},
}
crdClient := crdfake.NewSimpleClientset(myCRD)

// Here's my fake API resource
myAPI := &metav1.APIResourceList{
GroupVersion: "foo/v1",
APIResources: []metav1.APIResource{
{Name: "customs", SingularName: "custom", Namespaced: true, Kind: "Custom", Verbs: getAndList},
},
}

apiResources := coreClient.Fake.Resources
coreClient.Fake.Resources = append(apiResources, myAPI)

shutdown := make(chan struct{})
defer close(shutdown)

// this extra handler means we can synchronise on the add later
// being processed
allowAdd := make(chan interface{})

addHandler := toolscache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
allowAdd <- obj
},
}
makeHandler := func(d discovery.CachedDiscoveryInterface) toolscache.ResourceEventHandler {
return chainHandler{first: addHandler, next: makeInvalidatingHandler(d)}
}

cachedDisco, store, _ := makeCachedDiscovery(coreClient.Discovery(), crdClient, shutdown, makeHandler)

saved := getDefaultNamespace
getDefaultNamespace = func() (string, error) { return "bar-ns", nil }
defer func() { getDefaultNamespace = saved }()
namespacer, err := NewNamespacer(cachedDisco)
if err != nil {
t.Fatal(err)
}

namespaced, err := namespacer.lookupNamespaced("foo/v1", "Custom")
if err != nil {
t.Fatal(err)
}
if !namespaced {
t.Error("got false from lookupNamespaced, expecting true")
}

// In a cluster, we'd rely on the apiextensions server to reflect
// changes to CRDs to changes in the API resources. Here I will be
// more narrow, and just test that the API resources are reloaded
// when a CRD is updated or deleted.

// This is delicate: we can't just change the value in-place,
// since that will update everyone's record of it, and the test
// below will trivially succeed.
updatedAPI := &metav1.APIResourceList{
GroupVersion: "foo/v1",
APIResources: []metav1.APIResource{
{Name: "customs", SingularName: "custom", Namespaced: false /* <-- changed */, Kind: "Custom", Verbs: getAndList},
},
}
coreClient.Fake.Resources = append(apiResources, updatedAPI)

// Provoke the cached discovery client into invalidating
_, err = crdClient.ApiextensionsV1beta1().CustomResourceDefinitions().Update(myCRD)
if err != nil {
t.Fatal(err)
}

// Wait for the update to "go through"
select {
case <-allowAdd:
break
case <-time.After(time.Second):
t.Fatal("timed out waiting for Add to happen")
}

_, exists, err := store.Get(myCRD)
if err != nil {
t.Error(err)
}
if !exists {
t.Error("does not exist")
}

namespaced, err = namespacer.lookupNamespaced("foo/v1", "Custom")
if err != nil {
t.Fatal(err)
}
if namespaced {
t.Error("got true from lookupNamespaced, expecting false (after changing it)")
}
}
1 change: 0 additions & 1 deletion cluster/kubernetes/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,4 @@ Package kubernetes provides implementations of `Cluster` and
`Manifests` that interact with the Kubernetes API (using kubectl or
the k8s API client).
*/

package kubernetes
Loading

0 comments on commit d76ecab

Please sign in to comment.