Skip to content

Commit

Permalink
Cache init offsets results (knative-extensions#817)
Browse files Browse the repository at this point in the history
When there is high load and multiple consumer group schedule calls,
we get many `dial tcp 10.130.4.8:9092: i/o timeout` errors when
trying to connect to Kafka.
This leads to increased "time to readiness" for consumer groups.

The downside of caching is that, in the case, partitions
increase while the result is cached we won't initialize
the offsets of the new partitions.

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
  • Loading branch information
openshift-cherrypick-robot authored and pierDipi committed Sep 20, 2023
1 parent e7f53ae commit 78dcca3
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 1 deletion.
25 changes: 25 additions & 0 deletions control-plane/pkg/reconciler/consumergroup/consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/storage/names"
"k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
Expand All @@ -47,6 +48,7 @@ import (
"knative.dev/pkg/resolver"

sources "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/sources/v1beta1"
"knative.dev/eventing-kafka-broker/control-plane/pkg/prober"

"knative.dev/eventing/pkg/scheduler"

Expand Down Expand Up @@ -139,6 +141,15 @@ type Reconciler struct {
// DeleteConsumerGroupMetadataCounter is an in-memory counter to count how many times we have
// tried to delete consumer group metadata from Kafka.
DeleteConsumerGroupMetadataCounter *counter.Counter

// InitOffsetLatestInitialOffsetCache is the cache for consumer group offset initialization.
//
// When there is high load and multiple consumer group schedule calls, we get many
// `dial tcp 10.130.4.8:9092: i/o timeout` errors when trying to connect to Kafka.
// This leads to increased "time to readiness" for consumer groups.
InitOffsetLatestInitialOffsetCache prober.Cache

EnqueueKey func(key string)
}

func (r *Reconciler) ReconcileKind(ctx context.Context, cg *kafkainternals.ConsumerGroup) reconciler.Event {
Expand Down Expand Up @@ -220,6 +231,8 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, cg *kafkainternals.Consum
r.DeleteConsumerGroupMetadataCounter.Del(string(cg.GetUID()))
}

r.InitOffsetLatestInitialOffsetCache.Expire(keyOf(cg))

return nil
}

Expand Down Expand Up @@ -492,6 +505,10 @@ func (r *Reconciler) reconcileInitialOffset(ctx context.Context, cg *kafkaintern
return nil
}

if status := r.InitOffsetLatestInitialOffsetCache.GetStatus(keyOf(cg)); status == prober.StatusReady {
return nil
}

saramaSecurityOption, err := r.newAuthConfigOption(ctx, cg)
if err != nil {
return fmt.Errorf("failed to create config options for Kafka cluster auth: %w", err)
Expand Down Expand Up @@ -528,6 +545,10 @@ func (r *Reconciler) reconcileInitialOffset(ctx context.Context, cg *kafkaintern
return fmt.Errorf("failed to initialize offset: %w", err)
}

r.InitOffsetLatestInitialOffsetCache.UpsertStatus(keyOf(cg), prober.StatusReady, nil, func(key string, arg interface{}) {
r.EnqueueKey(key)
})

return nil
}

Expand Down Expand Up @@ -764,3 +785,7 @@ func recordInitializeOffsetsLatency(ctx context.Context, cg *kafkainternals.Cons
metrics.Record(ctx, initializeOffsetsLatencyStat.M(time.Since(startTime).Milliseconds()))
}()
}

func keyOf(cg metav1.Object) string {
return types.NamespacedName{Namespace: cg.GetNamespace(), Name: cg.GetName()}.String()
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"io"
"testing"
"time"

"github.com/Shopify/sarama"
corev1 "k8s.io/api/core/v1"
Expand All @@ -34,6 +35,7 @@ import (

bindings "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/bindings/v1beta1"
sources "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/sources/v1beta1"
"knative.dev/eventing-kafka-broker/control-plane/pkg/prober"

"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
Expand Down Expand Up @@ -1675,6 +1677,8 @@ func TestReconcileKind(t *testing.T) {
SystemNamespace: systemNamespace,
AutoscalerConfig: "",
DeleteConsumerGroupMetadataCounter: counter.NewExpiringCounter(ctx),
InitOffsetLatestInitialOffsetCache: prober.NewLocalExpiringCache(ctx, time.Second),
EnqueueKey: func(key string) {},
}

r.KafkaFeatureFlags = configapis.FromContext(store.ToContext(ctx))
Expand Down Expand Up @@ -1814,6 +1818,8 @@ func TestReconcileKindNoAutoscaler(t *testing.T) {
},
SystemNamespace: systemNamespace,
DeleteConsumerGroupMetadataCounter: counter.NewExpiringCounter(ctx),
InitOffsetLatestInitialOffsetCache: prober.NewLocalExpiringCache(ctx, time.Second),
EnqueueKey: func(key string) {},
}

r.KafkaFeatureFlags = configapis.DefaultFeaturesConfig()
Expand Down Expand Up @@ -2198,6 +2204,7 @@ func TestFinalizeKind(t *testing.T) {
},
KafkaFeatureFlags: configapis.DefaultFeaturesConfig(),
DeleteConsumerGroupMetadataCounter: counter.NewExpiringCounter(ctx),
InitOffsetLatestInitialOffsetCache: prober.NewLocalExpiringCache(ctx, time.Second),
}

return consumergroup.NewReconciler(
Expand Down
22 changes: 21 additions & 1 deletion control-plane/pkg/reconciler/consumergroup/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/storage/names"
"k8s.io/client-go/tools/cache"

"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/offset"
"knative.dev/eventing-kafka-broker/control-plane/pkg/prober"

"knative.dev/eventing/pkg/scheduler"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset"
Expand Down Expand Up @@ -127,6 +130,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I
KedaClient: kedaclient.Get(ctx),
AutoscalerConfig: env.AutoscalerConfigMap,
DeleteConsumerGroupMetadataCounter: counter.NewExpiringCounter(ctx),
InitOffsetLatestInitialOffsetCache: prober.NewLocalExpiringCache(ctx, 20*time.Minute),
}

consumerInformer := consumer.Get(ctx)
Expand All @@ -153,14 +157,30 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I
})

r.Resolver = resolver.NewURIResolverFromTracker(ctx, impl.Tracker)
r.EnqueueKey = func(key string) {
parts := strings.SplitN(key, string(types.Separator), 3)
if len(parts) != 2 {
panic(fmt.Sprintf("Expected <namespace>/<name> format, got %s", key))
}
impl.EnqueueKey(types.NamespacedName{Namespace: parts[0], Name: parts[1]})
}

configStore := config.NewStore(ctx, func(name string, value *config.KafkaFeatureFlags) {
r.KafkaFeatureFlags.Reset(value)
impl.GlobalResync(consumerGroupInformer.Informer())
})
configStore.WatchConfigs(watcher)

consumerGroupInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))
consumerGroupInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: impl.Enqueue,
UpdateFunc: controller.PassNew(impl.Enqueue),
DeleteFunc: func(obj interface{}) {
impl.Enqueue(obj)
if cg, ok := obj.(metav1.Object); ok && cg != nil {
r.InitOffsetLatestInitialOffsetCache.Expire(keyOf(cg))
}
},
})
consumerInformer.Informer().AddEventHandler(controller.HandleAll(enqueueConsumerGroupFromConsumer(impl.EnqueueKey)))

globalResync := func(interface{}) {
Expand Down

0 comments on commit 78dcca3

Please sign in to comment.