Skip to content

Commit

Permalink
[release-1.9] Revert "Change subscription patch logic to ensure resou…
Browse files Browse the repository at this point in the history
…rce version (#6670) (#6725)

This reverts commit
4d6e1fc.

It has the side effect of dropping channel spec fields, so even
immutable
fields are dropped, hence channels will fail to get updated.

We need to re-evalute the approach to fix the orginal issue:
#6636, patch will always
have edge cases that will lead the original bug because subscriptions
are reconciled independently from each other (and potentially by
multiple controller replicas), so update is the only way of having
concurrency control at the resource level but we should make sure
that we're preserving unknown fields when updating channelables.
  • Loading branch information
pierDipi authored Feb 6, 2023
1 parent f6cce6b commit 5a0ac4c
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 236 deletions.
85 changes: 34 additions & 51 deletions pkg/reconciler/subscription/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ import (
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/util/retry"

"knative.dev/pkg/apis/duck"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/kref"
"knative.dev/pkg/logging"
Expand Down Expand Up @@ -152,11 +153,11 @@ func (r Reconciler) checkChannelStatusForSubscription(ctx context.Context, chann
func (r Reconciler) syncChannel(ctx context.Context, channel *eventingduckv1.Channelable, sub *v1.Subscription) pkgreconciler.Event {
// Ok, now that we have the Channel and at least one of the Call/Result, let's reconcile
// the Channel with this information.
if updated, err := r.syncPhysicalChannel(ctx, sub, channel, false); err != nil {
if patched, err := r.syncPhysicalChannel(ctx, sub, channel, false); err != nil {
logging.FromContext(ctx).Warnw("Failed to sync physical Channel", zap.Error(err))
sub.Status.MarkChannelFailed(physicalChannelSyncFailed, "Failed to sync physical Channel: %v", err)
sub.Status.MarkNotAddedToChannel(physicalChannelSyncFailed, "Failed to sync physical Channel: %v", err)
return pkgreconciler.NewEvent(corev1.EventTypeWarning, physicalChannelSyncFailed, "Failed to synchronize to channel %q: %w", channel.Name, err)
} else if updated {
} else if patched {
if sub.DeletionTimestamp.IsZero() {
sub.Status.MarkAddedToChannel()
return pkgreconciler.NewEvent(corev1.EventTypeNormal, "SubscriberSync", "Subscription was synchronized to channel %q", channel.Name)
Expand Down Expand Up @@ -346,7 +347,7 @@ func (r *Reconciler) trackAndFetchChannel(ctx context.Context, sub *v1.Subscript
}

// getChannel fetches the Channel as specified by the Subscriptions spec.Channel
// and verifies it's a channelable (so that we can operate on it via updates).
// and verifies it's a channelable (so that we can operate on it via patches).
// If the Channel is a channels.messaging type (hence, it's only a factory for
// underlying channels), fetch and validate the "backing" channel.
func (r *Reconciler) getChannel(ctx context.Context, sub *v1.Subscription) (*eventingduckv1.Channelable, pkgreconciler.Event) {
Expand Down Expand Up @@ -424,66 +425,48 @@ func isNilOrEmptyDestination(destination *duckv1.Destination) bool {

func (r *Reconciler) syncPhysicalChannel(ctx context.Context, sub *v1.Subscription, channel *eventingduckv1.Channelable, isDeleted bool) (bool, error) {
logging.FromContext(ctx).Debugw("Reconciling physical from Channel", zap.Any("sub", sub))
if updated, updateErr := r.updateChannelable(ctx, sub.Namespace, channel, sub); updateErr != nil {
if isDeleted && apierrors.IsNotFound(updateErr) {
if patched, patchErr := r.patchSubscription(ctx, sub.Namespace, channel, sub); patchErr != nil {
if isDeleted && apierrors.IsNotFound(patchErr) {
logging.FromContext(ctx).Warnw("Could not find Channel", zap.Any("channel", sub.Spec.Channel))
return false, nil
}
return updated, updateErr
return patched, patchErr
} else {
return updated, nil
return patched, nil
}
}

func (r *Reconciler) updateChannelable(ctx context.Context, namespace string, channel *eventingduckv1.Channelable, sub *v1.Subscription) (bool, error) {
func (r *Reconciler) patchSubscription(ctx context.Context, namespace string, channel *eventingduckv1.Channelable, sub *v1.Subscription) (bool, error) {
after := channel.DeepCopy()

if sub.DeletionTimestamp.IsZero() {
r.updateChannelAddSubscription(after, sub)
} else {
r.updateChannelRemoveSubscription(after, sub)
}

patch, err := duck.CreateMergePatch(channel, after)
if err != nil {
return false, err
}
// If there is nothing to patch, we are good, just return.
// Empty patch is {}, hence we check for that.
if len(patch) <= 2 {
return false, nil
}

resourceClient, err := eventingduck.ResourceInterface(r.dynamicClientSet, namespace, channel.GroupVersionKind())
if err != nil {
logging.FromContext(ctx).Warnw("Failed to create dynamic resource client", zap.Error(err))
return false, err
}

updated := false

uo := &unstructured.Unstructured{}
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
channel, err = r.getChannel(ctx, sub)
if err != nil {
return fmt.Errorf("failed to get channel for subscription %s/%s: %w", sub.GetNamespace(), sub.GetName(), err)
}

after := channel.DeepCopy()

if sub.DeletionTimestamp.IsZero() {
r.updateChannelAddSubscription(after, sub)
} else {
r.updateChannelRemoveSubscription(after, sub)
}

// If there is nothing to update on subscribers, we are good, just return.
if equality.Semantic.DeepEqual(channel.Spec.Subscribers, after.Spec.Subscribers) {
return nil
} else {
updated = true
}

after.Status = eventingduckv1.ChannelableStatus{}
output, err := runtime.DefaultUnstructuredConverter.ToUnstructured(after)
if err != nil {
return err
}

uo.Object = output
if _, err = resourceClient.Update(ctx, uo, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to update channel %s/%s: %w", uo.GetNamespace(), uo.GetName(), err)
}
return nil
})
patched, err := resourceClient.Patch(ctx, channel.GetName(), types.MergePatchType, patch, metav1.PatchOptions{})
if err != nil {
logging.FromContext(ctx).Warnw("Failed to update the Channel", zap.Error(err), zap.Any("update", uo))
logging.FromContext(ctx).Warnw("Failed to patch the Channel", zap.Error(err), zap.Any("patch", patch))
return false, err
}

return updated, nil
logging.FromContext(ctx).Debugw("Patched resource", zap.Any("patch", patch), zap.Any("patched", patched))
return true, nil
}

func (r *Reconciler) updateChannelRemoveSubscription(channel *eventingduckv1.Channelable, sub *v1.Subscription) {
Expand Down
Loading

0 comments on commit 5a0ac4c

Please sign in to comment.