From 5a0ac4c25fb8bfaf24346e6985c97768eaa4d140 Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Mon, 6 Feb 2023 20:19:44 +0100 Subject: [PATCH] [release-1.9] Revert "Change subscription patch logic to ensure resource version (#6670) (#6725) This reverts commit https://github.com/knative/eventing/commit/4d6e1fc98d1a98c304566fa19843bcbb3fdea658. It has the side effect of dropping channel spec fields, so even immutable fields are dropped, hence channels will fail to get updated. We need to re-evalute the approach to fix the orginal issue: https://github.com/knative/eventing/issues/6636, patch will always have edge cases that will lead the original bug because subscriptions are reconciled independently from each other (and potentially by multiple controller replicas), so update is the only way of having concurrency control at the resource level but we should make sure that we're preserving unknown fields when updating channelables. --- pkg/reconciler/subscription/subscription.go | 85 +++--- .../subscription/subscription_test.go | 263 ++++++------------ pkg/reconciler/testing/v1/subscription.go | 6 - 3 files changed, 118 insertions(+), 236 deletions(-) diff --git a/pkg/reconciler/subscription/subscription.go b/pkg/reconciler/subscription/subscription.go index 7ff849d00a9..bcbe662b096 100644 --- a/pkg/reconciler/subscription/subscription.go +++ b/pkg/reconciler/subscription/subscription.go @@ -25,10 +25,11 @@ import ( "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/dynamic" - "k8s.io/client-go/util/retry" + + "knative.dev/pkg/apis/duck" duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/kref" "knative.dev/pkg/logging" @@ -152,11 +153,11 @@ func (r Reconciler) checkChannelStatusForSubscription(ctx context.Context, chann func (r Reconciler) syncChannel(ctx context.Context, channel *eventingduckv1.Channelable, sub *v1.Subscription) pkgreconciler.Event { // Ok, now that we have the Channel and at least one of the Call/Result, let's reconcile // the Channel with this information. - if updated, err := r.syncPhysicalChannel(ctx, sub, channel, false); err != nil { + if patched, err := r.syncPhysicalChannel(ctx, sub, channel, false); err != nil { logging.FromContext(ctx).Warnw("Failed to sync physical Channel", zap.Error(err)) - sub.Status.MarkChannelFailed(physicalChannelSyncFailed, "Failed to sync physical Channel: %v", err) + sub.Status.MarkNotAddedToChannel(physicalChannelSyncFailed, "Failed to sync physical Channel: %v", err) return pkgreconciler.NewEvent(corev1.EventTypeWarning, physicalChannelSyncFailed, "Failed to synchronize to channel %q: %w", channel.Name, err) - } else if updated { + } else if patched { if sub.DeletionTimestamp.IsZero() { sub.Status.MarkAddedToChannel() return pkgreconciler.NewEvent(corev1.EventTypeNormal, "SubscriberSync", "Subscription was synchronized to channel %q", channel.Name) @@ -346,7 +347,7 @@ func (r *Reconciler) trackAndFetchChannel(ctx context.Context, sub *v1.Subscript } // getChannel fetches the Channel as specified by the Subscriptions spec.Channel -// and verifies it's a channelable (so that we can operate on it via updates). +// and verifies it's a channelable (so that we can operate on it via patches). // If the Channel is a channels.messaging type (hence, it's only a factory for // underlying channels), fetch and validate the "backing" channel. func (r *Reconciler) getChannel(ctx context.Context, sub *v1.Subscription) (*eventingduckv1.Channelable, pkgreconciler.Event) { @@ -424,66 +425,48 @@ func isNilOrEmptyDestination(destination *duckv1.Destination) bool { func (r *Reconciler) syncPhysicalChannel(ctx context.Context, sub *v1.Subscription, channel *eventingduckv1.Channelable, isDeleted bool) (bool, error) { logging.FromContext(ctx).Debugw("Reconciling physical from Channel", zap.Any("sub", sub)) - if updated, updateErr := r.updateChannelable(ctx, sub.Namespace, channel, sub); updateErr != nil { - if isDeleted && apierrors.IsNotFound(updateErr) { + if patched, patchErr := r.patchSubscription(ctx, sub.Namespace, channel, sub); patchErr != nil { + if isDeleted && apierrors.IsNotFound(patchErr) { logging.FromContext(ctx).Warnw("Could not find Channel", zap.Any("channel", sub.Spec.Channel)) return false, nil } - return updated, updateErr + return patched, patchErr } else { - return updated, nil + return patched, nil } } -func (r *Reconciler) updateChannelable(ctx context.Context, namespace string, channel *eventingduckv1.Channelable, sub *v1.Subscription) (bool, error) { +func (r *Reconciler) patchSubscription(ctx context.Context, namespace string, channel *eventingduckv1.Channelable, sub *v1.Subscription) (bool, error) { + after := channel.DeepCopy() + + if sub.DeletionTimestamp.IsZero() { + r.updateChannelAddSubscription(after, sub) + } else { + r.updateChannelRemoveSubscription(after, sub) + } + + patch, err := duck.CreateMergePatch(channel, after) + if err != nil { + return false, err + } + // If there is nothing to patch, we are good, just return. + // Empty patch is {}, hence we check for that. + if len(patch) <= 2 { + return false, nil + } + resourceClient, err := eventingduck.ResourceInterface(r.dynamicClientSet, namespace, channel.GroupVersionKind()) if err != nil { logging.FromContext(ctx).Warnw("Failed to create dynamic resource client", zap.Error(err)) return false, err } - - updated := false - - uo := &unstructured.Unstructured{} - err = retry.RetryOnConflict(retry.DefaultRetry, func() error { - channel, err = r.getChannel(ctx, sub) - if err != nil { - return fmt.Errorf("failed to get channel for subscription %s/%s: %w", sub.GetNamespace(), sub.GetName(), err) - } - - after := channel.DeepCopy() - - if sub.DeletionTimestamp.IsZero() { - r.updateChannelAddSubscription(after, sub) - } else { - r.updateChannelRemoveSubscription(after, sub) - } - - // If there is nothing to update on subscribers, we are good, just return. - if equality.Semantic.DeepEqual(channel.Spec.Subscribers, after.Spec.Subscribers) { - return nil - } else { - updated = true - } - - after.Status = eventingduckv1.ChannelableStatus{} - output, err := runtime.DefaultUnstructuredConverter.ToUnstructured(after) - if err != nil { - return err - } - - uo.Object = output - if _, err = resourceClient.Update(ctx, uo, metav1.UpdateOptions{}); err != nil { - return fmt.Errorf("failed to update channel %s/%s: %w", uo.GetNamespace(), uo.GetName(), err) - } - return nil - }) + patched, err := resourceClient.Patch(ctx, channel.GetName(), types.MergePatchType, patch, metav1.PatchOptions{}) if err != nil { - logging.FromContext(ctx).Warnw("Failed to update the Channel", zap.Error(err), zap.Any("update", uo)) + logging.FromContext(ctx).Warnw("Failed to patch the Channel", zap.Error(err), zap.Any("patch", patch)) return false, err } - - return updated, nil + logging.FromContext(ctx).Debugw("Patched resource", zap.Any("patch", patch), zap.Any("patched", patched)) + return true, nil } func (r *Reconciler) updateChannelRemoveSubscription(channel *eventingduckv1.Channelable, sub *v1.Subscription) { diff --git a/pkg/reconciler/subscription/subscription_test.go b/pkg/reconciler/subscription/subscription_test.go index c946f5b1843..30d1dc8299d 100644 --- a/pkg/reconciler/subscription/subscription_test.go +++ b/pkg/reconciler/subscription/subscription_test.go @@ -18,12 +18,10 @@ package subscription import ( "context" + "encoding/json" "fmt" "testing" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "knative.dev/eventing/pkg/apis/messaging" - corev1 "k8s.io/api/core/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -905,18 +903,14 @@ func TestAllCases(t *testing.T) { WithInitSubscriptionConditions, MarkReferencesResolved, MarkAddedToChannel, + WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI), ), }}, - WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ - { - UID: subscriptionUID, - SubscriberURI: subscriberURI, - }, - }, nil), - }}, WantPatches: []clientgotesting.PatchActionImpl{ + patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ + {UID: subscriptionUID, SubscriberURI: subscriberURI}, + }), patchFinalizers(testNS, subscriptionName), }, }, { @@ -999,19 +993,10 @@ func TestAllCases(t *testing.T) { WithSubscriptionDeadLetterSinkURI(dlsURI), ), }}, - WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ - { - UID: subscriptionUID, - SubscriberURI: subscriberURI, - Delivery: &eventingduck.DeliverySpec{ - DeadLetterSink: &duckv1.Destination{ - URI: dlsURI, - }, - }}, - }, nil), - }}, WantPatches: []clientgotesting.PatchActionImpl{ + patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ + {UID: subscriptionUID, SubscriberURI: subscriberURI, Delivery: &eventingduck.DeliverySpec{DeadLetterSink: &duckv1.Destination{URI: apis.HTTP("dls.mynamespace.svc.cluster.local")}}}, + }), patchFinalizers(testNS, subscriptionName), }, }, { @@ -1058,15 +1043,10 @@ func TestAllCases(t *testing.T) { WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI), ), }}, - WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ - { - UID: subscriptionUID, - SubscriberURI: subscriberURI, - }, - }, nil), - }}, WantPatches: []clientgotesting.PatchActionImpl{ + patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ + {UID: subscriptionUID, SubscriberURI: subscriberURI}, + }), patchFinalizers(testNS, subscriptionName), }, }, { @@ -1152,16 +1132,10 @@ func TestAllCases(t *testing.T) { WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI), ), }}, - WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ - { - UID: subscriptionUID, - SubscriberURI: subscriberURI, - ReplyURI: replyURI, - }, - }, nil), - }}, WantPatches: []clientgotesting.PatchActionImpl{ + patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ + {UID: subscriptionUID, ReplyURI: replyURI, SubscriberURI: subscriberURI}, + }), patchFinalizers(testNS, subscriptionName), }, }, { @@ -1206,16 +1180,10 @@ func TestAllCases(t *testing.T) { WithSubscriptionPhysicalSubscriptionReply(replyURI), ), }}, - WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ - { - UID: subscriptionUID, - SubscriberURI: subscriberURI, - ReplyURI: replyURI, - }, - }, nil), - }}, WantPatches: []clientgotesting.PatchActionImpl{ + patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ + {UID: subscriptionUID, SubscriberURI: subscriberURI, ReplyURI: replyURI}, + }), patchFinalizers(testNS, subscriptionName), }, }, { @@ -1261,15 +1229,11 @@ func TestAllCases(t *testing.T) { WithSubscriptionStatusObservedGeneration(subscriptionGeneration), ), }}, - WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ - { - UID: subscriptionUID, - SubscriberURI: subscriberURI, - Generation: subscriptionGeneration, - }, - }, nil), - }}, + WantPatches: []clientgotesting.PatchActionImpl{ + patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ + {UID: subscriptionUID, Generation: subscriptionGeneration, SubscriberURI: subscriberURI}, + }), + }, }, { Name: "v1 imc+subscriber as service", Objects: []runtime.Object{ @@ -1303,15 +1267,10 @@ func TestAllCases(t *testing.T) { WithSubscriptionPhysicalSubscriptionSubscriber(serviceURI), ), }}, - WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ - { - UID: subscriptionUID, - SubscriberURI: serviceURI, - }, - }, nil), - }}, WantPatches: []clientgotesting.PatchActionImpl{ + patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ + {UID: subscriptionUID, SubscriberURI: serviceURI}, + }), patchFinalizers(testNS, subscriptionName), }, }, { @@ -1357,15 +1316,10 @@ func TestAllCases(t *testing.T) { WithSubscriptionPhysicalSubscriptionSubscriber(serviceURI), ), }}, - WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ - { - UID: "a-" + subscriptionUID, - SubscriberURI: serviceURI, - }, - }, nil), - }}, WantPatches: []clientgotesting.PatchActionImpl{ + patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ + {UID: "a-" + subscriptionUID, SubscriberURI: serviceURI}, + }), patchFinalizers(testNS, "a-"+subscriptionName), }, }, { @@ -1421,20 +1375,10 @@ func TestAllCases(t *testing.T) { WithSubscriptionDeadLetterSinkURI(dlsURI), ), }}, - WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ - { - UID: "a-" + subscriptionUID, - SubscriberURI: serviceURI, - Delivery: &eventingduck.DeliverySpec{ - DeadLetterSink: &duckv1.Destination{ - URI: apis.HTTP("dls.mynamespace.svc.cluster.local"), - }, - }, - }, - }, nil), - }}, WantPatches: []clientgotesting.PatchActionImpl{ + patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ + {UID: "a-" + subscriptionUID, SubscriberURI: serviceURI, Delivery: &eventingduck.DeliverySpec{DeadLetterSink: &duckv1.Destination{URI: apis.HTTP("dls.mynamespace.svc.cluster.local")}}}, + }), patchFinalizers(testNS, "a-"+subscriptionName), }, }, @@ -1503,8 +1447,8 @@ func TestAllCases(t *testing.T) { WithSubscriptionDeadLetterSinkURI(dlsURI), ), }}, - WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ + WantPatches: []clientgotesting.PatchActionImpl{ + patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ { UID: "a-" + subscriptionUID, SubscriberURI: serviceURI, @@ -1517,9 +1461,7 @@ func TestAllCases(t *testing.T) { BackoffDelay: pointer.String("PT1S"), }, }, - }, nil), - }}, - WantPatches: []clientgotesting.PatchActionImpl{ + }), patchFinalizers(testNS, "a-"+subscriptionName), }, }, @@ -1575,8 +1517,8 @@ func TestAllCases(t *testing.T) { WithSubscriptionDeadLetterSinkURI(dlcURI), ), }}, - WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ + WantPatches: []clientgotesting.PatchActionImpl{ + patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ { UID: "a-" + subscriptionUID, SubscriberURI: serviceURI, @@ -1589,21 +1531,7 @@ func TestAllCases(t *testing.T) { BackoffDelay: pointer.String("PT1S"), }, }, - }, &eventingduck.DeliverySpec{ - DeadLetterSink: &duckv1.Destination{ - Ref: &duckv1.KReference{ - APIVersion: subscriberGVK.Group + "/" + subscriberGVK.Version, - Kind: subscriberGVK.Kind, - Name: dlcName, - Namespace: testNS, - }, - }, - Retry: pointer.Int32(10), - BackoffPolicy: &linear, - BackoffDelay: pointer.String("PT1S"), }), - }}, - WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, "a-"+subscriptionName), }, }, @@ -1653,8 +1581,8 @@ func TestAllCases(t *testing.T) { WithSubscriptionPhysicalSubscriptionSubscriber(serviceURI), ), }}, - WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ + WantPatches: []clientgotesting.PatchActionImpl{ + patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ { UID: "a-" + subscriptionUID, SubscriberURI: serviceURI, @@ -1663,12 +1591,7 @@ func TestAllCases(t *testing.T) { RetryAfterMax: pointer.String("PT2S"), }, }, - }, &eventingduck.DeliverySpec{ - Timeout: pointer.String("PT1S"), - RetryAfterMax: pointer.String("PT2S"), }), - }}, - WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, "a-"+subscriptionName), }, }, @@ -1709,15 +1632,13 @@ func TestAllCases(t *testing.T) { WithSubscriptionPhysicalSubscriptionSubscriber(serviceURI), ), }}, - WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ + WantPatches: []clientgotesting.PatchActionImpl{ + patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ { UID: "a-" + subscriptionUID, SubscriberURI: serviceURI, }, - }, nil), - }}, - WantPatches: []clientgotesting.PatchActionImpl{ + }), patchFinalizers(testNS, "a-"+subscriptionName), }, }, @@ -1853,8 +1774,8 @@ func TestAllCases(t *testing.T) { WithSubscriptionDeadLetterSinkURI(dlsURI), ), }}, - WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ + WantPatches: []clientgotesting.PatchActionImpl{ + patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ { UID: "a-" + subscriptionUID, SubscriberURI: serviceURI, @@ -1867,21 +1788,7 @@ func TestAllCases(t *testing.T) { BackoffDelay: pointer.String("PT1S"), }, }, - }, &eventingduck.DeliverySpec{ - DeadLetterSink: &duckv1.Destination{ - Ref: &duckv1.KReference{ - APIVersion: subscriberGVK.Group + "/" + subscriberGVK.Version, - Kind: subscriberGVK.Kind, - Name: dlc2Name, - Namespace: testNS, - }, - }, - Retry: pointer.Int32(20), - BackoffPolicy: &linear, - BackoffDelay: pointer.String("PT10S"), }), - }}, - WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, "a-"+subscriptionName), }, }, @@ -1941,8 +1848,8 @@ func TestAllCases(t *testing.T) { }), ), }}, - WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: makeUnstructuredChannelable(imcV1GVK, []eventingduck.SubscriberSpec{ + WantPatches: []clientgotesting.PatchActionImpl{ + patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ { UID: "a-" + subscriptionUID, SubscriberURI: serviceURI, @@ -1951,12 +1858,7 @@ func TestAllCases(t *testing.T) { RetryAfterMax: pointer.String("PT2S"), }, }, - }, &eventingduck.DeliverySpec{ - Timeout: pointer.String("PT10S"), - RetryAfterMax: pointer.String("PT20S"), }), - }}, - WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, "a-"+subscriptionName), }, }, @@ -1990,14 +1892,12 @@ func TestAllCases(t *testing.T) { Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", subscriptionName), Eventf(corev1.EventTypeNormal, "SubscriberRemoved", "Subscription was removed from channel \"origin\""), }, - WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: makeUnstructuredChannelable(imcV1GVK, nil, nil), - }}, WantPatches: []clientgotesting.PatchActionImpl{ + patchSubscribers(testNS, channelName, nil), patchRemoveFinalizers(testNS, subscriptionName), }, }, { - Name: "subscription not deleted - channel update fails", + Name: "subscription not deleted - channel patch fails", Objects: []runtime.Object{ NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), @@ -2023,10 +1923,10 @@ func TestAllCases(t *testing.T) { }, Key: testNS + "/" + subscriptionName, WithReactors: []clientgotesting.ReactionFunc{ - InduceFailure("update", "inmemorychannels"), + InduceFailure("patch", "inmemorychannels"), }, WantEvents: []string{ - Eventf(corev1.EventTypeWarning, "PhysicalChannelSyncFailed", fmt.Sprintf("Failed to synchronize to channel %q: %s: %s", channelName, "failed to update channel testnamespace/origin", "inducing failure for update inmemorychannels")), + Eventf(corev1.EventTypeWarning, "PhysicalChannelSyncFailed", fmt.Sprintf("Failed to synchronize to channel %q: %s", channelName, "inducing failure for patch inmemorychannels")), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewSubscription(subscriptionName, testNS, @@ -2034,16 +1934,16 @@ func TestAllCases(t *testing.T) { WithSubscriptionChannel(imcV1GVK, channelName), WithInitSubscriptionConditions, MarkSubscriptionReady, - MarkChannelFailed("PhysicalChannelSyncFailed", "Failed to sync physical Channel: failed to update channel testnamespace/origin: inducing failure for update inmemorychannels"), + MarkNotAddedToChannel("PhysicalChannelSyncFailed", "Failed to sync physical Channel: inducing failure for patch inmemorychannels"), WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), WithSubscriptionFinalizers(finalizerName), WithSubscriptionPhysicalSubscriptionSubscriber(serviceURI), WithSubscriptionDeleted, ), }}, - WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: makeUnstructuredChannelable(imcV1GVK, nil, nil), - }}, + WantPatches: []clientgotesting.PatchActionImpl{ + patchSubscribers(testNS, channelName, nil), + }, }, { Name: "subscription deleted - channel does not exist", Objects: []runtime.Object{ @@ -2071,6 +1971,7 @@ func TestAllCases(t *testing.T) { }, }, } + logger := logtesting.TestLogger(t) table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher) controller.Reconciler { ctx = channelable.WithDuck(ctx) @@ -2096,6 +1997,36 @@ func WithSubscriptionDeliverySpec(d *eventingduck.DeliverySpec) SubscriptionOpti } } +func patchSubscribers(namespace, name string, subscribers []eventingduck.SubscriberSpec) clientgotesting.PatchActionImpl { + action := clientgotesting.PatchActionImpl{} + action.Name = name + action.Namespace = namespace + + var spec string + if subscribers != nil { + b, err := json.Marshal(subscribers) + if err != nil { + return action + } + ss := make([]map[string]interface{}, 0) + err = json.Unmarshal(b, &ss) + if err != nil { + return action + } + subs, err := json.Marshal(ss) + if err != nil { + return action + } + spec = fmt.Sprintf(`{"subscribers":%s}`, subs) + } else { + spec = `{"subscribers":null}` + } + + patch := `{"spec":` + spec + `}` + action.Patch = []byte(patch) + return action +} + func patchFinalizers(namespace, name string) clientgotesting.PatchActionImpl { action := clientgotesting.PatchActionImpl{} action.Name = name @@ -2113,29 +2044,3 @@ func patchRemoveFinalizers(namespace, name string) clientgotesting.PatchActionIm action.Patch = []byte(patch) return action } - -func makeUnstructuredChannelable(gvk metav1.GroupVersionKind, subscribers []eventingduck.SubscriberSpec, delivery *eventingduck.DeliverySpec) *unstructured.Unstructured { - ch := &eventingduck.Channelable{ - TypeMeta: metav1.TypeMeta{ - Kind: gvk.Kind, - APIVersion: fmt.Sprintf("%s/%s", gvk.Group, gvk.Version), - }, - ObjectMeta: metav1.ObjectMeta{ - Name: channelName, - Namespace: testNS, - Annotations: map[string]string{messaging.SubscribableDuckVersionAnnotation: "v1"}, - }, - Spec: eventingduck.ChannelableSpec{ - SubscribableSpec: eventingduck.SubscribableSpec{ - Subscribers: subscribers, - }, - Delivery: delivery, - }, - Status: eventingduck.ChannelableStatus{}, - } - uo, err := runtime.DefaultUnstructuredConverter.ToUnstructured(ch) - if err != nil { - return nil - } - return &unstructured.Unstructured{Object: uo} -} diff --git a/pkg/reconciler/testing/v1/subscription.go b/pkg/reconciler/testing/v1/subscription.go index aa3690ec0a0..b42e759672c 100644 --- a/pkg/reconciler/testing/v1/subscription.go +++ b/pkg/reconciler/testing/v1/subscription.go @@ -250,12 +250,6 @@ func MarkNotAddedToChannel(reason, msg string) SubscriptionOption { } } -func MarkChannelFailed(reason, msg string) SubscriptionOption { - return func(s *v1.Subscription) { - s.Status.MarkChannelFailed(reason, msg) - } -} - func MarkReferencesResolved(s *v1.Subscription) { s.Status.MarkReferencesResolved() }