diff --git a/control-plane/pkg/reconciler/consumergroup/consumergroup.go b/control-plane/pkg/reconciler/consumergroup/consumergroup.go index 6034bda26d..d68b1b1ee1 100644 --- a/control-plane/pkg/reconciler/consumergroup/consumergroup.go +++ b/control-plane/pkg/reconciler/consumergroup/consumergroup.go @@ -34,6 +34,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" "k8s.io/apiserver/pkg/storage/names" "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" @@ -47,6 +48,7 @@ import ( "knative.dev/pkg/resolver" sources "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/sources/v1beta1" + "knative.dev/eventing-kafka-broker/control-plane/pkg/prober" "knative.dev/eventing/pkg/scheduler" @@ -139,6 +141,15 @@ type Reconciler struct { // DeleteConsumerGroupMetadataCounter is an in-memory counter to count how many times we have // tried to delete consumer group metadata from Kafka. DeleteConsumerGroupMetadataCounter *counter.Counter + + // InitOffsetLatestInitialOffsetCache is the cache for consumer group offset initialization. + // + // When there is high load and multiple consumer group schedule calls, we get many + // `dial tcp 10.130.4.8:9092: i/o timeout` errors when trying to connect to Kafka. + // This leads to increased "time to readiness" for consumer groups. + InitOffsetLatestInitialOffsetCache prober.Cache + + EnqueueKey func(key string) } func (r *Reconciler) ReconcileKind(ctx context.Context, cg *kafkainternals.ConsumerGroup) reconciler.Event { @@ -220,6 +231,8 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, cg *kafkainternals.Consum r.DeleteConsumerGroupMetadataCounter.Del(string(cg.GetUID())) } + r.InitOffsetLatestInitialOffsetCache.Expire(keyOf(cg)) + return nil } @@ -492,6 +505,10 @@ func (r *Reconciler) reconcileInitialOffset(ctx context.Context, cg *kafkaintern return nil } + if status := r.InitOffsetLatestInitialOffsetCache.GetStatus(keyOf(cg)); status == prober.StatusReady { + return nil + } + saramaSecurityOption, err := r.newAuthConfigOption(ctx, cg) if err != nil { return fmt.Errorf("failed to create config options for Kafka cluster auth: %w", err) @@ -528,6 +545,10 @@ func (r *Reconciler) reconcileInitialOffset(ctx context.Context, cg *kafkaintern return fmt.Errorf("failed to initialize offset: %w", err) } + r.InitOffsetLatestInitialOffsetCache.UpsertStatus(keyOf(cg), prober.StatusReady, nil, func(key string, arg interface{}) { + r.EnqueueKey(key) + }) + return nil } @@ -764,3 +785,7 @@ func recordInitializeOffsetsLatency(ctx context.Context, cg *kafkainternals.Cons metrics.Record(ctx, initializeOffsetsLatencyStat.M(time.Since(startTime).Milliseconds())) }() } + +func keyOf(cg metav1.Object) string { + return types.NamespacedName{Namespace: cg.GetNamespace(), Name: cg.GetName()}.String() +} diff --git a/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go b/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go index 8bacd72d0c..f3a085447f 100644 --- a/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go +++ b/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go @@ -22,6 +22,7 @@ import ( "fmt" "io" "testing" + "time" "github.com/Shopify/sarama" corev1 "k8s.io/api/core/v1" @@ -34,6 +35,7 @@ import ( bindings "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/bindings/v1beta1" sources "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/sources/v1beta1" + "knative.dev/eventing-kafka-broker/control-plane/pkg/prober" "knative.dev/pkg/controller" "knative.dev/pkg/logging" @@ -1675,6 +1677,8 @@ func TestReconcileKind(t *testing.T) { SystemNamespace: systemNamespace, AutoscalerConfig: "", DeleteConsumerGroupMetadataCounter: counter.NewExpiringCounter(ctx), + InitOffsetLatestInitialOffsetCache: prober.NewLocalExpiringCache(ctx, time.Second), + EnqueueKey: func(key string) {}, } r.KafkaFeatureFlags = configapis.FromContext(store.ToContext(ctx)) @@ -1814,6 +1818,8 @@ func TestReconcileKindNoAutoscaler(t *testing.T) { }, SystemNamespace: systemNamespace, DeleteConsumerGroupMetadataCounter: counter.NewExpiringCounter(ctx), + InitOffsetLatestInitialOffsetCache: prober.NewLocalExpiringCache(ctx, time.Second), + EnqueueKey: func(key string) {}, } r.KafkaFeatureFlags = configapis.DefaultFeaturesConfig() @@ -2198,6 +2204,7 @@ func TestFinalizeKind(t *testing.T) { }, KafkaFeatureFlags: configapis.DefaultFeaturesConfig(), DeleteConsumerGroupMetadataCounter: counter.NewExpiringCounter(ctx), + InitOffsetLatestInitialOffsetCache: prober.NewLocalExpiringCache(ctx, time.Second), } return consumergroup.NewReconciler( diff --git a/control-plane/pkg/reconciler/consumergroup/controller.go b/control-plane/pkg/reconciler/consumergroup/controller.go index ac6b2ef8af..37c2ad1f40 100644 --- a/control-plane/pkg/reconciler/consumergroup/controller.go +++ b/control-plane/pkg/reconciler/consumergroup/controller.go @@ -33,7 +33,10 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apiserver/pkg/storage/names" "k8s.io/client-go/tools/cache" + "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/offset" + "knative.dev/eventing-kafka-broker/control-plane/pkg/prober" + "knative.dev/eventing/pkg/scheduler" kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset" @@ -127,6 +130,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I KedaClient: kedaclient.Get(ctx), AutoscalerConfig: env.AutoscalerConfigMap, DeleteConsumerGroupMetadataCounter: counter.NewExpiringCounter(ctx), + InitOffsetLatestInitialOffsetCache: prober.NewLocalExpiringCache(ctx, 20*time.Minute), } consumerInformer := consumer.Get(ctx) @@ -153,6 +157,13 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I }) r.Resolver = resolver.NewURIResolverFromTracker(ctx, impl.Tracker) + r.EnqueueKey = func(key string) { + parts := strings.SplitN(key, string(types.Separator), 3) + if len(parts) != 2 { + panic(fmt.Sprintf("Expected <namespace>/<name> format, got %s", key)) + } + impl.EnqueueKey(types.NamespacedName{Namespace: parts[0], Name: parts[1]}) + } configStore := config.NewStore(ctx, func(name string, value *config.KafkaFeatureFlags) { r.KafkaFeatureFlags.Reset(value) @@ -160,7 +171,16 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I }) configStore.WatchConfigs(watcher) - consumerGroupInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)) + consumerGroupInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: impl.Enqueue, + UpdateFunc: controller.PassNew(impl.Enqueue), + DeleteFunc: func(obj interface{}) { + impl.Enqueue(obj) + if cg, ok := obj.(metav1.Object); ok && cg != nil { + r.InitOffsetLatestInitialOffsetCache.Expire(keyOf(cg)) + } + }, + }) consumerInformer.Informer().AddEventHandler(controller.HandleAll(enqueueConsumerGroupFromConsumer(impl.EnqueueKey))) globalResync := func(interface{}) {