From facbd02ba367c872bb60eccc611bf037215bbdf4 Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Mon, 19 Apr 2021 16:15:00 -0700 Subject: [PATCH] [Rekt] Add polling on inner channel subscriber status (#5278) * add polling on inner channel subscriber status * lint * drop unused function * poll on the status. --- test/rekt/features/channel/control_plane.go | 29 ++++++++++++++++++++- test/rekt/features/channel/helpers.go | 18 ------------- 2 files changed, 28 insertions(+), 19 deletions(-) diff --git a/test/rekt/features/channel/control_plane.go b/test/rekt/features/channel/control_plane.go index f100d7947d..79053e4c7c 100644 --- a/test/rekt/features/channel/control_plane.go +++ b/test/rekt/features/channel/control_plane.go @@ -21,6 +21,7 @@ import ( "github.com/google/go-cmp/cmp" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" duckv1 "knative.dev/eventing/pkg/apis/duck/v1" "knative.dev/eventing/pkg/apis/messaging" "knative.dev/eventing/test/rekt/features/knconf" @@ -28,6 +29,7 @@ import ( "knative.dev/eventing/test/rekt/resources/channel_impl" "knative.dev/pkg/apis" "knative.dev/pkg/apis/duck" + "knative.dev/reconciler-test/pkg/environment" "knative.dev/reconciler-test/pkg/feature" ) @@ -150,7 +152,32 @@ func channelAllowsSubscribersAndStatus(ctx context.Context, t feature.T) { patchChannelable(ctx, t, original, ch) - updated := getChannelableWithStatus(ctx, t) + var updated *duckv1.Channelable + interval, timeout := environment.PollTimingsFromContext(ctx) + err := wait.PollImmediate(interval, timeout, func() (bool, error) { + updated = getChannelable(ctx, t) + if updated.Status.ObservedGeneration != updated.Generation { + // keep polling. + return false, nil + } + if len(updated.Status.Subscribers) == len(ch.Spec.Subscribers) { + for _, got := range updated.Status.Subscribers { + // want should be Ready. + if got.UID == want.UID { + if want := corev1.ConditionTrue; got.Ready == want { + // Synced! + return true, nil + } + } + } + } + // keep polling. + return false, nil + }) + if err != nil { + t.Fatalf("failed waiting for channel subscribers to sync", err) + } + if len(updated.Spec.Subscribers) <= 0 { t.Errorf("subscriber was not saved") } diff --git a/test/rekt/features/channel/helpers.go b/test/rekt/features/channel/helpers.go index 8ad6a24a52..13151a3741 100644 --- a/test/rekt/features/channel/helpers.go +++ b/test/rekt/features/channel/helpers.go @@ -28,7 +28,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/dynamic" duckv1 "knative.dev/eventing/pkg/apis/duck/v1" messagingclientsetv1 "knative.dev/eventing/pkg/client/clientset/versioned/typed/messaging/v1" @@ -93,23 +92,6 @@ func getChannelable(ctx context.Context, t feature.T) *duckv1.Channelable { return channel } -func getChannelableWithStatus(ctx context.Context, t feature.T) *duckv1.Channelable { - interval, timeout := environment.PollTimingsFromContext(ctx) - var ch *duckv1.Channelable - err := wait.PollImmediate(interval, timeout, func() (bool, error) { - ch = getChannelable(ctx, t) - if ch.Status.ObservedGeneration != ch.Generation { - // keep polling. - return false, nil - } - return true, nil - }) - if err != nil { - t.Fatal(err) - } - return ch -} - func patchChannelable(ctx context.Context, t feature.T, before, after *duckv1.Channelable) { patch, err := duck.CreateMergePatch(before, after) if err != nil {