diff --git a/pkg/federate/fake/federator.go b/pkg/federate/fake/federator.go index 057724a2..b9da275e 100644 --- a/pkg/federate/fake/federator.go +++ b/pkg/federate/fake/federator.go @@ -26,6 +26,7 @@ import ( "time" . "github.com/onsi/gomega" + "github.com/submariner-io/admiral/pkg/federate" "github.com/submariner-io/admiral/pkg/resource" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" @@ -35,6 +36,7 @@ type Federator struct { lock sync.Mutex distribute chan *unstructured.Unstructured delete chan *unstructured.Unstructured + delegator federate.Federator failOnDistribute error failOnDelete error ResetOnFailure atomic.Bool @@ -50,6 +52,13 @@ func New() *Federator { return f } +func (f *Federator) SetDelegator(d federate.Federator) { + f.lock.Lock() + defer f.lock.Unlock() + + f.delegator = d +} + func (f *Federator) FailOnDistribute(err error) { f.lock.Lock() defer f.lock.Unlock() @@ -64,7 +73,7 @@ func (f *Federator) FailOnDelete(err error) { f.failOnDelete = err } -func (f *Federator) Distribute(_ context.Context, obj runtime.Object) error { +func (f *Federator) Distribute(ctx context.Context, obj runtime.Object) error { f.lock.Lock() defer f.lock.Unlock() @@ -77,12 +86,18 @@ func (f *Federator) Distribute(_ context.Context, obj runtime.Object) error { return err } - f.distribute <- resource.MustToUnstructured(obj) + if f.delegator != nil { + err = f.delegator.Distribute(ctx, obj) + } - return nil + if err == nil { + f.distribute <- resource.MustToUnstructured(obj) + } + + return err } -func (f *Federator) Delete(_ context.Context, obj runtime.Object) error { +func (f *Federator) Delete(ctx context.Context, obj runtime.Object) error { f.lock.Lock() defer f.lock.Unlock() @@ -95,9 +110,15 @@ func (f *Federator) Delete(_ context.Context, obj runtime.Object) error { return err } - f.delete <- resource.MustToUnstructured(obj) + if f.delegator != nil { + err = f.delegator.Delete(ctx, obj) + } + + if err == nil { + f.delete <- resource.MustToUnstructured(obj) + } - return nil + return err } func (f *Federator) VerifyDistribute(expected runtime.Object) { diff --git a/pkg/syncer/resource_syncer.go b/pkg/syncer/resource_syncer.go index eb3e8bda..72cf929a 100644 --- a/pkg/syncer/resource_syncer.go +++ b/pkg/syncer/resource_syncer.go @@ -48,7 +48,8 @@ import ( const ( OrigNamespaceLabelKey = "submariner-io/originatingNamespace" - namespaceKey = "$namespace$" + namespaceAddedKey = "$namespace-added$" + namespaceDeletedKey = "$namespace-deleted$" ) type SyncDirection int @@ -316,7 +317,13 @@ func newResourceSyncer(config *ResourceSyncerConfig) (*resourceSyncer, error) { if config.NamespaceInformer != nil { _, err := config.NamespaceInformer.AddEventHandler(cache.ResourceEventHandlerDetailedFuncs{ AddFunc: func(obj interface{}, _ bool) { - syncer.workQueue.Enqueue(cache.ExplicitKey(cache.NewObjectName(namespaceKey, resourceUtil.MustToMeta(obj).GetName()).String())) + syncer.workQueue.Enqueue(cache.ExplicitKey(cache.NewObjectName(namespaceAddedKey, resourceUtil.MustToMeta(obj).GetName()).String())) + }, + DeleteFunc: func(obj interface{}) { + objName, err := cache.DeletionHandlingObjectToName(obj) + utilruntime.Must(err) + + syncer.workQueue.Enqueue(cache.ExplicitKey(cache.NewObjectName(namespaceDeletedKey, objName.Name).String())) }, }) if err != nil { @@ -520,11 +527,16 @@ func (r *resourceSyncer) runIfCacheSynced(defaultReturn any, run func() any) any } func (r *resourceSyncer) processNextWorkItem(key, name, ns string) (bool, error) { - if ns == namespaceKey { + if ns == namespaceAddedKey { r.handleNamespaceAdded(name) return false, nil } + if ns == namespaceDeletedKey { + r.handleNamespaceDeleted(name) + return false, nil + } + var ( requeue bool err error @@ -600,6 +612,8 @@ func (r *resourceSyncer) handleCreatedOrUpdated(key string, created *unstructure return true, errors.Wrapf(err, "error distributing resource %q", key) } + r.recordNamespaceSeen(resource.GetNamespace()) + if r.syncCounter != nil { r.syncCounter.With(prometheus.Labels{ DirectionLabel: r.config.Direction.String(), @@ -796,6 +810,13 @@ func (r *resourceSyncer) assertUnstructured(obj interface{}) *unstructured.Unstr return u } +func (r *resourceSyncer) recordNamespaceSeen(namespace string) { + _, ok := r.missingNamespaces[namespace] + if !ok { + r.missingNamespaces[namespace] = set.New[string]() + } +} + func (r *resourceSyncer) handleMissingNamespace(key, namespace string) { r.log.Warningf("Syncer %q: Unable to distribute resource %q due to missing namespace %q", r.config.Name, key, namespace) @@ -803,13 +824,8 @@ func (r *resourceSyncer) handleMissingNamespace(key, namespace string) { return } - keys, ok := r.missingNamespaces[namespace] - if !ok { - keys = set.New[string]() - r.missingNamespaces[namespace] = keys - } - - keys.Insert(key) + r.recordNamespaceSeen(namespace) + r.missingNamespaces[namespace].Insert(key) } func (r *resourceSyncer) handleNamespaceAdded(namespace string) { @@ -826,6 +842,33 @@ func (r *resourceSyncer) handleNamespaceAdded(namespace string) { } } +func (r *resourceSyncer) handleNamespaceDeleted(namespace string) { + keys, ok := r.missingNamespaces[namespace] + if !ok { + return + } + + for _, key := range r.store.ListKeys() { + obj, exists, _ := r.store.GetByKey(key) + if !exists { + continue + } + + resource, _, _ := r.transform(r.assertUnstructured(obj), key, Create) + if resource == nil { + continue + } + + if resource.GetNamespace() == namespace { + keys.Insert(key) + } + } + + if keys.Len() > 0 { + r.log.Infof("Syncer %q: namespace %q deleted - recorded %d missing resources", r.config.Name, namespace, keys.Len()) + } +} + func getClusterIDLabel(resource runtime.Object) (string, bool) { clusterID, found := resourceUtil.MustToMeta(resource).GetLabels()[federate.ClusterIDLabelKey] return clusterID, found diff --git a/pkg/syncer/resource_syncer_test.go b/pkg/syncer/resource_syncer_test.go index 2aca96ba..04676162 100644 --- a/pkg/syncer/resource_syncer_test.go +++ b/pkg/syncer/resource_syncer_test.go @@ -27,6 +27,8 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/prometheus/client_golang/prometheus" + fakereactor "github.com/submariner-io/admiral/pkg/fake" + "github.com/submariner-io/admiral/pkg/federate" "github.com/submariner-io/admiral/pkg/federate/fake" . "github.com/submariner-io/admiral/pkg/gomega" resourceutils "github.com/submariner-io/admiral/pkg/resource" @@ -43,11 +45,9 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/dynamic" fakeClient "k8s.io/client-go/dynamic/fake" - "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" - fakeK8s "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/cache" "k8s.io/utils/ptr" ) @@ -1236,50 +1236,66 @@ func testWithSharedInformer() { } func testWithMissingNamespace() { - const transformedNamespace = "transformed-ns" + const ( + transformedNamespace = "transformed-ns" + noTransform = "no-transform" + ) d := newTestDriver(test.LocalNamespace, "", syncer.LocalToRemote) - var ( - k8sClient kubernetes.Interface - nsInformerFactory informers.SharedInformerFactory - ) + namespaceClient := func() dynamic.ResourceInterface { + return d.config.SourceClient.Resource(corev1.SchemeGroupVersion.WithResource("namespaces")).Namespace(metav1.NamespaceNone) + } + + createNamespace := func(name string) { + test.CreateResource(namespaceClient(), &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + }) + } BeforeEach(func() { d.config.Transform = func(obj runtime.Object, _ int, _ syncer.Operation) (runtime.Object, bool) { - obj = obj.DeepCopyObject() - resourceutils.MustToMeta(obj).SetNamespace(transformedNamespace) + if resourceutils.MustToMeta(obj).GetName() == noTransform { + return nil, false + } else if resourceutils.MustToMeta(obj).GetNamespace() == test.LocalNamespace { + obj = obj.DeepCopyObject() + resourceutils.MustToMeta(obj).SetNamespace(transformedNamespace) + } return obj, false } - d.federator.FailOnDistribute(apierrors.NewNotFound(schema.GroupResource{ - Resource: "namespaces", - }, transformedNamespace)) - - k8sClient = fakeK8s.NewClientset() - nsInformerFactory = informers.NewSharedInformerFactory(k8sClient, 0) - d.config.NamespaceInformer = nsInformerFactory.Core().V1().Namespaces().Informer() + d.config.NamespaceInformer = cache.NewSharedInformer(&cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return namespaceClient().List(context.TODO(), options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return namespaceClient().Watch(context.TODO(), options) + }, + }, resourceutils.MustToUnstructured(&corev1.Namespace{}), 0) }) JustBeforeEach(func() { - nsInformerFactory.Start(d.stopCh) + d.federator.SetDelegator(federate.NewCreateFederator(d.config.SourceClient, d.config.RestMapper, transformedNamespace)) + + createNamespace(test.LocalNamespace) + + fakereactor.AddVerifyNamespaceReactor(&d.config.SourceClient.(*fakeClient.FakeDynamicClient).Fake, "pods") + + if d.config.NamespaceInformer != nil { + go d.config.NamespaceInformer.Run(d.stopCh) + } }) Specify("distribute should eventually succeed when the namespace is created", func() { resource := test.CreateResource(d.sourceClient, d.resource) d.federator.VerifyNoDistribute() - d.federator.FailOnDistribute(nil) - By("Creating namespace") - _, err := k8sClient.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: transformedNamespace, - }, - }, metav1.CreateOptions{}) - Expect(err).To(Succeed()) + createNamespace(transformedNamespace) resource.SetNamespace(transformedNamespace) d.federator.VerifyDistribute(resource) @@ -1295,6 +1311,40 @@ func testWithMissingNamespace() { d.federator.VerifyNoDistribute() }) }) + + Context("after a namespace is created and distribute succeeds", func() { + const otherNS = "other-ns" + + JustBeforeEach(func() { + createNamespace(transformedNamespace) + createNamespace(otherNS) + }) + + It("should eventually redistribute when the namespace is recreated", func() { + resource := test.CreateResource(d.sourceClient, d.resource) + resource.SetNamespace(transformedNamespace) + d.federator.VerifyDistribute(resource) + + other := d.resource.DeepCopy() + other.Name = noTransform + test.CreateResource(d.sourceClient, other) + + other = d.resource.DeepCopy() + other.Namespace = otherNS + test.CreateResource(d.config.SourceClient.Resource( + *test.GetGroupVersionResourceFor(d.config.RestMapper, other)).Namespace(otherNS), other) + + By("Deleting namespace") + + err := namespaceClient().Delete(context.TODO(), transformedNamespace, metav1.DeleteOptions{}) + Expect(err).To(Succeed()) + + By("Recreating namespace") + + createNamespace(transformedNamespace) + d.federator.VerifyDistribute(resource) + }) + }) } func testEventOrdering() {