diff --git a/pkg/reconciler/subscription/subscription.go b/pkg/reconciler/subscription/subscription.go index 7ff849d00a9..bcbe662b096 100644 --- a/pkg/reconciler/subscription/subscription.go +++ b/pkg/reconciler/subscription/subscription.go @@ -25,10 +25,11 @@ import ( "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/dynamic" - "k8s.io/client-go/util/retry" + + "knative.dev/pkg/apis/duck" duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/kref" "knative.dev/pkg/logging" @@ -152,11 +153,11 @@ func (r Reconciler) checkChannelStatusForSubscription(ctx context.Context, chann func (r Reconciler) syncChannel(ctx context.Context, channel *eventingduckv1.Channelable, sub *v1.Subscription) pkgreconciler.Event { // Ok, now that we have the Channel and at least one of the Call/Result, let's reconcile // the Channel with this information. - if updated, err := r.syncPhysicalChannel(ctx, sub, channel, false); err != nil { + if patched, err := r.syncPhysicalChannel(ctx, sub, channel, false); err != nil { logging.FromContext(ctx).Warnw("Failed to sync physical Channel", zap.Error(err)) - sub.Status.MarkChannelFailed(physicalChannelSyncFailed, "Failed to sync physical Channel: %v", err) + sub.Status.MarkNotAddedToChannel(physicalChannelSyncFailed, "Failed to sync physical Channel: %v", err) return pkgreconciler.NewEvent(corev1.EventTypeWarning, physicalChannelSyncFailed, "Failed to synchronize to channel %q: %w", channel.Name, err) - } else if updated { + } else if patched { if sub.DeletionTimestamp.IsZero() { sub.Status.MarkAddedToChannel() return pkgreconciler.NewEvent(corev1.EventTypeNormal, "SubscriberSync", "Subscription was synchronized to channel %q", channel.Name) @@ -346,7 +347,7 @@ func (r *Reconciler) trackAndFetchChannel(ctx context.Context, sub *v1.Subscript } // getChannel fetches the Channel as specified by the Subscriptions spec.Channel -// and verifies it's a channelable (so that we can operate on it via updates). +// and verifies it's a channelable (so that we can operate on it via patches). // If the Channel is a channels.messaging type (hence, it's only a factory for // underlying channels), fetch and validate the "backing" channel. func (r *Reconciler) getChannel(ctx context.Context, sub *v1.Subscription) (*eventingduckv1.Channelable, pkgreconciler.Event) { @@ -424,66 +425,48 @@ func isNilOrEmptyDestination(destination *duckv1.Destination) bool { func (r *Reconciler) syncPhysicalChannel(ctx context.Context, sub *v1.Subscription, channel *eventingduckv1.Channelable, isDeleted bool) (bool, error) { logging.FromContext(ctx).Debugw("Reconciling physical from Channel", zap.Any("sub", sub)) - if updated, updateErr := r.updateChannelable(ctx, sub.Namespace, channel, sub); updateErr != nil { - if isDeleted && apierrors.IsNotFound(updateErr) { + if patched, patchErr := r.patchSubscription(ctx, sub.Namespace, channel, sub); patchErr != nil { + if isDeleted && apierrors.IsNotFound(patchErr) { logging.FromContext(ctx).Warnw("Could not find Channel", zap.Any("channel", sub.Spec.Channel)) return false, nil } - return updated, updateErr + return patched, patchErr } else { - return updated, nil + return patched, nil } } -func (r *Reconciler) updateChannelable(ctx context.Context, namespace string, channel *eventingduckv1.Channelable, sub *v1.Subscription) (bool, error) { +func (r *Reconciler) patchSubscription(ctx context.Context, namespace string, channel *eventingduckv1.Channelable, sub *v1.Subscription) (bool, error) { + after := channel.DeepCopy() + + if sub.DeletionTimestamp.IsZero() { + r.updateChannelAddSubscription(after, sub) + } else { + r.updateChannelRemoveSubscription(after, sub) + } + + patch, err := duck.CreateMergePatch(channel, after) + if err != nil { + return false, err + } + // If there is nothing to patch, we are good, just return. + // Empty patch is {}, hence we check for that. + if len(patch) <= 2 { + return false, nil + } + resourceClient, err := eventingduck.ResourceInterface(r.dynamicClientSet, namespace, channel.GroupVersionKind()) if err != nil { logging.FromContext(ctx).Warnw("Failed to create dynamic resource client", zap.Error(err)) return false, err } - - updated := false - - uo := &unstructured.Unstructured{} - err = retry.RetryOnConflict(retry.DefaultRetry, func() error { - channel, err = r.getChannel(ctx, sub) - if err != nil { - return fmt.Errorf("failed to get channel for subscription %s/%s: %w", sub.GetNamespace(), sub.GetName(), err) - } - - after := channel.DeepCopy() - - if sub.DeletionTimestamp.IsZero() { - r.updateChannelAddSubscription(after, sub) - } else { - r.updateChannelRemoveSubscription(after, sub) - } - - // If there is nothing to update on subscribers, we are good, just return. - if equality.Semantic.DeepEqual(channel.Spec.Subscribers, after.Spec.Subscribers) { - return nil - } else { - updated = true - } - - after.Status = eventingduckv1.ChannelableStatus{} - output, err := runtime.DefaultUnstructuredConverter.ToUnstructured(after) - if err != nil { - return err - } - - uo.Object = output - if _, err = resourceClient.Update(ctx, uo, metav1.UpdateOptions{}); err != nil { - return fmt.Errorf("failed to update channel %s/%s: %w", uo.GetNamespace(), uo.GetName(), err) - } - return nil - }) + patched, err := resourceClient.Patch(ctx, channel.GetName(), types.MergePatchType, patch, metav1.PatchOptions{}) if err != nil { - logging.FromContext(ctx).Warnw("Failed to update the Channel", zap.Error(err), zap.Any("update", uo)) + logging.FromContext(ctx).Warnw("Failed to patch the Channel", zap.Error(err), zap.Any("patch", patch)) return false, err } - - return updated, nil + logging.FromContext(ctx).Debugw("Patched resource", zap.Any("patch", patch), zap.Any("patched", patched)) + return true, nil } func (r *Reconciler) updateChannelRemoveSubscription(channel *eventingduckv1.Channelable, sub *v1.Subscription) { diff --git a/pkg/reconciler/subscription/subscription_test.go b/pkg/reconciler/subscription/subscription_test.go index c946f5b1843..30d1dc8299d 100644 --- a/pkg/reconciler/subscription/subscription_test.go +++ b/pkg/reconciler/subscription/subscription_test.go @@ -18,12 +18,10 @@ package subscription import ( "context" + "encoding/json" "fmt" "testing" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "knative.dev/eventing/pkg/apis/messaging" - corev1 "k8s.io/api/core/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -905,18 +903,14 @@ func TestAllCases(t *testing.T) { WithInitSubscriptionConditions, MarkReferencesResolved, MarkAddedToChannel, + WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI), ), }}, - WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ - { - UID: subscriptionUID, - SubscriberURI: subscriberURI, - }, - }, nil), - }}, WantPatches: []clientgotesting.PatchActionImpl{ + patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ + {UID: subscriptionUID, SubscriberURI: subscriberURI}, + }), patchFinalizers(testNS, subscriptionName), }, }, { @@ -999,19 +993,10 @@ func TestAllCases(t *testing.T) { WithSubscriptionDeadLetterSinkURI(dlsURI), ), }}, - WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ - { - UID: subscriptionUID, - SubscriberURI: subscriberURI, - Delivery: &eventingduck.DeliverySpec{ - DeadLetterSink: &duckv1.Destination{ - URI: dlsURI, - }, - }}, - }, nil), - }}, WantPatches: []clientgotesting.PatchActionImpl{ + patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ + {UID: subscriptionUID, SubscriberURI: subscriberURI, Delivery: &eventingduck.DeliverySpec{DeadLetterSink: &duckv1.Destination{URI: apis.HTTP("dls.mynamespace.svc.cluster.local")}}}, + }), patchFinalizers(testNS, subscriptionName), }, }, { @@ -1058,15 +1043,10 @@ func TestAllCases(t *testing.T) { WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI), ), }}, - WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ - { - UID: subscriptionUID, - SubscriberURI: subscriberURI, - }, - }, nil), - }}, WantPatches: []clientgotesting.PatchActionImpl{ + patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ + {UID: subscriptionUID, SubscriberURI: subscriberURI}, + }), patchFinalizers(testNS, subscriptionName), }, }, { @@ -1152,16 +1132,10 @@ func TestAllCases(t *testing.T) { WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI), ), }}, - WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ - { - UID: subscriptionUID, - SubscriberURI: subscriberURI, - ReplyURI: replyURI, - }, - }, nil), - }}, WantPatches: []clientgotesting.PatchActionImpl{ + patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ + {UID: subscriptionUID, ReplyURI: replyURI, SubscriberURI: subscriberURI}, + }), patchFinalizers(testNS, subscriptionName), }, }, { @@ -1206,16 +1180,10 @@ func TestAllCases(t *testing.T) { WithSubscriptionPhysicalSubscriptionReply(replyURI), ), }}, - WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ - { - UID: subscriptionUID, - SubscriberURI: subscriberURI, - ReplyURI: replyURI, - }, - }, nil), - }}, WantPatches: []clientgotesting.PatchActionImpl{ + patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ + {UID: subscriptionUID, SubscriberURI: subscriberURI, ReplyURI: replyURI}, + }), patchFinalizers(testNS, subscriptionName), }, }, { @@ -1261,15 +1229,11 @@ func TestAllCases(t *testing.T) { WithSubscriptionStatusObservedGeneration(subscriptionGeneration), ), }}, - WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ - { - UID: subscriptionUID, - SubscriberURI: subscriberURI, - Generation: subscriptionGeneration, - }, - }, nil), - }}, + WantPatches: []clientgotesting.PatchActionImpl{ + patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ + {UID: subscriptionUID, Generation: subscriptionGeneration, SubscriberURI: subscriberURI}, + }), + }, }, { Name: "v1 imc+subscriber as service", Objects: []runtime.Object{ @@ -1303,15 +1267,10 @@ func TestAllCases(t *testing.T) { WithSubscriptionPhysicalSubscriptionSubscriber(serviceURI), ), }}, - WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ - { - UID: subscriptionUID, - SubscriberURI: serviceURI, - }, - }, nil), - }}, WantPatches: []clientgotesting.PatchActionImpl{ + patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ + {UID: subscriptionUID, SubscriberURI: serviceURI}, + }), patchFinalizers(testNS, subscriptionName), }, }, { @@ -1357,15 +1316,10 @@ func TestAllCases(t *testing.T) { WithSubscriptionPhysicalSubscriptionSubscriber(serviceURI), ), }}, - WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ - { - UID: "a-" + subscriptionUID, - SubscriberURI: serviceURI, - }, - }, nil), - }}, WantPatches: []clientgotesting.PatchActionImpl{ + patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ + {UID: "a-" + subscriptionUID, SubscriberURI: serviceURI}, + }), patchFinalizers(testNS, "a-"+subscriptionName), }, }, { @@ -1421,20 +1375,10 @@ func TestAllCases(t *testing.T) { WithSubscriptionDeadLetterSinkURI(dlsURI), ), }}, - WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ - { - UID: "a-" + subscriptionUID, - SubscriberURI: serviceURI, - Delivery: &eventingduck.DeliverySpec{ - DeadLetterSink: &duckv1.Destination{ - URI: apis.HTTP("dls.mynamespace.svc.cluster.local"), - }, - }, - }, - }, nil), - }}, WantPatches: []clientgotesting.PatchActionImpl{ + patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ + {UID: "a-" + subscriptionUID, SubscriberURI: serviceURI, Delivery: &eventingduck.DeliverySpec{DeadLetterSink: &duckv1.Destination{URI: apis.HTTP("dls.mynamespace.svc.cluster.local")}}}, + }), patchFinalizers(testNS, "a-"+subscriptionName), }, }, @@ -1503,8 +1447,8 @@ func TestAllCases(t *testing.T) { WithSubscriptionDeadLetterSinkURI(dlsURI), ), }}, - WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ + WantPatches: []clientgotesting.PatchActionImpl{ + patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ { UID: "a-" + subscriptionUID, SubscriberURI: serviceURI, @@ -1517,9 +1461,7 @@ func TestAllCases(t *testing.T) { BackoffDelay: pointer.String("PT1S"), }, }, - }, nil), - }}, - WantPatches: []clientgotesting.PatchActionImpl{ + }), patchFinalizers(testNS, "a-"+subscriptionName), }, }, @@ -1575,8 +1517,8 @@ func TestAllCases(t *testing.T) { WithSubscriptionDeadLetterSinkURI(dlcURI), ), }}, - WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ + WantPatches: []clientgotesting.PatchActionImpl{ + patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ { UID: "a-" + subscriptionUID, SubscriberURI: serviceURI, @@ -1589,21 +1531,7 @@ func TestAllCases(t *testing.T) { BackoffDelay: pointer.String("PT1S"), }, }, - }, &eventingduck.DeliverySpec{ - DeadLetterSink: &duckv1.Destination{ - Ref: &duckv1.KReference{ - APIVersion: subscriberGVK.Group + "/" + subscriberGVK.Version, - Kind: subscriberGVK.Kind, - Name: dlcName, - Namespace: testNS, - }, - }, - Retry: pointer.Int32(10), - BackoffPolicy: &linear, - BackoffDelay: pointer.String("PT1S"), }), - }}, - WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, "a-"+subscriptionName), }, }, @@ -1653,8 +1581,8 @@ func TestAllCases(t *testing.T) { WithSubscriptionPhysicalSubscriptionSubscriber(serviceURI), ), }}, - WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ + WantPatches: []clientgotesting.PatchActionImpl{ + patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ { UID: "a-" + subscriptionUID, SubscriberURI: serviceURI, @@ -1663,12 +1591,7 @@ func TestAllCases(t *testing.T) { RetryAfterMax: pointer.String("PT2S"), }, }, - }, &eventingduck.DeliverySpec{ - Timeout: pointer.String("PT1S"), - RetryAfterMax: pointer.String("PT2S"), }), - }}, - WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, "a-"+subscriptionName), }, }, @@ -1709,15 +1632,13 @@ func TestAllCases(t *testing.T) { WithSubscriptionPhysicalSubscriptionSubscriber(serviceURI), ), }}, - WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ + WantPatches: []clientgotesting.PatchActionImpl{ + patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ { UID: "a-" + subscriptionUID, SubscriberURI: serviceURI, }, - }, nil), - }}, - WantPatches: []clientgotesting.PatchActionImpl{ + }), patchFinalizers(testNS, "a-"+subscriptionName), }, }, @@ -1853,8 +1774,8 @@ func TestAllCases(t *testing.T) { WithSubscriptionDeadLetterSinkURI(dlsURI), ), }}, - WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ + WantPatches: []clientgotesting.PatchActionImpl{ + patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ { UID: "a-" + subscriptionUID, SubscriberURI: serviceURI, @@ -1867,21 +1788,7 @@ func TestAllCases(t *testing.T) { BackoffDelay: pointer.String("PT1S"), }, }, - }, &eventingduck.DeliverySpec{ - DeadLetterSink: &duckv1.Destination{ - Ref: &duckv1.KReference{ - APIVersion: subscriberGVK.Group + "/" + subscriberGVK.Version, - Kind: subscriberGVK.Kind, - Name: dlc2Name, - Namespace: testNS, - }, - }, - Retry: pointer.Int32(20), - BackoffPolicy: &linear, - BackoffDelay: pointer.String("PT10S"), }), - }}, - WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, "a-"+subscriptionName), }, }, @@ -1941,8 +1848,8 @@ func TestAllCases(t *testing.T) { }), ), }}, - WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ + WantPatches: []clientgotesting.PatchActionImpl{ + patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ { UID: "a-" + subscriptionUID, SubscriberURI: serviceURI, @@ -1951,12 +1858,7 @@ func TestAllCases(t *testing.T) { RetryAfterMax: pointer.String("PT2S"), }, }, - }, &eventingduck.DeliverySpec{ - Timeout: pointer.String("PT10S"), - RetryAfterMax: pointer.String("PT20S"), }), - }}, - WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, "a-"+subscriptionName), }, }, @@ -1990,14 +1892,12 @@ func TestAllCases(t *testing.T) { Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", subscriptionName), Eventf(corev1.EventTypeNormal, "SubscriberRemoved", "Subscription was removed from channel \"origin\""), }, - WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: makeUnstructuredChannelable(imcV1GVK, nil, nil), - }}, WantPatches: []clientgotesting.PatchActionImpl{ + patchSubscribers(testNS, channelName, nil), patchRemoveFinalizers(testNS, subscriptionName), }, }, { - Name: "subscription not deleted - channel update fails", + Name: "subscription not deleted - channel patch fails", Objects: []runtime.Object{ NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), @@ -2023,10 +1923,10 @@ func TestAllCases(t *testing.T) { }, Key: testNS + "/" + subscriptionName, WithReactors: []clientgotesting.ReactionFunc{ - InduceFailure("update", "inmemorychannels"), + InduceFailure("patch", "inmemorychannels"), }, WantEvents: []string{ - Eventf(corev1.EventTypeWarning, "PhysicalChannelSyncFailed", fmt.Sprintf("Failed to synchronize to channel %q: %s: %s", channelName, "failed to update channel testnamespace/origin", "inducing failure for update inmemorychannels")), + Eventf(corev1.EventTypeWarning, "PhysicalChannelSyncFailed", fmt.Sprintf("Failed to synchronize to channel %q: %s", channelName, "inducing failure for patch inmemorychannels")), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewSubscription(subscriptionName, testNS, @@ -2034,16 +1934,16 @@ func TestAllCases(t *testing.T) { WithSubscriptionChannel(imcV1GVK, channelName), WithInitSubscriptionConditions, MarkSubscriptionReady, - MarkChannelFailed("PhysicalChannelSyncFailed", "Failed to sync physical Channel: failed to update channel testnamespace/origin: inducing failure for update inmemorychannels"), + MarkNotAddedToChannel("PhysicalChannelSyncFailed", "Failed to sync physical Channel: inducing failure for patch inmemorychannels"), WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), WithSubscriptionFinalizers(finalizerName), WithSubscriptionPhysicalSubscriptionSubscriber(serviceURI), WithSubscriptionDeleted, ), }}, - WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: makeUnstructuredChannelable(imcV1GVK, nil, nil), - }}, + WantPatches: []clientgotesting.PatchActionImpl{ + patchSubscribers(testNS, channelName, nil), + }, }, { Name: "subscription deleted - channel does not exist", Objects: []runtime.Object{ @@ -2071,6 +1971,7 @@ func TestAllCases(t *testing.T) { }, }, } + logger := logtesting.TestLogger(t) table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher) controller.Reconciler { ctx = channelable.WithDuck(ctx) @@ -2096,6 +1997,36 @@ func WithSubscriptionDeliverySpec(d *eventingduck.DeliverySpec) SubscriptionOpti } } +func patchSubscribers(namespace, name string, subscribers []eventingduck.SubscriberSpec) clientgotesting.PatchActionImpl { + action := clientgotesting.PatchActionImpl{} + action.Name = name + action.Namespace = namespace + + var spec string + if subscribers != nil { + b, err := json.Marshal(subscribers) + if err != nil { + return action + } + ss := make([]map[string]interface{}, 0) + err = json.Unmarshal(b, &ss) + if err != nil { + return action + } + subs, err := json.Marshal(ss) + if err != nil { + return action + } + spec = fmt.Sprintf(`{"subscribers":%s}`, subs) + } else { + spec = `{"subscribers":null}` + } + + patch := `{"spec":` + spec + `}` + action.Patch = []byte(patch) + return action +} + func patchFinalizers(namespace, name string) clientgotesting.PatchActionImpl { action := clientgotesting.PatchActionImpl{} action.Name = name @@ -2113,29 +2044,3 @@ func patchRemoveFinalizers(namespace, name string) clientgotesting.PatchActionIm action.Patch = []byte(patch) return action } - -func makeUnstructuredChannelable(gvk metav1.GroupVersionKind, subscribers []eventingduck.SubscriberSpec, delivery *eventingduck.DeliverySpec) *unstructured.Unstructured { - ch := &eventingduck.Channelable{ - TypeMeta: metav1.TypeMeta{ - Kind: gvk.Kind, - APIVersion: fmt.Sprintf("%s/%s", gvk.Group, gvk.Version), - }, - ObjectMeta: metav1.ObjectMeta{ - Name: channelName, - Namespace: testNS, - Annotations: map[string]string{messaging.SubscribableDuckVersionAnnotation: "v1"}, - }, - Spec: eventingduck.ChannelableSpec{ - SubscribableSpec: eventingduck.SubscribableSpec{ - Subscribers: subscribers, - }, - Delivery: delivery, - }, - Status: eventingduck.ChannelableStatus{}, - } - uo, err := runtime.DefaultUnstructuredConverter.ToUnstructured(ch) - if err != nil { - return nil - } - return &unstructured.Unstructured{Object: uo} -} diff --git a/pkg/reconciler/testing/v1/subscription.go b/pkg/reconciler/testing/v1/subscription.go index aa3690ec0a0..b42e759672c 100644 --- a/pkg/reconciler/testing/v1/subscription.go +++ b/pkg/reconciler/testing/v1/subscription.go @@ -250,12 +250,6 @@ func MarkNotAddedToChannel(reason, msg string) SubscriptionOption { } } -func MarkChannelFailed(reason, msg string) SubscriptionOption { - return func(s *v1.Subscription) { - s.Status.MarkChannelFailed(reason, msg) - } -} - func MarkReferencesResolved(s *v1.Subscription) { s.Status.MarkReferencesResolved() }