diff --git a/pkg/reconciler/subscription/subscription.go b/pkg/reconciler/subscription/subscription.go index bcbe662b096..d8a632aac9f 100644 --- a/pkg/reconciler/subscription/subscription.go +++ b/pkg/reconciler/subscription/subscription.go @@ -19,17 +19,15 @@ package subscription import ( "context" "fmt" - "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/dynamic" - - "knative.dev/pkg/apis/duck" + "k8s.io/client-go/util/retry" duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/kref" "knative.dev/pkg/logging" @@ -153,11 +151,11 @@ func (r Reconciler) checkChannelStatusForSubscription(ctx context.Context, chann func (r Reconciler) syncChannel(ctx context.Context, channel *eventingduckv1.Channelable, sub *v1.Subscription) pkgreconciler.Event { // Ok, now that we have the Channel and at least one of the Call/Result, let's reconcile // the Channel with this information. - if patched, err := r.syncPhysicalChannel(ctx, sub, channel, false); err != nil { + if updated, err := r.syncPhysicalChannel(ctx, sub, channel, false); err != nil { logging.FromContext(ctx).Warnw("Failed to sync physical Channel", zap.Error(err)) - sub.Status.MarkNotAddedToChannel(physicalChannelSyncFailed, "Failed to sync physical Channel: %v", err) + sub.Status.MarkChannelFailed(physicalChannelSyncFailed, "Failed to sync physical Channel: %v", err) return pkgreconciler.NewEvent(corev1.EventTypeWarning, physicalChannelSyncFailed, "Failed to synchronize to channel %q: %w", channel.Name, err) - } else if patched { + } else if updated { if sub.DeletionTimestamp.IsZero() { sub.Status.MarkAddedToChannel() return pkgreconciler.NewEvent(corev1.EventTypeNormal, "SubscriberSync", "Subscription was synchronized to channel %q", channel.Name) @@ -347,7 +345,7 @@ func (r *Reconciler) trackAndFetchChannel(ctx context.Context, sub *v1.Subscript } // getChannel fetches the Channel as specified by the Subscriptions spec.Channel -// and verifies it's a channelable (so that we can operate on it via patches). +// and verifies it's a channelable (so that we can operate on it via updates). // If the Channel is a channels.messaging type (hence, it's only a factory for // underlying channels), fetch and validate the "backing" channel. func (r *Reconciler) getChannel(ctx context.Context, sub *v1.Subscription) (*eventingduckv1.Channelable, pkgreconciler.Event) { @@ -425,48 +423,66 @@ func isNilOrEmptyDestination(destination *duckv1.Destination) bool { func (r *Reconciler) syncPhysicalChannel(ctx context.Context, sub *v1.Subscription, channel *eventingduckv1.Channelable, isDeleted bool) (bool, error) { logging.FromContext(ctx).Debugw("Reconciling physical from Channel", zap.Any("sub", sub)) - if patched, patchErr := r.patchSubscription(ctx, sub.Namespace, channel, sub); patchErr != nil { - if isDeleted && apierrors.IsNotFound(patchErr) { + if updated, updateErr := r.updateChannelable(ctx, sub.Namespace, channel, sub); updateErr != nil { + if isDeleted && apierrors.IsNotFound(updateErr) { logging.FromContext(ctx).Warnw("Could not find Channel", zap.Any("channel", sub.Spec.Channel)) return false, nil } - return patched, patchErr + return updated, updateErr } else { - return patched, nil + return updated, nil } } -func (r *Reconciler) patchSubscription(ctx context.Context, namespace string, channel *eventingduckv1.Channelable, sub *v1.Subscription) (bool, error) { - after := channel.DeepCopy() - - if sub.DeletionTimestamp.IsZero() { - r.updateChannelAddSubscription(after, sub) - } else { - r.updateChannelRemoveSubscription(after, sub) - } - - patch, err := duck.CreateMergePatch(channel, after) - if err != nil { - return false, err - } - // If there is nothing to patch, we are good, just return. - // Empty patch is {}, hence we check for that. - if len(patch) <= 2 { - return false, nil - } - +func (r *Reconciler) updateChannelable(ctx context.Context, namespace string, channel *eventingduckv1.Channelable, sub *v1.Subscription) (bool, error) { resourceClient, err := eventingduck.ResourceInterface(r.dynamicClientSet, namespace, channel.GroupVersionKind()) if err != nil { logging.FromContext(ctx).Warnw("Failed to create dynamic resource client", zap.Error(err)) return false, err } - patched, err := resourceClient.Patch(ctx, channel.GetName(), types.MergePatchType, patch, metav1.PatchOptions{}) + + updated := false + + uo := &unstructured.Unstructured{} + err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + channel, err = r.getChannel(ctx, sub) + if err != nil { + return fmt.Errorf("failed to get channel for subscription %s/%s: %w", sub.GetNamespace(), sub.GetName(), err) + } + + after := channel.DeepCopy() + + if sub.DeletionTimestamp.IsZero() { + r.updateChannelAddSubscription(after, sub) + } else { + r.updateChannelRemoveSubscription(after, sub) + } + + // If there is nothing to update on subscribers, we are good, just return. + if equality.Semantic.DeepEqual(channel.Spec.Subscribers, after.Spec.Subscribers) { + return nil + } else { + updated = true + } + + after.Status = eventingduckv1.ChannelableStatus{} + output, err := runtime.DefaultUnstructuredConverter.ToUnstructured(after) + if err != nil { + return err + } + + uo.Object = output + if _, err = resourceClient.Update(ctx, uo, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("failed to update channel %s/%s: %w", uo.GetNamespace(), uo.GetName(), err) + } + return nil + }) if err != nil { - logging.FromContext(ctx).Warnw("Failed to patch the Channel", zap.Error(err), zap.Any("patch", patch)) + logging.FromContext(ctx).Warnw("Failed to update the Channel", zap.Error(err), zap.Any("update", uo)) return false, err } - logging.FromContext(ctx).Debugw("Patched resource", zap.Any("patch", patch), zap.Any("patched", patched)) - return true, nil + + return updated, nil } func (r *Reconciler) updateChannelRemoveSubscription(channel *eventingduckv1.Channelable, sub *v1.Subscription) { diff --git a/pkg/reconciler/subscription/subscription_test.go b/pkg/reconciler/subscription/subscription_test.go index 30d1dc8299d..86c4fa50c65 100644 --- a/pkg/reconciler/subscription/subscription_test.go +++ b/pkg/reconciler/subscription/subscription_test.go @@ -18,8 +18,9 @@ package subscription import ( "context" - "encoding/json" "fmt" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "knative.dev/eventing/pkg/apis/messaging" "testing" corev1 "k8s.io/api/core/v1" @@ -903,14 +904,18 @@ func TestAllCases(t *testing.T) { WithInitSubscriptionConditions, MarkReferencesResolved, MarkAddedToChannel, - WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI), ), }}, + WantUpdates: []clientgotesting.UpdateActionImpl{{ + Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ + { + UID: subscriptionUID, + SubscriberURI: subscriberURI, + }, + }, nil), + }}, WantPatches: []clientgotesting.PatchActionImpl{ - patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ - {UID: subscriptionUID, SubscriberURI: subscriberURI}, - }), patchFinalizers(testNS, subscriptionName), }, }, { @@ -993,10 +998,19 @@ func TestAllCases(t *testing.T) { WithSubscriptionDeadLetterSinkURI(dlsURI), ), }}, + WantUpdates: []clientgotesting.UpdateActionImpl{{ + Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ + { + UID: subscriptionUID, + SubscriberURI: subscriberURI, + Delivery: &eventingduck.DeliverySpec{ + DeadLetterSink: &duckv1.Destination{ + URI: dlsURI, + }, + }}, + }, nil), + }}, WantPatches: []clientgotesting.PatchActionImpl{ - patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ - {UID: subscriptionUID, SubscriberURI: subscriberURI, Delivery: &eventingduck.DeliverySpec{DeadLetterSink: &duckv1.Destination{URI: apis.HTTP("dls.mynamespace.svc.cluster.local")}}}, - }), patchFinalizers(testNS, subscriptionName), }, }, { @@ -1043,10 +1057,15 @@ func TestAllCases(t *testing.T) { WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI), ), }}, + WantUpdates: []clientgotesting.UpdateActionImpl{{ + Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ + { + UID: subscriptionUID, + SubscriberURI: subscriberURI, + }, + }, nil), + }}, WantPatches: []clientgotesting.PatchActionImpl{ - patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ - {UID: subscriptionUID, SubscriberURI: subscriberURI}, - }), patchFinalizers(testNS, subscriptionName), }, }, { @@ -1132,10 +1151,16 @@ func TestAllCases(t *testing.T) { WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI), ), }}, + WantUpdates: []clientgotesting.UpdateActionImpl{{ + Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ + { + UID: subscriptionUID, + SubscriberURI: subscriberURI, + ReplyURI: replyURI, + }, + }, nil), + }}, WantPatches: []clientgotesting.PatchActionImpl{ - patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ - {UID: subscriptionUID, ReplyURI: replyURI, SubscriberURI: subscriberURI}, - }), patchFinalizers(testNS, subscriptionName), }, }, { @@ -1180,10 +1205,16 @@ func TestAllCases(t *testing.T) { WithSubscriptionPhysicalSubscriptionReply(replyURI), ), }}, + WantUpdates: []clientgotesting.UpdateActionImpl{{ + Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ + { + UID: subscriptionUID, + SubscriberURI: subscriberURI, + ReplyURI: replyURI, + }, + }, nil), + }}, WantPatches: []clientgotesting.PatchActionImpl{ - patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ - {UID: subscriptionUID, SubscriberURI: subscriberURI, ReplyURI: replyURI}, - }), patchFinalizers(testNS, subscriptionName), }, }, { @@ -1229,11 +1260,15 @@ func TestAllCases(t *testing.T) { WithSubscriptionStatusObservedGeneration(subscriptionGeneration), ), }}, - WantPatches: []clientgotesting.PatchActionImpl{ - patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ - {UID: subscriptionUID, Generation: subscriptionGeneration, SubscriberURI: subscriberURI}, - }), - }, + WantUpdates: []clientgotesting.UpdateActionImpl{{ + Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ + { + UID: subscriptionUID, + SubscriberURI: subscriberURI, + Generation: subscriptionGeneration, + }, + }, nil), + }}, }, { Name: "v1 imc+subscriber as service", Objects: []runtime.Object{ @@ -1267,10 +1302,15 @@ func TestAllCases(t *testing.T) { WithSubscriptionPhysicalSubscriptionSubscriber(serviceURI), ), }}, + WantUpdates: []clientgotesting.UpdateActionImpl{{ + Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ + { + UID: subscriptionUID, + SubscriberURI: serviceURI, + }, + }, nil), + }}, WantPatches: []clientgotesting.PatchActionImpl{ - patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ - {UID: subscriptionUID, SubscriberURI: serviceURI}, - }), patchFinalizers(testNS, subscriptionName), }, }, { @@ -1316,10 +1356,15 @@ func TestAllCases(t *testing.T) { WithSubscriptionPhysicalSubscriptionSubscriber(serviceURI), ), }}, + WantUpdates: []clientgotesting.UpdateActionImpl{{ + Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ + { + UID: "a-" + subscriptionUID, + SubscriberURI: serviceURI, + }, + }, nil), + }}, WantPatches: []clientgotesting.PatchActionImpl{ - patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ - {UID: "a-" + subscriptionUID, SubscriberURI: serviceURI}, - }), patchFinalizers(testNS, "a-"+subscriptionName), }, }, { @@ -1375,10 +1420,20 @@ func TestAllCases(t *testing.T) { WithSubscriptionDeadLetterSinkURI(dlsURI), ), }}, + WantUpdates: []clientgotesting.UpdateActionImpl{{ + Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ + { + UID: "a-" + subscriptionUID, + SubscriberURI: serviceURI, + Delivery: &eventingduck.DeliverySpec{ + DeadLetterSink: &duckv1.Destination{ + URI: apis.HTTP("dls.mynamespace.svc.cluster.local"), + }, + }, + }, + }, nil), + }}, WantPatches: []clientgotesting.PatchActionImpl{ - patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ - {UID: "a-" + subscriptionUID, SubscriberURI: serviceURI, Delivery: &eventingduck.DeliverySpec{DeadLetterSink: &duckv1.Destination{URI: apis.HTTP("dls.mynamespace.svc.cluster.local")}}}, - }), patchFinalizers(testNS, "a-"+subscriptionName), }, }, @@ -1447,8 +1502,8 @@ func TestAllCases(t *testing.T) { WithSubscriptionDeadLetterSinkURI(dlsURI), ), }}, - WantPatches: []clientgotesting.PatchActionImpl{ - patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ + WantUpdates: []clientgotesting.UpdateActionImpl{{ + Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ { UID: "a-" + subscriptionUID, SubscriberURI: serviceURI, @@ -1461,7 +1516,9 @@ func TestAllCases(t *testing.T) { BackoffDelay: pointer.String("PT1S"), }, }, - }), + }, nil), + }}, + WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, "a-"+subscriptionName), }, }, @@ -1517,8 +1574,8 @@ func TestAllCases(t *testing.T) { WithSubscriptionDeadLetterSinkURI(dlcURI), ), }}, - WantPatches: []clientgotesting.PatchActionImpl{ - patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ + WantUpdates: []clientgotesting.UpdateActionImpl{{ + Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ { UID: "a-" + subscriptionUID, SubscriberURI: serviceURI, @@ -1531,7 +1588,21 @@ func TestAllCases(t *testing.T) { BackoffDelay: pointer.String("PT1S"), }, }, + }, &eventingduck.DeliverySpec{ + DeadLetterSink: &duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: subscriberGVK.Group + "/" + subscriberGVK.Version, + Kind: subscriberGVK.Kind, + Name: dlcName, + Namespace: testNS, + }, + }, + Retry: pointer.Int32(10), + BackoffPolicy: &linear, + BackoffDelay: pointer.String("PT1S"), }), + }}, + WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, "a-"+subscriptionName), }, }, @@ -1581,8 +1652,8 @@ func TestAllCases(t *testing.T) { WithSubscriptionPhysicalSubscriptionSubscriber(serviceURI), ), }}, - WantPatches: []clientgotesting.PatchActionImpl{ - patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ + WantUpdates: []clientgotesting.UpdateActionImpl{{ + Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ { UID: "a-" + subscriptionUID, SubscriberURI: serviceURI, @@ -1591,7 +1662,12 @@ func TestAllCases(t *testing.T) { RetryAfterMax: pointer.String("PT2S"), }, }, + }, &eventingduck.DeliverySpec{ + Timeout: pointer.String("PT1S"), + RetryAfterMax: pointer.String("PT2S"), }), + }}, + WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, "a-"+subscriptionName), }, }, @@ -1632,13 +1708,15 @@ func TestAllCases(t *testing.T) { WithSubscriptionPhysicalSubscriptionSubscriber(serviceURI), ), }}, - WantPatches: []clientgotesting.PatchActionImpl{ - patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ + WantUpdates: []clientgotesting.UpdateActionImpl{{ + Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ { UID: "a-" + subscriptionUID, SubscriberURI: serviceURI, }, - }), + }, nil), + }}, + WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, "a-"+subscriptionName), }, }, @@ -1774,8 +1852,8 @@ func TestAllCases(t *testing.T) { WithSubscriptionDeadLetterSinkURI(dlsURI), ), }}, - WantPatches: []clientgotesting.PatchActionImpl{ - patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ + WantUpdates: []clientgotesting.UpdateActionImpl{{ + Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ { UID: "a-" + subscriptionUID, SubscriberURI: serviceURI, @@ -1788,7 +1866,21 @@ func TestAllCases(t *testing.T) { BackoffDelay: pointer.String("PT1S"), }, }, + }, &eventingduck.DeliverySpec{ + DeadLetterSink: &duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: subscriberGVK.Group + "/" + subscriberGVK.Version, + Kind: subscriberGVK.Kind, + Name: dlc2Name, + Namespace: testNS, + }, + }, + Retry: pointer.Int32(20), + BackoffPolicy: &linear, + BackoffDelay: pointer.String("PT10S"), }), + }}, + WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, "a-"+subscriptionName), }, }, @@ -1848,8 +1940,8 @@ func TestAllCases(t *testing.T) { }), ), }}, - WantPatches: []clientgotesting.PatchActionImpl{ - patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ + WantUpdates: []clientgotesting.UpdateActionImpl{{ + Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ { UID: "a-" + subscriptionUID, SubscriberURI: serviceURI, @@ -1858,7 +1950,12 @@ func TestAllCases(t *testing.T) { RetryAfterMax: pointer.String("PT2S"), }, }, + }, &eventingduck.DeliverySpec{ + Timeout: pointer.String("PT10S"), + RetryAfterMax: pointer.String("PT20S"), }), + }}, + WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, "a-"+subscriptionName), }, }, @@ -1892,12 +1989,14 @@ func TestAllCases(t *testing.T) { Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", subscriptionName), Eventf(corev1.EventTypeNormal, "SubscriberRemoved", "Subscription was removed from channel \"origin\""), }, + WantUpdates: []clientgotesting.UpdateActionImpl{{ + Object: makeUnstructuredChannelable(imcV1GVK, nil, nil), + }}, WantPatches: []clientgotesting.PatchActionImpl{ - patchSubscribers(testNS, channelName, nil), patchRemoveFinalizers(testNS, subscriptionName), }, }, { - Name: "subscription not deleted - channel patch fails", + Name: "subscription not deleted - channel update fails", Objects: []runtime.Object{ NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), @@ -1923,10 +2022,10 @@ func TestAllCases(t *testing.T) { }, Key: testNS + "/" + subscriptionName, WithReactors: []clientgotesting.ReactionFunc{ - InduceFailure("patch", "inmemorychannels"), + InduceFailure("update", "inmemorychannels"), }, WantEvents: []string{ - Eventf(corev1.EventTypeWarning, "PhysicalChannelSyncFailed", fmt.Sprintf("Failed to synchronize to channel %q: %s", channelName, "inducing failure for patch inmemorychannels")), + Eventf(corev1.EventTypeWarning, "PhysicalChannelSyncFailed", fmt.Sprintf("Failed to synchronize to channel %q: %s: %s", channelName, "failed to update channel testnamespace/origin", "inducing failure for update inmemorychannels")), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewSubscription(subscriptionName, testNS, @@ -1934,16 +2033,16 @@ func TestAllCases(t *testing.T) { WithSubscriptionChannel(imcV1GVK, channelName), WithInitSubscriptionConditions, MarkSubscriptionReady, - MarkNotAddedToChannel("PhysicalChannelSyncFailed", "Failed to sync physical Channel: inducing failure for patch inmemorychannels"), + MarkChannelFailed("PhysicalChannelSyncFailed", "Failed to sync physical Channel: failed to update channel testnamespace/origin: inducing failure for update inmemorychannels"), WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), WithSubscriptionFinalizers(finalizerName), WithSubscriptionPhysicalSubscriptionSubscriber(serviceURI), WithSubscriptionDeleted, ), }}, - WantPatches: []clientgotesting.PatchActionImpl{ - patchSubscribers(testNS, channelName, nil), - }, + WantUpdates: []clientgotesting.UpdateActionImpl{{ + Object: makeUnstructuredChannelable(imcV1GVK, nil, nil), + }}, }, { Name: "subscription deleted - channel does not exist", Objects: []runtime.Object{ @@ -1971,7 +2070,6 @@ func TestAllCases(t *testing.T) { }, }, } - logger := logtesting.TestLogger(t) table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher) controller.Reconciler { ctx = channelable.WithDuck(ctx) @@ -1997,36 +2095,6 @@ func WithSubscriptionDeliverySpec(d *eventingduck.DeliverySpec) SubscriptionOpti } } -func patchSubscribers(namespace, name string, subscribers []eventingduck.SubscriberSpec) clientgotesting.PatchActionImpl { - action := clientgotesting.PatchActionImpl{} - action.Name = name - action.Namespace = namespace - - var spec string - if subscribers != nil { - b, err := json.Marshal(subscribers) - if err != nil { - return action - } - ss := make([]map[string]interface{}, 0) - err = json.Unmarshal(b, &ss) - if err != nil { - return action - } - subs, err := json.Marshal(ss) - if err != nil { - return action - } - spec = fmt.Sprintf(`{"subscribers":%s}`, subs) - } else { - spec = `{"subscribers":null}` - } - - patch := `{"spec":` + spec + `}` - action.Patch = []byte(patch) - return action -} - func patchFinalizers(namespace, name string) clientgotesting.PatchActionImpl { action := clientgotesting.PatchActionImpl{} action.Name = name @@ -2044,3 +2112,29 @@ func patchRemoveFinalizers(namespace, name string) clientgotesting.PatchActionIm action.Patch = []byte(patch) return action } + +func makeUnstructuredChannelable(gvk metav1.GroupVersionKind, subscribers []eventingduck.SubscriberSpec, delivery *eventingduck.DeliverySpec) *unstructured.Unstructured { + ch := &eventingduck.Channelable{ + TypeMeta: metav1.TypeMeta{ + Kind: gvk.Kind, + APIVersion: fmt.Sprintf("%s/%s", gvk.Group, gvk.Version), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: channelName, + Namespace: testNS, + Annotations: map[string]string{messaging.SubscribableDuckVersionAnnotation: "v1"}, + }, + Spec: eventingduck.ChannelableSpec{ + SubscribableSpec: eventingduck.SubscribableSpec{ + Subscribers: subscribers, + }, + Delivery: delivery, + }, + Status: eventingduck.ChannelableStatus{}, + } + uo, err := runtime.DefaultUnstructuredConverter.ToUnstructured(ch) + if err != nil { + return nil + } + return &unstructured.Unstructured{Object: uo} +} diff --git a/pkg/reconciler/testing/v1/subscription.go b/pkg/reconciler/testing/v1/subscription.go index b42e759672c..aa3690ec0a0 100644 --- a/pkg/reconciler/testing/v1/subscription.go +++ b/pkg/reconciler/testing/v1/subscription.go @@ -250,6 +250,12 @@ func MarkNotAddedToChannel(reason, msg string) SubscriptionOption { } } +func MarkChannelFailed(reason, msg string) SubscriptionOption { + return func(s *v1.Subscription) { + s.Status.MarkChannelFailed(reason, msg) + } +} + func MarkReferencesResolved(s *v1.Subscription) { s.Status.MarkReferencesResolved() }