From 87933be0eb31e5c415c0bbeab788c2b6b3453edf Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Wed, 15 Dec 2021 08:50:09 +0100 Subject: [PATCH] Add ConsumerGroup scheduling Signed-off-by: Pierangelo Di Pilato --- .../v1alpha1/consumer_group_lifecycle.go | 20 +- .../eventing/v1alpha1/consumer_group_types.go | 16 + .../kafka/eventing/v1alpha1/consumer_types.go | 11 + .../kafka/eventing/v1alpha1/register.go | 6 + .../v1alpha1/zz_generated.deepcopy.go | 26 + .../reconciler/consumergroup/consumergroup.go | 193 ++++++-- .../consumergroup/consumergroup_test.go | 468 ++++++++++++++++++ .../pkg/reconciler/testing/factory.go | 8 +- .../pkg/reconciler/testing/listers.go | 15 +- .../pkg/reconciler/testing/objects_common.go | 21 - .../reconciler/testing/objects_consumer.go | 132 +++++ .../testing/objects_consumergroup.go | 87 ++++ .../eventing/pkg/scheduler/README.md | 256 ++++++++++ .../knative.dev/eventing/pkg/scheduler/doc.go | 18 + .../eventing/pkg/scheduler/placement.go | 52 ++ .../eventing/pkg/scheduler/scheduler.go | 109 ++++ vendor/modules.txt | 1 + 17 files changed, 1357 insertions(+), 82 deletions(-) create mode 100644 control-plane/pkg/reconciler/consumergroup/consumergroup_test.go create mode 100644 control-plane/pkg/reconciler/testing/objects_consumer.go create mode 100644 control-plane/pkg/reconciler/testing/objects_consumergroup.go create mode 100644 vendor/knative.dev/eventing/pkg/scheduler/README.md create mode 100644 vendor/knative.dev/eventing/pkg/scheduler/doc.go create mode 100644 vendor/knative.dev/eventing/pkg/scheduler/placement.go create mode 100644 vendor/knative.dev/eventing/pkg/scheduler/scheduler.go diff --git a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_lifecycle.go b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_lifecycle.go index 81afa34bb5..326e86f89d 100644 --- a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_lifecycle.go +++ b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_lifecycle.go @@ -23,12 +23,14 @@ import ( ) const ( - ConditionConsumers apis.ConditionType = "Consumers" + ConditionConsumerGroupConsumers apis.ConditionType = "Consumers" + ConditionConsumerGroupConsumersScheduled apis.ConditionType = "ConsumersScheduled" ) var ( conditionSet = apis.NewLivingConditionSet( - ConditionConsumers, + ConditionConsumerGroupConsumers, + ConditionConsumerGroupConsumersScheduled, ) ) @@ -38,10 +40,20 @@ func (c *ConsumerGroup) GetConditionSet() apis.ConditionSet { func (cg *ConsumerGroup) MarkReconcileConsumersFailed(reason string, err error) error { err = fmt.Errorf("failed to reconcile consumers: %w", err) - cg.GetConditionSet().Manage(cg.GetStatus()).MarkFalse(ConditionConsumers, reason, err.Error()) + cg.GetConditionSet().Manage(cg.GetStatus()).MarkFalse(ConditionConsumerGroupConsumers, reason, err.Error()) return err } func (cg *ConsumerGroup) MarkReconcileConsumersSucceeded() { - cg.GetConditionSet().Manage(cg.GetStatus()).MarkTrue(ConditionConsumers) + cg.GetConditionSet().Manage(cg.GetStatus()).MarkTrue(ConditionConsumerGroupConsumers) +} + +func (cg *ConsumerGroup) MarkScheduleConsumerFailed(reason string, err error) error { + err = fmt.Errorf("failed to schedule consumers: %w", err) + cg.GetConditionSet().Manage(cg.GetStatus()).MarkFalse(ConditionConsumerGroupConsumers, reason, err.Error()) + return err +} + +func (cg *ConsumerGroup) MarkScheduleSucceeded() { + cg.GetConditionSet().Manage(cg.GetStatus()).MarkTrue(ConditionConsumerGroupConsumersScheduled) } diff --git a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_types.go b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_types.go index 8c9f8b6264..da1665f9b6 100644 --- a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_types.go +++ b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_types.go @@ -19,6 +19,7 @@ package v1alpha1 import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -53,6 +54,21 @@ type ConsumerGroup struct { Status ConsumerGroupStatus `json:"status,omitempty"` } +func (cg *ConsumerGroup) GetKey() types.NamespacedName { + return types.NamespacedName{ + Namespace: cg.GetNamespace(), + Name: cg.GetName(), + } +} + +func (cg *ConsumerGroup) GetVReplicas() int32 { + return *cg.Spec.Replicas +} + +func (cg *ConsumerGroup) GetPlacements() []eventingduckv1alpha1.Placement { + return cg.Status.Placements +} + type ConsumerGroupSpec struct { // Template is the object that describes the consumer that will be created if diff --git a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_types.go b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_types.go index 93284aba9c..795ed2c235 100644 --- a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_types.go +++ b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_types.go @@ -51,6 +51,11 @@ type Consumer struct { Status ConsumerStatus `json:"status,omitempty"` } +type PodBind struct { + PodName string `json:"podName"` + PodNamespace string `json:"podNamespace"` +} + type ConsumerSpec struct { // Topics is the list of topics to subscribe to. Topics []string `json:"topics"` @@ -71,6 +76,12 @@ type ConsumerSpec struct { // Subscriber is the addressable that receives events that pass the Filters. Subscriber duckv1.Destination `json:"subscriber"` + + // VReplicas is the number of virtual replicas for a consumer. + VReplicas *int32 + + // PodBind represents a reference to the pod in which the consumer should be placed. + PodBind *PodBind `json:"podBind"` } type DeliverySpec struct { diff --git a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/register.go b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/register.go index d8b2bf76c2..0713b287df 100644 --- a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/register.go +++ b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/register.go @@ -33,6 +33,12 @@ var ConsumerGroupGroupVersionKind = schema.GroupVersionKind{ Kind: "ConsumerGroup", } +var ConsumerGroupVersionKind = schema.GroupVersionKind{ + Group: SchemeGroupVersion.Group, + Version: SchemeGroupVersion.Version, + Kind: "Consumer", +} + // Kind takes an unqualified kind and returns back a Group qualified GroupKind. func Kind(kind string) schema.GroupKind { return SchemeGroupVersion.WithKind(kind).GroupKind() diff --git a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/zz_generated.deepcopy.go b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/zz_generated.deepcopy.go index d2d8c9a78b..c469c8deb8 100644 --- a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/zz_generated.deepcopy.go +++ b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/zz_generated.deepcopy.go @@ -246,6 +246,16 @@ func (in *ConsumerSpec) DeepCopyInto(out *ConsumerSpec) { (*in).DeepCopyInto(*out) } in.Subscriber.DeepCopyInto(&out.Subscriber) + if in.VReplicas != nil { + in, out := &in.VReplicas, &out.VReplicas + *out = new(int32) + **out = **in + } + if in.PodBind != nil { + in, out := &in.PodBind, &out.PodBind + *out = new(PodBind) + **out = **in + } return } @@ -348,3 +358,19 @@ func (in *Filters) DeepCopy() *Filters { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PodBind) DeepCopyInto(out *PodBind) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PodBind. +func (in *PodBind) DeepCopy() *PodBind { + if in == nil { + return nil + } + out := new(PodBind) + in.DeepCopyInto(out) + return out +} diff --git a/control-plane/pkg/reconciler/consumergroup/consumergroup.go b/control-plane/pkg/reconciler/consumergroup/consumergroup.go index cfe92c7022..fe246dae71 100644 --- a/control-plane/pkg/reconciler/consumergroup/consumergroup.go +++ b/control-plane/pkg/reconciler/consumergroup/consumergroup.go @@ -19,14 +19,20 @@ package consumergroup import ( "context" "fmt" + "math" "sort" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apiserver/pkg/storage/names" + "k8s.io/utils/pointer" + eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" "knative.dev/pkg/reconciler" + "knative.dev/eventing/pkg/scheduler" + kafkainternals "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1" internalv1alpha1 "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/clientset/versioned/typed/eventing/v1alpha1" "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/reconciler/eventing/v1alpha1/consumergroup" @@ -34,11 +40,21 @@ import ( ) type Reconciler struct { + Scheduler scheduler.Scheduler ConsumerLister kafkainternalslisters.ConsumerLister InternalsClient internalv1alpha1.InternalV1alpha1Interface + + NameGenerator names.NameGenerator + + SystemNamespace string } func (r Reconciler) ReconcileKind(ctx context.Context, cg *kafkainternals.ConsumerGroup) reconciler.Event { + if err := r.schedule(cg); err != nil { + return err + } + cg.MarkScheduleSucceeded() + if err := r.reconcileConsumers(ctx, cg); err != nil { return err } @@ -55,71 +71,87 @@ func (r Reconciler) reconcileConsumers(ctx context.Context, cg *kafkainternals.C return cg.MarkReconcileConsumersFailed("ListConsumers", err) } - // Stable sort consumers so that we give consumers different deletion - // priorities based on their state. - // - // Consumers at the tail of the list are deleted first. - sort.SliceStable(existingConsumers, func(i, j int) bool { - return existingConsumers[i].IsLessThan(existingConsumers[j]) - }) + consumersByPlacement := r.joinConsumersByPlacement(cg.Status.Placements, existingConsumers) - nConsumers := int32(len(existingConsumers)) - - for i := int32(0); i < *cg.Spec.Replicas && i < nConsumers; i++ { - if err := r.reconcileConsumer(ctx, cg, existingConsumers[i]); err != nil { - return cg.MarkReconcileConsumersFailed("ReconcileConsumer", err) - } - } - - if nConsumers > *cg.Spec.Replicas { - toBeDeleted := existingConsumers[:nConsumers-*cg.Spec.Replicas] - for _, c := range toBeDeleted { - if err := r.finalizeConsumer(ctx, c); err != nil { - return cg.MarkReconcileConsumersFailed("FinalizeConsumer", err) + for _, pc := range consumersByPlacement { + if pc.Placement == nil { + for _, c := range pc.Consumers { + if err := r.finalizeConsumer(ctx, c); err != nil { + return cg.MarkReconcileConsumersFailed("FinalizeConsumer", err) + } } + continue } - return nil - } - if nConsumers < *cg.Spec.Replicas { - for i := int32(0); i < *cg.Spec.Replicas-nConsumers; i++ { - if err := r.reconcileConsumer(ctx, cg, nil); err != nil { - return cg.MarkReconcileConsumersFailed("ReconcileConsumer", err) - } + + if err := r.reconcileConsumersInPlacement(ctx, cg, *pc.Placement, pc.Consumers); err != nil { + return cg.MarkReconcileConsumersFailed("ReconcileConsumer", err) } - return nil } return nil } -func (r Reconciler) reconcileConsumer(ctx context.Context, cg *kafkainternals.ConsumerGroup, c *kafkainternals.Consumer) error { - if c == nil { - // Consumer doesn't exist, create it. - c = cg.ConsumerFromTemplate() +func (r Reconciler) reconcileConsumersInPlacement( + ctx context.Context, + cg *kafkainternals.ConsumerGroup, + placement eventingduckv1alpha1.Placement, + consumers []*kafkainternals.Consumer) error { - if _, err := r.InternalsClient.Consumers(cg.GetNamespace()).Create(ctx, c, metav1.CreateOptions{}); err != nil { - return fmt.Errorf("failed to create consumer %s/%s: %w", c.GetNamespace(), c.GetName(), err) + // Check if there is a consumer for the given placement. + if len(consumers) == 0 { + return r.createConsumer(ctx, cg, placement) + } + + // Stable sort consumers so that we give consumers different deletion + // priorities based on their state. + // + // Consumers at the tail of the list are deleted. + sort.SliceStable(consumers, func(i, j int) bool { return consumers[i].IsLessThan(consumers[j]) }) + + for _, c := range consumers[1:] { + if err := r.finalizeConsumer(ctx, c); err != nil { + return cg.MarkReconcileConsumersFailed("FinalizeConsumer", err) } - return nil } - if equality.Semantic.DeepDerivative(cg.Spec.Template, c.Spec) { + + // Do not modify informer copy. + c := &kafkainternals.Consumer{} + consumers[0].DeepCopyInto(c) + + expectedSpec := kafkainternals.ConsumerSpec{} + cg.Spec.Template.Spec.DeepCopyInto(&expectedSpec) + + expectedSpec.VReplicas = pointer.Int32Ptr(placement.VReplicas) + + if equality.Semantic.DeepDerivative(expectedSpec, c.Spec) { // Consumer is equal to the template. return nil } + + c.Spec.PodBind = &kafkainternals.PodBind{PodName: placement.PodName, PodNamespace: r.SystemNamespace} + c.Spec.VReplicas = pointer.Int32Ptr(*expectedSpec.VReplicas) + // Update existing Consumer. - newC := &kafkainternals.Consumer{ - TypeMeta: c.TypeMeta, - ObjectMeta: c.ObjectMeta, - Spec: cg.Spec.Template.Spec, - Status: c.Status, - } - if _, err := r.InternalsClient.Consumers(cg.GetNamespace()).Update(ctx, newC, metav1.UpdateOptions{}); err != nil { - return fmt.Errorf("failed to update consumer %s/%s: %w", newC.GetNamespace(), newC.GetName(), err) + if _, err := r.InternalsClient.Consumers(cg.GetNamespace()).Update(ctx, c, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("failed to update consumer %s/%s: %w", c.GetNamespace(), c.GetName(), err) } return nil } +func (r Reconciler) createConsumer(ctx context.Context, cg *kafkainternals.ConsumerGroup, placement eventingduckv1alpha1.Placement) error { + c := cg.ConsumerFromTemplate() + + c.Name = r.NameGenerator.GenerateName(cg.GetName() + "-") + c.Spec.VReplicas = pointer.Int32Ptr(placement.VReplicas) + c.Spec.PodBind = &kafkainternals.PodBind{PodName: placement.PodName, PodNamespace: r.SystemNamespace} + + if _, err := r.InternalsClient.Consumers(cg.GetNamespace()).Create(ctx, c, metav1.CreateOptions{}); err != nil { + return fmt.Errorf("failed to create consumer %s/%s: %w", c.GetNamespace(), c.GetName(), err) + } + return nil +} + func (r Reconciler) finalizeConsumer(ctx context.Context, consumer *kafkainternals.Consumer) error { dOpts := metav1.DeleteOptions{ Preconditions: &metav1.Preconditions{UID: &consumer.UID}, @@ -131,6 +163,79 @@ func (r Reconciler) finalizeConsumer(ctx context.Context, consumer *kafkainterna return nil } +func (r Reconciler) schedule(cg *kafkainternals.ConsumerGroup) error { + placements, err := r.Scheduler.Schedule(cg) + if err != nil { + return cg.MarkScheduleConsumerFailed("Schedule", err) + } + // Sort placements by pod name. + sort.SliceStable(placements, func(i, j int) bool { return placements[i].PodName < placements[j].PodName }) + + cg.Status.Placements = placements + + return nil +} + +type ConsumersPerPlacement struct { + Placement *eventingduckv1alpha1.Placement + Consumers []*kafkainternals.Consumer +} + +func (r Reconciler) joinConsumersByPlacement(placements []eventingduckv1alpha1.Placement, consumers []*kafkainternals.Consumer) []ConsumersPerPlacement { + placementConsumers := make([]ConsumersPerPlacement, 0, int(math.Max(float64(len(placements)), float64(len(consumers))))) + + // Group consumers by Pod bind. + consumersByPod := make(map[kafkainternals.PodBind][]*kafkainternals.Consumer, len(consumers)) + for i := range consumers { + consumersByPod[*consumers[i].Spec.PodBind] = append(consumersByPod[*consumers[i].Spec.PodBind], consumers[i]) + } + + // Group placements by Pod bind. + placementsByPod := make(map[kafkainternals.PodBind]eventingduckv1alpha1.Placement, len(placements)) + for i := range placements { + pb := kafkainternals.PodBind{ + PodName: placements[i].PodName, + PodNamespace: r.SystemNamespace, + } + + v := placementsByPod[pb] + placementsByPod[pb] = eventingduckv1alpha1.Placement{ + PodName: pb.PodName, + VReplicas: v.VReplicas + placements[i].VReplicas, + } + } + + for k := range placementsByPod { + if _, ok := consumersByPod[k]; !ok { + consumersByPod[k] = nil + } + } + + for pb := range consumersByPod { + + var p *eventingduckv1alpha1.Placement + if v, ok := placementsByPod[pb]; ok { + p = &v + } + + c := consumersByPod[pb] + + placementConsumers = append(placementConsumers, ConsumersPerPlacement{Placement: p, Consumers: c}) + } + + sort.Slice(placementConsumers, func(i, j int) bool { + if placementConsumers[i].Placement == nil { + return true + } + if placementConsumers[j].Placement == nil { + return false + } + return placementConsumers[i].Placement.PodName < placementConsumers[j].Placement.PodName + }) + + return placementConsumers +} + var ( _ consumergroup.Interface = &Reconciler{} ) diff --git a/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go b/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go new file mode 100644 index 0000000000..15f57d27ed --- /dev/null +++ b/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go @@ -0,0 +1,468 @@ +/* + * Copyright 2021 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package consumergroup + +import ( + "context" + "fmt" + "io" + "testing" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + clientgotesting "k8s.io/client-go/testing" + "knative.dev/pkg/controller" + "knative.dev/pkg/logging" + . "knative.dev/pkg/reconciler/testing" + + eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" + "knative.dev/eventing/pkg/scheduler" + + kafkainternals "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1" + fakekafkainternalsclient "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/client/fake" + "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/reconciler/eventing/v1alpha1/consumergroup" + "knative.dev/eventing-kafka-broker/control-plane/pkg/config" + . "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/testing" +) + +type SchedulerFunc func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) + +func (f SchedulerFunc) Schedule(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + return f(vpod) +} + +const ( + testSchedulerKey = "scheduler" + + systemNamespace = "knative-eventing" +) + +func TestReconcileKind(t *testing.T) { + + tt := TableTest{ + { + Name: "Consumers in multiple pods", + Objects: []runtime.Object{ + NewService(), + NewConsumerGroup( + ConsumerGroupConsumerSpec(NewConsumerSpec( + ConsumerTopics("t1", "t2"), + ConsumerConfigs( + ConsumerBootstrapServersConfig(ChannelBootstrapServers), + ConsumerGroupIdConfig("my.group.id"), + ), + )), + ConsumerGroupReplicas(2), + ), + }, + Key: ConsumerGroupTestKey, + OtherTestData: map[string]interface{}{ + testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + return []eventingduckv1alpha1.Placement{ + {PodName: "p1", VReplicas: 1}, + {PodName: "p2", VReplicas: 1}, + }, nil + }), + }, + WantCreates: []runtime.Object{ + NewConsumer(1, + ConsumerSpec(NewConsumerSpec( + ConsumerTopics("t1", "t2"), + ConsumerConfigs( + ConsumerBootstrapServersConfig(ChannelBootstrapServers), + ConsumerGroupIdConfig("my.group.id"), + ), + ConsumerVReplicas(1), + ConsumerPlacement(kafkainternals.PodBind{PodName: "p1", PodNamespace: systemNamespace}), + )), + ), + NewConsumer(2, + ConsumerSpec(NewConsumerSpec( + ConsumerTopics("t1", "t2"), + ConsumerConfigs( + ConsumerBootstrapServersConfig(ChannelBootstrapServers), + ConsumerGroupIdConfig("my.group.id"), + ), + ConsumerVReplicas(1), + ConsumerPlacement(kafkainternals.PodBind{PodName: "p2", PodNamespace: systemNamespace}), + )), + ), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{ + { + Object: func() runtime.Object { + cg := NewConsumerGroup( + ConsumerGroupConsumerSpec(NewConsumerSpec( + ConsumerTopics("t1", "t2"), + ConsumerConfigs( + ConsumerBootstrapServersConfig(ChannelBootstrapServers), + ConsumerGroupIdConfig("my.group.id"), + ), + )), + ConsumerGroupReplicas(2), + ) + cg.Status.Placements = []eventingduckv1alpha1.Placement{ + {PodName: "p1", VReplicas: 1}, + {PodName: "p2", VReplicas: 1}, + } + cg.MarkReconcileConsumersSucceeded() + cg.MarkScheduleSucceeded() + return cg + }(), + }, + }, + }, + { + Name: "Consumers in multiple pods, one exists", + Objects: []runtime.Object{ + NewConsumer(2, + ConsumerSpec(NewConsumerSpec( + ConsumerTopics("t1", "t2"), + ConsumerConfigs( + ConsumerBootstrapServersConfig(ChannelBootstrapServers), + ConsumerGroupIdConfig("my.group.id"), + ), + ConsumerVReplicas(1), + ConsumerPlacement(kafkainternals.PodBind{ + PodName: "p2", + PodNamespace: systemNamespace, + }), + )), + ), + NewConsumerGroup( + ConsumerGroupConsumerSpec(NewConsumerSpec( + ConsumerTopics("t1", "t2"), + ConsumerConfigs( + ConsumerBootstrapServersConfig(ChannelBootstrapServers), + ConsumerGroupIdConfig("my.group.id"), + ), + )), + ConsumerGroupReplicas(2), + ), + }, + Key: ConsumerGroupTestKey, + OtherTestData: map[string]interface{}{ + testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + return []eventingduckv1alpha1.Placement{ + {PodName: "p1", VReplicas: 1}, + {PodName: "p2", VReplicas: 1}, + }, nil + }), + }, + WantCreates: []runtime.Object{ + NewConsumer(1, + ConsumerSpec(NewConsumerSpec( + ConsumerTopics("t1", "t2"), + ConsumerConfigs( + ConsumerBootstrapServersConfig(ChannelBootstrapServers), + ConsumerGroupIdConfig("my.group.id"), + ), + ConsumerVReplicas(1), + ConsumerPlacement(kafkainternals.PodBind{PodName: "p1", PodNamespace: systemNamespace}), + )), + ), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{ + { + Object: func() runtime.Object { + cg := NewConsumerGroup( + ConsumerGroupConsumerSpec(NewConsumerSpec( + ConsumerTopics("t1", "t2"), + ConsumerConfigs( + ConsumerBootstrapServersConfig(ChannelBootstrapServers), + ConsumerGroupIdConfig("my.group.id"), + ), + )), + ConsumerGroupReplicas(2), + ) + cg.Status.Placements = []eventingduckv1alpha1.Placement{ + {PodName: "p1", VReplicas: 1}, + {PodName: "p2", VReplicas: 1}, + } + cg.MarkReconcileConsumersSucceeded() + cg.MarkScheduleSucceeded() + return cg + }(), + }, + }, + }, + { + Name: "Consumers in multiple pods, one exists, increase replicas", + Objects: []runtime.Object{ + NewConsumer(2, + ConsumerSpec(NewConsumerSpec( + ConsumerTopics("t1", "t2"), + ConsumerConfigs( + ConsumerBootstrapServersConfig(ChannelBootstrapServers), + ConsumerGroupIdConfig("my.group.id"), + ), + ConsumerVReplicas(1), + ConsumerPlacement(kafkainternals.PodBind{ + PodName: "p2", + PodNamespace: systemNamespace, + }), + )), + ), + NewConsumerGroup( + ConsumerGroupConsumerSpec(NewConsumerSpec( + ConsumerTopics("t1", "t2"), + ConsumerConfigs( + ConsumerBootstrapServersConfig(ChannelBootstrapServers), + ConsumerGroupIdConfig("my.group.id"), + ), + )), + ConsumerGroupReplicas(3), + ), + }, + Key: ConsumerGroupTestKey, + OtherTestData: map[string]interface{}{ + testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + return []eventingduckv1alpha1.Placement{ + {PodName: "p1", VReplicas: 1}, + {PodName: "p2", VReplicas: 2}, + }, nil + }), + }, + WantCreates: []runtime.Object{ + NewConsumer(1, + ConsumerSpec(NewConsumerSpec( + ConsumerTopics("t1", "t2"), + ConsumerConfigs( + ConsumerBootstrapServersConfig(ChannelBootstrapServers), + ConsumerGroupIdConfig("my.group.id"), + ), + ConsumerVReplicas(1), + ConsumerPlacement(kafkainternals.PodBind{PodName: "p1", PodNamespace: systemNamespace}), + )), + ), + }, + WantUpdates: []clientgotesting.UpdateActionImpl{ + clientgotesting.NewUpdateAction( + schema.GroupVersionResource{ + Group: kafkainternals.SchemeGroupVersion.Group, + Version: kafkainternals.SchemeGroupVersion.Version, + Resource: "consumers", + }, + ConsumerNamespace, + NewConsumer(2, + ConsumerSpec(NewConsumerSpec( + ConsumerTopics("t1", "t2"), + ConsumerConfigs( + ConsumerBootstrapServersConfig(ChannelBootstrapServers), + ConsumerGroupIdConfig("my.group.id"), + ), + ConsumerVReplicas(2), + ConsumerPlacement(kafkainternals.PodBind{PodName: "p2", PodNamespace: systemNamespace}), + )), + ), + ), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{ + { + Object: func() runtime.Object { + cg := NewConsumerGroup( + ConsumerGroupConsumerSpec(NewConsumerSpec( + ConsumerTopics("t1", "t2"), + ConsumerConfigs( + ConsumerBootstrapServersConfig(ChannelBootstrapServers), + ConsumerGroupIdConfig("my.group.id"), + ), + )), + ConsumerGroupReplicas(3), + ) + cg.Status.Placements = []eventingduckv1alpha1.Placement{ + {PodName: "p1", VReplicas: 1}, + {PodName: "p2", VReplicas: 2}, + } + cg.MarkReconcileConsumersSucceeded() + cg.MarkScheduleSucceeded() + return cg + }(), + }, + }, + }, + { + Name: "Consumers in multiple pods, one exists, different placement", + Objects: []runtime.Object{ + NewService(), + NewConsumer(1, + ConsumerSpec(NewConsumerSpec( + ConsumerTopics("t1", "t2"), + ConsumerConfigs( + ConsumerBootstrapServersConfig(ChannelBootstrapServers), + ConsumerGroupIdConfig("my.group.id"), + ), + ConsumerVReplicas(1), + ConsumerPlacement(kafkainternals.PodBind{PodName: "p3", PodNamespace: systemNamespace}), + )), + ), + NewConsumerGroup( + ConsumerGroupConsumerSpec(NewConsumerSpec( + ConsumerTopics("t1", "t2"), + ConsumerConfigs( + ConsumerBootstrapServersConfig(ChannelBootstrapServers), + ConsumerGroupIdConfig("my.group.id"), + ), + )), + ConsumerGroupReplicas(2), + ), + }, + Key: ConsumerGroupTestKey, + OtherTestData: map[string]interface{}{ + testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + return []eventingduckv1alpha1.Placement{ + {PodName: "p1", VReplicas: 1}, + {PodName: "p2", VReplicas: 1}, + }, nil + }), + }, + WantDeletes: []clientgotesting.DeleteActionImpl{ + { + ActionImpl: clientgotesting.ActionImpl{ + Namespace: ConsumerNamespace, + Resource: schema.GroupVersionResource{ + Group: kafkainternals.SchemeGroupVersion.Group, + Version: kafkainternals.SchemeGroupVersion.Version, + Resource: "consumers", + }, + }, + Name: NewConsumer(1).Name, + }, + }, + WantCreates: []runtime.Object{ + NewConsumer(1, + ConsumerSpec(NewConsumerSpec( + ConsumerTopics("t1", "t2"), + ConsumerConfigs( + ConsumerBootstrapServersConfig(ChannelBootstrapServers), + ConsumerGroupIdConfig("my.group.id"), + ), + ConsumerVReplicas(1), + ConsumerPlacement(kafkainternals.PodBind{PodName: "p1", PodNamespace: systemNamespace}), + )), + ), + NewConsumer(2, + ConsumerSpec(NewConsumerSpec( + ConsumerTopics("t1", "t2"), + ConsumerConfigs( + ConsumerBootstrapServersConfig(ChannelBootstrapServers), + ConsumerGroupIdConfig("my.group.id"), + ), + ConsumerVReplicas(1), + ConsumerPlacement(kafkainternals.PodBind{PodName: "p2", PodNamespace: systemNamespace}), + )), + ), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{ + { + Object: func() runtime.Object { + cg := NewConsumerGroup( + ConsumerGroupConsumerSpec(NewConsumerSpec( + ConsumerTopics("t1", "t2"), + ConsumerConfigs( + ConsumerBootstrapServersConfig(ChannelBootstrapServers), + ConsumerGroupIdConfig("my.group.id"), + ), + )), + ConsumerGroupReplicas(2), + ) + cg.Status.Placements = []eventingduckv1alpha1.Placement{ + {PodName: "p1", VReplicas: 1}, + {PodName: "p2", VReplicas: 1}, + } + cg.MarkReconcileConsumersSucceeded() + cg.MarkScheduleSucceeded() + return cg + }(), + }, + }, + }, + { + Name: "Scheduler failed", + Objects: []runtime.Object{ + NewConsumerGroup( + ConsumerGroupConsumerSpec(NewConsumerSpec( + ConsumerTopics("t1", "t2"), + ConsumerConfigs( + ConsumerBootstrapServersConfig(ChannelBootstrapServers), + ConsumerGroupIdConfig("my.group.id"), + ), + )), + ConsumerGroupReplicas(2), + ), + }, + Key: ConsumerGroupTestKey, + OtherTestData: map[string]interface{}{ + testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + return nil, io.EOF + }), + }, + WantErr: true, + WantEvents: []string{ + "Warning InternalError failed to schedule consumers: EOF", + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{ + { + Object: func() runtime.Object { + cg := NewConsumerGroup( + ConsumerGroupConsumerSpec(NewConsumerSpec( + ConsumerTopics("t1", "t2"), + ConsumerConfigs( + ConsumerBootstrapServersConfig(ChannelBootstrapServers), + ConsumerGroupIdConfig("my.group.id"), + ), + )), + ConsumerGroupReplicas(2), + ) + cg.GetConditionSet().Manage(cg.GetStatus()).InitializeConditions() + _ = cg.MarkScheduleConsumerFailed("Schedule", io.EOF) + return cg + }(), + }, + }, + }, + } + + tt.Test(t, NewFactory(nil, func(ctx context.Context, listers *Listers, env *config.Env, row *TableRow) controller.Reconciler { + + r := Reconciler{ + Scheduler: row.OtherTestData[testSchedulerKey].(scheduler.Scheduler), + ConsumerLister: listers.GetConsumerLister(), + InternalsClient: fakekafkainternalsclient.Get(ctx).InternalV1alpha1(), + NameGenerator: &CounterGenerator{}, + SystemNamespace: systemNamespace, + } + + return consumergroup.NewReconciler( + ctx, + logging.FromContext(ctx), + fakekafkainternalsclient.Get(ctx), + listers.GetConsumerGroupLister(), + controller.GetEventRecorder(ctx), + r, + ) + })) +} + +type CounterGenerator struct { + counter int +} + +func (c *CounterGenerator) GenerateName(base string) string { + c.counter++ + return fmt.Sprintf("%s%d", base, c.counter) +} diff --git a/control-plane/pkg/reconciler/testing/factory.go b/control-plane/pkg/reconciler/testing/factory.go index 2e3b3508c1..4e70616384 100644 --- a/control-plane/pkg/reconciler/testing/factory.go +++ b/control-plane/pkg/reconciler/testing/factory.go @@ -40,7 +40,7 @@ import ( fakeeventingkafkabrokerclient "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/client/fake" "knative.dev/eventing-kafka-broker/control-plane/pkg/config" - fakeconsumergroupclient "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/client/fake" + fakekafkainternals "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/client/fake" ) const ( @@ -62,7 +62,7 @@ func NewFactory(env *config.Env, ctor Ctor) Factory { ctx, sourcesKafkaClient := fakeeventingkafkakafkaclient.With(ctx, listers.GetEventingKafkaObjects()...) ctx, eventingKafkaClient := fakeeventingkafkabrokerclient.With(ctx, listers.GetEventingKafkaBrokerObjects()...) ctx, kubeClient := fakekubeclient.With(ctx, listers.GetKubeObjects()...) - ctx, consumerGroupClient := fakeconsumergroupclient.With(ctx, listers.GetConsumerGroupObjects()...) + ctx, kafkaInternalsClient := fakekafkainternals.With(ctx, listers.GetKafkaInternalsObjects()...) ctx, dynamicClient := fakedynamicclient.With(ctx, newScheme(), @@ -98,7 +98,7 @@ func NewFactory(env *config.Env, ctor Ctor) Factory { eventingClient.PrependReactor("*", "*", reactor) eventingKafkaClient.PrependReactor("*", "*", reactor) sourcesKafkaClient.PrependReactor("*", "*", reactor) - consumerGroupClient.PrependReactor("*", "*", reactor) + kafkaInternalsClient.PrependReactor("*", "*", reactor) } actionRecorderList := ActionRecorderList{ @@ -107,7 +107,7 @@ func NewFactory(env *config.Env, ctor Ctor) Factory { eventingClient, eventingKafkaClient, sourcesKafkaClient, - consumerGroupClient, + kafkaInternalsClient, } eventList := EventList{ diff --git a/control-plane/pkg/reconciler/testing/listers.go b/control-plane/pkg/reconciler/testing/listers.go index dee137ef42..779496b297 100644 --- a/control-plane/pkg/reconciler/testing/listers.go +++ b/control-plane/pkg/reconciler/testing/listers.go @@ -28,16 +28,17 @@ import ( eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1" "knative.dev/pkg/reconciler/testing" - eventingkafkabrokerconsumer "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1" eventingkafkachannels "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1" eventingkafkasources "knative.dev/eventing-kafka/pkg/apis/sources/v1beta1" fakeeventingkafkaclientset "knative.dev/eventing-kafka/pkg/client/clientset/versioned/fake" eventingkafkachannelslisters "knative.dev/eventing-kafka/pkg/client/listers/messaging/v1beta1" eventingkafkasourceslisters "knative.dev/eventing-kafka/pkg/client/listers/sources/v1beta1" + eventingkafkabrokerconsumer "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1" + eventingkafkabroker "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing/v1alpha1" fakeeventingkafkabrokerclientset "knative.dev/eventing-kafka-broker/control-plane/pkg/client/clientset/versioned/fake" - fakeeventingkafkaconsumerclientset "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/clientset/versioned/fake" + fakekafkainternalsclientset "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/clientset/versioned/fake" consumerlisters "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/listers/eventing/v1alpha1" eventingkafkabrokerlisters "knative.dev/eventing-kafka-broker/control-plane/pkg/client/listers/eventing/v1alpha1" ) @@ -48,7 +49,7 @@ var clientSetSchemes = []func(*runtime.Scheme) error{ fakeapiextensionsclientset.AddToScheme, fakeeventingkafkabrokerclientset.AddToScheme, fakeeventingkafkaclientset.AddToScheme, - fakeeventingkafkaconsumerclientset.AddToScheme, + fakekafkainternalsclientset.AddToScheme, } type Listers struct { @@ -99,12 +100,8 @@ func (l *Listers) GetEventingKafkaObjects() []runtime.Object { return l.sorter.ObjectsForSchemeFunc(fakeeventingkafkaclientset.AddToScheme) } -func (l *Listers) GetConsumerGroupObjects() []runtime.Object { - return l.sorter.ObjectsForSchemeFunc(fakeeventingkafkaconsumerclientset.AddToScheme) -} - -func (l *Listers) GetConsumerObjects() []runtime.Object { - return l.sorter.ObjectsForSchemeFunc(fakeeventingkafkaconsumerclientset.AddToScheme) +func (l *Listers) GetKafkaInternalsObjects() []runtime.Object { + return l.sorter.ObjectsForSchemeFunc(fakekafkainternalsclientset.AddToScheme) } func (l *Listers) GetBrokerLister() eventinglisters.BrokerLister { diff --git a/control-plane/pkg/reconciler/testing/objects_common.go b/control-plane/pkg/reconciler/testing/objects_common.go index 9ed7e3793a..54960b4c12 100644 --- a/control-plane/pkg/reconciler/testing/objects_common.go +++ b/control-plane/pkg/reconciler/testing/objects_common.go @@ -28,7 +28,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" clientgotesting "k8s.io/client-go/testing" - "k8s.io/utils/pointer" reconcilertesting "knative.dev/eventing/pkg/reconciler/testing/v1" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -38,8 +37,6 @@ import ( "knative.dev/eventing-kafka-broker/control-plane/pkg/security" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" - - internalscg "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1" ) const ( @@ -300,21 +297,3 @@ func allocateStatusAnnotations(obj duckv1.KRShaped) { obj.GetStatus().Annotations = make(map[string]string, 1) } } - -func NewConsumerGroup() *internalscg.ConsumerGroup { - return &internalscg.ConsumerGroup{ - TypeMeta: metav1.TypeMeta{ - APIVersion: internalscg.ConsumerGroupGroupVersionKind.String(), - }, - ObjectMeta: metav1.ObjectMeta{ - Name: TriggerUUID, - Namespace: ServiceNamespace, - }, - Spec: internalscg.ConsumerGroupSpec{ - Template: internalscg.ConsumerTemplateSpec{ - Spec: internalscg.ConsumerSpec{}, - }, - Replicas: pointer.Int32Ptr(1), - }, - } -} diff --git a/control-plane/pkg/reconciler/testing/objects_consumer.go b/control-plane/pkg/reconciler/testing/objects_consumer.go new file mode 100644 index 0000000000..e6f9d1e9a6 --- /dev/null +++ b/control-plane/pkg/reconciler/testing/objects_consumer.go @@ -0,0 +1,132 @@ +/* + * Copyright 2021 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package testing + +import ( + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/pointer" + duckv1 "knative.dev/pkg/apis/duck/v1" + + kafkainternals "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1" +) + +const ( + ConsumerNamePrefix = "test-cg" + ConsumerNamespace = "test-cg-ns" +) + +type ConsumerOption func(cg *kafkainternals.Consumer) + +type ConsumerSpecOption func(c *kafkainternals.ConsumerSpec) + +func NewConsumer(ordinal int, opts ...ConsumerOption) *kafkainternals.Consumer { + + c := &kafkainternals.Consumer{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%d", ConsumerNamePrefix, ordinal), + Namespace: ConsumerNamespace, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: kafkainternals.SchemeGroupVersion.String(), + Kind: kafkainternals.ConsumerGroupGroupVersionKind.Kind, + Name: ConsumerGroupName, + Controller: pointer.BoolPtr(true), + BlockOwnerDeletion: pointer.BoolPtr(true), + }, + }, + Labels: ConsumerLabels, + }, + Spec: kafkainternals.ConsumerSpec{ + Subscriber: duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "Service", + Namespace: ServiceNamespace, + Name: ServiceName, + APIVersion: "v1", + }, + }, + }, + } + + for _, opt := range opts { + opt(c) + } + + return c +} + +func NewConsumerSpec(opts ...ConsumerSpecOption) kafkainternals.ConsumerSpec { + spec := &kafkainternals.ConsumerSpec{} + + for _, opt := range opts { + opt(spec) + } + + return *spec +} + +func ConsumerSpec(spec kafkainternals.ConsumerSpec) ConsumerOption { + return func(cg *kafkainternals.Consumer) { + cg.Spec = spec + } +} + +func ConsumerTopics(topics ...string) ConsumerSpecOption { + return func(c *kafkainternals.ConsumerSpec) { + c.Topics = topics + } +} + +func ConsumerPlacement(pb kafkainternals.PodBind) ConsumerSpecOption { + return func(c *kafkainternals.ConsumerSpec) { + c.PodBind = &pb + } +} + +type ConsumerConfigsOption func(configs *kafkainternals.ConsumerConfigs) + +func ConsumerConfigs(opts ...ConsumerConfigsOption) ConsumerSpecOption { + return func(c *kafkainternals.ConsumerSpec) { + configs := &kafkainternals.ConsumerConfigs{Configs: map[string]string{}} + + for _, opt := range opts { + opt(configs) + } + + c.Configs = *configs + } +} + +func ConsumerBootstrapServersConfig(s string) ConsumerConfigsOption { + return func(configs *kafkainternals.ConsumerConfigs) { + configs.Configs["bootstrap.servers"] = s + } +} + +func ConsumerGroupIdConfig(s string) ConsumerConfigsOption { + return func(configs *kafkainternals.ConsumerConfigs) { + configs.Configs["group.id"] = s + } +} + +func ConsumerVReplicas(vreplicas int32) ConsumerSpecOption { + return func(c *kafkainternals.ConsumerSpec) { + c.VReplicas = &vreplicas + } +} diff --git a/control-plane/pkg/reconciler/testing/objects_consumergroup.go b/control-plane/pkg/reconciler/testing/objects_consumergroup.go new file mode 100644 index 0000000000..a62463d2aa --- /dev/null +++ b/control-plane/pkg/reconciler/testing/objects_consumergroup.go @@ -0,0 +1,87 @@ +/* + * Copyright 2021 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package testing + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/pointer" + duckv1 "knative.dev/pkg/apis/duck/v1" + + kafkainternals "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1" +) + +const ( + ConsumerGroupName = "test-cg" + ConsumerGroupNamespace = "test-cg-ns" + + ConsumerGroupTestKey = ConsumerGroupNamespace + "/" + ConsumerGroupName +) + +var ( + ConsumerLabels = map[string]string{"c": "C"} +) + +type ConsumerGroupOption func(cg *kafkainternals.ConsumerGroup) + +func NewConsumerGroup(opts ...ConsumerGroupOption) *kafkainternals.ConsumerGroup { + + cg := &kafkainternals.ConsumerGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: ConsumerGroupName, + Namespace: ConsumerGroupNamespace, + }, + Spec: kafkainternals.ConsumerGroupSpec{ + Template: kafkainternals.ConsumerTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{Labels: ConsumerLabels}, + Spec: kafkainternals.ConsumerSpec{ + Subscriber: duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "Service", + Namespace: ServiceNamespace, + Name: ServiceName, + APIVersion: "v1", + }, + }, + }, + }, + }, + } + + for _, opt := range opts { + opt(cg) + } + + return cg +} + +func ConsumerGroupReplicas(replicas int32) ConsumerGroupOption { + return func(cg *kafkainternals.ConsumerGroup) { + cg.Spec.Replicas = pointer.Int32Ptr(replicas) + } +} + +func ConsumerGroupSelector(selector map[string]string) ConsumerGroupOption { + return func(cg *kafkainternals.ConsumerGroup) { + cg.Spec.Selector = selector + } +} + +func ConsumerGroupConsumerSpec(spec kafkainternals.ConsumerSpec) ConsumerGroupOption { + return func(cg *kafkainternals.ConsumerGroup) { + cg.Spec.Template.Spec = spec + } +} diff --git a/vendor/knative.dev/eventing/pkg/scheduler/README.md b/vendor/knative.dev/eventing/pkg/scheduler/README.md new file mode 100644 index 0000000000..1b805f6494 --- /dev/null +++ b/vendor/knative.dev/eventing/pkg/scheduler/README.md @@ -0,0 +1,256 @@ +# Knative Eventing Multi-Tenant Scheduler with High-Availability + +An eventing source instance (for example, [KafkaSource](https://github.com/knative-sandbox/eventing-kafka/tree/main/pkg/source), [RedisStreamSource](https://github.com/knative-sandbox/eventing-redis/tree/main/source), etc) gets materialized as a virtual pod (**vpod**) and can be scaled up and down by increasing or decreasing the number of virtual pod replicas (**vreplicas**). A vreplica corresponds to a resource in the source that can replicated for maximum distributed processing (for example, number of consumers running in a consumer group). + +The vpod multi-tenant [scheduler](#1scheduler) is responsible for placing vreplicas onto real Kubernetes pods. Each pod is limited in capacity and can hold a maximum number of vreplicas. The scheduler takes a list of (source, # of vreplicas) tuples and computes a set of Placements. Placement info are added to the source status. + +Scheduling strategies rely on pods having a sticky identity (StatefulSet replicas) and the current [State](#4state-collector) of the cluster. + +When a vreplica cannot be scheduled it is added to the list of pending vreplicas. The [Autoscaler](#3autoscaler) monitors this list and allocates more pods for placing it. + +To support high-availability the scheduler distributes vreplicas uniformly across failure domains such as zones/nodes/pods containing replicas from a StatefulSet. + +## General Scheduler Requirements + +1. High Availability: Vreplicas for a source must be evenly spread across domains to reduce impact of failure when a zone/node/pod goes unavailable for scheduling.* + +2. Equal event consumption: Vreplicas for a source must be evenly spread across adapter pods to provide an equal rate of processing events. For example, Kafka broker spreads partitions equally across pods so if vreplicas aren’t equally spread, pods with fewer vreplicas will consume events slower than others. + +3. Pod spread not more than available resources: Vreplicas for a source must be evenly spread across pods such that the total number of pods with placements does not exceed the number of resources available from the source (for example, number of Kafka partitions for the topic it's consuming from). Else, the additional pods have no resources (Kafka partitions) to consume events from and could waste Kubernetes resources. + +* Note: StatefulSet anti-affinity rules guarantee new pods to be scheduled on a new zone and node. + +## Components: + +### 1.Scheduler +The scheduling framework has a pluggable architecture where plugins are registered and compiled into the scheduler. It allows many scheduling features to be implemented as plugins, while keeping the scheduling "core" simple and maintainable. + +Scheduling happens in a series of stages: + + 1. **Filter**: These plugins (predicates) are used to filter out pods where a vreplica cannot be placed. If any filter plugin marks the pod as infeasible, the remaining plugins will not be called for that pod. A vreplica is marked as unschedulable if no pods pass all the filters. + + 2. **Score**: These plugins (priorities) provide a score to each pod that has passed the filtering phase. Scheduler will then select the pod with the highest weighted scores sum. + +Scheduler must be Knative generic with its core functionality implemented as core plugins. Anything specific to an eventing source will be implemented as separate plugins (for example, number of Kafka partitions) + +It allocates one vreplica at a time by filtering and scoring schedulable pods. + +A vreplica can be unschedulable for several reasons such as pods not having enough capacity, constraints cannot be fulfilled, etc. + +### 2.Descheduler + +Similar to scheduler but has its own set of priorities (no predicates today). + +### 3.Autoscaler + +The autoscaler scales up pod replicas of the statefulset adapter when there are vreplicas pending to be scheduled, and scales down if there are unused pods. It takes into consideration a scaling factor that is based on number of domains for HA. + +### 4.State Collector + +Current state information about the cluster is collected after placing each vreplica and during intervals. Cluster information include computing the free capacity for each pod, list of schedulable pods (unschedulable pods are pods that are marked for eviction for compacting, and pods that are on unschedulable nodes (cordoned or unreachable nodes), number of pods (stateful set replicas), number of available nodes, number of zones, a node to zone map, total number of vreplicas in each pod for each vpod (spread), total number of vreplicas in each node for each vpod (spread), total number of vreplicas in each zone for each vpod (spread), etc. + +### 5.Reservation + +Scheduler also tracks vreplicas that have been placed (ie. scheduled) but haven't been committed yet to its vpod status. These reserved veplicas are taken into consideration when computing cluster's state for scheduling the next vreplica. + +### 6.Evictor + +Autoscaler periodically attempts to compact veplicas into a smaller number of free replicas with lower ordinals. Vreplicas placed on higher ordinal pods are evicted and rescheduled to pods with a lower ordinal using the same scheduling strategies. + +## Scheduler Profile + +### Predicates: + +1. **PodFitsResources**: check if a pod has enough capacity [CORE] + +2. **NoMaxResourceCount**: check if total number of placement pods exceed available resources [KAFKA]. It has an argument `NumPartitions` to configure the plugin with the total number of Kafka partitions. + +3. **EvenPodSpread**: check if resources are evenly spread across pods [CORE]. It has an argument `MaxSkew` to configure the plugin with an allowed skew factor. + +### Priorities: + +1. **AvailabilityNodePriority**: make sure resources are evenly spread across nodes [CORE]. It has an argument `MaxSkew` to configure the plugin with an allowed skew factor. + +2. **AvailabilityZonePriority**: make sure resources are evenly spread across zones [CORE]. It has an argument `MaxSkew` to configure the plugin with an allowed skew factor. + +3. **LowestOrdinalPriority**: make sure vreplicas are placed on free smaller ordinal pods to minimize resource usage [CORE] + +**Example ConfigMap for config-scheduler:** + +``` +data: + predicates: |+ + [ + {"Name": "PodFitsResources"}, + {"Name": "NoMaxResourceCount", + "Args": "{\"NumPartitions\": 100}"}, + {"Name": "EvenPodSpread", + "Args": "{\"MaxSkew\": 2}"} + ] + priorities: |+ + [ + {"Name": "AvailabilityZonePriority", + "Weight": 10, + "Args": "{\"MaxSkew\": 2}"}, + {"Name": "LowestOrdinalPriority", + "Weight": 2} + ] +``` + +## Descheduler Profile: + +### Priorities: + +1. **RemoveWithAvailabilityNodePriority**: make sure resources are evenly spread across nodes [CORE] + +2. **RemoveWithAvailabilityZonePriority**: make sure resources are evenly spread across zones [CORE] + +3. **HighestOrdinalPriority**: make sure vreps are removed from higher ordinal pods to minimize resource usage [CORE] + +**Example ConfigMap for config-descheduler:** + +``` +data: + priorities: |+ + [ + {"Name": "RemoveWithEvenPodSpreadPriority", + "Weight": 10, + "Args": "{\"MaxSkew\": 2}"}, + {"Name": "RemoveWithAvailabilityZonePriority", + "Weight": 10, + "Args": "{\"MaxSkew\": 2}"}, + {"Name": "RemoveWithHighestOrdinalPriority", + "Weight": 2} + ] +``` + +## Normal Operation + +1. **Busy scheduler**: + +Scheduler can be very busy allocating the best placements for multiple eventing sources at a time using the scheduler predicates and priorities configured. During this time, the cluster could see statefulset replicas increasing, as the autoscaler computes how many more pods are needed to complete scheduling successfully. Also, the replicas could be decreasing during idle time, either caused by less events flowing through the system, or the evictor compacting vreplicas placements into a smaller number of pods or the deletion of event sources. The current placements are stored in the eventing source's status field for observability. + +2. **Software upgrades**: + +We can expect periodic software version upgrades or fixes to be performed on the Kubernetes cluster running the scheduler or on the Knative framework installed. Either of these scenarios could involve graceful rebooting of nodes and/or reapplying of controllers, adapters and other resources. + +All existing vreplica placements will still be valid and no rebalancing will be done by the vreplica scheduler. +(For Kafka, its broker may trigger a rebalancing of partitions due to consumer group member changes.) + +TODO: Measure latencies in events processing using a performance tool (KPerf eventing). + +3. **No more cluster resources**: + +When there are no resources available on existing nodes in the cluster to schedule more pods and the autoscaler continues to scale up replicas, the new pods are left in a Pending state till cluster size is increased. Nothing to do for the scheduler until then. + +## Disaster Recovery + +Some failure scenarios are described below: + +1. **Pod failure**: + +When a pod/replica in a StatefulSet goes down due to some reason (but its node and zone are healthy), a new replica is spun up by the StatefulSet with the same pod identity (pod can come up on a different node) almost immediately. + +All existing vreplica placements will still be valid and no rebalancing will be done by the vreplica scheduler. +(For Kafka, its broker may trigger a rebalancing of partitions due to consumer group member changes.) + +TODO: Measure latencies in events processing using a performance tool (KPerf eventing). + +2. **Node failure (graceful)**: + +When a node is rebooted for upgrades etc, running pods on the node will be evicted (drained), gracefully terminated and rescheduled on a different node. The drained node will be marked as unschedulable by K8 (`node.Spec.Unschedulable` = True) after its cordoning. + +``` +k describe node knative-worker4 +Name: knative-worker4 +CreationTimestamp: Mon, 30 Aug 2021 11:13:11 -0400 +Taints: none +Unschedulable: true +``` + +All existing vreplica placements will still be valid and no rebalancing will be done by the vreplica scheduler. +(For Kafka, its broker may trigger a rebalancing of partitions due to consumer group member changes.) + +TODO: Measure latencies in events processing using a performance tool (KPerf eventing). + +New vreplicas will not be scheduled on pods running on this cordoned node. + +3. **Node failure (abrupt)**: + +When a node goes down unexpectedly due to some physical machine failure (network isolation/ loss, CPU issue, power loss, etc), the node controller does the following few steps + +Pods running on the failed node receives a NodeNotReady Warning event + +``` +k describe pod kafkasource-mt-adapter-5 -n knative-eventing +Name: kafkasource-mt-adapter-5 +Namespace: knative-eventing +Priority: 0 +Node: knative-worker4/172.18.0.3 +Tolerations: node.kubernetes.io/not-ready:NoExecute op=Exists for 300s node.kubernetes.io/unreachable:NoExecute op=Exists for 300s + +Events: + Type Reason Age From Message + ---- ------ ---- ---- ------- + Normal Scheduled 11m default-scheduler Successfully assigned knative-eventing/kafkasource-mt-adapter-5 to knative-worker4 + Normal Pulled 11m kubelet Container image + Normal Created 11m kubelet Created container receive-adapter + Normal Started 11m kubelet Started container receive-adapter + Warning NodeNotReady 3m48s node-controller Node is not ready +``` + +Failing node is tainted with the following Key:Condition: by the node controller if the node controller has not heard from the node in the last node-monitor-grace-period (default is 40 seconds) + +``` +k describe node knative-worker4 +Name: knative-worker4 +Taints: node.kubernetes.io/unreachable:NoExecute + node.kubernetes.io/unreachable:NoSchedule +Unschedulable: false + Events: + Type Reason Age From Message + ---- ------ ---- ---- ------- + Normal NodeNotSchedulable 5m42s kubelet Node knative-worker4 status is now: NodeNotSchedulable + Normal NodeSchedulable 2m31s kubelet Node knative-worker4 status is now: NodeSchedulable +``` + +``` +k get nodes +NAME STATUS ROLES AGE VERSION +knative-control-plane Ready control-plane,master 7h23m v1.21.1 +knative-worker Ready 7h23m v1.21.1 +knative-worker2 Ready 7h23m v1.21.1 +knative-worker3 Ready 7h23m v1.21.1 +knative-worker4 NotReady 7h23m v1.21.1 +``` + +After a timeout period (`pod-eviction-timeout` == 5 mins (default)), the pods move to the Terminating state. + +Since statefulset now has a `terminationGracePeriodSeconds: 0` setting, the terminating pods are immediately restarted on another functioning Node. A new replica is spun up with the same ordinal. + +During the time period of the failing node being unreachable (~5mins), vreplicas placed on that pod aren’t available to process work from the eventing source. (Theory) Consumption rate goes down and Kafka eventually triggers rebalancing of partitions. Also, KEDA will scale up the number of consumers to resolve the processing lag. A scale up will cause the Eventing scheduler to rebalance the total vreplicas for that source on available running pods. + +4. **Zone failure**: + +All nodes running in the failing zone will be unavailable for scheduling. Nodes will either be tainted with `unreachable` or Spec’ed as `Unschedulable` +See node failure scenarios above for what happens to vreplica placements. + +## References: + +* https://kubernetes.io/docs/concepts/scheduling-eviction/scheduling-framework/ +* https://github.com/kubernetes-sigs/descheduler +* https://kubernetes.io/docs/reference/scheduling/policies/ +* https://kubernetes.io/docs/reference/config-api/kube-scheduler-policy-config.v1 +* https://github.com/virtual-kubelet/virtual-kubelet#how-it-works +* https://github.com/kubernetes/enhancements/tree/master/keps/sig-scheduling/624-scheduling-framework +* https://medium.com/tailwinds-navigator/kubernetes-tip-how-statefulsets-behave-differently-than-deployments-when-node-fails-d29e36bca7d5 +* https://kubernetes.io/docs/concepts/architecture/nodes/#node-controller + + +--- + +To learn more about Knative, please visit the +[/docs](https://github.com/knative/docs) repository. + +This repo falls under the +[Knative Code of Conduct](https://github.com/knative/community/blob/master/CODE-OF-CONDUCT.md) diff --git a/vendor/knative.dev/eventing/pkg/scheduler/doc.go b/vendor/knative.dev/eventing/pkg/scheduler/doc.go new file mode 100644 index 0000000000..57ebf7e894 --- /dev/null +++ b/vendor/knative.dev/eventing/pkg/scheduler/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// The scheduler is responsible for placing virtual pod (VPod) replicas within real pods. +package scheduler diff --git a/vendor/knative.dev/eventing/pkg/scheduler/placement.go b/vendor/knative.dev/eventing/pkg/scheduler/placement.go new file mode 100644 index 0000000000..e0aaab0da2 --- /dev/null +++ b/vendor/knative.dev/eventing/pkg/scheduler/placement.go @@ -0,0 +1,52 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "k8s.io/apimachinery/pkg/util/sets" + duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" +) + +// GetTotalVReplicas returns the total number of placed virtual replicas +func GetTotalVReplicas(placements []duckv1alpha1.Placement) int32 { + r := int32(0) + for _, p := range placements { + r += p.VReplicas + } + return r +} + +// GetPlacementForPod returns the placement corresponding to podName +func GetPlacementForPod(placements []duckv1alpha1.Placement, podName string) *duckv1alpha1.Placement { + for i := 0; i < len(placements); i++ { + if placements[i].PodName == podName { + return &placements[i] + } + } + return nil +} + +// GetPodCount returns the number of pods with the given placements +func GetPodCount(placements []duckv1alpha1.Placement) int { + set := sets.NewString() + for _, p := range placements { + if p.VReplicas > 0 { + set.Insert(p.PodName) + } + } + return set.Len() +} diff --git a/vendor/knative.dev/eventing/pkg/scheduler/scheduler.go b/vendor/knative.dev/eventing/pkg/scheduler/scheduler.go new file mode 100644 index 0000000000..1e63719862 --- /dev/null +++ b/vendor/knative.dev/eventing/pkg/scheduler/scheduler.go @@ -0,0 +1,109 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "errors" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + + duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" +) + +var ( + ErrNotEnoughReplicas = errors.New("scheduling failed (not enough pod replicas)") +) + +type SchedulerPolicyType string + +const ( + // MAXFILLUP policy type adds vreplicas to existing pods to fill them up before adding to new pods + MAXFILLUP SchedulerPolicyType = "MAXFILLUP" + + // PodAnnotationKey is an annotation used by the scheduler to be informed of pods + // being evicted and not use it for placing vreplicas + PodAnnotationKey = "eventing.knative.dev/unschedulable" +) + +const ( + ZoneLabel = "topology.kubernetes.io/zone" +) + +const ( + // MaxWeight is the maximum weight that can be assigned for a priority. + MaxWeight uint64 = 10 + // MinWeight is the minimum weight that can be assigned for a priority. + MinWeight uint64 = 0 +) + +// Policy describes a struct of a policy resource. +type SchedulerPolicy struct { + // Holds the information to configure the fit predicate functions. + Predicates []PredicatePolicy + // Holds the information to configure the priority functions. + Priorities []PriorityPolicy +} + +// PredicatePolicy describes a struct of a predicate policy. +type PredicatePolicy struct { + // Identifier of the predicate policy + Name string + // Holds the parameters to configure the given predicate + Args interface{} +} + +// PriorityPolicy describes a struct of a priority policy. +type PriorityPolicy struct { + // Identifier of the priority policy + Name string + // The numeric multiplier for the pod scores that the priority function generates + // The weight should be a positive integer + Weight uint64 + // Holds the parameters to configure the given priority function + Args interface{} +} + +// VPodLister is the function signature for returning a list of VPods +type VPodLister func() ([]VPod, error) + +// Evictor allows for vreplicas to be evicted. +// For instance, the evictor is used by the statefulset scheduler to +// move vreplicas to pod with a lower ordinal. +type Evictor func(pod *corev1.Pod, vpod VPod, from *duckv1alpha1.Placement) error + +// Scheduler is responsible for placing VPods into real Kubernetes pods +type Scheduler interface { + // Schedule computes the new set of placements for vpod. + Schedule(vpod VPod) ([]duckv1alpha1.Placement, error) +} + +// VPod represents virtual replicas placed into real Kubernetes pods +// The scheduler is responsible for placing VPods +type VPod interface { + // GetKey returns the VPod key (namespace/name). + GetKey() types.NamespacedName + + // GetVReplicas returns the number of expected virtual replicas + GetVReplicas() int32 + + // GetPlacements returns the current list of placements + // Do not mutate! + GetPlacements() []duckv1alpha1.Placement + + GetResourceVersion() string +} diff --git a/vendor/modules.txt b/vendor/modules.txt index d00a43ae5c..d65e049f53 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1180,6 +1180,7 @@ knative.dev/eventing/pkg/reconciler/sugar knative.dev/eventing/pkg/reconciler/testing knative.dev/eventing/pkg/reconciler/testing/scheme knative.dev/eventing/pkg/reconciler/testing/v1 +knative.dev/eventing/pkg/scheduler knative.dev/eventing/pkg/utils knative.dev/eventing/test/conformance/helpers knative.dev/eventing/test/conformance/helpers/tracing