From a867e2dce2bec257ea3b609bcdf681ef48c5ee77 Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Tue, 30 Nov 2021 09:03:24 +0100 Subject: [PATCH 1/3] ConsumerGroup reconciler Signed-off-by: Pierangelo Di Pilato --- .../eventing/v1alpha1/consumer_defaults.go | 45 ++++++ .../v1alpha1/consumer_defaults_test.go | 52 +++++++ .../v1alpha1/consumer_group_defaults.go | 48 ++++++ .../v1alpha1/consumer_group_defaults_test.go | 138 ++++++++++++++++++ .../v1alpha1/consumer_group_lifecycle.go | 24 ++- .../eventing/v1alpha1/consumer_group_types.go | 41 +++++- .../v1alpha1/consumer_group_validation.go | 46 ++++++ .../consumer_group_validation_test.go | 124 ++++++++++++++++ .../kafka/eventing/v1alpha1/consumer_types.go | 23 +++ .../eventing/v1alpha1/consumer_validation.go | 54 +++++++ .../kafka/eventing/v1alpha1/register.go | 6 + .../reconciler/consumergroup/consumergroup.go | 136 +++++++++++++++++ 12 files changed, 734 insertions(+), 3 deletions(-) create mode 100644 control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_defaults.go create mode 100644 control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_defaults_test.go create mode 100644 control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_defaults.go create mode 100644 control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_defaults_test.go create mode 100644 control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_validation.go create mode 100644 control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_validation_test.go create mode 100644 control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_validation.go create mode 100644 control-plane/pkg/reconciler/consumergroup/consumergroup.go diff --git a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_defaults.go b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_defaults.go new file mode 100644 index 0000000000..7eeae08e94 --- /dev/null +++ b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_defaults.go @@ -0,0 +1,45 @@ +/* + * Copyright 2021 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package v1alpha1 + +import ( + "context" + + "knative.dev/pkg/apis" +) + +var ( + _ apis.Defaultable = &Consumer{} +) + +// SetDefaults implements apis.Defaultable. +func (c *Consumer) SetDefaults(ctx context.Context) { + ctx = apis.WithinParent(ctx, c.ObjectMeta) + c.Spec.SetDefaults(ctx) +} + +func (c *ConsumerSpec) SetDefaults(ctx context.Context) { + c.Delivery.SetDefaults(ctx) + c.Subscriber.SetDefaults(ctx) +} + +func (d *DeliverySpec) SetDefaults(ctx context.Context) { + if d == nil { + return + } + d.DeliverySpec.SetDefaults(ctx) +} diff --git a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_defaults_test.go b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_defaults_test.go new file mode 100644 index 0000000000..d4f5dcc21b --- /dev/null +++ b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_defaults_test.go @@ -0,0 +1,52 @@ +/* + * Copyright 2021 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package v1alpha1 + +import ( + "context" + "testing" + + "github.com/google/go-cmp/cmp" +) + +func TestConsumerSetDefaults(t *testing.T) { + tests := []struct { + name string + ctx context.Context + given *Consumer + want *Consumer + }{ + { + name: "with delivery", + ctx: context.Background(), + given: &Consumer{ + Spec: ConsumerSpec{Delivery: &DeliverySpec{}}, + }, + want: &Consumer{ + Spec: ConsumerSpec{Delivery: &DeliverySpec{}}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.given.SetDefaults(tt.ctx) + if diff := cmp.Diff(tt.want, tt.given); diff != "" { + t.Error("(-want, +got)", diff) + } + }) + } +} diff --git a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_defaults.go b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_defaults.go new file mode 100644 index 0000000000..db6e8b646a --- /dev/null +++ b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_defaults.go @@ -0,0 +1,48 @@ +/* + * Copyright 2021 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package v1alpha1 + +import ( + "context" + + "k8s.io/utils/pointer" + "knative.dev/pkg/apis" +) + +var ( + _ apis.Defaultable = &ConsumerGroup{} +) + +// SetDefaults implements apis.Defaultable. +func (cg *ConsumerGroup) SetDefaults(ctx context.Context) { + ctx = apis.WithinParent(ctx, cg.ObjectMeta) + + // Replicas is the number of Consumers for this ConsumerGroup. + // When unset, set it to 1. + if cg.Spec.Replicas == nil { + cg.Spec.Replicas = pointer.Int32Ptr(1) + } + // Selector is a label query over consumers that should match the Replicas count. + // If Selector is empty, it is defaulted to the labels present on the template. + if cg.Spec.Selector == nil || len(cg.Spec.Selector) == 0 { + cg.Spec.Selector = cg.Spec.Template.Labels + } + + // Force template namespace to be set to ConsumerGroup's namespace. + cg.Spec.Template.Namespace = cg.Namespace + cg.Spec.Template.Spec.SetDefaults(ctx) +} diff --git a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_defaults_test.go b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_defaults_test.go new file mode 100644 index 0000000000..b9c5a01cd9 --- /dev/null +++ b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_defaults_test.go @@ -0,0 +1,138 @@ +/* + * Copyright 2021 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package v1alpha1 + +import ( + "context" + "testing" + + "github.com/google/go-cmp/cmp" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/pointer" +) + +func TestConsumerGroupSetDefaults(t *testing.T) { + tests := []struct { + name string + ctx context.Context + given *ConsumerGroup + want *ConsumerGroup + }{ + { + name: "default replicas", + ctx: context.Background(), + given: &ConsumerGroup{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "name", + }, + Spec: ConsumerGroupSpec{ + Template: ConsumerTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + }, + }, + }, + }, + want: &ConsumerGroup{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "name", + }, + Spec: ConsumerGroupSpec{ + Template: ConsumerTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + }, + }, + Replicas: pointer.Int32Ptr(1), + }, + }, + }, + { + name: "default selector", + ctx: context.Background(), + given: &ConsumerGroup{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "name", + }, + Spec: ConsumerGroupSpec{ + Template: ConsumerTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Labels: map[string]string{"app": "app"}, + }, + }, + Replicas: pointer.Int32Ptr(1), + }, + }, + want: &ConsumerGroup{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "name", + }, + Spec: ConsumerGroupSpec{ + Template: ConsumerTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Labels: map[string]string{"app": "app"}, + }, + }, + Replicas: pointer.Int32Ptr(1), + Selector: map[string]string{"app": "app"}, + }, + }, + }, + { + name: "default namespace", + ctx: context.Background(), + given: &ConsumerGroup{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "name", + }, + Spec: ConsumerGroupSpec{ + Replicas: pointer.Int32Ptr(1), + }, + }, + want: &ConsumerGroup{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "name", + }, + Spec: ConsumerGroupSpec{ + Template: ConsumerTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + }, + }, + Replicas: pointer.Int32Ptr(1), + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.given.SetDefaults(tt.ctx) + + if diff := cmp.Diff(tt.want, tt.given); diff != "" { + t.Error("(-want, +got)", diff) + } + }) + } +} diff --git a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_lifecycle.go b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_lifecycle.go index 823954b8db..81afa34bb5 100644 --- a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_lifecycle.go +++ b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_lifecycle.go @@ -17,9 +17,31 @@ package v1alpha1 import ( + "fmt" + "knative.dev/pkg/apis" ) +const ( + ConditionConsumers apis.ConditionType = "Consumers" +) + +var ( + conditionSet = apis.NewLivingConditionSet( + ConditionConsumers, + ) +) + func (c *ConsumerGroup) GetConditionSet() apis.ConditionSet { - return apis.NewLivingConditionSet() + return conditionSet +} + +func (cg *ConsumerGroup) MarkReconcileConsumersFailed(reason string, err error) error { + err = fmt.Errorf("failed to reconcile consumers: %w", err) + cg.GetConditionSet().Manage(cg.GetStatus()).MarkFalse(ConditionConsumers, reason, err.Error()) + return err +} + +func (cg *ConsumerGroup) MarkReconcileConsumersSucceeded() { + cg.GetConditionSet().Manage(cg.GetStatus()).MarkTrue(ConditionConsumers) } diff --git a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_types.go b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_types.go index ca4bf4ebb6..9a27766bb8 100644 --- a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_types.go +++ b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_types.go @@ -19,9 +19,9 @@ package v1alpha1 import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" - duckv1 "knative.dev/pkg/apis/duck/v1" - + "k8s.io/apimachinery/pkg/types" eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" + duckv1 "knative.dev/pkg/apis/duck/v1" ) // +genclient @@ -51,6 +51,18 @@ type ConsumerGroup struct { Status ConsumerGroupStatus `json:"status,omitempty"` } +func (c *ConsumerGroup) GetKey() types.NamespacedName { + return types.NamespacedName{Namespace: c.GetNamespace(), Name: c.GetName()} +} + +func (c *ConsumerGroup) GetVReplicas() int32 { + return *c.Spec.Replicas +} + +func (c *ConsumerGroup) GetPlacements() []eventingduckv1alpha1.Placement { + return c.Status.Placements +} + type ConsumerGroupSpec struct { // Template is the object that describes the consumer that will be created if @@ -65,6 +77,14 @@ type ConsumerGroupSpec struct { // If unspecified, defaults to 1. // +optional Replicas *int32 `json:"replicas,omitempty"` + + // Selector is a label query over consumers that should match the Replicas count. + // If Selector is empty, it is defaulted to the labels present on the template. + // Label keys and values that must match in order to be controlled by this + // controller, if empty defaulted to labels on template. + // More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors + // +optional + Selector map[string]string `json:"selector,omitempty" protobuf:"bytes,2,rep,name=selector"` } type ConsumerGroupStatus struct { @@ -102,3 +122,20 @@ func (c *ConsumerGroup) GetUntypedSpec() interface{} { func (c *ConsumerGroup) GetStatus() *duckv1.Status { return &c.Status.Status } + +// ConsumerFromTemplate returns a Consumer from the Consumer template in the ConsumerGroup spec. +func (cg *ConsumerGroup) ConsumerFromTemplate(options ...ConsumerOption) *Consumer { + // TODO figure out naming strategy, is generateName enough? + c := &Consumer{ + ObjectMeta: cg.Spec.Template.ObjectMeta, + Spec: cg.Spec.Template.Spec, + } + + ownerRef := metav1.NewControllerRef(cg, ConsumerGroupGroupVersionKind) + c.OwnerReferences = append(c.OwnerReferences, *ownerRef) + + for _, opt := range options { + opt(c) + } + return c +} diff --git a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_validation.go b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_validation.go new file mode 100644 index 0000000000..6e5441add5 --- /dev/null +++ b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_validation.go @@ -0,0 +1,46 @@ +/* + * Copyright 2021 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package v1alpha1 + +import ( + "context" + + "knative.dev/pkg/apis" +) + +var ( + _ apis.Validatable = &ConsumerGroup{} +) + +func (c *ConsumerGroup) Validate(ctx context.Context) *apis.FieldError { + ctx = apis.WithinParent(ctx, c.ObjectMeta) + return c.Spec.Validate(ctx).ViaField("spec") +} + +func (cgs *ConsumerGroupSpec) Validate(ctx context.Context) *apis.FieldError { + if cgs.Replicas == nil { + return apis.ErrMissingField("replicas") + } + if cgs.Selector == nil { + return apis.ErrMissingField("selector") + } + return cgs.Template.Validate(ctx).ViaField("template") +} + +func (cts *ConsumerTemplateSpec) Validate(ctx context.Context) *apis.FieldError { + return cts.Spec.Validate(ctx).ViaField("spec") +} diff --git a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_validation_test.go b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_validation_test.go new file mode 100644 index 0000000000..4065666734 --- /dev/null +++ b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_validation_test.go @@ -0,0 +1,124 @@ +/* + * Copyright 2021 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package v1alpha1 + +import ( + "context" + "testing" + + "k8s.io/utils/pointer" + eventingduck "knative.dev/eventing/pkg/apis/duck/v1" + "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" +) + +func TestConsumerGroup_Validate(t *testing.T) { + tests := []struct { + name string + ctx context.Context + given *ConsumerGroup + wantErr bool + }{ + { + name: "no replicas", + ctx: context.Background(), + given: &ConsumerGroup{}, + wantErr: true, + }, + { + name: "no selector", + ctx: context.Background(), + given: &ConsumerGroup{ + Spec: ConsumerGroupSpec{ + Replicas: pointer.Int32Ptr(1), + }, + }, + wantErr: true, + }, + { + name: "no topics", + ctx: context.Background(), + given: &ConsumerGroup{ + Spec: ConsumerGroupSpec{ + Replicas: pointer.Int32Ptr(1), + Selector: map[string]string{"app": "app"}, + Template: ConsumerTemplateSpec{}, + }, + }, + wantErr: true, + }, + { + name: "missing group.id", + ctx: context.Background(), + given: &ConsumerGroup{ + Spec: ConsumerGroupSpec{ + Replicas: pointer.Int32Ptr(1), + Selector: map[string]string{"app": "app"}, + Template: ConsumerTemplateSpec{ + Spec: ConsumerSpec{ + Topics: []string{"t1"}, + Subscriber: duckv1.Destination{ + URI: &apis.URL{ + Scheme: "http", + Host: "127.0.0.1", + }, + }, + }, + }, + }, + }, + wantErr: true, + }, + { + name: "valid", + ctx: context.Background(), + given: &ConsumerGroup{ + Spec: ConsumerGroupSpec{ + Replicas: pointer.Int32Ptr(1), + Selector: map[string]string{"app": "app"}, + Template: ConsumerTemplateSpec{ + Spec: ConsumerSpec{ + Topics: []string{"t1"}, + Configs: ConsumerConfigs{ + Configs: map[string]string{ + "group.id": "g1", + }, + }, + Delivery: &DeliverySpec{ + DeliverySpec: eventingduck.DeliverySpec{}, + }, + Subscriber: duckv1.Destination{ + URI: &apis.URL{ + Scheme: "http", + Host: "127.0.0.1", + }, + }, + }, + }, + }, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if err := tt.given.Validate(tt.ctx); (err != nil) != tt.wantErr { + t.Errorf("want err = %v, got err %v", tt.wantErr, err) + } + }) + } +} diff --git a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_types.go b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_types.go index 2166c56e80..9c7936fbf6 100644 --- a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_types.go +++ b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_types.go @@ -138,3 +138,26 @@ func (c *Consumer) GetUntypedSpec() interface{} { func (c *Consumer) GetStatus() *duckv1.Status { return &c.Status.Status } + +// IsLessThan returns true if c is less than other. +// +// if c is less than other, other might be deleted before c. +func (c *Consumer) IsLessThan(other *Consumer) bool { + // Prefer ready instances. + if c.IsReady() { + return true + } + if other.IsReady() { + return false + } + // Prefer older instances. + return c.CreationTimestamp.Time.Before(other.CreationTimestamp.Time) +} + +func (c *Consumer) IsReady() bool { + return c.Generation == c.Status.ObservedGeneration && + c.GetConditionSet().Manage(c.GetStatus()).IsHappy() +} + +// ConsumerOption is a functional option for Consumer. +type ConsumerOption func(consumer *Consumer) diff --git a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_validation.go b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_validation.go new file mode 100644 index 0000000000..a074c6464b --- /dev/null +++ b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_validation.go @@ -0,0 +1,54 @@ +/* + * Copyright 2021 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package v1alpha1 + +import ( + "context" + + "knative.dev/pkg/apis" +) + +func (cs *ConsumerSpec) Validate(ctx context.Context) *apis.FieldError { + var err *apis.FieldError + if len(cs.Topics) == 0 { + return apis.ErrMissingField("topics") + } + return err.Also( + cs.Delivery.Validate(ctx).ViaField("delivery"), + cs.Configs.Validate(ctx).ViaField("configs"), + cs.Filters.Validate(ctx).ViaField("filters"), + cs.Subscriber.Validate(ctx).ViaField("subscriber"), + ) +} + +func (d *DeliverySpec) Validate(ctx context.Context) *apis.FieldError { + if d == nil { + return nil + } + return d.DeliverySpec.Validate(ctx).ViaField(apis.CurrentField) +} + +func (cc *ConsumerConfigs) Validate(ctx context.Context) *apis.FieldError { + if v, ok := cc.Configs["group.id"]; !ok || v == "" { + return apis.ErrMissingField("group.id") + } + return nil +} + +func (f *Filters) Validate(ctx context.Context) *apis.FieldError { + return nil +} diff --git a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/register.go b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/register.go index cbae815a7a..d8b2bf76c2 100644 --- a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/register.go +++ b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/register.go @@ -27,6 +27,12 @@ import ( // SchemeGroupVersion is group version used to register these objects. var SchemeGroupVersion = schema.GroupVersion{Group: kafkaeventing.GroupName, Version: "v1alpha1"} +var ConsumerGroupGroupVersionKind = schema.GroupVersionKind{ + Group: SchemeGroupVersion.Group, + Version: SchemeGroupVersion.Version, + Kind: "ConsumerGroup", +} + // Kind takes an unqualified kind and returns back a Group qualified GroupKind. func Kind(kind string) schema.GroupKind { return SchemeGroupVersion.WithKind(kind).GroupKind() diff --git a/control-plane/pkg/reconciler/consumergroup/consumergroup.go b/control-plane/pkg/reconciler/consumergroup/consumergroup.go new file mode 100644 index 0000000000..cfe92c7022 --- /dev/null +++ b/control-plane/pkg/reconciler/consumergroup/consumergroup.go @@ -0,0 +1,136 @@ +/* + * Copyright 2021 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package consumergroup + +import ( + "context" + "fmt" + "sort" + + "k8s.io/apimachinery/pkg/api/equality" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "knative.dev/pkg/reconciler" + + kafkainternals "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1" + internalv1alpha1 "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/clientset/versioned/typed/eventing/v1alpha1" + "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/reconciler/eventing/v1alpha1/consumergroup" + kafkainternalslisters "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/listers/eventing/v1alpha1" +) + +type Reconciler struct { + ConsumerLister kafkainternalslisters.ConsumerLister + InternalsClient internalv1alpha1.InternalV1alpha1Interface +} + +func (r Reconciler) ReconcileKind(ctx context.Context, cg *kafkainternals.ConsumerGroup) reconciler.Event { + if err := r.reconcileConsumers(ctx, cg); err != nil { + return err + } + cg.MarkReconcileConsumersSucceeded() + + return nil +} + +func (r Reconciler) reconcileConsumers(ctx context.Context, cg *kafkainternals.ConsumerGroup) error { + + // Get consumers associated with the ConsumerGroup. + existingConsumers, err := r.ConsumerLister.Consumers(cg.GetNamespace()).List(labels.SelectorFromSet(cg.Spec.Selector)) + if err != nil { + return cg.MarkReconcileConsumersFailed("ListConsumers", err) + } + + // Stable sort consumers so that we give consumers different deletion + // priorities based on their state. + // + // Consumers at the tail of the list are deleted first. + sort.SliceStable(existingConsumers, func(i, j int) bool { + return existingConsumers[i].IsLessThan(existingConsumers[j]) + }) + + nConsumers := int32(len(existingConsumers)) + + for i := int32(0); i < *cg.Spec.Replicas && i < nConsumers; i++ { + if err := r.reconcileConsumer(ctx, cg, existingConsumers[i]); err != nil { + return cg.MarkReconcileConsumersFailed("ReconcileConsumer", err) + } + } + + if nConsumers > *cg.Spec.Replicas { + toBeDeleted := existingConsumers[:nConsumers-*cg.Spec.Replicas] + for _, c := range toBeDeleted { + if err := r.finalizeConsumer(ctx, c); err != nil { + return cg.MarkReconcileConsumersFailed("FinalizeConsumer", err) + } + } + return nil + } + if nConsumers < *cg.Spec.Replicas { + for i := int32(0); i < *cg.Spec.Replicas-nConsumers; i++ { + if err := r.reconcileConsumer(ctx, cg, nil); err != nil { + return cg.MarkReconcileConsumersFailed("ReconcileConsumer", err) + } + } + return nil + } + + return nil +} + +func (r Reconciler) reconcileConsumer(ctx context.Context, cg *kafkainternals.ConsumerGroup, c *kafkainternals.Consumer) error { + if c == nil { + // Consumer doesn't exist, create it. + c = cg.ConsumerFromTemplate() + + if _, err := r.InternalsClient.Consumers(cg.GetNamespace()).Create(ctx, c, metav1.CreateOptions{}); err != nil { + return fmt.Errorf("failed to create consumer %s/%s: %w", c.GetNamespace(), c.GetName(), err) + } + return nil + } + if equality.Semantic.DeepDerivative(cg.Spec.Template, c.Spec) { + // Consumer is equal to the template. + return nil + } + // Update existing Consumer. + newC := &kafkainternals.Consumer{ + TypeMeta: c.TypeMeta, + ObjectMeta: c.ObjectMeta, + Spec: cg.Spec.Template.Spec, + Status: c.Status, + } + if _, err := r.InternalsClient.Consumers(cg.GetNamespace()).Update(ctx, newC, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("failed to update consumer %s/%s: %w", newC.GetNamespace(), newC.GetName(), err) + } + + return nil +} + +func (r Reconciler) finalizeConsumer(ctx context.Context, consumer *kafkainternals.Consumer) error { + dOpts := metav1.DeleteOptions{ + Preconditions: &metav1.Preconditions{UID: &consumer.UID}, + } + err := r.InternalsClient.Consumers(consumer.GetNamespace()).Delete(ctx, consumer.GetName(), dOpts) + if err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("failed to remove consumer %s/%s: %w", consumer.GetNamespace(), consumer.GetName(), err) + } + return nil +} + +var ( + _ consumergroup.Interface = &Reconciler{} +) From 55f4dd6102d9aa70c2f49490e7fe6721dcb28e9b Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Wed, 1 Dec 2021 17:31:26 +0100 Subject: [PATCH 2/3] Run hack/update-codegen.sh Signed-off-by: Pierangelo Di Pilato --- .../kafka/eventing/v1alpha1/zz_generated.deepcopy.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/zz_generated.deepcopy.go b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/zz_generated.deepcopy.go index 63a25c06fc..479b4e94b2 100644 --- a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/zz_generated.deepcopy.go +++ b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/zz_generated.deepcopy.go @@ -147,6 +147,13 @@ func (in *ConsumerGroupSpec) DeepCopyInto(out *ConsumerGroupSpec) { *out = new(int32) **out = **in } + if in.Selector != nil { + in, out := &in.Selector, &out.Selector + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } return } From 22319b5bede6dc2109e788c4664434ba77df5e68 Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Wed, 1 Dec 2021 17:45:58 +0100 Subject: [PATCH 3/3] Remove placements method (for now) Signed-off-by: Pierangelo Di Pilato --- .../kafka/eventing/v1alpha1/consumer_group_types.go | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_types.go b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_types.go index 9a27766bb8..085c86660c 100644 --- a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_types.go +++ b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_types.go @@ -19,7 +19,6 @@ package v1alpha1 import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" duckv1 "knative.dev/pkg/apis/duck/v1" ) @@ -51,18 +50,6 @@ type ConsumerGroup struct { Status ConsumerGroupStatus `json:"status,omitempty"` } -func (c *ConsumerGroup) GetKey() types.NamespacedName { - return types.NamespacedName{Namespace: c.GetNamespace(), Name: c.GetName()} -} - -func (c *ConsumerGroup) GetVReplicas() int32 { - return *c.Spec.Replicas -} - -func (c *ConsumerGroup) GetPlacements() []eventingduckv1alpha1.Placement { - return c.Status.Placements -} - type ConsumerGroupSpec struct { // Template is the object that describes the consumer that will be created if