From a8363ad6b17eae9dd523aea5b4800c5d23a97263 Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Tue, 22 Aug 2023 15:05:15 +0200 Subject: [PATCH] Expose init offset and schedule metrics for ConsumerGroup reconciler (#790) Signed-off-by: Pierangelo Di Pilato --- .../pkg/reconciler/consumergroup/auth.go | 2 +- .../consumergroup/autoscaler_config.go | 3 +- .../reconciler/consumergroup/consumergroup.go | 100 +++++++++++++++--- .../consumergroup/consumergroup_test.go | 4 +- .../pkg/reconciler/consumergroup/evictor.go | 9 +- .../reconciler/consumergroup/evictor_test.go | 2 +- go.mod | 1 + 7 files changed, 98 insertions(+), 23 deletions(-) diff --git a/control-plane/pkg/reconciler/consumergroup/auth.go b/control-plane/pkg/reconciler/consumergroup/auth.go index 90bea0389c..49908f88ac 100644 --- a/control-plane/pkg/reconciler/consumergroup/auth.go +++ b/control-plane/pkg/reconciler/consumergroup/auth.go @@ -27,7 +27,7 @@ import ( "knative.dev/eventing-kafka-broker/control-plane/pkg/security" ) -func (r Reconciler) newAuthConfigOption(ctx context.Context, cg *kafkainternals.ConsumerGroup) (kafka.ConfigOption, error) { +func (r *Reconciler) newAuthConfigOption(ctx context.Context, cg *kafkainternals.ConsumerGroup) (kafka.ConfigOption, error) { var secret *corev1.Secret if hasSecretSpecConfig(cg.Spec.Template.Spec.Auth) { diff --git a/control-plane/pkg/reconciler/consumergroup/autoscaler_config.go b/control-plane/pkg/reconciler/consumergroup/autoscaler_config.go index 1338ef4abf..a9f41048d8 100644 --- a/control-plane/pkg/reconciler/consumergroup/autoscaler_config.go +++ b/control-plane/pkg/reconciler/consumergroup/autoscaler_config.go @@ -21,10 +21,11 @@ import ( "fmt" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "knative.dev/eventing-kafka-broker/control-plane/pkg/autoscaler" ) -func (r Reconciler) autoscalerDefaultsFromConfigMap(ctx context.Context, configMapName string) (*autoscaler.AutoscalerConfig, error) { +func (r *Reconciler) autoscalerDefaultsFromConfigMap(ctx context.Context, configMapName string) (*autoscaler.AutoscalerConfig, error) { cm, err := r.KubeClient.CoreV1().ConfigMaps(r.SystemNamespace).Get(ctx, configMapName, metav1.GetOptions{}) if err != nil { return nil, fmt.Errorf("error while retrieving the %s config map in namespace %s: %+v", configMapName, r.SystemNamespace, err) diff --git a/control-plane/pkg/reconciler/consumergroup/consumergroup.go b/control-plane/pkg/reconciler/consumergroup/consumergroup.go index f7cef59a57..d7bfa71baf 100644 --- a/control-plane/pkg/reconciler/consumergroup/consumergroup.go +++ b/control-plane/pkg/reconciler/consumergroup/consumergroup.go @@ -22,8 +22,12 @@ import ( "fmt" "math" "sort" + "time" "github.com/Shopify/sarama" + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" @@ -36,7 +40,9 @@ import ( "k8s.io/utils/pointer" eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" "knative.dev/pkg/apis" + "knative.dev/pkg/controller" "knative.dev/pkg/logging" + "knative.dev/pkg/metrics" "knative.dev/pkg/reconciler" sources "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/sources/v1beta1" @@ -60,8 +66,38 @@ import ( var ( ErrNoSubscriberURI = errors.New("no subscriber URI resolved") ErrNoDeadLetterSinkURI = errors.New("no dead letter sink URI resolved") + + scheduleLatencyStat = stats.Int64("schedule_latency", "Latency of consumer group schedule operations", stats.UnitMilliseconds) + // scheduleDistribution defines the bucket boundaries for the histogram of schedule latency metric. + // Bucket boundaries are 10ms, 100ms, 1s, 10s, 30s and 60s. + scheduleDistribution = view.Distribution(10, 100, 1000, 10000, 30000, 60000) + + initializeOffsetsLatencyStat = stats.Int64("initialize_offsets_latency", "Latency of consumer group offsets initialization operations", stats.UnitMilliseconds) + // initializeOffsetsDistribution defines the bucket boundaries for the histogram of initialize offsets latency metric. + // Bucket boundaries are 10ms, 100ms, 1s, 10s, 30s and 60s. + initializeOffsetsDistribution = view.Distribution(10, 100, 1000, 10000, 30000, 60000) ) +func init() { + views := []*view.View{ + { + Description: "Latency of consumer group schedule operations", + TagKeys: []tag.Key{controller.NamespaceTagKey}, + Measure: scheduleLatencyStat, + Aggregation: scheduleDistribution, + }, + { + Description: "Latency of consumer group offsets initialization operations", + TagKeys: []tag.Key{controller.NamespaceTagKey}, + Measure: initializeOffsetsLatencyStat, + Aggregation: initializeOffsetsDistribution, + }, + } + if err := view.Register(views...); err != nil { + panic(err) + } +} + type Scheduler struct { scheduler.Scheduler SchedulerConfig @@ -103,7 +139,7 @@ type Reconciler struct { DeleteConsumerGroupMetadataCounter *counter.Counter } -func (r Reconciler) ReconcileKind(ctx context.Context, cg *kafkainternals.ConsumerGroup) reconciler.Event { +func (r *Reconciler) ReconcileKind(ctx context.Context, cg *kafkainternals.ConsumerGroup) reconciler.Event { if err := r.reconcileInitialOffset(ctx, cg); err != nil { return cg.MarkInitializeOffsetFailed("InitializeOffset", err) } @@ -150,7 +186,7 @@ func (r Reconciler) ReconcileKind(ctx context.Context, cg *kafkainternals.Consum return nil } -func (r Reconciler) FinalizeKind(ctx context.Context, cg *kafkainternals.ConsumerGroup) reconciler.Event { +func (r *Reconciler) FinalizeKind(ctx context.Context, cg *kafkainternals.ConsumerGroup) reconciler.Event { cg.Spec.Replicas = pointer.Int32(0) err := r.schedule(ctx, cg) //de-schedule placements @@ -159,7 +195,7 @@ func (r Reconciler) FinalizeKind(ctx context.Context, cg *kafkainternals.Consume cg.Status.Placements = nil // return an error to 1. update the status. 2. not clear the finalizer - return errors.New("placement list was not empty") + return fmt.Errorf("failed to unschedule consumer group: %w", err) } // Get consumers associated with the ConsumerGroup. @@ -185,7 +221,7 @@ func (r Reconciler) FinalizeKind(ctx context.Context, cg *kafkainternals.Consume return nil } -func (r Reconciler) deleteConsumerGroupMetadata(ctx context.Context, cg *kafkainternals.ConsumerGroup) error { +func (r *Reconciler) deleteConsumerGroupMetadata(ctx context.Context, cg *kafkainternals.ConsumerGroup) error { saramaSecurityOption, err := r.newAuthConfigOption(ctx, cg) if err != nil { return fmt.Errorf("failed to create config options for Kafka cluster auth: %w", err) @@ -213,7 +249,7 @@ func (r Reconciler) deleteConsumerGroupMetadata(ctx context.Context, cg *kafkain return nil } -func (r Reconciler) reconcileConsumers(ctx context.Context, cg *kafkainternals.ConsumerGroup) error { +func (r *Reconciler) reconcileConsumers(ctx context.Context, cg *kafkainternals.ConsumerGroup) error { // Get consumers associated with the ConsumerGroup. existingConsumers, err := r.ConsumerLister.Consumers(cg.GetNamespace()).List(labels.SelectorFromSet(cg.Spec.Selector)) @@ -242,7 +278,7 @@ func (r Reconciler) reconcileConsumers(ctx context.Context, cg *kafkainternals.C return nil } -func (r Reconciler) reconcileConsumersInPlacement(ctx context.Context, cg *kafkainternals.ConsumerGroup, pc ConsumersPerPlacement) error { +func (r *Reconciler) reconcileConsumersInPlacement(ctx context.Context, cg *kafkainternals.ConsumerGroup, pc ConsumersPerPlacement) error { placement := *pc.Placement consumers := pc.Consumers @@ -297,7 +333,7 @@ func (r Reconciler) reconcileConsumersInPlacement(ctx context.Context, cg *kafka return nil } -func (r Reconciler) createConsumer(ctx context.Context, cg *kafkainternals.ConsumerGroup, placement eventingduckv1alpha1.Placement) error { +func (r *Reconciler) createConsumer(ctx context.Context, cg *kafkainternals.ConsumerGroup, placement eventingduckv1alpha1.Placement) error { c := cg.ConsumerFromTemplate() c.Name = r.NameGenerator.GenerateName(cg.GetName() + "-") @@ -310,7 +346,7 @@ func (r Reconciler) createConsumer(ctx context.Context, cg *kafkainternals.Consu return nil } -func (r Reconciler) finalizeConsumer(ctx context.Context, consumer *kafkainternals.Consumer) error { +func (r *Reconciler) finalizeConsumer(ctx context.Context, consumer *kafkainternals.Consumer) error { dOpts := metav1.DeleteOptions{ Preconditions: &metav1.Preconditions{UID: &consumer.UID}, } @@ -321,7 +357,10 @@ func (r Reconciler) finalizeConsumer(ctx context.Context, consumer *kafkainterna return nil } -func (r Reconciler) schedule(ctx context.Context, cg *kafkainternals.ConsumerGroup) error { +func (r *Reconciler) schedule(ctx context.Context, cg *kafkainternals.ConsumerGroup) error { + startTime := time.Now() + defer recordScheduleLatency(ctx, cg, startTime) + statefulSetScheduler := r.SchedulerFunc(cg.GetUserFacingResourceRef().Kind) // Ensure Contract configmaps are created before scheduling to avoid having pending pods due to missing @@ -348,7 +387,7 @@ type ConsumersPerPlacement struct { Consumers []*kafkainternals.Consumer } -func (r Reconciler) joinConsumersByPlacement(placements []eventingduckv1alpha1.Placement, consumers []*kafkainternals.Consumer) []ConsumersPerPlacement { +func (r *Reconciler) joinConsumersByPlacement(placements []eventingduckv1alpha1.Placement, consumers []*kafkainternals.Consumer) []ConsumersPerPlacement { placementConsumers := make([]ConsumersPerPlacement, 0, int(math.Max(float64(len(placements)), float64(len(consumers))))) // Group consumers by Pod bind. @@ -403,7 +442,7 @@ func (r Reconciler) joinConsumersByPlacement(placements []eventingduckv1alpha1.P return placementConsumers } -func (r Reconciler) propagateStatus(cg *kafkainternals.ConsumerGroup) (*apis.Condition, error) { +func (r *Reconciler) propagateStatus(cg *kafkainternals.ConsumerGroup) (*apis.Condition, error) { consumers, err := r.ConsumerLister.Consumers(cg.GetNamespace()).List(labels.SelectorFromSet(cg.Spec.Selector)) if err != nil { return nil, fmt.Errorf("failed to list consumers for selector %+v: %w", cg.Spec.Selector, err) @@ -435,7 +474,10 @@ func (r Reconciler) propagateStatus(cg *kafkainternals.ConsumerGroup) (*apis.Con return condition, nil } -func (r Reconciler) reconcileInitialOffset(ctx context.Context, cg *kafkainternals.ConsumerGroup) error { +func (r *Reconciler) reconcileInitialOffset(ctx context.Context, cg *kafkainternals.ConsumerGroup) error { + startTime := time.Now() + defer recordInitializeOffsetsLatency(ctx, cg, startTime) + if cg.Spec.Template.Spec.Delivery == nil || cg.Spec.Template.Spec.Delivery.InitialOffset == sources.OffsetEarliest { return nil } @@ -479,7 +521,7 @@ func (r Reconciler) reconcileInitialOffset(ctx context.Context, cg *kafkainterna return nil } -func (r Reconciler) reconcileKedaObjects(ctx context.Context, cg *kafkainternals.ConsumerGroup) error { +func (r *Reconciler) reconcileKedaObjects(ctx context.Context, cg *kafkainternals.ConsumerGroup) error { var triggerAuthentication *kedav1alpha1.TriggerAuthentication var secret *corev1.Secret @@ -620,7 +662,7 @@ func (r *Reconciler) reconcileSecret(ctx context.Context, expectedSecret *corev1 return nil } -func (r Reconciler) ensureContractConfigmapsExist(ctx context.Context, scheduler Scheduler) error { +func (r *Reconciler) ensureContractConfigmapsExist(ctx context.Context, scheduler Scheduler) error { selector := labels.SelectorFromSet(map[string]string{"app": scheduler.StatefulSetName}) pods, err := r.PodLister. Pods(r.SystemNamespace). @@ -642,7 +684,7 @@ func (r Reconciler) ensureContractConfigmapsExist(ctx context.Context, scheduler return nil } -func (r Reconciler) ensureContractConfigMapExists(ctx context.Context, p *corev1.Pod, name string) error { +func (r *Reconciler) ensureContractConfigMapExists(ctx context.Context, p *corev1.Pod, name string) error { // Check if ConfigMap exists in lister cache _, err := r.ConfigMapLister.ConfigMaps(r.SystemNamespace).Get(name) // ConfigMap already exists, return @@ -682,3 +724,31 @@ var ( _ consumergroup.Interface = &Reconciler{} _ consumergroup.Finalizer = &Reconciler{} ) + +func recordScheduleLatency(ctx context.Context, cg *kafkainternals.ConsumerGroup, startTime time.Time) { + func() { + ctx, err := tag.New( + ctx, + tag.Insert(controller.NamespaceTagKey, cg.Namespace), + ) + if err != nil { + return + } + + metrics.Record(ctx, scheduleLatencyStat.M(time.Since(startTime).Milliseconds())) + }() +} + +func recordInitializeOffsetsLatency(ctx context.Context, cg *kafkainternals.ConsumerGroup, startTime time.Time) { + func() { + ctx, err := tag.New( + ctx, + tag.Insert(controller.NamespaceTagKey, cg.Namespace), + ) + if err != nil { + return + } + + metrics.Record(ctx, initializeOffsetsLatencyStat.M(time.Since(startTime).Milliseconds())) + }() +} diff --git a/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go b/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go index 97139bc132..f5788ac417 100644 --- a/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go +++ b/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go @@ -1644,7 +1644,7 @@ func TestReconcileKind(t *testing.T) { _, exampleConfig := cm.ConfigMapsFromTestFile(t, configapis.FlagsConfigName) store.OnConfigChanged(exampleConfig) - r := Reconciler{ + r := &Reconciler{ SchedulerFunc: func(s string) Scheduler { ss := row.OtherTestData[testSchedulerKey].(scheduler.Scheduler) return Scheduler{ @@ -1787,7 +1787,7 @@ func TestReconcileKindNoAutoscaler(t *testing.T) { ctx, _ = kedaclient.With(ctx) - r := Reconciler{ + r := &Reconciler{ SchedulerFunc: func(s string) Scheduler { ss := row.OtherTestData[testSchedulerKey].(scheduler.Scheduler) return Scheduler{ diff --git a/control-plane/pkg/reconciler/consumergroup/evictor.go b/control-plane/pkg/reconciler/consumergroup/evictor.go index 185801a420..1d6d2750ce 100644 --- a/control-plane/pkg/reconciler/consumergroup/evictor.go +++ b/control-plane/pkg/reconciler/consumergroup/evictor.go @@ -48,8 +48,8 @@ type evictor struct { // newEvictor creates a new evictor. // // fields are additional logger fields to be attached to the evictor logger. -func newEvictor(ctx context.Context, fields ...zap.Field) evictor { - return evictor{ +func newEvictor(ctx context.Context, fields ...zap.Field) *evictor { + return &evictor{ ctx: ctx, kubeClient: kubeclient.Get(ctx), InternalsClient: kafkainternalsclient.Get(ctx).InternalV1alpha1(), @@ -60,7 +60,7 @@ func newEvictor(ctx context.Context, fields ...zap.Field) evictor { } } -func (e evictor) evict(pod *corev1.Pod, vpod scheduler.VPod, from *eventingduckv1alpha1.Placement) error { +func (e *evictor) evict(pod *corev1.Pod, vpod scheduler.VPod, from *eventingduckv1alpha1.Placement) error { key := vpod.GetKey() logger := e.logger. @@ -124,6 +124,9 @@ func (e *evictor) disablePodScheduling(logger *zap.Logger, pod *corev1.Pod) erro _, err := e.kubeClient.CoreV1(). Pods(pod.GetNamespace()). Update(e.ctx, pod, metav1.UpdateOptions{}) + if apierrors.IsNotFound(err) { + return nil + } if err != nil { return fmt.Errorf("failed to update pod %s/%s: %w", pod.GetNamespace(), pod.GetName(), err) } diff --git a/control-plane/pkg/reconciler/consumergroup/evictor_test.go b/control-plane/pkg/reconciler/consumergroup/evictor_test.go index 98fa5d1144..a71c935482 100644 --- a/control-plane/pkg/reconciler/consumergroup/evictor_test.go +++ b/control-plane/pkg/reconciler/consumergroup/evictor_test.go @@ -201,7 +201,7 @@ func TestEvictorEvictPodNotFound(t *testing.T) { e := newEvictor(ctx) err := e.evict(pod, cg, placement) - require.NotNil(t, err) + require.Nil(t, err) } func TestEvictorEvictConsumerGroupNotFound(t *testing.T) { ctx, _ := reconcilertesting.SetupFakeContext(t) diff --git a/go.mod b/go.mod index 2552699466..cad5bf158d 100644 --- a/go.mod +++ b/go.mod @@ -43,6 +43,7 @@ require ( knative.dev/hack v0.0.0-20230712131415-ddae80293c43 knative.dev/pkg v0.0.0-20230718152110-aef227e72ead knative.dev/reconciler-test v0.0.0-20230726074640-26cee79ad63d + go.opencensus.io v0.24.0 sigs.k8s.io/controller-runtime v0.12.3 )