Skip to content

Commit

Permalink
Expose init offset and schedule metrics for ConsumerGroup reconciler (#…
Browse files Browse the repository at this point in the history
…790)

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
  • Loading branch information
pierDipi committed Aug 22, 2023
1 parent fdc5140 commit a8363ad
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 23 deletions.
2 changes: 1 addition & 1 deletion control-plane/pkg/reconciler/consumergroup/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"knative.dev/eventing-kafka-broker/control-plane/pkg/security"
)

func (r Reconciler) newAuthConfigOption(ctx context.Context, cg *kafkainternals.ConsumerGroup) (kafka.ConfigOption, error) {
func (r *Reconciler) newAuthConfigOption(ctx context.Context, cg *kafkainternals.ConsumerGroup) (kafka.ConfigOption, error) {
var secret *corev1.Secret

if hasSecretSpecConfig(cg.Spec.Template.Spec.Auth) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import (
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"knative.dev/eventing-kafka-broker/control-plane/pkg/autoscaler"
)

func (r Reconciler) autoscalerDefaultsFromConfigMap(ctx context.Context, configMapName string) (*autoscaler.AutoscalerConfig, error) {
func (r *Reconciler) autoscalerDefaultsFromConfigMap(ctx context.Context, configMapName string) (*autoscaler.AutoscalerConfig, error) {
cm, err := r.KubeClient.CoreV1().ConfigMaps(r.SystemNamespace).Get(ctx, configMapName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("error while retrieving the %s config map in namespace %s: %+v", configMapName, r.SystemNamespace, err)
Expand Down
100 changes: 85 additions & 15 deletions control-plane/pkg/reconciler/consumergroup/consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@ import (
"fmt"
"math"
"sort"
"time"

"github.com/Shopify/sarama"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
Expand All @@ -36,7 +40,9 @@ import (
"k8s.io/utils/pointer"
eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
"knative.dev/pkg/apis"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
"knative.dev/pkg/metrics"
"knative.dev/pkg/reconciler"

sources "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/sources/v1beta1"
Expand All @@ -60,8 +66,38 @@ import (
var (
ErrNoSubscriberURI = errors.New("no subscriber URI resolved")
ErrNoDeadLetterSinkURI = errors.New("no dead letter sink URI resolved")

scheduleLatencyStat = stats.Int64("schedule_latency", "Latency of consumer group schedule operations", stats.UnitMilliseconds)
// scheduleDistribution defines the bucket boundaries for the histogram of schedule latency metric.
// Bucket boundaries are 10ms, 100ms, 1s, 10s, 30s and 60s.
scheduleDistribution = view.Distribution(10, 100, 1000, 10000, 30000, 60000)

initializeOffsetsLatencyStat = stats.Int64("initialize_offsets_latency", "Latency of consumer group offsets initialization operations", stats.UnitMilliseconds)
// initializeOffsetsDistribution defines the bucket boundaries for the histogram of initialize offsets latency metric.
// Bucket boundaries are 10ms, 100ms, 1s, 10s, 30s and 60s.
initializeOffsetsDistribution = view.Distribution(10, 100, 1000, 10000, 30000, 60000)
)

func init() {
views := []*view.View{
{
Description: "Latency of consumer group schedule operations",
TagKeys: []tag.Key{controller.NamespaceTagKey},
Measure: scheduleLatencyStat,
Aggregation: scheduleDistribution,
},
{
Description: "Latency of consumer group offsets initialization operations",
TagKeys: []tag.Key{controller.NamespaceTagKey},
Measure: initializeOffsetsLatencyStat,
Aggregation: initializeOffsetsDistribution,
},
}
if err := view.Register(views...); err != nil {
panic(err)
}
}

type Scheduler struct {
scheduler.Scheduler
SchedulerConfig
Expand Down Expand Up @@ -103,7 +139,7 @@ type Reconciler struct {
DeleteConsumerGroupMetadataCounter *counter.Counter
}

func (r Reconciler) ReconcileKind(ctx context.Context, cg *kafkainternals.ConsumerGroup) reconciler.Event {
func (r *Reconciler) ReconcileKind(ctx context.Context, cg *kafkainternals.ConsumerGroup) reconciler.Event {
if err := r.reconcileInitialOffset(ctx, cg); err != nil {
return cg.MarkInitializeOffsetFailed("InitializeOffset", err)
}
Expand Down Expand Up @@ -150,7 +186,7 @@ func (r Reconciler) ReconcileKind(ctx context.Context, cg *kafkainternals.Consum
return nil
}

func (r Reconciler) FinalizeKind(ctx context.Context, cg *kafkainternals.ConsumerGroup) reconciler.Event {
func (r *Reconciler) FinalizeKind(ctx context.Context, cg *kafkainternals.ConsumerGroup) reconciler.Event {

cg.Spec.Replicas = pointer.Int32(0)
err := r.schedule(ctx, cg) //de-schedule placements
Expand All @@ -159,7 +195,7 @@ func (r Reconciler) FinalizeKind(ctx context.Context, cg *kafkainternals.Consume
cg.Status.Placements = nil

// return an error to 1. update the status. 2. not clear the finalizer
return errors.New("placement list was not empty")
return fmt.Errorf("failed to unschedule consumer group: %w", err)
}

// Get consumers associated with the ConsumerGroup.
Expand All @@ -185,7 +221,7 @@ func (r Reconciler) FinalizeKind(ctx context.Context, cg *kafkainternals.Consume
return nil
}

func (r Reconciler) deleteConsumerGroupMetadata(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {
func (r *Reconciler) deleteConsumerGroupMetadata(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {
saramaSecurityOption, err := r.newAuthConfigOption(ctx, cg)
if err != nil {
return fmt.Errorf("failed to create config options for Kafka cluster auth: %w", err)
Expand Down Expand Up @@ -213,7 +249,7 @@ func (r Reconciler) deleteConsumerGroupMetadata(ctx context.Context, cg *kafkain
return nil
}

func (r Reconciler) reconcileConsumers(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {
func (r *Reconciler) reconcileConsumers(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {

// Get consumers associated with the ConsumerGroup.
existingConsumers, err := r.ConsumerLister.Consumers(cg.GetNamespace()).List(labels.SelectorFromSet(cg.Spec.Selector))
Expand Down Expand Up @@ -242,7 +278,7 @@ func (r Reconciler) reconcileConsumers(ctx context.Context, cg *kafkainternals.C
return nil
}

func (r Reconciler) reconcileConsumersInPlacement(ctx context.Context, cg *kafkainternals.ConsumerGroup, pc ConsumersPerPlacement) error {
func (r *Reconciler) reconcileConsumersInPlacement(ctx context.Context, cg *kafkainternals.ConsumerGroup, pc ConsumersPerPlacement) error {

placement := *pc.Placement
consumers := pc.Consumers
Expand Down Expand Up @@ -297,7 +333,7 @@ func (r Reconciler) reconcileConsumersInPlacement(ctx context.Context, cg *kafka
return nil
}

func (r Reconciler) createConsumer(ctx context.Context, cg *kafkainternals.ConsumerGroup, placement eventingduckv1alpha1.Placement) error {
func (r *Reconciler) createConsumer(ctx context.Context, cg *kafkainternals.ConsumerGroup, placement eventingduckv1alpha1.Placement) error {
c := cg.ConsumerFromTemplate()

c.Name = r.NameGenerator.GenerateName(cg.GetName() + "-")
Expand All @@ -310,7 +346,7 @@ func (r Reconciler) createConsumer(ctx context.Context, cg *kafkainternals.Consu
return nil
}

func (r Reconciler) finalizeConsumer(ctx context.Context, consumer *kafkainternals.Consumer) error {
func (r *Reconciler) finalizeConsumer(ctx context.Context, consumer *kafkainternals.Consumer) error {
dOpts := metav1.DeleteOptions{
Preconditions: &metav1.Preconditions{UID: &consumer.UID},
}
Expand All @@ -321,7 +357,10 @@ func (r Reconciler) finalizeConsumer(ctx context.Context, consumer *kafkainterna
return nil
}

func (r Reconciler) schedule(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {
func (r *Reconciler) schedule(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {
startTime := time.Now()
defer recordScheduleLatency(ctx, cg, startTime)

statefulSetScheduler := r.SchedulerFunc(cg.GetUserFacingResourceRef().Kind)

// Ensure Contract configmaps are created before scheduling to avoid having pending pods due to missing
Expand All @@ -348,7 +387,7 @@ type ConsumersPerPlacement struct {
Consumers []*kafkainternals.Consumer
}

func (r Reconciler) joinConsumersByPlacement(placements []eventingduckv1alpha1.Placement, consumers []*kafkainternals.Consumer) []ConsumersPerPlacement {
func (r *Reconciler) joinConsumersByPlacement(placements []eventingduckv1alpha1.Placement, consumers []*kafkainternals.Consumer) []ConsumersPerPlacement {
placementConsumers := make([]ConsumersPerPlacement, 0, int(math.Max(float64(len(placements)), float64(len(consumers)))))

// Group consumers by Pod bind.
Expand Down Expand Up @@ -403,7 +442,7 @@ func (r Reconciler) joinConsumersByPlacement(placements []eventingduckv1alpha1.P
return placementConsumers
}

func (r Reconciler) propagateStatus(cg *kafkainternals.ConsumerGroup) (*apis.Condition, error) {
func (r *Reconciler) propagateStatus(cg *kafkainternals.ConsumerGroup) (*apis.Condition, error) {
consumers, err := r.ConsumerLister.Consumers(cg.GetNamespace()).List(labels.SelectorFromSet(cg.Spec.Selector))
if err != nil {
return nil, fmt.Errorf("failed to list consumers for selector %+v: %w", cg.Spec.Selector, err)
Expand Down Expand Up @@ -435,7 +474,10 @@ func (r Reconciler) propagateStatus(cg *kafkainternals.ConsumerGroup) (*apis.Con
return condition, nil
}

func (r Reconciler) reconcileInitialOffset(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {
func (r *Reconciler) reconcileInitialOffset(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {
startTime := time.Now()
defer recordInitializeOffsetsLatency(ctx, cg, startTime)

if cg.Spec.Template.Spec.Delivery == nil || cg.Spec.Template.Spec.Delivery.InitialOffset == sources.OffsetEarliest {
return nil
}
Expand Down Expand Up @@ -479,7 +521,7 @@ func (r Reconciler) reconcileInitialOffset(ctx context.Context, cg *kafkainterna
return nil
}

func (r Reconciler) reconcileKedaObjects(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {
func (r *Reconciler) reconcileKedaObjects(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {
var triggerAuthentication *kedav1alpha1.TriggerAuthentication
var secret *corev1.Secret

Expand Down Expand Up @@ -620,7 +662,7 @@ func (r *Reconciler) reconcileSecret(ctx context.Context, expectedSecret *corev1
return nil
}

func (r Reconciler) ensureContractConfigmapsExist(ctx context.Context, scheduler Scheduler) error {
func (r *Reconciler) ensureContractConfigmapsExist(ctx context.Context, scheduler Scheduler) error {
selector := labels.SelectorFromSet(map[string]string{"app": scheduler.StatefulSetName})
pods, err := r.PodLister.
Pods(r.SystemNamespace).
Expand All @@ -642,7 +684,7 @@ func (r Reconciler) ensureContractConfigmapsExist(ctx context.Context, scheduler
return nil
}

func (r Reconciler) ensureContractConfigMapExists(ctx context.Context, p *corev1.Pod, name string) error {
func (r *Reconciler) ensureContractConfigMapExists(ctx context.Context, p *corev1.Pod, name string) error {
// Check if ConfigMap exists in lister cache
_, err := r.ConfigMapLister.ConfigMaps(r.SystemNamespace).Get(name)
// ConfigMap already exists, return
Expand Down Expand Up @@ -682,3 +724,31 @@ var (
_ consumergroup.Interface = &Reconciler{}
_ consumergroup.Finalizer = &Reconciler{}
)

func recordScheduleLatency(ctx context.Context, cg *kafkainternals.ConsumerGroup, startTime time.Time) {
func() {
ctx, err := tag.New(
ctx,
tag.Insert(controller.NamespaceTagKey, cg.Namespace),
)
if err != nil {
return
}

metrics.Record(ctx, scheduleLatencyStat.M(time.Since(startTime).Milliseconds()))
}()
}

func recordInitializeOffsetsLatency(ctx context.Context, cg *kafkainternals.ConsumerGroup, startTime time.Time) {
func() {
ctx, err := tag.New(
ctx,
tag.Insert(controller.NamespaceTagKey, cg.Namespace),
)
if err != nil {
return
}

metrics.Record(ctx, initializeOffsetsLatencyStat.M(time.Since(startTime).Milliseconds()))
}()
}
Original file line number Diff line number Diff line change
Expand Up @@ -1644,7 +1644,7 @@ func TestReconcileKind(t *testing.T) {
_, exampleConfig := cm.ConfigMapsFromTestFile(t, configapis.FlagsConfigName)
store.OnConfigChanged(exampleConfig)

r := Reconciler{
r := &Reconciler{
SchedulerFunc: func(s string) Scheduler {
ss := row.OtherTestData[testSchedulerKey].(scheduler.Scheduler)
return Scheduler{
Expand Down Expand Up @@ -1787,7 +1787,7 @@ func TestReconcileKindNoAutoscaler(t *testing.T) {

ctx, _ = kedaclient.With(ctx)

r := Reconciler{
r := &Reconciler{
SchedulerFunc: func(s string) Scheduler {
ss := row.OtherTestData[testSchedulerKey].(scheduler.Scheduler)
return Scheduler{
Expand Down
9 changes: 6 additions & 3 deletions control-plane/pkg/reconciler/consumergroup/evictor.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ type evictor struct {
// newEvictor creates a new evictor.
//
// fields are additional logger fields to be attached to the evictor logger.
func newEvictor(ctx context.Context, fields ...zap.Field) evictor {
return evictor{
func newEvictor(ctx context.Context, fields ...zap.Field) *evictor {
return &evictor{
ctx: ctx,
kubeClient: kubeclient.Get(ctx),
InternalsClient: kafkainternalsclient.Get(ctx).InternalV1alpha1(),
Expand All @@ -60,7 +60,7 @@ func newEvictor(ctx context.Context, fields ...zap.Field) evictor {
}
}

func (e evictor) evict(pod *corev1.Pod, vpod scheduler.VPod, from *eventingduckv1alpha1.Placement) error {
func (e *evictor) evict(pod *corev1.Pod, vpod scheduler.VPod, from *eventingduckv1alpha1.Placement) error {
key := vpod.GetKey()

logger := e.logger.
Expand Down Expand Up @@ -124,6 +124,9 @@ func (e *evictor) disablePodScheduling(logger *zap.Logger, pod *corev1.Pod) erro
_, err := e.kubeClient.CoreV1().
Pods(pod.GetNamespace()).
Update(e.ctx, pod, metav1.UpdateOptions{})
if apierrors.IsNotFound(err) {
return nil
}
if err != nil {
return fmt.Errorf("failed to update pod %s/%s: %w", pod.GetNamespace(), pod.GetName(), err)
}
Expand Down
2 changes: 1 addition & 1 deletion control-plane/pkg/reconciler/consumergroup/evictor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func TestEvictorEvictPodNotFound(t *testing.T) {
e := newEvictor(ctx)
err := e.evict(pod, cg, placement)

require.NotNil(t, err)
require.Nil(t, err)
}
func TestEvictorEvictConsumerGroupNotFound(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ require (
knative.dev/hack v0.0.0-20230712131415-ddae80293c43
knative.dev/pkg v0.0.0-20230718152110-aef227e72ead
knative.dev/reconciler-test v0.0.0-20230726074640-26cee79ad63d
go.opencensus.io v0.24.0
sigs.k8s.io/controller-runtime v0.12.3
)

Expand Down

0 comments on commit a8363ad

Please sign in to comment.