From 90c61a53ea921ed9a67950e3d378c4963f5a8988 Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Wed, 20 Sep 2023 13:07:29 +0200 Subject: [PATCH] [release v1.11] Batch of backports (#834) * Make SecretSpec field of consumers Auth omitempty (#780) * Expose init offset and schedule metrics for ConsumerGroup reconciler (#790) (#791) Signed-off-by: Pierangelo Di Pilato * Fix channel finalizer logic (#3295) (#795) Signed-off-by: Calum Murray Co-authored-by: Calum Murray Co-authored-by: Pierangelo Di Pilato * [release-v1.10] SRVKE-958: Cache init offsets results (#817) * Cache init offsets results When there is high load and multiple consumer group schedule calls, we get many `dial tcp 10.130.4.8:9092: i/o timeout` errors when trying to connect to Kafka. This leads to increased "time to readiness" for consumer groups. The downside of caching is that, in the case, partitions increase while the result is cached we won't initialize the offsets of the new partitions. Signed-off-by: Pierangelo Di Pilato * Add autoscaler leader log patch Signed-off-by: Pierangelo Di Pilato --------- Signed-off-by: Pierangelo Di Pilato Co-authored-by: Pierangelo Di Pilato * Scheduler handle overcommitted pods (#820) Signed-off-by: Pierangelo Di Pilato Co-authored-by: Pierangelo Di Pilato * Set consumer and consumergroups finalizers when creating them (#823) It is possible that a delete consumer or consumergroup might be reconciled and never finalized when it is deleted before the finalizer is set. This happens because the Knative generated reconciler uses patch (as opposed to using update) for setting the finalizer and patch doesn't have any optimistic concurrency controls. Signed-off-by: Pierangelo Di Pilato Co-authored-by: Pierangelo Di Pilato * Clean up reserved from resources that have been scheduled (#830) In a recent testing run, we've noticed we have have a scheduled `ConsumerGroup` [1] (see placements) being considered having reserved replicas in a different pod [2]. That makes the scheduler think that there is no space but the autoscaler says we have enough space to hold every virtual replica. [1] ``` $ k describe consumergroups -n ks-multi-ksvc-0 c9ee3490-5b4b-4d11-87af-8cb2219d9fe3 Name: c9ee3490-5b4b-4d11-87af-8cb2219d9fe3 Namespace: ks-multi-ksvc-0 ... Status: Conditions: Last Transition Time: 2023-09-06T19:58:27Z Reason: Autoscaler is disabled Status: True Type: Autoscaler Last Transition Time: 2023-09-06T21:41:13Z Status: True Type: Consumers Last Transition Time: 2023-09-06T19:58:27Z Status: True Type: ConsumersScheduled Last Transition Time: 2023-09-06T21:41:13Z Status: True Type: Ready Observed Generation: 1 Placements: Pod Name: kafka-source-dispatcher-6 Vreplicas: 4 Pod Name: kafka-source-dispatcher-7 Vreplicas: 4 Replicas: 8 Subscriber Uri: http://receiver5-2.ks-multi-ksvc-0.svc.cluster.local Events: ``` [2] ``` "ks-multi-ksvc-0/c9ee3490-5b4b-4d11-87af-8cb2219d9fe3": { "kafka-source-dispatcher-3": 8 }, ``` Signed-off-by: Pierangelo Di Pilato Co-authored-by: Pierangelo Di Pilato * Ignore unknown fields in data plane contract (#3335) (#828) Signed-off-by: Calum Murray --------- Signed-off-by: Pierangelo Di Pilato Signed-off-by: Calum Murray Co-authored-by: Martin Gencur Co-authored-by: Matthias Wessendorf Co-authored-by: Calum Murray Co-authored-by: OpenShift Cherrypick Robot --- .../eventing/v1alpha1/consumer_group_types.go | 5 +- .../kafka/eventing/v1alpha1/consumer_types.go | 2 +- .../pkg/reconciler/base/reconciler.go | 6 +- .../pkg/reconciler/base/reconciler_test.go | 66 ++++++++++ .../pkg/reconciler/channel/channel.go | 5 +- .../pkg/reconciler/consumergroup/auth.go | 2 +- .../consumergroup/autoscaler_config.go | 3 +- .../reconciler/consumergroup/consumergroup.go | 123 ++++++++++++++++-- .../consumergroup/consumergroup_test.go | 11 +- .../reconciler/consumergroup/controller.go | 20 ++- .../pkg/reconciler/consumergroup/evictor.go | 9 +- .../reconciler/consumergroup/evictor_test.go | 2 +- control-plane/pkg/reconciler/source/source.go | 3 + go.mod | 2 +- openshift/patches/autoscaler_leader_log.patch | 17 +++ ...d_from_deleted_and_non_pending_vpods.patch | 25 ++++ .../patches/handle_overcommitted_pods.patch | 62 +++++++++ .../remove_resource_version_check.patch | 20 +++ openshift/release/generate-release.sh | 6 + .../k8s.io/code-generator/generate-groups.sh | 0 .../eventing/pkg/scheduler/state/state.go | 2 +- .../pkg/scheduler/statefulset/autoscaler.go | 4 +- .../pkg/scheduler/statefulset/scheduler.go | 57 ++++++-- .../knative.dev/pkg/hack/generate-knative.sh | 0 24 files changed, 408 insertions(+), 44 deletions(-) create mode 100644 openshift/patches/autoscaler_leader_log.patch create mode 100644 openshift/patches/cleanup_reserved_from_deleted_and_non_pending_vpods.patch create mode 100644 openshift/patches/handle_overcommitted_pods.patch create mode 100644 openshift/patches/remove_resource_version_check.patch mode change 100644 => 100755 vendor/k8s.io/code-generator/generate-groups.sh mode change 100644 => 100755 vendor/knative.dev/pkg/hack/generate-knative.sh diff --git a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_types.go b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_types.go index 57e1475050..a02816515f 100644 --- a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_types.go +++ b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_types.go @@ -156,12 +156,13 @@ func (c *ConsumerGroup) GetStatus() *duckv1.Status { func (cg *ConsumerGroup) ConsumerFromTemplate(options ...ConsumerOption) *Consumer { // TODO figure out naming strategy, is generateName enough? c := &Consumer{ - ObjectMeta: cg.Spec.Template.ObjectMeta, - Spec: cg.Spec.Template.Spec, + ObjectMeta: *cg.Spec.Template.ObjectMeta.DeepCopy(), + Spec: *cg.Spec.Template.Spec.DeepCopy(), } ownerRef := metav1.NewControllerRef(cg, ConsumerGroupGroupVersionKind) c.OwnerReferences = append(c.OwnerReferences, *ownerRef) + c.Finalizers = []string{"consumers.internal.kafka.eventing.knative.dev"} for _, opt := range options { opt(c) diff --git a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_types.go b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_types.go index c41ba92b71..a9f6f9386e 100644 --- a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_types.go +++ b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_types.go @@ -127,7 +127,7 @@ type Auth struct { NetSpec *bindings.KafkaNetSpec // Deprecated, use secret spec AuthSpec *eventingv1alpha1.Auth `json:"AuthSpec,omitempty"` - SecretSpec *SecretSpec + SecretSpec *SecretSpec `json:"SecretSpec,omitempty"` } type SecretSpec struct { diff --git a/control-plane/pkg/reconciler/base/reconciler.go b/control-plane/pkg/reconciler/base/reconciler.go index b4d94620e7..25f668bd46 100644 --- a/control-plane/pkg/reconciler/base/reconciler.go +++ b/control-plane/pkg/reconciler/base/reconciler.go @@ -70,6 +70,10 @@ const ( Json = "json" ) +var ( + jsonUnmarshalOptions = protojson.UnmarshalOptions{DiscardUnknown: true} +) + // Base reconciler for broker and trigger reconciler. // It contains common logic for both trigger and broker reconciler. type Reconciler struct { @@ -213,7 +217,7 @@ func GetDataPlaneConfigMapData(logger *zap.Logger, dataPlaneConfigMap *corev1.Co case Protobuf: err = proto.Unmarshal(dataPlaneDataRaw, ct) case Json: - err = protojson.Unmarshal(dataPlaneDataRaw, ct) + err = jsonUnmarshalOptions.Unmarshal(dataPlaneDataRaw, ct) } if err != nil { diff --git a/control-plane/pkg/reconciler/base/reconciler_test.go b/control-plane/pkg/reconciler/base/reconciler_test.go index d2ea57b701..9a2318dd60 100644 --- a/control-plane/pkg/reconciler/base/reconciler_test.go +++ b/control-plane/pkg/reconciler/base/reconciler_test.go @@ -200,6 +200,25 @@ func TestGetDataPlaneConfigMapDataCorrupted(t *testing.T) { require.Equal(t, uint64(0), got.Generation) } +func TestGetDataPlaneConfigMapDataUnknownField(t *testing.T) { + ctx, _ := reconcilertesting.SetupFakeContext(t) + + r := &base.Reconciler{ + KubeClient: kubeclient.Get(ctx), + ContractConfigMapFormat: base.Json, + } + + cm := &corev1.ConfigMap{ + BinaryData: map[string][]byte{ + base.ConfigMapDataKey: []byte(dataPlaneContractExtraData), + }, + } + + got, err := r.GetDataPlaneConfigMapData(logging.FromContext(ctx).Desugar(), cm) + require.Nil(t, err) + require.Equal(t, uint64(11), got.Generation) +} + func TestUpdateReceiverPodAnnotation(t *testing.T) { ctx, _ := reconcilertesting.SetupFakeContext(t) @@ -297,3 +316,50 @@ func addRunningPod(store cache.Store, kc kubernetes.Interface, label string) { panic(err) } } + +const dataPlaneContractExtraData = `{ + "generation": "11", + "resources": [ + { + "uid": "50a30fb7-9710-45f5-9724-9a7ebb677a29", + "topics": [ + "knative-messaging-kafka.eventing-e2e17.sut" + ], + "bootstrapServers": "my-cluster-kafka-bootstrap.kafka:9092", + "ingress": { + "host": "sut-kn-channel.eventing-e2e17.svc.cluster.local" + }, + "egressConfig": { + "retry": 12, + "backoffDelay": "1000" + }, + "egresses": [ + { + "consumerGroup": "kafka.eventing-e2e17.sut.e21ad4f4-bf2d-4fe2-879a-728bb9d5626d", + "destination": "http://wathola-receiver.eventing-e2e17.svc.cluster.local", + "discardReply": {}, + "uid": "e21ad4f4-bf2d-4fe2-879a-728bb9d5626d", + "egressConfig": { + "retry": 12, + "backoffDelay": "1000" + }, + "deliveryOrder": "ORDERED", + "reference": { + "uuid": "e21ad4f4-bf2d-4fe2-879a-728bb9d5626d", + "namespace": "eventing-e2e17", + "name": "sut" + } + } + ], + "reference": { + "uuid": "50a30fb7-9710-45f5-9724-9a7ebb677a29", + "namespace": "eventing-e2e17", + "name": "sut", + "kind": "KafkaChannel", + "groupVersion": "messaging.knative.dev/v1beta1" + }, + "extraKey": "extraValue" + } + ] +} +` diff --git a/control-plane/pkg/reconciler/channel/channel.go b/control-plane/pkg/reconciler/channel/channel.go index fd09240de8..01417e5383 100644 --- a/control-plane/pkg/reconciler/channel/channel.go +++ b/control-plane/pkg/reconciler/channel/channel.go @@ -68,6 +68,9 @@ const ( NewChannelIngressServiceName = "kafka-channel-ingress" kafkaChannelTLSSecretName = "kafka-channel-ingress-server-tls" //nolint:gosec // This is not a hardcoded credential caCertsSecretKey = "ca.crt" + // TopicPrefix is the old Kafka Channel topic prefix - we keep this constant so that deleting channels shortly after upgrading + // does not have issues. See https://github.com/knative-extensions/eventing-kafka-broker/issues/3289 for more info + TopicPrefix = "knative-messaging-kafka" ) type Reconciler struct { @@ -494,7 +497,7 @@ func (r *Reconciler) finalizeKind(ctx context.Context, channel *messagingv1beta1 topicName, ok := channel.Status.Annotations[kafka.TopicAnnotation] if !ok { - return fmt.Errorf("no topic annotated on channel") + topicName = kafka.ChannelTopic(TopicPrefix, channel) } topic, err := kafka.DeleteTopic(kafkaClusterAdminClient, topicName) if err != nil { diff --git a/control-plane/pkg/reconciler/consumergroup/auth.go b/control-plane/pkg/reconciler/consumergroup/auth.go index 90bea0389c..49908f88ac 100644 --- a/control-plane/pkg/reconciler/consumergroup/auth.go +++ b/control-plane/pkg/reconciler/consumergroup/auth.go @@ -27,7 +27,7 @@ import ( "knative.dev/eventing-kafka-broker/control-plane/pkg/security" ) -func (r Reconciler) newAuthConfigOption(ctx context.Context, cg *kafkainternals.ConsumerGroup) (kafka.ConfigOption, error) { +func (r *Reconciler) newAuthConfigOption(ctx context.Context, cg *kafkainternals.ConsumerGroup) (kafka.ConfigOption, error) { var secret *corev1.Secret if hasSecretSpecConfig(cg.Spec.Template.Spec.Auth) { diff --git a/control-plane/pkg/reconciler/consumergroup/autoscaler_config.go b/control-plane/pkg/reconciler/consumergroup/autoscaler_config.go index 1338ef4abf..a9f41048d8 100644 --- a/control-plane/pkg/reconciler/consumergroup/autoscaler_config.go +++ b/control-plane/pkg/reconciler/consumergroup/autoscaler_config.go @@ -21,10 +21,11 @@ import ( "fmt" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "knative.dev/eventing-kafka-broker/control-plane/pkg/autoscaler" ) -func (r Reconciler) autoscalerDefaultsFromConfigMap(ctx context.Context, configMapName string) (*autoscaler.AutoscalerConfig, error) { +func (r *Reconciler) autoscalerDefaultsFromConfigMap(ctx context.Context, configMapName string) (*autoscaler.AutoscalerConfig, error) { cm, err := r.KubeClient.CoreV1().ConfigMaps(r.SystemNamespace).Get(ctx, configMapName, metav1.GetOptions{}) if err != nil { return nil, fmt.Errorf("error while retrieving the %s config map in namespace %s: %+v", configMapName, r.SystemNamespace, err) diff --git a/control-plane/pkg/reconciler/consumergroup/consumergroup.go b/control-plane/pkg/reconciler/consumergroup/consumergroup.go index bab350ae24..8e1a84c77d 100644 --- a/control-plane/pkg/reconciler/consumergroup/consumergroup.go +++ b/control-plane/pkg/reconciler/consumergroup/consumergroup.go @@ -22,25 +22,33 @@ import ( "fmt" "math" "sort" + "time" "github.com/Shopify/sarama" + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" "k8s.io/apiserver/pkg/storage/names" "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/utils/pointer" eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" "knative.dev/pkg/apis" + "knative.dev/pkg/controller" "knative.dev/pkg/logging" + "knative.dev/pkg/metrics" "knative.dev/pkg/reconciler" "knative.dev/pkg/resolver" sources "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/sources/v1beta1" + "knative.dev/eventing-kafka-broker/control-plane/pkg/prober" "knative.dev/eventing/pkg/scheduler" @@ -61,8 +69,38 @@ import ( var ( ErrNoSubscriberURI = errors.New("no subscriber URI resolved") ErrNoDeadLetterSinkURI = errors.New("no dead letter sink URI resolved") + + scheduleLatencyStat = stats.Int64("schedule_latency", "Latency of consumer group schedule operations", stats.UnitMilliseconds) + // scheduleDistribution defines the bucket boundaries for the histogram of schedule latency metric. + // Bucket boundaries are 10ms, 100ms, 1s, 10s, 30s and 60s. + scheduleDistribution = view.Distribution(10, 100, 1000, 10000, 30000, 60000) + + initializeOffsetsLatencyStat = stats.Int64("initialize_offsets_latency", "Latency of consumer group offsets initialization operations", stats.UnitMilliseconds) + // initializeOffsetsDistribution defines the bucket boundaries for the histogram of initialize offsets latency metric. + // Bucket boundaries are 10ms, 100ms, 1s, 10s, 30s and 60s. + initializeOffsetsDistribution = view.Distribution(10, 100, 1000, 10000, 30000, 60000) ) +func init() { + views := []*view.View{ + { + Description: "Latency of consumer group schedule operations", + TagKeys: []tag.Key{controller.NamespaceTagKey}, + Measure: scheduleLatencyStat, + Aggregation: scheduleDistribution, + }, + { + Description: "Latency of consumer group offsets initialization operations", + TagKeys: []tag.Key{controller.NamespaceTagKey}, + Measure: initializeOffsetsLatencyStat, + Aggregation: initializeOffsetsDistribution, + }, + } + if err := view.Register(views...); err != nil { + panic(err) + } +} + type Scheduler struct { scheduler.Scheduler SchedulerConfig @@ -103,9 +141,18 @@ type Reconciler struct { // DeleteConsumerGroupMetadataCounter is an in-memory counter to count how many times we have // tried to delete consumer group metadata from Kafka. DeleteConsumerGroupMetadataCounter *counter.Counter + + // InitOffsetLatestInitialOffsetCache is the cache for consumer group offset initialization. + // + // When there is high load and multiple consumer group schedule calls, we get many + // `dial tcp 10.130.4.8:9092: i/o timeout` errors when trying to connect to Kafka. + // This leads to increased "time to readiness" for consumer groups. + InitOffsetLatestInitialOffsetCache prober.Cache + + EnqueueKey func(key string) } -func (r Reconciler) ReconcileKind(ctx context.Context, cg *kafkainternals.ConsumerGroup) reconciler.Event { +func (r *Reconciler) ReconcileKind(ctx context.Context, cg *kafkainternals.ConsumerGroup) reconciler.Event { if err := r.reconcileInitialOffset(ctx, cg); err != nil { return cg.MarkInitializeOffsetFailed("InitializeOffset", err) } @@ -152,7 +199,7 @@ func (r Reconciler) ReconcileKind(ctx context.Context, cg *kafkainternals.Consum return nil } -func (r Reconciler) FinalizeKind(ctx context.Context, cg *kafkainternals.ConsumerGroup) reconciler.Event { +func (r *Reconciler) FinalizeKind(ctx context.Context, cg *kafkainternals.ConsumerGroup) reconciler.Event { cg.Spec.Replicas = pointer.Int32(0) err := r.schedule(ctx, cg) //de-schedule placements @@ -161,7 +208,7 @@ func (r Reconciler) FinalizeKind(ctx context.Context, cg *kafkainternals.Consume cg.Status.Placements = nil // return an error to 1. update the status. 2. not clear the finalizer - return errors.New("placement list was not empty") + return fmt.Errorf("failed to unschedule consumer group: %w", err) } // Get consumers associated with the ConsumerGroup. @@ -184,10 +231,12 @@ func (r Reconciler) FinalizeKind(ctx context.Context, cg *kafkainternals.Consume r.DeleteConsumerGroupMetadataCounter.Del(string(cg.GetUID())) } + r.InitOffsetLatestInitialOffsetCache.Expire(keyOf(cg)) + return nil } -func (r Reconciler) deleteConsumerGroupMetadata(ctx context.Context, cg *kafkainternals.ConsumerGroup) error { +func (r *Reconciler) deleteConsumerGroupMetadata(ctx context.Context, cg *kafkainternals.ConsumerGroup) error { saramaSecurityOption, err := r.newAuthConfigOption(ctx, cg) if err != nil { return fmt.Errorf("failed to create config options for Kafka cluster auth: %w", err) @@ -215,7 +264,7 @@ func (r Reconciler) deleteConsumerGroupMetadata(ctx context.Context, cg *kafkain return nil } -func (r Reconciler) reconcileConsumers(ctx context.Context, cg *kafkainternals.ConsumerGroup) error { +func (r *Reconciler) reconcileConsumers(ctx context.Context, cg *kafkainternals.ConsumerGroup) error { // Get consumers associated with the ConsumerGroup. existingConsumers, err := r.ConsumerLister.Consumers(cg.GetNamespace()).List(labels.SelectorFromSet(cg.Spec.Selector)) @@ -244,7 +293,7 @@ func (r Reconciler) reconcileConsumers(ctx context.Context, cg *kafkainternals.C return nil } -func (r Reconciler) reconcileConsumersInPlacement(ctx context.Context, cg *kafkainternals.ConsumerGroup, pc ConsumersPerPlacement) error { +func (r *Reconciler) reconcileConsumersInPlacement(ctx context.Context, cg *kafkainternals.ConsumerGroup, pc ConsumersPerPlacement) error { placement := *pc.Placement consumers := pc.Consumers @@ -299,7 +348,7 @@ func (r Reconciler) reconcileConsumersInPlacement(ctx context.Context, cg *kafka return nil } -func (r Reconciler) createConsumer(ctx context.Context, cg *kafkainternals.ConsumerGroup, placement eventingduckv1alpha1.Placement) error { +func (r *Reconciler) createConsumer(ctx context.Context, cg *kafkainternals.ConsumerGroup, placement eventingduckv1alpha1.Placement) error { c := cg.ConsumerFromTemplate() c.Name = r.NameGenerator.GenerateName(cg.GetName() + "-") @@ -312,7 +361,7 @@ func (r Reconciler) createConsumer(ctx context.Context, cg *kafkainternals.Consu return nil } -func (r Reconciler) finalizeConsumer(ctx context.Context, consumer *kafkainternals.Consumer) error { +func (r *Reconciler) finalizeConsumer(ctx context.Context, consumer *kafkainternals.Consumer) error { dOpts := metav1.DeleteOptions{ Preconditions: &metav1.Preconditions{UID: &consumer.UID}, } @@ -323,7 +372,10 @@ func (r Reconciler) finalizeConsumer(ctx context.Context, consumer *kafkainterna return nil } -func (r Reconciler) schedule(ctx context.Context, cg *kafkainternals.ConsumerGroup) error { +func (r *Reconciler) schedule(ctx context.Context, cg *kafkainternals.ConsumerGroup) error { + startTime := time.Now() + defer recordScheduleLatency(ctx, cg, startTime) + statefulSetScheduler := r.SchedulerFunc(cg.GetUserFacingResourceRef().Kind) // Ensure Contract configmaps are created before scheduling to avoid having pending pods due to missing @@ -350,7 +402,7 @@ type ConsumersPerPlacement struct { Consumers []*kafkainternals.Consumer } -func (r Reconciler) joinConsumersByPlacement(placements []eventingduckv1alpha1.Placement, consumers []*kafkainternals.Consumer) []ConsumersPerPlacement { +func (r *Reconciler) joinConsumersByPlacement(placements []eventingduckv1alpha1.Placement, consumers []*kafkainternals.Consumer) []ConsumersPerPlacement { placementConsumers := make([]ConsumersPerPlacement, 0, int(math.Max(float64(len(placements)), float64(len(consumers))))) // Group consumers by Pod bind. @@ -445,11 +497,18 @@ func (r *Reconciler) propagateStatus(ctx context.Context, cg *kafkainternals.Con return condition, nil } -func (r Reconciler) reconcileInitialOffset(ctx context.Context, cg *kafkainternals.ConsumerGroup) error { +func (r *Reconciler) reconcileInitialOffset(ctx context.Context, cg *kafkainternals.ConsumerGroup) error { + startTime := time.Now() + defer recordInitializeOffsetsLatency(ctx, cg, startTime) + if cg.Spec.Template.Spec.Delivery == nil || cg.Spec.Template.Spec.Delivery.InitialOffset == sources.OffsetEarliest { return nil } + if status := r.InitOffsetLatestInitialOffsetCache.GetStatus(keyOf(cg)); status == prober.StatusReady { + return nil + } + saramaSecurityOption, err := r.newAuthConfigOption(ctx, cg) if err != nil { return fmt.Errorf("failed to create config options for Kafka cluster auth: %w", err) @@ -486,10 +545,14 @@ func (r Reconciler) reconcileInitialOffset(ctx context.Context, cg *kafkainterna return fmt.Errorf("failed to initialize offset: %w", err) } + r.InitOffsetLatestInitialOffsetCache.UpsertStatus(keyOf(cg), prober.StatusReady, nil, func(key string, arg interface{}) { + r.EnqueueKey(key) + }) + return nil } -func (r Reconciler) reconcileKedaObjects(ctx context.Context, cg *kafkainternals.ConsumerGroup) error { +func (r *Reconciler) reconcileKedaObjects(ctx context.Context, cg *kafkainternals.ConsumerGroup) error { var triggerAuthentication *kedav1alpha1.TriggerAuthentication var secret *corev1.Secret @@ -630,7 +693,7 @@ func (r *Reconciler) reconcileSecret(ctx context.Context, expectedSecret *corev1 return nil } -func (r Reconciler) ensureContractConfigmapsExist(ctx context.Context, scheduler Scheduler) error { +func (r *Reconciler) ensureContractConfigmapsExist(ctx context.Context, scheduler Scheduler) error { selector := labels.SelectorFromSet(map[string]string{"app": scheduler.StatefulSetName}) pods, err := r.PodLister. Pods(r.SystemNamespace). @@ -652,7 +715,7 @@ func (r Reconciler) ensureContractConfigmapsExist(ctx context.Context, scheduler return nil } -func (r Reconciler) ensureContractConfigMapExists(ctx context.Context, p *corev1.Pod, name string) error { +func (r *Reconciler) ensureContractConfigMapExists(ctx context.Context, p *corev1.Pod, name string) error { // Check if ConfigMap exists in lister cache _, err := r.ConfigMapLister.ConfigMaps(r.SystemNamespace).Get(name) // ConfigMap already exists, return @@ -692,3 +755,35 @@ var ( _ consumergroup.Interface = &Reconciler{} _ consumergroup.Finalizer = &Reconciler{} ) + +func recordScheduleLatency(ctx context.Context, cg *kafkainternals.ConsumerGroup, startTime time.Time) { + func() { + ctx, err := tag.New( + ctx, + tag.Insert(controller.NamespaceTagKey, cg.Namespace), + ) + if err != nil { + return + } + + metrics.Record(ctx, scheduleLatencyStat.M(time.Since(startTime).Milliseconds())) + }() +} + +func recordInitializeOffsetsLatency(ctx context.Context, cg *kafkainternals.ConsumerGroup, startTime time.Time) { + func() { + ctx, err := tag.New( + ctx, + tag.Insert(controller.NamespaceTagKey, cg.Namespace), + ) + if err != nil { + return + } + + metrics.Record(ctx, initializeOffsetsLatencyStat.M(time.Since(startTime).Milliseconds())) + }() +} + +func keyOf(cg metav1.Object) string { + return types.NamespacedName{Namespace: cg.GetNamespace(), Name: cg.GetName()}.String() +} diff --git a/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go b/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go index b49c9b120f..f3a085447f 100644 --- a/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go +++ b/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go @@ -22,6 +22,7 @@ import ( "fmt" "io" "testing" + "time" "github.com/Shopify/sarama" corev1 "k8s.io/api/core/v1" @@ -34,6 +35,7 @@ import ( bindings "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/bindings/v1beta1" sources "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/sources/v1beta1" + "knative.dev/eventing-kafka-broker/control-plane/pkg/prober" "knative.dev/pkg/controller" "knative.dev/pkg/logging" @@ -1643,7 +1645,7 @@ func TestReconcileKind(t *testing.T) { _, exampleConfig := cm.ConfigMapsFromTestFile(t, configapis.FlagsConfigName) store.OnConfigChanged(exampleConfig) - r := Reconciler{ + r := &Reconciler{ SchedulerFunc: func(s string) Scheduler { ss := row.OtherTestData[testSchedulerKey].(scheduler.Scheduler) return Scheduler{ @@ -1675,6 +1677,8 @@ func TestReconcileKind(t *testing.T) { SystemNamespace: systemNamespace, AutoscalerConfig: "", DeleteConsumerGroupMetadataCounter: counter.NewExpiringCounter(ctx), + InitOffsetLatestInitialOffsetCache: prober.NewLocalExpiringCache(ctx, time.Second), + EnqueueKey: func(key string) {}, } r.KafkaFeatureFlags = configapis.FromContext(store.ToContext(ctx)) @@ -1786,7 +1790,7 @@ func TestReconcileKindNoAutoscaler(t *testing.T) { ctx, _ = kedaclient.With(ctx) - r := Reconciler{ + r := &Reconciler{ SchedulerFunc: func(s string) Scheduler { ss := row.OtherTestData[testSchedulerKey].(scheduler.Scheduler) return Scheduler{ @@ -1814,6 +1818,8 @@ func TestReconcileKindNoAutoscaler(t *testing.T) { }, SystemNamespace: systemNamespace, DeleteConsumerGroupMetadataCounter: counter.NewExpiringCounter(ctx), + InitOffsetLatestInitialOffsetCache: prober.NewLocalExpiringCache(ctx, time.Second), + EnqueueKey: func(key string) {}, } r.KafkaFeatureFlags = configapis.DefaultFeaturesConfig() @@ -2198,6 +2204,7 @@ func TestFinalizeKind(t *testing.T) { }, KafkaFeatureFlags: configapis.DefaultFeaturesConfig(), DeleteConsumerGroupMetadataCounter: counter.NewExpiringCounter(ctx), + InitOffsetLatestInitialOffsetCache: prober.NewLocalExpiringCache(ctx, time.Second), } return consumergroup.NewReconciler( diff --git a/control-plane/pkg/reconciler/consumergroup/controller.go b/control-plane/pkg/reconciler/consumergroup/controller.go index 0da00230ca..427c7733e6 100644 --- a/control-plane/pkg/reconciler/consumergroup/controller.go +++ b/control-plane/pkg/reconciler/consumergroup/controller.go @@ -48,6 +48,7 @@ import ( "knative.dev/pkg/system" "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/offset" + "knative.dev/eventing-kafka-broker/control-plane/pkg/prober" statefulsetscheduler "knative.dev/eventing/pkg/scheduler/statefulset" @@ -128,6 +129,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I KedaClient: kedaclient.Get(ctx), AutoscalerConfig: env.AutoscalerConfigMap, DeleteConsumerGroupMetadataCounter: counter.NewExpiringCounter(ctx), + InitOffsetLatestInitialOffsetCache: prober.NewLocalExpiringCache(ctx, 20*time.Minute), } consumerInformer := consumer.Get(ctx) @@ -154,6 +156,13 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I }) r.Resolver = resolver.NewURIResolverFromTracker(ctx, impl.Tracker) + r.EnqueueKey = func(key string) { + parts := strings.SplitN(key, string(types.Separator), 3) + if len(parts) != 2 { + panic(fmt.Sprintf("Expected / format, got %s", key)) + } + impl.EnqueueKey(types.NamespacedName{Namespace: parts[0], Name: parts[1]}) + } configStore := config.NewStore(ctx, func(name string, value *config.KafkaFeatureFlags) { r.KafkaFeatureFlags.Reset(value) @@ -161,7 +170,16 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I }) configStore.WatchConfigs(watcher) - consumerGroupInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)) + consumerGroupInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: impl.Enqueue, + UpdateFunc: controller.PassNew(impl.Enqueue), + DeleteFunc: func(obj interface{}) { + impl.Enqueue(obj) + if cg, ok := obj.(metav1.Object); ok && cg != nil { + r.InitOffsetLatestInitialOffsetCache.Expire(keyOf(cg)) + } + }, + }) consumerInformer.Informer().AddEventHandler(controller.HandleAll(enqueueConsumerGroupFromConsumer(impl.EnqueueKey))) globalResync := func(interface{}) { diff --git a/control-plane/pkg/reconciler/consumergroup/evictor.go b/control-plane/pkg/reconciler/consumergroup/evictor.go index dc61d896c3..57ec602580 100644 --- a/control-plane/pkg/reconciler/consumergroup/evictor.go +++ b/control-plane/pkg/reconciler/consumergroup/evictor.go @@ -48,8 +48,8 @@ type evictor struct { // newEvictor creates a new evictor. // // fields are additional logger fields to be attached to the evictor logger. -func newEvictor(ctx context.Context, fields ...zap.Field) evictor { - return evictor{ +func newEvictor(ctx context.Context, fields ...zap.Field) *evictor { + return &evictor{ ctx: ctx, kubeClient: kubeclient.Get(ctx), InternalsClient: kafkainternalsclient.Get(ctx).InternalV1alpha1(), @@ -60,7 +60,7 @@ func newEvictor(ctx context.Context, fields ...zap.Field) evictor { } } -func (e evictor) evict(pod *corev1.Pod, vpod scheduler.VPod, from *eventingduckv1alpha1.Placement) error { +func (e *evictor) evict(pod *corev1.Pod, vpod scheduler.VPod, from *eventingduckv1alpha1.Placement) error { key := vpod.GetKey() logger := e.logger.With(zap.String("consumergroup", key.String())) @@ -121,6 +121,9 @@ func (e *evictor) disablePodScheduling(logger *zap.Logger, pod *corev1.Pod) erro _, err := e.kubeClient.CoreV1(). Pods(pod.GetNamespace()). Update(e.ctx, pod, metav1.UpdateOptions{}) + if apierrors.IsNotFound(err) { + return nil + } if err != nil { return fmt.Errorf("failed to update pod %s/%s: %w", pod.GetNamespace(), pod.GetName(), err) } diff --git a/control-plane/pkg/reconciler/consumergroup/evictor_test.go b/control-plane/pkg/reconciler/consumergroup/evictor_test.go index 98fa5d1144..a71c935482 100644 --- a/control-plane/pkg/reconciler/consumergroup/evictor_test.go +++ b/control-plane/pkg/reconciler/consumergroup/evictor_test.go @@ -201,7 +201,7 @@ func TestEvictorEvictPodNotFound(t *testing.T) { e := newEvictor(ctx) err := e.evict(pod, cg, placement) - require.NotNil(t, err) + require.Nil(t, err) } func TestEvictorEvictConsumerGroupNotFound(t *testing.T) { ctx, _ := reconcilertesting.SetupFakeContext(t) diff --git a/control-plane/pkg/reconciler/source/source.go b/control-plane/pkg/reconciler/source/source.go index 7962aa86ec..229b6c5c33 100644 --- a/control-plane/pkg/reconciler/source/source.go +++ b/control-plane/pkg/reconciler/source/source.go @@ -138,6 +138,9 @@ func (r Reconciler) reconcileConsumerGroup(ctx context.Context, ks *sources.Kafk Labels: map[string]string{ internalscg.UserFacingResourceLabelSelector: strings.ToLower(ks.GetGroupVersionKind().Kind), }, + Finalizers: []string{ + "consumergroups.internal.kafka.eventing.knative.dev", + }, }, Spec: internalscg.ConsumerGroupSpec{ Replicas: ks.Spec.Consumers, diff --git a/go.mod b/go.mod index 2f29811f95..74a71df17a 100644 --- a/go.mod +++ b/go.mod @@ -39,6 +39,7 @@ require ( github.com/google/gofuzz v1.2.0 github.com/hashicorp/go-cleanhttp v0.5.2 github.com/kedacore/keda/v2 v2.8.1 + go.opencensus.io v0.24.0 knative.dev/eventing v0.38.0 knative.dev/hack v0.0.0-20230712131415-ddae80293c43 knative.dev/pkg v0.0.0-20230718152110-aef227e72ead @@ -132,7 +133,6 @@ require ( github.com/wavesoftware/go-ensure v1.0.0 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/stringprep v1.0.3 // indirect - go.opencensus.io v0.24.0 // indirect go.uber.org/automaxprocs v1.4.0 // indirect golang.org/x/crypto v0.7.0 // indirect golang.org/x/mod v0.9.0 // indirect diff --git a/openshift/patches/autoscaler_leader_log.patch b/openshift/patches/autoscaler_leader_log.patch new file mode 100644 index 0000000000..a9f2f7d403 --- /dev/null +++ b/openshift/patches/autoscaler_leader_log.patch @@ -0,0 +1,17 @@ +diff --git a/vendor/knative.dev/eventing/pkg/scheduler/statefulset/autoscaler.go b/vendor/knative.dev/eventing/pkg/scheduler/statefulset/autoscaler.go +index 96008b849..6c00a231b 100644 +--- a/vendor/knative.dev/eventing/pkg/scheduler/statefulset/autoscaler.go ++++ b/vendor/knative.dev/eventing/pkg/scheduler/statefulset/autoscaler.go +@@ -133,10 +133,10 @@ func (a *autoscaler) Start(ctx context.Context) { + case <-ctx.Done(): + return + case <-time.After(a.refreshPeriod): +- a.logger.Infow("Triggering scale down") ++ a.logger.Infow("Triggering scale down", zap.Bool("isLeader", a.isLeader.Load())) + attemptScaleDown = true + case <-a.trigger: +- a.logger.Infow("Triggering scale up") ++ a.logger.Infow("Triggering scale up", zap.Bool("isLeader", a.isLeader.Load())) + attemptScaleDown = false + } + diff --git a/openshift/patches/cleanup_reserved_from_deleted_and_non_pending_vpods.patch b/openshift/patches/cleanup_reserved_from_deleted_and_non_pending_vpods.patch new file mode 100644 index 0000000000..9b445750c1 --- /dev/null +++ b/openshift/patches/cleanup_reserved_from_deleted_and_non_pending_vpods.patch @@ -0,0 +1,25 @@ +diff --git a/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go b/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go +index 083767450..ed1defaa6 100644 +--- a/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go ++++ b/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go +@@ -253,6 +253,20 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 + return nil, err + } + ++ // Clean up reserved from removed resources that don't appear in the vpod list anymore and have ++ // no pending resources. ++ reserved := make(map[types.NamespacedName]map[string]int32) ++ for k, v := range s.reserved { ++ if pendings, ok := state.Pending[k]; ok { ++ if pendings == 0 { ++ reserved[k] = map[string]int32{} ++ } else { ++ reserved[k] = v ++ } ++ } ++ } ++ s.reserved = reserved ++ + logger.Debugw("scheduling", zap.Any("state", state)) + + existingPlacements := vpod.GetPlacements() diff --git a/openshift/patches/handle_overcommitted_pods.patch b/openshift/patches/handle_overcommitted_pods.patch new file mode 100644 index 0000000000..51dd29bb50 --- /dev/null +++ b/openshift/patches/handle_overcommitted_pods.patch @@ -0,0 +1,62 @@ +diff --git a/vendor/knative.dev/eventing/pkg/scheduler/state/state.go b/vendor/knative.dev/eventing/pkg/scheduler/state/state.go +index 2d5460cf8..65018e7c5 100644 +--- a/vendor/knative.dev/eventing/pkg/scheduler/state/state.go ++++ b/vendor/knative.dev/eventing/pkg/scheduler/state/state.go +@@ -361,7 +361,7 @@ func (s *stateBuilder) updateFreeCapacity(free []int32, last int32, podName stri + // Assert the pod is not overcommitted + if free[ordinal] < 0 { + // This should not happen anymore. Log as an error but do not interrupt the current scheduling. +- s.logger.Errorw("pod is overcommitted", zap.String("podName", podName), zap.Int32("free", free[ordinal])) ++ s.logger.Warnw("pod is overcommitted", zap.String("podName", podName), zap.Int32("free", free[ordinal])) + } + + if ordinal > last { +diff --git a/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go b/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go +index ed1defaa6..d9bcef1f8 100644 +--- a/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go ++++ b/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go +@@ -272,13 +272,41 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 + existingPlacements := vpod.GetPlacements() + var left int32 + +- // Remove unschedulable pods from placements ++ // Remove unschedulable or adjust overcommitted pods from placements + var placements []duckv1alpha1.Placement + if len(existingPlacements) > 0 { + placements = make([]duckv1alpha1.Placement, 0, len(existingPlacements)) + for _, p := range existingPlacements { +- if state.IsSchedulablePod(st.OrdinalFromPodName(p.PodName)) { +- placements = append(placements, *p.DeepCopy()) ++ p := p.DeepCopy() ++ ordinal := st.OrdinalFromPodName(p.PodName) ++ ++ if !state.IsSchedulablePod(ordinal) { ++ continue ++ } ++ ++ // Handle overcommitted pods. ++ if state.FreeCap[ordinal] < 0 { ++ // vr > free => vr: 9, overcommit 4 -> free: 0, vr: 5, pending: +4 ++ // vr = free => vr: 4, overcommit 4 -> free: 0, vr: 0, pending: +4 ++ // vr < free => vr: 3, overcommit 4 -> free: -1, vr: 0, pending: +3 ++ ++ overcommit := -state.FreeCap[ordinal] ++ ++ if p.VReplicas >= overcommit { ++ state.SetFree(ordinal, 0) ++ state.Pending[vpod.GetKey()] += overcommit ++ ++ p.VReplicas = p.VReplicas - overcommit ++ } else { ++ state.SetFree(ordinal, p.VReplicas-overcommit) ++ state.Pending[vpod.GetKey()] += p.VReplicas ++ ++ p.VReplicas = 0 ++ } ++ } ++ ++ if p.VReplicas > 0 { ++ placements = append(placements, *p) + } + } + } diff --git a/openshift/patches/remove_resource_version_check.patch b/openshift/patches/remove_resource_version_check.patch new file mode 100644 index 0000000000..e282755a37 --- /dev/null +++ b/openshift/patches/remove_resource_version_check.patch @@ -0,0 +1,20 @@ +diff --git a/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go b/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go +index a95242ee2..083767450 100644 +--- a/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go ++++ b/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go +@@ -228,15 +228,6 @@ func (s *StatefulSetScheduler) Schedule(vpod scheduler.VPod) ([]duckv1alpha1.Pla + s.reservedMu.Lock() + defer s.reservedMu.Unlock() + +- vpods, err := s.vpodLister() +- if err != nil { +- return nil, err +- } +- vpodFromLister := st.GetVPod(vpod.GetKey(), vpods) +- if vpodFromLister != nil && vpod.GetResourceVersion() != vpodFromLister.GetResourceVersion() { +- return nil, fmt.Errorf("vpod to schedule has resource version different from one in indexer") +- } +- + placements, err := s.scheduleVPod(vpod) + if placements == nil { + return placements, err diff --git a/openshift/release/generate-release.sh b/openshift/release/generate-release.sh index d34706522e..51be197d99 100755 --- a/openshift/release/generate-release.sh +++ b/openshift/release/generate-release.sh @@ -7,6 +7,12 @@ source $(dirname $0)/resolve.sh GITHUB_ACTIONS=true $(dirname $0)/../../hack/update-codegen.sh git apply openshift/patches/disable-ko-publish-rekt.patch git apply openshift/patches/autoscaler_fix.patch +git apply openshift/patches/remove_resource_version_check.patch +git apply openshift/patches/autoscaler_leader_log.patch +git apply openshift/patches/cleanup_reserved_from_deleted_and_non_pending_vpods.patch + +chmod +x $(dirname $0)/../../vendor/k8s.io/code-generator/generate-groups.sh +chmod +x $(dirname $0)/../../vendor/knative.dev/pkg/hack/generate-knative.sh # Eventing core will bring the config tracing ConfigMap, so remove it from heret rm -f control-plane/config/eventing-kafka-broker/200-controller/100-config-tracing.yaml diff --git a/vendor/k8s.io/code-generator/generate-groups.sh b/vendor/k8s.io/code-generator/generate-groups.sh old mode 100644 new mode 100755 diff --git a/vendor/knative.dev/eventing/pkg/scheduler/state/state.go b/vendor/knative.dev/eventing/pkg/scheduler/state/state.go index 2d5460cf80..65018e7c5d 100644 --- a/vendor/knative.dev/eventing/pkg/scheduler/state/state.go +++ b/vendor/knative.dev/eventing/pkg/scheduler/state/state.go @@ -361,7 +361,7 @@ func (s *stateBuilder) updateFreeCapacity(free []int32, last int32, podName stri // Assert the pod is not overcommitted if free[ordinal] < 0 { // This should not happen anymore. Log as an error but do not interrupt the current scheduling. - s.logger.Errorw("pod is overcommitted", zap.String("podName", podName), zap.Int32("free", free[ordinal])) + s.logger.Warnw("pod is overcommitted", zap.String("podName", podName), zap.Int32("free", free[ordinal])) } if ordinal > last { diff --git a/vendor/knative.dev/eventing/pkg/scheduler/statefulset/autoscaler.go b/vendor/knative.dev/eventing/pkg/scheduler/statefulset/autoscaler.go index 96008b8493..6c00a231b7 100644 --- a/vendor/knative.dev/eventing/pkg/scheduler/statefulset/autoscaler.go +++ b/vendor/knative.dev/eventing/pkg/scheduler/statefulset/autoscaler.go @@ -133,10 +133,10 @@ func (a *autoscaler) Start(ctx context.Context) { case <-ctx.Done(): return case <-time.After(a.refreshPeriod): - a.logger.Infow("Triggering scale down") + a.logger.Infow("Triggering scale down", zap.Bool("isLeader", a.isLeader.Load())) attemptScaleDown = true case <-a.trigger: - a.logger.Infow("Triggering scale up") + a.logger.Infow("Triggering scale up", zap.Bool("isLeader", a.isLeader.Load())) attemptScaleDown = false } diff --git a/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go b/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go index c7b7bbdae8..4132da0f36 100644 --- a/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go +++ b/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go @@ -228,15 +228,6 @@ func (s *StatefulSetScheduler) Schedule(vpod scheduler.VPod) ([]duckv1alpha1.Pla s.reservedMu.Lock() defer s.reservedMu.Unlock() - vpods, err := s.vpodLister() - if err != nil { - return nil, err - } - vpodFromLister := st.GetVPod(vpod.GetKey(), vpods) - if vpodFromLister != nil && vpod.GetResourceVersion() != vpodFromLister.GetResourceVersion() { - return nil, fmt.Errorf("vpod to schedule has resource version different from one in indexer") - } - placements, err := s.scheduleVPod(vpod) if placements == nil { return placements, err @@ -262,18 +253,60 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 return nil, err } + // Clean up reserved from removed resources that don't appear in the vpod list anymore and have + // no pending resources. + reserved := make(map[types.NamespacedName]map[string]int32) + for k, v := range s.reserved { + if pendings, ok := state.Pending[k]; ok { + if pendings == 0 { + reserved[k] = map[string]int32{} + } else { + reserved[k] = v + } + } + } + s.reserved = reserved + logger.Debugw("scheduling", zap.Any("state", state)) existingPlacements := vpod.GetPlacements() var left int32 - // Remove unschedulable pods from placements + // Remove unschedulable or adjust overcommitted pods from placements var placements []duckv1alpha1.Placement if len(existingPlacements) > 0 { placements = make([]duckv1alpha1.Placement, 0, len(existingPlacements)) for _, p := range existingPlacements { - if state.IsSchedulablePod(st.OrdinalFromPodName(p.PodName)) { - placements = append(placements, *p.DeepCopy()) + p := p.DeepCopy() + ordinal := st.OrdinalFromPodName(p.PodName) + + if !state.IsSchedulablePod(ordinal) { + continue + } + + // Handle overcommitted pods. + if state.FreeCap[ordinal] < 0 { + // vr > free => vr: 9, overcommit 4 -> free: 0, vr: 5, pending: +4 + // vr = free => vr: 4, overcommit 4 -> free: 0, vr: 0, pending: +4 + // vr < free => vr: 3, overcommit 4 -> free: -1, vr: 0, pending: +3 + + overcommit := -state.FreeCap[ordinal] + + if p.VReplicas >= overcommit { + state.SetFree(ordinal, 0) + state.Pending[vpod.GetKey()] += overcommit + + p.VReplicas = p.VReplicas - overcommit + } else { + state.SetFree(ordinal, p.VReplicas-overcommit) + state.Pending[vpod.GetKey()] += p.VReplicas + + p.VReplicas = 0 + } + } + + if p.VReplicas > 0 { + placements = append(placements, *p) } } } diff --git a/vendor/knative.dev/pkg/hack/generate-knative.sh b/vendor/knative.dev/pkg/hack/generate-knative.sh old mode 100644 new mode 100755