Skip to content

Commit

Permalink
ConsumerGroup reconciler
Browse files Browse the repository at this point in the history
Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com>
  • Loading branch information
pierDipi committed Dec 1, 2021
1 parent e956d93 commit acca8f1
Show file tree
Hide file tree
Showing 12 changed files with 718 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2021 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package v1alpha1

import (
"context"

"knative.dev/pkg/apis"
)

var (
_ apis.Defaultable = &Consumer{}
)

// SetDefaults implements apis.Defaultable.
func (c *Consumer) SetDefaults(ctx context.Context) {
ctx = apis.WithinParent(ctx, c.ObjectMeta)
c.Spec.SetDefaults(ctx)
}

func (c *ConsumerSpec) SetDefaults(ctx context.Context) {
c.Delivery.SetDefaults(ctx)
c.Subscriber.SetDefaults(ctx)
}

func (d *DeliverySpec) SetDefaults(ctx context.Context) {
if d == nil {
return
}
d.DeliverySpec.SetDefaults(ctx)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2021 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package v1alpha1

import (
"context"
"testing"

"github.com/google/go-cmp/cmp"
)

func TestConsumerSetDefaults(t *testing.T) {
tests := []struct {
name string
ctx context.Context
given *Consumer
want *Consumer
}{
{
name: "with delivery",
ctx: context.Background(),
given: &Consumer{
Spec: ConsumerSpec{Delivery: &DeliverySpec{}},
},
want: &Consumer{
Spec: ConsumerSpec{Delivery: &DeliverySpec{}},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.given.SetDefaults(tt.ctx)
if diff := cmp.Diff(tt.want, tt.given); diff != "" {
t.Error("(-want, +got)", diff)
}
})
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2021 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package v1alpha1

import (
"context"

"k8s.io/utils/pointer"
"knative.dev/pkg/apis"
)

var (
_ apis.Defaultable = &ConsumerGroup{}
)

// SetDefaults implements apis.Defaultable.
func (cg *ConsumerGroup) SetDefaults(ctx context.Context) {
ctx = apis.WithinParent(ctx, cg.ObjectMeta)

// Replicas is the number of Consumers for this ConsumerGroup.
// When unset, set it to 1.
if cg.Spec.Replicas == nil {
cg.Spec.Replicas = pointer.Int32Ptr(1)
}
// Selector is a label query over consumers that should match the Replicas count.
// If Selector is empty, it is defaulted to the labels present on the template.
if cg.Spec.Selector == nil || len(cg.Spec.Selector) == 0 {
cg.Spec.Selector = cg.Spec.Template.Labels
}

// Force template namespace to be set to ConsumerGroup's namespace.
cg.Spec.Template.Namespace = cg.Namespace
cg.Spec.Template.Spec.SetDefaults(ctx)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package v1alpha1

import (
"context"
"testing"

"github.com/google/go-cmp/cmp"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
)

func TestConsumerGroupSetDefaults(t *testing.T) {
tests := []struct {
name string
ctx context.Context
given *ConsumerGroup
want *ConsumerGroup
}{
{
name: "default replicas",
ctx: context.Background(),
given: &ConsumerGroup{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "name",
},
Spec: ConsumerGroupSpec{
Template: ConsumerTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
},
},
},
},
want: &ConsumerGroup{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "name",
},
Spec: ConsumerGroupSpec{
Template: ConsumerTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
},
},
Replicas: pointer.Int32Ptr(1),
},
},
},
{
name: "default selector",
ctx: context.Background(),
given: &ConsumerGroup{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "name",
},
Spec: ConsumerGroupSpec{
Template: ConsumerTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Labels: map[string]string{"app": "app"},
},
},
Replicas: pointer.Int32Ptr(1),
},
},
want: &ConsumerGroup{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "name",
},
Spec: ConsumerGroupSpec{
Template: ConsumerTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Labels: map[string]string{"app": "app"},
},
},
Replicas: pointer.Int32Ptr(1),
Selector: map[string]string{"app": "app"},
},
},
},
{
name: "default namespace",
ctx: context.Background(),
given: &ConsumerGroup{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "name",
},
Spec: ConsumerGroupSpec{
Replicas: pointer.Int32Ptr(1),
},
},
want: &ConsumerGroup{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "name",
},
Spec: ConsumerGroupSpec{
Template: ConsumerTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
},
},
Replicas: pointer.Int32Ptr(1),
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.given.SetDefaults(tt.ctx)

if diff := cmp.Diff(tt.want, tt.given); diff != "" {
t.Error("(-want, +got)", diff)
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,31 @@
package v1alpha1

import (
"fmt"

"knative.dev/pkg/apis"
)

const (
ConditionConsumers apis.ConditionType = "Consumers"
)

var (
conditionSet = apis.NewLivingConditionSet(
ConditionConsumers,
)
)

func (c *ConsumerGroup) GetConditionSet() apis.ConditionSet {
return apis.NewLivingConditionSet()
return conditionSet
}

func (cg *ConsumerGroup) MarkReconcileConsumersFailed(reason string, err error) error {
err = fmt.Errorf("failed to reconcile consumers: %w", err)
cg.GetConditionSet().Manage(cg.GetStatus()).MarkFalse(ConditionConsumers, reason, err.Error())
return err
}

func (cg *ConsumerGroup) MarkReconcileConsumersSucceeded() {
cg.GetConditionSet().Manage(cg.GetStatus()).MarkTrue(ConditionConsumers)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
duckv1 "knative.dev/pkg/apis/duck/v1"

"k8s.io/apimachinery/pkg/types"
eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
duckv1 "knative.dev/pkg/apis/duck/v1"
)

// +genclient
Expand Down Expand Up @@ -51,6 +51,18 @@ type ConsumerGroup struct {
Status ConsumerGroupStatus `json:"status,omitempty"`
}

func (c *ConsumerGroup) GetKey() types.NamespacedName {
return types.NamespacedName{Namespace: c.GetNamespace(), Name: c.GetName()}
}

func (c *ConsumerGroup) GetVReplicas() int32 {
return *c.Spec.Replicas
}

func (c *ConsumerGroup) GetPlacements() []eventingduckv1alpha1.Placement {
return c.Status.Placements
}

type ConsumerGroupSpec struct {

// Template is the object that describes the consumer that will be created if
Expand All @@ -65,6 +77,14 @@ type ConsumerGroupSpec struct {
// If unspecified, defaults to 1.
// +optional
Replicas *int32 `json:"replicas,omitempty"`

// Selector is a label query over consumers that should match the Replicas count.
// If Selector is empty, it is defaulted to the labels present on the template.
// Label keys and values that must match in order to be controlled by this
// controller, if empty defaulted to labels on template.
// More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors
// +optional
Selector map[string]string `json:"selector,omitempty" protobuf:"bytes,2,rep,name=selector"`
}

type ConsumerGroupStatus struct {
Expand Down Expand Up @@ -102,3 +122,20 @@ func (c *ConsumerGroup) GetUntypedSpec() interface{} {
func (c *ConsumerGroup) GetStatus() *duckv1.Status {
return &c.Status.Status
}

// ConsumerFromTemplate returns a Consumer from the Consumer template in the ConsumerGroup spec.
func (cg *ConsumerGroup) ConsumerFromTemplate(options ...ConsumerOption) *Consumer {
// TODO figure out naming strategy, is generateName enough?
c := &Consumer{
ObjectMeta: cg.Spec.Template.ObjectMeta,
Spec: cg.Spec.Template.Spec,
}

ownerRef := metav1.NewControllerRef(cg, ConsumerGroupGroupVersionKind)
c.OwnerReferences = append(c.OwnerReferences, *ownerRef)

for _, opt := range options {
opt(c)
}
return c
}
Loading

0 comments on commit acca8f1

Please sign in to comment.