Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-v1.9] Expose init offset and schedule metrics for ConsumerGroup reconciler #790

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion control-plane/pkg/reconciler/consumergroup/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"knative.dev/eventing-kafka-broker/control-plane/pkg/security"
)

func (r Reconciler) newAuthConfigOption(ctx context.Context, cg *kafkainternals.ConsumerGroup) (kafka.ConfigOption, error) {
func (r *Reconciler) newAuthConfigOption(ctx context.Context, cg *kafkainternals.ConsumerGroup) (kafka.ConfigOption, error) {
var secret *corev1.Secret

if hasSecretSpecConfig(cg.Spec.Template.Spec.Auth) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import (
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"knative.dev/eventing-kafka-broker/control-plane/pkg/autoscaler"
)

func (r Reconciler) autoscalerDefaultsFromConfigMap(ctx context.Context, configMapName string) (*autoscaler.AutoscalerConfig, error) {
func (r *Reconciler) autoscalerDefaultsFromConfigMap(ctx context.Context, configMapName string) (*autoscaler.AutoscalerConfig, error) {
cm, err := r.KubeClient.CoreV1().ConfigMaps(r.SystemNamespace).Get(ctx, configMapName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("error while retrieving the %s config map in namespace %s: %+v", configMapName, r.SystemNamespace, err)
Expand Down
100 changes: 85 additions & 15 deletions control-plane/pkg/reconciler/consumergroup/consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@ import (
"fmt"
"math"
"sort"
"time"

"github.com/Shopify/sarama"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
Expand All @@ -36,7 +40,9 @@ import (
"k8s.io/utils/pointer"
eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
"knative.dev/pkg/apis"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
"knative.dev/pkg/metrics"
"knative.dev/pkg/reconciler"

sources "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/sources/v1beta1"
Expand All @@ -60,8 +66,38 @@ import (
var (
ErrNoSubscriberURI = errors.New("no subscriber URI resolved")
ErrNoDeadLetterSinkURI = errors.New("no dead letter sink URI resolved")

scheduleLatencyStat = stats.Int64("schedule_latency", "Latency of consumer group schedule operations", stats.UnitMilliseconds)
// scheduleDistribution defines the bucket boundaries for the histogram of schedule latency metric.
// Bucket boundaries are 10ms, 100ms, 1s, 10s, 30s and 60s.
scheduleDistribution = view.Distribution(10, 100, 1000, 10000, 30000, 60000)

initializeOffsetsLatencyStat = stats.Int64("initialize_offsets_latency", "Latency of consumer group offsets initialization operations", stats.UnitMilliseconds)
// initializeOffsetsDistribution defines the bucket boundaries for the histogram of initialize offsets latency metric.
// Bucket boundaries are 10ms, 100ms, 1s, 10s, 30s and 60s.
initializeOffsetsDistribution = view.Distribution(10, 100, 1000, 10000, 30000, 60000)
)

func init() {
views := []*view.View{
{
Description: "Latency of consumer group schedule operations",
TagKeys: []tag.Key{controller.NamespaceTagKey},
Measure: scheduleLatencyStat,
Aggregation: scheduleDistribution,
},
{
Description: "Latency of consumer group offsets initialization operations",
TagKeys: []tag.Key{controller.NamespaceTagKey},
Measure: initializeOffsetsLatencyStat,
Aggregation: initializeOffsetsDistribution,
},
}
if err := view.Register(views...); err != nil {
panic(err)
}
}

type Scheduler struct {
scheduler.Scheduler
SchedulerConfig
Expand Down Expand Up @@ -103,7 +139,7 @@ type Reconciler struct {
DeleteConsumerGroupMetadataCounter *counter.Counter
}

func (r Reconciler) ReconcileKind(ctx context.Context, cg *kafkainternals.ConsumerGroup) reconciler.Event {
func (r *Reconciler) ReconcileKind(ctx context.Context, cg *kafkainternals.ConsumerGroup) reconciler.Event {
if err := r.reconcileInitialOffset(ctx, cg); err != nil {
return cg.MarkInitializeOffsetFailed("InitializeOffset", err)
}
Expand Down Expand Up @@ -150,7 +186,7 @@ func (r Reconciler) ReconcileKind(ctx context.Context, cg *kafkainternals.Consum
return nil
}

func (r Reconciler) FinalizeKind(ctx context.Context, cg *kafkainternals.ConsumerGroup) reconciler.Event {
func (r *Reconciler) FinalizeKind(ctx context.Context, cg *kafkainternals.ConsumerGroup) reconciler.Event {

cg.Spec.Replicas = pointer.Int32(0)
err := r.schedule(ctx, cg) //de-schedule placements
Expand All @@ -159,7 +195,7 @@ func (r Reconciler) FinalizeKind(ctx context.Context, cg *kafkainternals.Consume
cg.Status.Placements = nil

// return an error to 1. update the status. 2. not clear the finalizer
return errors.New("placement list was not empty")
return fmt.Errorf("failed to unschedule consumer group: %w", err)
}

// Get consumers associated with the ConsumerGroup.
Expand All @@ -185,7 +221,7 @@ func (r Reconciler) FinalizeKind(ctx context.Context, cg *kafkainternals.Consume
return nil
}

func (r Reconciler) deleteConsumerGroupMetadata(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {
func (r *Reconciler) deleteConsumerGroupMetadata(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {
saramaSecurityOption, err := r.newAuthConfigOption(ctx, cg)
if err != nil {
return fmt.Errorf("failed to create config options for Kafka cluster auth: %w", err)
Expand Down Expand Up @@ -213,7 +249,7 @@ func (r Reconciler) deleteConsumerGroupMetadata(ctx context.Context, cg *kafkain
return nil
}

func (r Reconciler) reconcileConsumers(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {
func (r *Reconciler) reconcileConsumers(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {

// Get consumers associated with the ConsumerGroup.
existingConsumers, err := r.ConsumerLister.Consumers(cg.GetNamespace()).List(labels.SelectorFromSet(cg.Spec.Selector))
Expand Down Expand Up @@ -242,7 +278,7 @@ func (r Reconciler) reconcileConsumers(ctx context.Context, cg *kafkainternals.C
return nil
}

func (r Reconciler) reconcileConsumersInPlacement(ctx context.Context, cg *kafkainternals.ConsumerGroup, pc ConsumersPerPlacement) error {
func (r *Reconciler) reconcileConsumersInPlacement(ctx context.Context, cg *kafkainternals.ConsumerGroup, pc ConsumersPerPlacement) error {

placement := *pc.Placement
consumers := pc.Consumers
Expand Down Expand Up @@ -297,7 +333,7 @@ func (r Reconciler) reconcileConsumersInPlacement(ctx context.Context, cg *kafka
return nil
}

func (r Reconciler) createConsumer(ctx context.Context, cg *kafkainternals.ConsumerGroup, placement eventingduckv1alpha1.Placement) error {
func (r *Reconciler) createConsumer(ctx context.Context, cg *kafkainternals.ConsumerGroup, placement eventingduckv1alpha1.Placement) error {
c := cg.ConsumerFromTemplate()

c.Name = r.NameGenerator.GenerateName(cg.GetName() + "-")
Expand All @@ -310,7 +346,7 @@ func (r Reconciler) createConsumer(ctx context.Context, cg *kafkainternals.Consu
return nil
}

func (r Reconciler) finalizeConsumer(ctx context.Context, consumer *kafkainternals.Consumer) error {
func (r *Reconciler) finalizeConsumer(ctx context.Context, consumer *kafkainternals.Consumer) error {
dOpts := metav1.DeleteOptions{
Preconditions: &metav1.Preconditions{UID: &consumer.UID},
}
Expand All @@ -321,7 +357,10 @@ func (r Reconciler) finalizeConsumer(ctx context.Context, consumer *kafkainterna
return nil
}

func (r Reconciler) schedule(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {
func (r *Reconciler) schedule(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {
startTime := time.Now()
defer recordScheduleLatency(ctx, cg, startTime)

statefulSetScheduler := r.SchedulerFunc(cg.GetUserFacingResourceRef().Kind)

// Ensure Contract configmaps are created before scheduling to avoid having pending pods due to missing
Expand All @@ -348,7 +387,7 @@ type ConsumersPerPlacement struct {
Consumers []*kafkainternals.Consumer
}

func (r Reconciler) joinConsumersByPlacement(placements []eventingduckv1alpha1.Placement, consumers []*kafkainternals.Consumer) []ConsumersPerPlacement {
func (r *Reconciler) joinConsumersByPlacement(placements []eventingduckv1alpha1.Placement, consumers []*kafkainternals.Consumer) []ConsumersPerPlacement {
placementConsumers := make([]ConsumersPerPlacement, 0, int(math.Max(float64(len(placements)), float64(len(consumers)))))

// Group consumers by Pod bind.
Expand Down Expand Up @@ -403,7 +442,7 @@ func (r Reconciler) joinConsumersByPlacement(placements []eventingduckv1alpha1.P
return placementConsumers
}

func (r Reconciler) propagateStatus(cg *kafkainternals.ConsumerGroup) (*apis.Condition, error) {
func (r *Reconciler) propagateStatus(cg *kafkainternals.ConsumerGroup) (*apis.Condition, error) {
consumers, err := r.ConsumerLister.Consumers(cg.GetNamespace()).List(labels.SelectorFromSet(cg.Spec.Selector))
if err != nil {
return nil, fmt.Errorf("failed to list consumers for selector %+v: %w", cg.Spec.Selector, err)
Expand Down Expand Up @@ -435,7 +474,10 @@ func (r Reconciler) propagateStatus(cg *kafkainternals.ConsumerGroup) (*apis.Con
return condition, nil
}

func (r Reconciler) reconcileInitialOffset(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {
func (r *Reconciler) reconcileInitialOffset(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {
startTime := time.Now()
defer recordInitializeOffsetsLatency(ctx, cg, startTime)

if cg.Spec.Template.Spec.Delivery == nil || cg.Spec.Template.Spec.Delivery.InitialOffset == sources.OffsetEarliest {
return nil
}
Expand Down Expand Up @@ -479,7 +521,7 @@ func (r Reconciler) reconcileInitialOffset(ctx context.Context, cg *kafkainterna
return nil
}

func (r Reconciler) reconcileKedaObjects(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {
func (r *Reconciler) reconcileKedaObjects(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {
var triggerAuthentication *kedav1alpha1.TriggerAuthentication
var secret *corev1.Secret

Expand Down Expand Up @@ -620,7 +662,7 @@ func (r *Reconciler) reconcileSecret(ctx context.Context, expectedSecret *corev1
return nil
}

func (r Reconciler) ensureContractConfigmapsExist(ctx context.Context, scheduler Scheduler) error {
func (r *Reconciler) ensureContractConfigmapsExist(ctx context.Context, scheduler Scheduler) error {
selector := labels.SelectorFromSet(map[string]string{"app": scheduler.StatefulSetName})
pods, err := r.PodLister.
Pods(r.SystemNamespace).
Expand All @@ -642,7 +684,7 @@ func (r Reconciler) ensureContractConfigmapsExist(ctx context.Context, scheduler
return nil
}

func (r Reconciler) ensureContractConfigMapExists(ctx context.Context, p *corev1.Pod, name string) error {
func (r *Reconciler) ensureContractConfigMapExists(ctx context.Context, p *corev1.Pod, name string) error {
// Check if ConfigMap exists in lister cache
_, err := r.ConfigMapLister.ConfigMaps(r.SystemNamespace).Get(name)
// ConfigMap already exists, return
Expand Down Expand Up @@ -682,3 +724,31 @@ var (
_ consumergroup.Interface = &Reconciler{}
_ consumergroup.Finalizer = &Reconciler{}
)

func recordScheduleLatency(ctx context.Context, cg *kafkainternals.ConsumerGroup, startTime time.Time) {
func() {
ctx, err := tag.New(
ctx,
tag.Insert(controller.NamespaceTagKey, cg.Namespace),
)
if err != nil {
return
}

metrics.Record(ctx, scheduleLatencyStat.M(time.Since(startTime).Milliseconds()))
}()
}

func recordInitializeOffsetsLatency(ctx context.Context, cg *kafkainternals.ConsumerGroup, startTime time.Time) {
func() {
ctx, err := tag.New(
ctx,
tag.Insert(controller.NamespaceTagKey, cg.Namespace),
)
if err != nil {
return
}

metrics.Record(ctx, initializeOffsetsLatencyStat.M(time.Since(startTime).Milliseconds()))
}()
}
Original file line number Diff line number Diff line change
Expand Up @@ -1643,7 +1643,7 @@ func TestReconcileKind(t *testing.T) {
_, exampleConfig := cm.ConfigMapsFromTestFile(t, configapis.FlagsConfigName)
store.OnConfigChanged(exampleConfig)

r := Reconciler{
r := &Reconciler{
SchedulerFunc: func(s string) Scheduler {
ss := row.OtherTestData[testSchedulerKey].(scheduler.Scheduler)
return Scheduler{
Expand Down Expand Up @@ -1786,7 +1786,7 @@ func TestReconcileKindNoAutoscaler(t *testing.T) {

ctx, _ = kedaclient.With(ctx)

r := Reconciler{
r := &Reconciler{
SchedulerFunc: func(s string) Scheduler {
ss := row.OtherTestData[testSchedulerKey].(scheduler.Scheduler)
return Scheduler{
Expand Down
9 changes: 6 additions & 3 deletions control-plane/pkg/reconciler/consumergroup/evictor.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ type evictor struct {
// newEvictor creates a new evictor.
//
// fields are additional logger fields to be attached to the evictor logger.
func newEvictor(ctx context.Context, fields ...zap.Field) evictor {
return evictor{
func newEvictor(ctx context.Context, fields ...zap.Field) *evictor {
return &evictor{
ctx: ctx,
kubeClient: kubeclient.Get(ctx),
InternalsClient: kafkainternalsclient.Get(ctx).InternalV1alpha1(),
Expand All @@ -60,7 +60,7 @@ func newEvictor(ctx context.Context, fields ...zap.Field) evictor {
}
}

func (e evictor) evict(pod *corev1.Pod, vpod scheduler.VPod, from *eventingduckv1alpha1.Placement) error {
func (e *evictor) evict(pod *corev1.Pod, vpod scheduler.VPod, from *eventingduckv1alpha1.Placement) error {
key := vpod.GetKey()

logger := e.logger.
Expand Down Expand Up @@ -124,6 +124,9 @@ func (e *evictor) disablePodScheduling(logger *zap.Logger, pod *corev1.Pod) erro
_, err := e.kubeClient.CoreV1().
Pods(pod.GetNamespace()).
Update(e.ctx, pod, metav1.UpdateOptions{})
if apierrors.IsNotFound(err) {
return nil
}
if err != nil {
return fmt.Errorf("failed to update pod %s/%s: %w", pod.GetNamespace(), pod.GetName(), err)
}
Expand Down
2 changes: 1 addition & 1 deletion control-plane/pkg/reconciler/consumergroup/evictor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func TestEvictorEvictPodNotFound(t *testing.T) {
e := newEvictor(ctx)
err := e.evict(pod, cg, placement)

require.NotNil(t, err)
require.Nil(t, err)
}
func TestEvictorEvictConsumerGroupNotFound(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ require (
knative.dev/hack v0.0.0-20230417170854-f591fea109b3
knative.dev/pkg v0.0.0-20230418073056-dfad48eaa5d0
knative.dev/reconciler-test v0.0.0-20230420091239-6c21623d2555
go.opencensus.io v0.23.0
sigs.k8s.io/controller-runtime v0.12.3
)

Expand Down Expand Up @@ -130,7 +131,6 @@ require (
github.com/wavesoftware/go-ensure v1.0.0 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/stringprep v1.0.3 // indirect
go.opencensus.io v0.23.0 // indirect
go.uber.org/automaxprocs v1.4.0 // indirect
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
Expand Down