Skip to content

Commit

Permalink
[Rekt] Add polling on inner channel subscriber status (#5278)
Browse files Browse the repository at this point in the history
* add polling on inner channel subscriber status

* lint

* drop unused function

* poll on the status.
  • Loading branch information
Scott Nichols authored Apr 19, 2021
1 parent 4324aef commit facbd02
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 19 deletions.
29 changes: 28 additions & 1 deletion test/rekt/features/channel/control_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ import (

"github.com/google/go-cmp/cmp"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
duckv1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/pkg/apis/messaging"
"knative.dev/eventing/test/rekt/features/knconf"
"knative.dev/eventing/test/rekt/resources/account_role"
"knative.dev/eventing/test/rekt/resources/channel_impl"
"knative.dev/pkg/apis"
"knative.dev/pkg/apis/duck"
"knative.dev/reconciler-test/pkg/environment"
"knative.dev/reconciler-test/pkg/feature"
)

Expand Down Expand Up @@ -150,7 +152,32 @@ func channelAllowsSubscribersAndStatus(ctx context.Context, t feature.T) {

patchChannelable(ctx, t, original, ch)

updated := getChannelableWithStatus(ctx, t)
var updated *duckv1.Channelable
interval, timeout := environment.PollTimingsFromContext(ctx)
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
updated = getChannelable(ctx, t)
if updated.Status.ObservedGeneration != updated.Generation {
// keep polling.
return false, nil
}
if len(updated.Status.Subscribers) == len(ch.Spec.Subscribers) {
for _, got := range updated.Status.Subscribers {
// want should be Ready.
if got.UID == want.UID {
if want := corev1.ConditionTrue; got.Ready == want {
// Synced!
return true, nil
}
}
}
}
// keep polling.
return false, nil
})
if err != nil {
t.Fatalf("failed waiting for channel subscribers to sync", err)
}

if len(updated.Spec.Subscribers) <= 0 {
t.Errorf("subscriber was not saved")
}
Expand Down
18 changes: 0 additions & 18 deletions test/rekt/features/channel/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
duckv1 "knative.dev/eventing/pkg/apis/duck/v1"
messagingclientsetv1 "knative.dev/eventing/pkg/client/clientset/versioned/typed/messaging/v1"
Expand Down Expand Up @@ -93,23 +92,6 @@ func getChannelable(ctx context.Context, t feature.T) *duckv1.Channelable {
return channel
}

func getChannelableWithStatus(ctx context.Context, t feature.T) *duckv1.Channelable {
interval, timeout := environment.PollTimingsFromContext(ctx)
var ch *duckv1.Channelable
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
ch = getChannelable(ctx, t)
if ch.Status.ObservedGeneration != ch.Generation {
// keep polling.
return false, nil
}
return true, nil
})
if err != nil {
t.Fatal(err)
}
return ch
}

func patchChannelable(ctx context.Context, t feature.T, before, after *duckv1.Channelable) {
patch, err := duck.CreateMergePatch(before, after)
if err != nil {
Expand Down

0 comments on commit facbd02

Please sign in to comment.