Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle namespace recreated in resource syncer #981

Merged
merged 4 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 107 additions & 25 deletions pkg/fake/basic_reactors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,31 +80,6 @@ var _ = Describe("Create", func() {
Expect(err).To(HaveOccurred())
})
})

Context("with namespace verification", func() {
BeforeEach(func() {
fake.AddVerifyNamespaceReactor(&t.client.Fake, "pods")
})

When("the namespace does not exist", func() {
It("should return an error", func() {
_, err := t.doCreate(t.pod)
Expect(resource.IsMissingNamespaceErr(err)).To(BeTrue())
Expect(resource.ExtractMissingNamespaceFromErr(err)).To(Equal(testNamespace))
})
})

When("the namespace does exist", func() {
It("should succeed", func() {
_, err := t.client.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{Name: testNamespace},
}, metav1.CreateOptions{})
Expect(err).To(Succeed())

t.assertCreateSuccess(t.pod)
})
})
})
})

var _ = Describe("Update", func() {
Expand Down Expand Up @@ -326,6 +301,113 @@ var _ = Describe("DeleteCollection", func() {
})
})

var _ = Describe("Namespace verification", func() {
t := newBasicReactorsTestDriver()

BeforeEach(func() {
fake.AddVerifyNamespaceReactor(&t.client.Fake, "*")
})

assertMissingNamespaceErr := func(err error) {
Expect(resource.IsMissingNamespaceErr(err)).To(BeTrue())
Expect(resource.ExtractMissingNamespaceFromErr(err)).To(Equal(testNamespace))
}

createNamespace := func(name string) {
_, err := t.client.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{Name: name},
}, metav1.CreateOptions{})
Expect(err).To(Succeed())
}

When("a namespace does not exist", func() {
Specify("requests should return the appropriate error", func() {
_, err := t.doCreate(t.pod)
assertMissingNamespaceErr(err)

_, err = t.doGet(t.pod.Name)
assertMissingNamespaceErr(err)

_, err = t.doUpdate()
assertMissingNamespaceErr(err)

assertMissingNamespaceErr(t.doDelete(metav1.DeleteOptions{}))

_, err = t.client.CoreV1().Pods(testNamespace).List(context.Background(), metav1.ListOptions{})
assertMissingNamespaceErr(err)
})
})

When("a request has no namespace", func() {
It("should succeed", func() {
_, err := t.client.CoreV1().Pods("").Create(context.Background(), t.pod, metav1.CreateOptions{})
Expect(err).To(Succeed())
})
})

When("a namespace does exist", func() {
Specify("requests should succeed", func() {
createNamespace(testNamespace)

t.pod.Name = "test-pod"
t.assertCreateSuccess(t.pod)

_, err := t.client.CoreV1().Pods(testNamespace).List(context.Background(), metav1.ListOptions{})
Expect(err).To(Succeed())

Expect(t.doDelete(metav1.DeleteOptions{})).To(Succeed())
})
})

When("a namespace is deleted", func() {
const noDeleteNS = "no-delete"

It("should delete all contained resources", func() {
createNamespace(testNamespace)
createNamespace(noDeleteNS)

t.pod.Name = "test-pod"
t.assertCreateSuccess(t.pod)

_, err := t.client.CoreV1().Services(testNamespace).Create(context.TODO(), &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "should-delete",
},
}, metav1.CreateOptions{})
Expect(err).To(Succeed())

_, err = t.client.CoreV1().Services(noDeleteNS).Create(context.TODO(), &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "should-not-delete",
},
}, metav1.CreateOptions{})
Expect(err).To(Succeed())

// Delete the namespace

Expect(t.client.CoreV1().Namespaces().Delete(context.TODO(), testNamespace, metav1.DeleteOptions{})).To(Succeed())

_, err = t.doGet(t.pod.Name)
assertMissingNamespaceErr(err)

// Recreate the namespace

createNamespace(testNamespace)

_, err = t.doGet(t.pod.Name)
Expect(err).To(HaveOccurred())
Expect(apierrors.IsNotFound(err)).To(BeTrue())

_, err = t.client.CoreV1().Services(testNamespace).Get(context.TODO(), "should-delete", metav1.GetOptions{})
Expect(err).To(HaveOccurred())
Expect(apierrors.IsNotFound(err)).To(BeTrue())

_, err = t.client.CoreV1().Services(noDeleteNS).Get(context.TODO(), "should-not-delete", metav1.GetOptions{})
Expect(err).To(Succeed())
})
})
})

type basicReactorsTestDriver struct {
client *k8sfake.Clientset
pod *corev1.Pod
Expand Down
36 changes: 24 additions & 12 deletions pkg/fake/delete_colection_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,12 @@ type DeleteCollectionReactor struct {
}

func AddDeleteCollectionReactor(f *testing.Fake) {
f.Lock()
r := &DeleteCollectionReactor{gvrToGVK: map[schema.GroupVersionResource]schema.GroupVersionKind{}, reactors: f.ReactionChain[0:]}
f.PrependReactor("delete-collection", "*", r.react)
f.Unlock()
r := newDeleteCollectionReactor(f.ReactionChain[0:])

for gvk := range scheme.Scheme.AllKnownTypes() {
if !strings.HasSuffix(gvk.Kind, "List") {
continue
}
f.Lock()
defer f.Unlock()

nonListGVK := gvk.GroupVersion().WithKind(gvk.Kind[:len(gvk.Kind)-4])
plural, _ := meta.UnsafeGuessKindToResource(nonListGVK)
r.gvrToGVK[plural] = nonListGVK
}
f.PrependReactor("delete-collection", "*", r.react)
}

func (r *DeleteCollectionReactor) react(action testing.Action) (bool, runtime.Object, error) {
Expand Down Expand Up @@ -113,3 +105,23 @@ func invokeReactors(action testing.Action, reactors []testing.Reactor) (runtime.

return nil, errors.New("action not handled")
}

func newDeleteCollectionReactor(reactors []testing.Reactor) *DeleteCollectionReactor {
return &DeleteCollectionReactor{gvrToGVK: createGVRToGVKMapping(), reactors: reactors}
}

func createGVRToGVKMapping() map[schema.GroupVersionResource]schema.GroupVersionKind {
gvrToGVK := map[schema.GroupVersionResource]schema.GroupVersionKind{}

for gvk := range scheme.Scheme.AllKnownTypes() {
if !strings.HasSuffix(gvk.Kind, "List") {
continue
}

nonListGVK := gvk.GroupVersion().WithKind(gvk.Kind[:len(gvk.Kind)-4])
plural, _ := meta.UnsafeGuessKindToResource(nonListGVK)
gvrToGVK[plural] = nonListGVK
}

return gvrToGVK
}
44 changes: 36 additions & 8 deletions pkg/fake/verify_namespace_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@ limitations under the License.
package fake

import (
"sync"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/testing"
)

Expand All @@ -30,21 +35,44 @@ func AddVerifyNamespaceReactor(f *testing.Fake, resources ...string) {

reactors := f.ReactionChain[0:]

react := func(a testing.Action) (bool, runtime.Object, error) {
action := a.(testing.CreateAction)
seenGVRs := sync.Map{}

namespace := action.GetNamespace()
react := func(a testing.Action) (bool, runtime.Object, error) {
seenGVRs.Store(a.GetResource(), true)

_, err := invokeReactors(testing.NewGetAction(corev1.SchemeGroupVersion.WithResource("namespaces"), "", namespace),
reactors)
if err != nil {
return true, nil, err
namespace := a.GetNamespace()
if namespace != metav1.NamespaceNone {
_, err := invokeReactors(testing.NewGetAction(corev1.SchemeGroupVersion.WithResource("namespaces"), "", namespace),
reactors)
if err != nil {
return true, nil, err
}
}

return false, nil, nil
}

for _, res := range resources {
f.PrependReactor("create", res, react)
f.PrependReactor("*", res, react)
}

deleteCollectionReactor := newDeleteCollectionReactor(reactors)

f.PrependReactor("delete", "namespaces", func(action testing.Action) (bool, runtime.Object, error) {
name := action.(testing.DeleteAction).GetName()

var err error

seenGVRs.Range(func(key, _ any) bool {
gvr := key.(schema.GroupVersionResource)

_, _, err = deleteCollectionReactor.react(testing.NewDeleteCollectionActionWithOptions(gvr, name,
metav1.DeleteOptions{}, metav1.ListOptions{}))
err = errors.Wrapf(err, "VerifyNamespaceReactor: error deleting %q resources in namespace %q", gvr, name)

return true
})

return err != nil, nil, err
})
}
33 changes: 27 additions & 6 deletions pkg/federate/fake/federator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

. "github.com/onsi/gomega"
"github.com/submariner-io/admiral/pkg/federate"
"github.com/submariner-io/admiral/pkg/resource"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -35,6 +36,7 @@ type Federator struct {
lock sync.Mutex
distribute chan *unstructured.Unstructured
delete chan *unstructured.Unstructured
delegator federate.Federator
failOnDistribute error
failOnDelete error
ResetOnFailure atomic.Bool
Expand All @@ -50,6 +52,13 @@ func New() *Federator {
return f
}

func (f *Federator) SetDelegator(d federate.Federator) {
f.lock.Lock()
defer f.lock.Unlock()

f.delegator = d
}

func (f *Federator) FailOnDistribute(err error) {
f.lock.Lock()
defer f.lock.Unlock()
Expand All @@ -64,7 +73,7 @@ func (f *Federator) FailOnDelete(err error) {
f.failOnDelete = err
}

func (f *Federator) Distribute(_ context.Context, obj runtime.Object) error {
func (f *Federator) Distribute(ctx context.Context, obj runtime.Object) error {
f.lock.Lock()
defer f.lock.Unlock()

Expand All @@ -77,12 +86,18 @@ func (f *Federator) Distribute(_ context.Context, obj runtime.Object) error {
return err
}

f.distribute <- resource.MustToUnstructured(obj)
if f.delegator != nil {
err = f.delegator.Distribute(ctx, obj)
}

return nil
if err == nil {
f.distribute <- resource.MustToUnstructured(obj)
}

return err
}

func (f *Federator) Delete(_ context.Context, obj runtime.Object) error {
func (f *Federator) Delete(ctx context.Context, obj runtime.Object) error {
f.lock.Lock()
defer f.lock.Unlock()

Expand All @@ -95,9 +110,15 @@ func (f *Federator) Delete(_ context.Context, obj runtime.Object) error {
return err
}

f.delete <- resource.MustToUnstructured(obj)
if f.delegator != nil {
err = f.delegator.Delete(ctx, obj)
}

if err == nil {
f.delete <- resource.MustToUnstructured(obj)
}

return nil
return err
}

func (f *Federator) VerifyDistribute(expected runtime.Object) {
Expand Down
Loading
Loading