Skip to content

Commit

Permalink
Retry reconcile subscriber error (knative-extensions#1115) (#598)
Browse files Browse the repository at this point in the history
Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com>

Co-authored-by: Pierangelo Di Pilato <pdipilat@redhat.com>

Co-authored-by: Knative Prow Robot <knative-prow-robot@google.com>
Co-authored-by: Pierangelo Di Pilato <pdipilat@redhat.com>
  • Loading branch information
3 people authored Mar 1, 2022
1 parent 0e85783 commit 9745510
Showing 1 changed file with 7 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/Shopify/sarama"
"github.com/google/go-cmp/cmp"
"go.uber.org/multierr"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
Expand Down Expand Up @@ -266,6 +267,9 @@ func (r *Reconciler) reconcileSubscribers(ctx context.Context, ch *v1beta1.Kafka
after := ch.DeepCopy()
after.Status.Subscribers = make([]v1.SubscriberStatus, 0)
logger := logging.FromContext(ctx)

var globalErr error

for _, s := range ch.Spec.Subscribers {
logger.Debugw("Reconciling initial offset for subscription", zap.Any("subscription", s), zap.Any("channel", ch))
err := r.reconcileInitialOffset(ctx, ch, s, kafkaClient, kafkaClusterAdmin)
Expand All @@ -278,6 +282,7 @@ func (r *Reconciler) reconcileSubscribers(ctx context.Context, ch *v1beta1.Kafka
Ready: corev1.ConditionFalse,
Message: fmt.Sprintf("Initial offset cannot be committed: %v", err),
})
globalErr = multierr.Append(globalErr, err)
} else {
logger.Debugw("Reconciled initial offset for subscription. Marking the subscription ready", zap.String("channel", fmt.Sprintf("%s.%s", ch.Namespace, ch.Name)), zap.Any("subscription", s))
after.Status.Subscribers = append(after.Status.Subscribers, v1.SubscriberStatus{
Expand Down Expand Up @@ -307,7 +312,8 @@ func (r *Reconciler) reconcileSubscribers(ctx context.Context, ch *v1beta1.Kafka
return fmt.Errorf("Failed patching: %w", err)
}
logger.Debugw("Patched resource", zap.Any("patch", patch), zap.Any("patched", patched))
return nil

return globalErr
}

func (r *Reconciler) reconcileDispatcher(ctx context.Context, scope string, dispatcherNamespace string, kc *v1beta1.KafkaChannel) error {
Expand Down

0 comments on commit 9745510

Please sign in to comment.