From 1a94a4d78b09598de0550e0b309c32092da7cf65 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Mon, 26 Aug 2024 17:51:38 -0400 Subject: [PATCH 1/2] Adjustments to VerifyNamespaceReactor - React to all verbs, not just "create" - When a namesapce is deleted, delete all contained resources - Update unit tests Signed-off-by: Tom Pantelis --- pkg/fake/basic_reactors_test.go | 132 ++++++++++++++++++++++----- pkg/fake/delete_colection_reactor.go | 36 +++++--- pkg/fake/verify_namespace_reactor.go | 44 +++++++-- 3 files changed, 167 insertions(+), 45 deletions(-) diff --git a/pkg/fake/basic_reactors_test.go b/pkg/fake/basic_reactors_test.go index 75d7ac9b..930c0ad1 100644 --- a/pkg/fake/basic_reactors_test.go +++ b/pkg/fake/basic_reactors_test.go @@ -80,31 +80,6 @@ var _ = Describe("Create", func() { Expect(err).To(HaveOccurred()) }) }) - - Context("with namespace verification", func() { - BeforeEach(func() { - fake.AddVerifyNamespaceReactor(&t.client.Fake, "pods") - }) - - When("the namespace does not exist", func() { - It("should return an error", func() { - _, err := t.doCreate(t.pod) - Expect(resource.IsMissingNamespaceErr(err)).To(BeTrue()) - Expect(resource.ExtractMissingNamespaceFromErr(err)).To(Equal(testNamespace)) - }) - }) - - When("the namespace does exist", func() { - It("should succeed", func() { - _, err := t.client.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{Name: testNamespace}, - }, metav1.CreateOptions{}) - Expect(err).To(Succeed()) - - t.assertCreateSuccess(t.pod) - }) - }) - }) }) var _ = Describe("Update", func() { @@ -326,6 +301,113 @@ var _ = Describe("DeleteCollection", func() { }) }) +var _ = Describe("Namespace verification", func() { + t := newBasicReactorsTestDriver() + + BeforeEach(func() { + fake.AddVerifyNamespaceReactor(&t.client.Fake, "*") + }) + + assertMissingNamespaceErr := func(err error) { + Expect(resource.IsMissingNamespaceErr(err)).To(BeTrue()) + Expect(resource.ExtractMissingNamespaceFromErr(err)).To(Equal(testNamespace)) + } + + createNamespace := func(name string) { + _, err := t.client.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: name}, + }, metav1.CreateOptions{}) + Expect(err).To(Succeed()) + } + + When("a namespace does not exist", func() { + Specify("requests should return the appropriate error", func() { + _, err := t.doCreate(t.pod) + assertMissingNamespaceErr(err) + + _, err = t.doGet(t.pod.Name) + assertMissingNamespaceErr(err) + + _, err = t.doUpdate() + assertMissingNamespaceErr(err) + + assertMissingNamespaceErr(t.doDelete(metav1.DeleteOptions{})) + + _, err = t.client.CoreV1().Pods(testNamespace).List(context.Background(), metav1.ListOptions{}) + assertMissingNamespaceErr(err) + }) + }) + + When("a request has no namespace", func() { + It("should succeed", func() { + _, err := t.client.CoreV1().Pods("").Create(context.Background(), t.pod, metav1.CreateOptions{}) + Expect(err).To(Succeed()) + }) + }) + + When("a namespace does exist", func() { + Specify("requests should succeed", func() { + createNamespace(testNamespace) + + t.pod.Name = "test-pod" + t.assertCreateSuccess(t.pod) + + _, err := t.client.CoreV1().Pods(testNamespace).List(context.Background(), metav1.ListOptions{}) + Expect(err).To(Succeed()) + + Expect(t.doDelete(metav1.DeleteOptions{})).To(Succeed()) + }) + }) + + When("a namespace is deleted", func() { + const noDeleteNS = "no-delete" + + It("should delete all contained resources", func() { + createNamespace(testNamespace) + createNamespace(noDeleteNS) + + t.pod.Name = "test-pod" + t.assertCreateSuccess(t.pod) + + _, err := t.client.CoreV1().Services(testNamespace).Create(context.TODO(), &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "should-delete", + }, + }, metav1.CreateOptions{}) + Expect(err).To(Succeed()) + + _, err = t.client.CoreV1().Services(noDeleteNS).Create(context.TODO(), &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "should-not-delete", + }, + }, metav1.CreateOptions{}) + Expect(err).To(Succeed()) + + // Delete the namespace + + Expect(t.client.CoreV1().Namespaces().Delete(context.TODO(), testNamespace, metav1.DeleteOptions{})).To(Succeed()) + + _, err = t.doGet(t.pod.Name) + assertMissingNamespaceErr(err) + + // Recreate the namespace + + createNamespace(testNamespace) + + _, err = t.doGet(t.pod.Name) + Expect(err).To(HaveOccurred()) + Expect(apierrors.IsNotFound(err)).To(BeTrue()) + + _, err = t.client.CoreV1().Services(testNamespace).Get(context.TODO(), "should-delete", metav1.GetOptions{}) + Expect(err).To(HaveOccurred()) + Expect(apierrors.IsNotFound(err)).To(BeTrue()) + + _, err = t.client.CoreV1().Services(noDeleteNS).Get(context.TODO(), "should-not-delete", metav1.GetOptions{}) + Expect(err).To(Succeed()) + }) + }) +}) + type basicReactorsTestDriver struct { client *k8sfake.Clientset pod *corev1.Pod diff --git a/pkg/fake/delete_colection_reactor.go b/pkg/fake/delete_colection_reactor.go index d4f3bec5..0f20265d 100644 --- a/pkg/fake/delete_colection_reactor.go +++ b/pkg/fake/delete_colection_reactor.go @@ -43,20 +43,12 @@ type DeleteCollectionReactor struct { } func AddDeleteCollectionReactor(f *testing.Fake) { - f.Lock() - r := &DeleteCollectionReactor{gvrToGVK: map[schema.GroupVersionResource]schema.GroupVersionKind{}, reactors: f.ReactionChain[0:]} - f.PrependReactor("delete-collection", "*", r.react) - f.Unlock() + r := newDeleteCollectionReactor(f.ReactionChain[0:]) - for gvk := range scheme.Scheme.AllKnownTypes() { - if !strings.HasSuffix(gvk.Kind, "List") { - continue - } + f.Lock() + defer f.Unlock() - nonListGVK := gvk.GroupVersion().WithKind(gvk.Kind[:len(gvk.Kind)-4]) - plural, _ := meta.UnsafeGuessKindToResource(nonListGVK) - r.gvrToGVK[plural] = nonListGVK - } + f.PrependReactor("delete-collection", "*", r.react) } func (r *DeleteCollectionReactor) react(action testing.Action) (bool, runtime.Object, error) { @@ -113,3 +105,23 @@ func invokeReactors(action testing.Action, reactors []testing.Reactor) (runtime. return nil, errors.New("action not handled") } + +func newDeleteCollectionReactor(reactors []testing.Reactor) *DeleteCollectionReactor { + return &DeleteCollectionReactor{gvrToGVK: createGVRToGVKMapping(), reactors: reactors} +} + +func createGVRToGVKMapping() map[schema.GroupVersionResource]schema.GroupVersionKind { + gvrToGVK := map[schema.GroupVersionResource]schema.GroupVersionKind{} + + for gvk := range scheme.Scheme.AllKnownTypes() { + if !strings.HasSuffix(gvk.Kind, "List") { + continue + } + + nonListGVK := gvk.GroupVersion().WithKind(gvk.Kind[:len(gvk.Kind)-4]) + plural, _ := meta.UnsafeGuessKindToResource(nonListGVK) + gvrToGVK[plural] = nonListGVK + } + + return gvrToGVK +} diff --git a/pkg/fake/verify_namespace_reactor.go b/pkg/fake/verify_namespace_reactor.go index e3960b7b..a10ec5cb 100644 --- a/pkg/fake/verify_namespace_reactor.go +++ b/pkg/fake/verify_namespace_reactor.go @@ -19,8 +19,13 @@ limitations under the License. package fake import ( + "sync" + + "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/testing" ) @@ -30,21 +35,44 @@ func AddVerifyNamespaceReactor(f *testing.Fake, resources ...string) { reactors := f.ReactionChain[0:] - react := func(a testing.Action) (bool, runtime.Object, error) { - action := a.(testing.CreateAction) + seenGVRs := sync.Map{} - namespace := action.GetNamespace() + react := func(a testing.Action) (bool, runtime.Object, error) { + seenGVRs.Store(a.GetResource(), true) - _, err := invokeReactors(testing.NewGetAction(corev1.SchemeGroupVersion.WithResource("namespaces"), "", namespace), - reactors) - if err != nil { - return true, nil, err + namespace := a.GetNamespace() + if namespace != metav1.NamespaceNone { + _, err := invokeReactors(testing.NewGetAction(corev1.SchemeGroupVersion.WithResource("namespaces"), "", namespace), + reactors) + if err != nil { + return true, nil, err + } } return false, nil, nil } for _, res := range resources { - f.PrependReactor("create", res, react) + f.PrependReactor("*", res, react) } + + deleteCollectionReactor := newDeleteCollectionReactor(reactors) + + f.PrependReactor("delete", "namespaces", func(action testing.Action) (bool, runtime.Object, error) { + name := action.(testing.DeleteAction).GetName() + + var err error + + seenGVRs.Range(func(key, _ any) bool { + gvr := key.(schema.GroupVersionResource) + + _, _, err = deleteCollectionReactor.react(testing.NewDeleteCollectionActionWithOptions(gvr, name, + metav1.DeleteOptions{}, metav1.ListOptions{})) + err = errors.Wrapf(err, "VerifyNamespaceReactor: error deleting %q resources in namespace %q", gvr, name) + + return true + }) + + return err != nil, nil, err + }) } From b80228f99f85b13eb5b3f7de353d7f8edf93183a Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Tue, 27 Aug 2024 19:48:23 -0400 Subject: [PATCH 2/2] Handle namespace recreated in resource syncer If a namespace is deleted and the namespace was previously seen, ie there was at least one resource previously synced in that namespace, then retrieve all the cached resource keys in the namespace and add them to the missingNamespaces set. If the namespace is later recreated, then those resources will be re-queued and re-synced. Signed-off-by: Tom Pantelis --- pkg/federate/fake/federator.go | 33 ++++++++-- pkg/syncer/resource_syncer.go | 63 +++++++++++++++--- pkg/syncer/resource_syncer_test.go | 102 +++++++++++++++++++++-------- 3 files changed, 156 insertions(+), 42 deletions(-) diff --git a/pkg/federate/fake/federator.go b/pkg/federate/fake/federator.go index 057724a2..b9da275e 100644 --- a/pkg/federate/fake/federator.go +++ b/pkg/federate/fake/federator.go @@ -26,6 +26,7 @@ import ( "time" . "github.com/onsi/gomega" + "github.com/submariner-io/admiral/pkg/federate" "github.com/submariner-io/admiral/pkg/resource" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" @@ -35,6 +36,7 @@ type Federator struct { lock sync.Mutex distribute chan *unstructured.Unstructured delete chan *unstructured.Unstructured + delegator federate.Federator failOnDistribute error failOnDelete error ResetOnFailure atomic.Bool @@ -50,6 +52,13 @@ func New() *Federator { return f } +func (f *Federator) SetDelegator(d federate.Federator) { + f.lock.Lock() + defer f.lock.Unlock() + + f.delegator = d +} + func (f *Federator) FailOnDistribute(err error) { f.lock.Lock() defer f.lock.Unlock() @@ -64,7 +73,7 @@ func (f *Federator) FailOnDelete(err error) { f.failOnDelete = err } -func (f *Federator) Distribute(_ context.Context, obj runtime.Object) error { +func (f *Federator) Distribute(ctx context.Context, obj runtime.Object) error { f.lock.Lock() defer f.lock.Unlock() @@ -77,12 +86,18 @@ func (f *Federator) Distribute(_ context.Context, obj runtime.Object) error { return err } - f.distribute <- resource.MustToUnstructured(obj) + if f.delegator != nil { + err = f.delegator.Distribute(ctx, obj) + } - return nil + if err == nil { + f.distribute <- resource.MustToUnstructured(obj) + } + + return err } -func (f *Federator) Delete(_ context.Context, obj runtime.Object) error { +func (f *Federator) Delete(ctx context.Context, obj runtime.Object) error { f.lock.Lock() defer f.lock.Unlock() @@ -95,9 +110,15 @@ func (f *Federator) Delete(_ context.Context, obj runtime.Object) error { return err } - f.delete <- resource.MustToUnstructured(obj) + if f.delegator != nil { + err = f.delegator.Delete(ctx, obj) + } + + if err == nil { + f.delete <- resource.MustToUnstructured(obj) + } - return nil + return err } func (f *Federator) VerifyDistribute(expected runtime.Object) { diff --git a/pkg/syncer/resource_syncer.go b/pkg/syncer/resource_syncer.go index eb3e8bda..72cf929a 100644 --- a/pkg/syncer/resource_syncer.go +++ b/pkg/syncer/resource_syncer.go @@ -48,7 +48,8 @@ import ( const ( OrigNamespaceLabelKey = "submariner-io/originatingNamespace" - namespaceKey = "$namespace$" + namespaceAddedKey = "$namespace-added$" + namespaceDeletedKey = "$namespace-deleted$" ) type SyncDirection int @@ -316,7 +317,13 @@ func newResourceSyncer(config *ResourceSyncerConfig) (*resourceSyncer, error) { if config.NamespaceInformer != nil { _, err := config.NamespaceInformer.AddEventHandler(cache.ResourceEventHandlerDetailedFuncs{ AddFunc: func(obj interface{}, _ bool) { - syncer.workQueue.Enqueue(cache.ExplicitKey(cache.NewObjectName(namespaceKey, resourceUtil.MustToMeta(obj).GetName()).String())) + syncer.workQueue.Enqueue(cache.ExplicitKey(cache.NewObjectName(namespaceAddedKey, resourceUtil.MustToMeta(obj).GetName()).String())) + }, + DeleteFunc: func(obj interface{}) { + objName, err := cache.DeletionHandlingObjectToName(obj) + utilruntime.Must(err) + + syncer.workQueue.Enqueue(cache.ExplicitKey(cache.NewObjectName(namespaceDeletedKey, objName.Name).String())) }, }) if err != nil { @@ -520,11 +527,16 @@ func (r *resourceSyncer) runIfCacheSynced(defaultReturn any, run func() any) any } func (r *resourceSyncer) processNextWorkItem(key, name, ns string) (bool, error) { - if ns == namespaceKey { + if ns == namespaceAddedKey { r.handleNamespaceAdded(name) return false, nil } + if ns == namespaceDeletedKey { + r.handleNamespaceDeleted(name) + return false, nil + } + var ( requeue bool err error @@ -600,6 +612,8 @@ func (r *resourceSyncer) handleCreatedOrUpdated(key string, created *unstructure return true, errors.Wrapf(err, "error distributing resource %q", key) } + r.recordNamespaceSeen(resource.GetNamespace()) + if r.syncCounter != nil { r.syncCounter.With(prometheus.Labels{ DirectionLabel: r.config.Direction.String(), @@ -796,6 +810,13 @@ func (r *resourceSyncer) assertUnstructured(obj interface{}) *unstructured.Unstr return u } +func (r *resourceSyncer) recordNamespaceSeen(namespace string) { + _, ok := r.missingNamespaces[namespace] + if !ok { + r.missingNamespaces[namespace] = set.New[string]() + } +} + func (r *resourceSyncer) handleMissingNamespace(key, namespace string) { r.log.Warningf("Syncer %q: Unable to distribute resource %q due to missing namespace %q", r.config.Name, key, namespace) @@ -803,13 +824,8 @@ func (r *resourceSyncer) handleMissingNamespace(key, namespace string) { return } - keys, ok := r.missingNamespaces[namespace] - if !ok { - keys = set.New[string]() - r.missingNamespaces[namespace] = keys - } - - keys.Insert(key) + r.recordNamespaceSeen(namespace) + r.missingNamespaces[namespace].Insert(key) } func (r *resourceSyncer) handleNamespaceAdded(namespace string) { @@ -826,6 +842,33 @@ func (r *resourceSyncer) handleNamespaceAdded(namespace string) { } } +func (r *resourceSyncer) handleNamespaceDeleted(namespace string) { + keys, ok := r.missingNamespaces[namespace] + if !ok { + return + } + + for _, key := range r.store.ListKeys() { + obj, exists, _ := r.store.GetByKey(key) + if !exists { + continue + } + + resource, _, _ := r.transform(r.assertUnstructured(obj), key, Create) + if resource == nil { + continue + } + + if resource.GetNamespace() == namespace { + keys.Insert(key) + } + } + + if keys.Len() > 0 { + r.log.Infof("Syncer %q: namespace %q deleted - recorded %d missing resources", r.config.Name, namespace, keys.Len()) + } +} + func getClusterIDLabel(resource runtime.Object) (string, bool) { clusterID, found := resourceUtil.MustToMeta(resource).GetLabels()[federate.ClusterIDLabelKey] return clusterID, found diff --git a/pkg/syncer/resource_syncer_test.go b/pkg/syncer/resource_syncer_test.go index 2aca96ba..04676162 100644 --- a/pkg/syncer/resource_syncer_test.go +++ b/pkg/syncer/resource_syncer_test.go @@ -27,6 +27,8 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/prometheus/client_golang/prometheus" + fakereactor "github.com/submariner-io/admiral/pkg/fake" + "github.com/submariner-io/admiral/pkg/federate" "github.com/submariner-io/admiral/pkg/federate/fake" . "github.com/submariner-io/admiral/pkg/gomega" resourceutils "github.com/submariner-io/admiral/pkg/resource" @@ -43,11 +45,9 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/dynamic" fakeClient "k8s.io/client-go/dynamic/fake" - "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" - fakeK8s "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/cache" "k8s.io/utils/ptr" ) @@ -1236,50 +1236,66 @@ func testWithSharedInformer() { } func testWithMissingNamespace() { - const transformedNamespace = "transformed-ns" + const ( + transformedNamespace = "transformed-ns" + noTransform = "no-transform" + ) d := newTestDriver(test.LocalNamespace, "", syncer.LocalToRemote) - var ( - k8sClient kubernetes.Interface - nsInformerFactory informers.SharedInformerFactory - ) + namespaceClient := func() dynamic.ResourceInterface { + return d.config.SourceClient.Resource(corev1.SchemeGroupVersion.WithResource("namespaces")).Namespace(metav1.NamespaceNone) + } + + createNamespace := func(name string) { + test.CreateResource(namespaceClient(), &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + }) + } BeforeEach(func() { d.config.Transform = func(obj runtime.Object, _ int, _ syncer.Operation) (runtime.Object, bool) { - obj = obj.DeepCopyObject() - resourceutils.MustToMeta(obj).SetNamespace(transformedNamespace) + if resourceutils.MustToMeta(obj).GetName() == noTransform { + return nil, false + } else if resourceutils.MustToMeta(obj).GetNamespace() == test.LocalNamespace { + obj = obj.DeepCopyObject() + resourceutils.MustToMeta(obj).SetNamespace(transformedNamespace) + } return obj, false } - d.federator.FailOnDistribute(apierrors.NewNotFound(schema.GroupResource{ - Resource: "namespaces", - }, transformedNamespace)) - - k8sClient = fakeK8s.NewClientset() - nsInformerFactory = informers.NewSharedInformerFactory(k8sClient, 0) - d.config.NamespaceInformer = nsInformerFactory.Core().V1().Namespaces().Informer() + d.config.NamespaceInformer = cache.NewSharedInformer(&cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return namespaceClient().List(context.TODO(), options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return namespaceClient().Watch(context.TODO(), options) + }, + }, resourceutils.MustToUnstructured(&corev1.Namespace{}), 0) }) JustBeforeEach(func() { - nsInformerFactory.Start(d.stopCh) + d.federator.SetDelegator(federate.NewCreateFederator(d.config.SourceClient, d.config.RestMapper, transformedNamespace)) + + createNamespace(test.LocalNamespace) + + fakereactor.AddVerifyNamespaceReactor(&d.config.SourceClient.(*fakeClient.FakeDynamicClient).Fake, "pods") + + if d.config.NamespaceInformer != nil { + go d.config.NamespaceInformer.Run(d.stopCh) + } }) Specify("distribute should eventually succeed when the namespace is created", func() { resource := test.CreateResource(d.sourceClient, d.resource) d.federator.VerifyNoDistribute() - d.federator.FailOnDistribute(nil) - By("Creating namespace") - _, err := k8sClient.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: transformedNamespace, - }, - }, metav1.CreateOptions{}) - Expect(err).To(Succeed()) + createNamespace(transformedNamespace) resource.SetNamespace(transformedNamespace) d.federator.VerifyDistribute(resource) @@ -1295,6 +1311,40 @@ func testWithMissingNamespace() { d.federator.VerifyNoDistribute() }) }) + + Context("after a namespace is created and distribute succeeds", func() { + const otherNS = "other-ns" + + JustBeforeEach(func() { + createNamespace(transformedNamespace) + createNamespace(otherNS) + }) + + It("should eventually redistribute when the namespace is recreated", func() { + resource := test.CreateResource(d.sourceClient, d.resource) + resource.SetNamespace(transformedNamespace) + d.federator.VerifyDistribute(resource) + + other := d.resource.DeepCopy() + other.Name = noTransform + test.CreateResource(d.sourceClient, other) + + other = d.resource.DeepCopy() + other.Namespace = otherNS + test.CreateResource(d.config.SourceClient.Resource( + *test.GetGroupVersionResourceFor(d.config.RestMapper, other)).Namespace(otherNS), other) + + By("Deleting namespace") + + err := namespaceClient().Delete(context.TODO(), transformedNamespace, metav1.DeleteOptions{}) + Expect(err).To(Succeed()) + + By("Recreating namespace") + + createNamespace(transformedNamespace) + d.federator.VerifyDistribute(resource) + }) + }) } func testEventOrdering() {