Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-v1.10] SRVKE-958: Cache init offsets results #817

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions control-plane/pkg/reconciler/consumergroup/consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/storage/names"
"k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
Expand All @@ -46,6 +47,7 @@ import (
"knative.dev/pkg/reconciler"

sources "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/sources/v1beta1"
"knative.dev/eventing-kafka-broker/control-plane/pkg/prober"

"knative.dev/eventing/pkg/scheduler"

Expand Down Expand Up @@ -137,6 +139,15 @@ type Reconciler struct {
// DeleteConsumerGroupMetadataCounter is an in-memory counter to count how many times we have
// tried to delete consumer group metadata from Kafka.
DeleteConsumerGroupMetadataCounter *counter.Counter

// InitOffsetLatestInitialOffsetCache is the cache for consumer group offset initialization.
//
// When there is high load and multiple consumer group schedule calls, we get many
// `dial tcp 10.130.4.8:9092: i/o timeout` errors when trying to connect to Kafka.
// This leads to increased "time to readiness" for consumer groups.
InitOffsetLatestInitialOffsetCache prober.Cache

EnqueueKey func(key string)
}

func (r *Reconciler) ReconcileKind(ctx context.Context, cg *kafkainternals.ConsumerGroup) reconciler.Event {
Expand Down Expand Up @@ -218,6 +229,8 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, cg *kafkainternals.Consum
r.DeleteConsumerGroupMetadataCounter.Del(string(cg.GetUID()))
}

r.InitOffsetLatestInitialOffsetCache.Expire(keyOf(cg))

return nil
}

Expand Down Expand Up @@ -482,6 +495,10 @@ func (r *Reconciler) reconcileInitialOffset(ctx context.Context, cg *kafkaintern
return nil
}

if status := r.InitOffsetLatestInitialOffsetCache.GetStatus(keyOf(cg)); status == prober.StatusReady {
return nil
}

saramaSecurityOption, err := r.newAuthConfigOption(ctx, cg)
if err != nil {
return fmt.Errorf("failed to create config options for Kafka cluster auth: %w", err)
Expand Down Expand Up @@ -518,6 +535,10 @@ func (r *Reconciler) reconcileInitialOffset(ctx context.Context, cg *kafkaintern
return fmt.Errorf("failed to initialize offset: %w", err)
}

r.InitOffsetLatestInitialOffsetCache.UpsertStatus(keyOf(cg), prober.StatusReady, nil, func(key string, arg interface{}) {
r.EnqueueKey(key)
})

return nil
}

Expand Down Expand Up @@ -752,3 +773,7 @@ func recordInitializeOffsetsLatency(ctx context.Context, cg *kafkainternals.Cons
metrics.Record(ctx, initializeOffsetsLatencyStat.M(time.Since(startTime).Milliseconds()))
}()
}

func keyOf(cg metav1.Object) string {
return types.NamespacedName{Namespace: cg.GetNamespace(), Name: cg.GetName()}.String()
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"io"
"testing"
"time"

"github.com/Shopify/sarama"
corev1 "k8s.io/api/core/v1"
Expand All @@ -34,6 +35,7 @@ import (

bindings "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/bindings/v1beta1"
sources "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/sources/v1beta1"
"knative.dev/eventing-kafka-broker/control-plane/pkg/prober"

"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
Expand Down Expand Up @@ -1675,6 +1677,8 @@ func TestReconcileKind(t *testing.T) {
SystemNamespace: systemNamespace,
AutoscalerConfig: "",
DeleteConsumerGroupMetadataCounter: counter.NewExpiringCounter(ctx),
InitOffsetLatestInitialOffsetCache: prober.NewLocalExpiringCache(ctx, time.Second),
EnqueueKey: func(key string) {},
}

r.KafkaFeatureFlags = configapis.FromContext(store.ToContext(ctx))
Expand Down Expand Up @@ -1814,6 +1818,8 @@ func TestReconcileKindNoAutoscaler(t *testing.T) {
},
SystemNamespace: systemNamespace,
DeleteConsumerGroupMetadataCounter: counter.NewExpiringCounter(ctx),
InitOffsetLatestInitialOffsetCache: prober.NewLocalExpiringCache(ctx, time.Second),
EnqueueKey: func(key string) {},
}

r.KafkaFeatureFlags = configapis.DefaultFeaturesConfig()
Expand Down Expand Up @@ -2198,6 +2204,7 @@ func TestFinalizeKind(t *testing.T) {
},
KafkaFeatureFlags: configapis.DefaultFeaturesConfig(),
DeleteConsumerGroupMetadataCounter: counter.NewExpiringCounter(ctx),
InitOffsetLatestInitialOffsetCache: prober.NewLocalExpiringCache(ctx, time.Second),
}

return consumergroup.NewReconciler(
Expand Down
23 changes: 22 additions & 1 deletion control-plane/pkg/reconciler/consumergroup/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/storage/names"
"k8s.io/client-go/tools/cache"

"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/offset"
"knative.dev/eventing-kafka-broker/control-plane/pkg/prober"

"knative.dev/eventing/pkg/scheduler"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset"
Expand Down Expand Up @@ -126,6 +129,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I
KedaClient: kedaclient.Get(ctx),
AutoscalerConfig: env.AutoscalerConfigMap,
DeleteConsumerGroupMetadataCounter: counter.NewExpiringCounter(ctx),
InitOffsetLatestInitialOffsetCache: prober.NewLocalExpiringCache(ctx, 20*time.Minute),
}

consumerInformer := consumer.Get(ctx)
Expand All @@ -151,13 +155,30 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I
}
})

r.EnqueueKey = func(key string) {
parts := strings.SplitN(key, string(types.Separator), 3)
if len(parts) != 2 {
panic(fmt.Sprintf("Expected <namespace>/<name> format, got %s", key))
}
impl.EnqueueKey(types.NamespacedName{Namespace: parts[0], Name: parts[1]})
}

configStore := config.NewStore(ctx, func(name string, value *config.KafkaFeatureFlags) {
r.KafkaFeatureFlags.Reset(value)
impl.GlobalResync(consumerGroupInformer.Informer())
})
configStore.WatchConfigs(watcher)

consumerGroupInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))
consumerGroupInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: impl.Enqueue,
UpdateFunc: controller.PassNew(impl.Enqueue),
DeleteFunc: func(obj interface{}) {
impl.Enqueue(obj)
if cg, ok := obj.(metav1.Object); ok && cg != nil {
r.InitOffsetLatestInitialOffsetCache.Expire(keyOf(cg))
}
},
})
consumerInformer.Informer().AddEventHandler(controller.HandleAll(enqueueConsumerGroupFromConsumer(impl.EnqueueKey)))

globalResync := func(interface{}) {
Expand Down
17 changes: 17 additions & 0 deletions openshift/patches/autoscaler_leader_log.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
diff --git a/vendor/knative.dev/eventing/pkg/scheduler/statefulset/autoscaler.go b/vendor/knative.dev/eventing/pkg/scheduler/statefulset/autoscaler.go
index 96008b849..6c00a231b 100644
--- a/vendor/knative.dev/eventing/pkg/scheduler/statefulset/autoscaler.go
+++ b/vendor/knative.dev/eventing/pkg/scheduler/statefulset/autoscaler.go
@@ -133,10 +133,10 @@ func (a *autoscaler) Start(ctx context.Context) {
case <-ctx.Done():
return
case <-time.After(a.refreshPeriod):
- a.logger.Infow("Triggering scale down")
+ a.logger.Infow("Triggering scale down", zap.Bool("isLeader", a.isLeader.Load()))
attemptScaleDown = true
case <-a.trigger:
- a.logger.Infow("Triggering scale up")
+ a.logger.Infow("Triggering scale up", zap.Bool("isLeader", a.isLeader.Load()))
attemptScaleDown = false
}

20 changes: 20 additions & 0 deletions openshift/patches/remove_resource_version_check.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
diff --git a/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go b/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go
index a95242ee2..083767450 100644
--- a/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go
+++ b/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go
@@ -228,15 +228,6 @@ func (s *StatefulSetScheduler) Schedule(vpod scheduler.VPod) ([]duckv1alpha1.Pla
s.reservedMu.Lock()
defer s.reservedMu.Unlock()

- vpods, err := s.vpodLister()
- if err != nil {
- return nil, err
- }
- vpodFromLister := st.GetVPod(vpod.GetKey(), vpods)
- if vpodFromLister != nil && vpod.GetResourceVersion() != vpodFromLister.GetResourceVersion() {
- return nil, fmt.Errorf("vpod to schedule has resource version different from one in indexer")
- }
-
placements, err := s.scheduleVPod(vpod)
if placements == nil {
return placements, err
2 changes: 2 additions & 0 deletions openshift/release/generate-release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ GITHUB_ACTIONS=true $(dirname $0)/../../hack/update-codegen.sh
git apply openshift/patches/disable-ko-publish-rekt.patch
git apply openshift/patches/override-min-version.patch
git apply openshift/patches/autoscaler_fix.patch
git apply openshift/patches/remove_resource_version_check.patch
git apply openshift/patches/autoscaler_leader_log.patch

# Eventing core will bring the config tracing ConfigMap, so remove it from heret
rm -f control-plane/config/eventing-kafka-broker/200-controller/100-config-tracing.yaml
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,10 @@ func (a *autoscaler) Start(ctx context.Context) {
case <-ctx.Done():
return
case <-time.After(a.refreshPeriod):
a.logger.Infow("Triggering scale down")
a.logger.Infow("Triggering scale down", zap.Bool("isLeader", a.isLeader.Load()))
attemptScaleDown = true
case <-a.trigger:
a.logger.Infow("Triggering scale up")
a.logger.Infow("Triggering scale up", zap.Bool("isLeader", a.isLeader.Load()))
attemptScaleDown = false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,15 +228,6 @@ func (s *StatefulSetScheduler) Schedule(vpod scheduler.VPod) ([]duckv1alpha1.Pla
s.reservedMu.Lock()
defer s.reservedMu.Unlock()

vpods, err := s.vpodLister()
if err != nil {
return nil, err
}
vpodFromLister := st.GetVPod(vpod.GetKey(), vpods)
if vpodFromLister != nil && vpod.GetResourceVersion() != vpodFromLister.GetResourceVersion() {
return nil, fmt.Errorf("vpod to schedule has resource version different from one in indexer")
}

placements, err := s.scheduleVPod(vpod)
if placements == nil {
return placements, err
Expand Down