Skip to content

Commit

Permalink
Change subscription patch logic to ensure resource version (#6670)
Browse files Browse the repository at this point in the history
Fixes #6636 

<!-- Please include the 'why' behind your changes if no issue exists -->

In `Subscription`'s reconcile loops, physical channel is updated by
`PATCH` logic. It occurs broken sync between the channel and
subscriptions.

For ensuring a resource version which can check whether conflict
occurred or not, change to `Update` with RetryOnConflict.

## Proposed Changes

<!-- Please categorize your changes:
- 🎁 Add new feature
- 🐛 Fix bug
- 🧹 Update or clean up current behavior
- 🗑️ Remove feature or internal logic
-->

- Change Patch to Update
- Use RetryOnConflict
- Change condition when sync failed

### Pre-review Checklist

<!-- If these boxes are not checked, you will be asked to complete these
requirements or explain why they do not apply to your PR. -->

- [ ] **At least 80% unit test coverage**
- [ ] **E2E tests** for any new behavior
- [ ] **Docs PR** for any user-facing impact
- [ ] **Spec PR** for any new API feature
- [ ] **Conformance test** for any change to the spec

**Release Note**

<!--
📄 If this change has user-visible impact, write a release
note in the block
below. Include the string "action required" if additional action is
required of
users switching to the new release, for example in case of a breaking
change.

Write as if you are speaking to users, not other Knative contributors.
If this
change has no user-visible impact, no release note is needed.
-->

```release-note

```


**Docs**

<!--
📖 If this change has user-visible impact, link to an issue or PR in
https://github.com/knative/docs.
-->

# Open Questions
If there are some users who are already affected by the bug related to
this issue, this PR cannot fix them. What should we do?

Co-authored-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>
  • Loading branch information
WoWsj and pierDipi authored Jan 25, 2023
1 parent 74e165a commit 4d6e1fc
Show file tree
Hide file tree
Showing 3 changed files with 235 additions and 119 deletions.
86 changes: 51 additions & 35 deletions pkg/reconciler/subscription/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,15 @@ package subscription
import (
"context"
"fmt"

"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"

"knative.dev/pkg/apis/duck"
"k8s.io/client-go/util/retry"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/kref"
"knative.dev/pkg/logging"
Expand Down Expand Up @@ -153,11 +151,11 @@ func (r Reconciler) checkChannelStatusForSubscription(ctx context.Context, chann
func (r Reconciler) syncChannel(ctx context.Context, channel *eventingduckv1.Channelable, sub *v1.Subscription) pkgreconciler.Event {
// Ok, now that we have the Channel and at least one of the Call/Result, let's reconcile
// the Channel with this information.
if patched, err := r.syncPhysicalChannel(ctx, sub, channel, false); err != nil {
if updated, err := r.syncPhysicalChannel(ctx, sub, channel, false); err != nil {
logging.FromContext(ctx).Warnw("Failed to sync physical Channel", zap.Error(err))
sub.Status.MarkNotAddedToChannel(physicalChannelSyncFailed, "Failed to sync physical Channel: %v", err)
sub.Status.MarkChannelFailed(physicalChannelSyncFailed, "Failed to sync physical Channel: %v", err)
return pkgreconciler.NewEvent(corev1.EventTypeWarning, physicalChannelSyncFailed, "Failed to synchronize to channel %q: %w", channel.Name, err)
} else if patched {
} else if updated {
if sub.DeletionTimestamp.IsZero() {
sub.Status.MarkAddedToChannel()
return pkgreconciler.NewEvent(corev1.EventTypeNormal, "SubscriberSync", "Subscription was synchronized to channel %q", channel.Name)
Expand Down Expand Up @@ -347,7 +345,7 @@ func (r *Reconciler) trackAndFetchChannel(ctx context.Context, sub *v1.Subscript
}

// getChannel fetches the Channel as specified by the Subscriptions spec.Channel
// and verifies it's a channelable (so that we can operate on it via patches).
// and verifies it's a channelable (so that we can operate on it via updates).
// If the Channel is a channels.messaging type (hence, it's only a factory for
// underlying channels), fetch and validate the "backing" channel.
func (r *Reconciler) getChannel(ctx context.Context, sub *v1.Subscription) (*eventingduckv1.Channelable, pkgreconciler.Event) {
Expand Down Expand Up @@ -425,48 +423,66 @@ func isNilOrEmptyDestination(destination *duckv1.Destination) bool {

func (r *Reconciler) syncPhysicalChannel(ctx context.Context, sub *v1.Subscription, channel *eventingduckv1.Channelable, isDeleted bool) (bool, error) {
logging.FromContext(ctx).Debugw("Reconciling physical from Channel", zap.Any("sub", sub))
if patched, patchErr := r.patchSubscription(ctx, sub.Namespace, channel, sub); patchErr != nil {
if isDeleted && apierrors.IsNotFound(patchErr) {
if updated, updateErr := r.updateChannelable(ctx, sub.Namespace, channel, sub); updateErr != nil {
if isDeleted && apierrors.IsNotFound(updateErr) {
logging.FromContext(ctx).Warnw("Could not find Channel", zap.Any("channel", sub.Spec.Channel))
return false, nil
}
return patched, patchErr
return updated, updateErr
} else {
return patched, nil
return updated, nil
}
}

func (r *Reconciler) patchSubscription(ctx context.Context, namespace string, channel *eventingduckv1.Channelable, sub *v1.Subscription) (bool, error) {
after := channel.DeepCopy()

if sub.DeletionTimestamp.IsZero() {
r.updateChannelAddSubscription(after, sub)
} else {
r.updateChannelRemoveSubscription(after, sub)
}

patch, err := duck.CreateMergePatch(channel, after)
if err != nil {
return false, err
}
// If there is nothing to patch, we are good, just return.
// Empty patch is {}, hence we check for that.
if len(patch) <= 2 {
return false, nil
}

func (r *Reconciler) updateChannelable(ctx context.Context, namespace string, channel *eventingduckv1.Channelable, sub *v1.Subscription) (bool, error) {
resourceClient, err := eventingduck.ResourceInterface(r.dynamicClientSet, namespace, channel.GroupVersionKind())
if err != nil {
logging.FromContext(ctx).Warnw("Failed to create dynamic resource client", zap.Error(err))
return false, err
}
patched, err := resourceClient.Patch(ctx, channel.GetName(), types.MergePatchType, patch, metav1.PatchOptions{})

updated := false

uo := &unstructured.Unstructured{}
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
channel, err = r.getChannel(ctx, sub)
if err != nil {
return fmt.Errorf("failed to get channel for subscription %s/%s: %w", sub.GetNamespace(), sub.GetName(), err)
}

after := channel.DeepCopy()

if sub.DeletionTimestamp.IsZero() {
r.updateChannelAddSubscription(after, sub)
} else {
r.updateChannelRemoveSubscription(after, sub)
}

// If there is nothing to update on subscribers, we are good, just return.
if equality.Semantic.DeepEqual(channel.Spec.Subscribers, after.Spec.Subscribers) {
return nil
} else {
updated = true
}

after.Status = eventingduckv1.ChannelableStatus{}
output, err := runtime.DefaultUnstructuredConverter.ToUnstructured(after)
if err != nil {
return err
}

uo.Object = output
if _, err = resourceClient.Update(ctx, uo, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to update channel %s/%s: %w", uo.GetNamespace(), uo.GetName(), err)
}
return nil
})
if err != nil {
logging.FromContext(ctx).Warnw("Failed to patch the Channel", zap.Error(err), zap.Any("patch", patch))
logging.FromContext(ctx).Warnw("Failed to update the Channel", zap.Error(err), zap.Any("update", uo))
return false, err
}
logging.FromContext(ctx).Debugw("Patched resource", zap.Any("patch", patch), zap.Any("patched", patched))
return true, nil

return updated, nil
}

func (r *Reconciler) updateChannelRemoveSubscription(channel *eventingduckv1.Channelable, sub *v1.Subscription) {
Expand Down
Loading

0 comments on commit 4d6e1fc

Please sign in to comment.