Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trigger reconciler tests for ConsumerGroup scaling #1756

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions control-plane/pkg/reconciler/testing/objects_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ import (

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
"knative.dev/eventing-kafka/pkg/apis/bindings/v1beta1"
v1 "knative.dev/eventing/pkg/apis/eventing/v1"
duckv1 "knative.dev/pkg/apis/duck/v1"

internals "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing"
kafkainternals "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1"
)

Expand Down Expand Up @@ -81,6 +84,24 @@ func NewConsumerSpec(opts ...ConsumerSpecOption) kafkainternals.ConsumerSpec {
return *spec
}

func NewConsumerSpecDelivery(order internals.DeliveryOrdering) *kafkainternals.DeliverySpec {
return &kafkainternals.DeliverySpec{
Ordering: order,
}
}

func NewConsumerSpecAuth() *kafkainternals.Auth {
return &kafkainternals.Auth{
NetSpec: &v1beta1.KafkaNetSpec{},
}
}

func NewConsumerSpecFilters() *kafkainternals.Filters {
return &kafkainternals.Filters{
Filter: &v1.TriggerFilter{},
}
}

func ConsumerSpec(spec kafkainternals.ConsumerSpec) ConsumerOption {
return func(cg *kafkainternals.Consumer) {
cg.Spec = spec
Expand Down Expand Up @@ -130,3 +151,33 @@ func ConsumerVReplicas(vreplicas int32) ConsumerSpecOption {
c.VReplicas = &vreplicas
}
}

func ConsumerAuth(auth *kafkainternals.Auth) ConsumerSpecOption {
return func(c *kafkainternals.ConsumerSpec) {
c.Auth = auth
}
}

func ConsumerDelivery(delivery *kafkainternals.DeliverySpec) ConsumerSpecOption {
return func(c *kafkainternals.ConsumerSpec) {
c.Delivery = delivery
}
}

func ConsumerSubscriber(dest duckv1.Destination) ConsumerSpecOption {
return func(c *kafkainternals.ConsumerSpec) {
c.Subscriber = dest
}
}

func ConsumerCloudEventOverrides(ce *duckv1.CloudEventOverrides) ConsumerSpecOption {
return func(c *kafkainternals.ConsumerSpec) {
c.CloudEventOverrides = ce
}
}

func ConsumerFilters(filters *kafkainternals.Filters) ConsumerSpecOption {
return func(c *kafkainternals.ConsumerSpec) {
c.Filters = filters
}
}
45 changes: 45 additions & 0 deletions control-plane/pkg/reconciler/testing/objects_consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package testing

import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"

kafkainternals "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1"
Expand Down Expand Up @@ -68,6 +70,39 @@ func NewConsumerGroup(opts ...ConsumerGroupOption) *kafkainternals.ConsumerGroup
return cg
}

func ConsumerGroupReady(cg *kafkainternals.ConsumerGroup) {
cg.Status.Conditions = duckv1.Conditions{
{
Type: apis.ConditionReady,
Status: corev1.ConditionTrue,
},
}
}

func WithConsumerGroupName(name string) ConsumerGroupOption {
return func(cg *kafkainternals.ConsumerGroup) {
cg.ObjectMeta.Name = name
}
}

func WithConsumerGroupNamespace(namespace string) ConsumerGroupOption {
return func(cg *kafkainternals.ConsumerGroup) {
cg.ObjectMeta.Namespace = namespace
}
}

func WithConsumerGroupOwnerRef(ownerref *metav1.OwnerReference) ConsumerGroupOption {
return func(cg *kafkainternals.ConsumerGroup) {
cg.ObjectMeta.OwnerReferences = []metav1.OwnerReference{*ownerref}
}
}

func WithConsumerGroupLabels(labels map[string]string) ConsumerGroupOption {
return func(cg *kafkainternals.ConsumerGroup) {
cg.Spec.Template.Labels = labels
}
}

func ConsumerGroupReplicas(replicas int32) ConsumerGroupOption {
return func(cg *kafkainternals.ConsumerGroup) {
cg.Spec.Replicas = pointer.Int32Ptr(replicas)
Expand All @@ -85,3 +120,13 @@ func ConsumerGroupConsumerSpec(spec kafkainternals.ConsumerSpec) ConsumerGroupOp
cg.Spec.Template.Spec = spec
}
}

func WithDeadLetterSinkURI(uri string) func(cg *kafkainternals.ConsumerGroup) {
return func(cg *kafkainternals.ConsumerGroup) {
u, err := apis.ParseURL(uri)
if err != nil {
panic(err)
}
cg.Status.DeadLetterSinkURI = u
}
}
9 changes: 7 additions & 2 deletions control-plane/pkg/reconciler/testing/objects_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,13 @@ func InitSourceConditions(obj duckv1.KRShaped) {
func StatusSourceSinkResolved(uri string) KRShapedOption {
return func(obj duckv1.KRShaped) {
ks := obj.(*sources.KafkaSource)
ks.Status.SinkURI, _ = apis.ParseURL(uri)
ks.GetConditionSet().Manage(ks.GetStatus()).MarkTrue(sources.KafkaConditionSinkProvided)
res, _ := apis.ParseURL(uri)
ks.Status.SinkURI = res
if !res.IsEmpty() {
ks.GetConditionSet().Manage(ks.GetStatus()).MarkTrue(sources.KafkaConditionSinkProvided)
} else {
ks.GetConditionSet().Manage(ks.GetStatus()).MarkUnknown(sources.KafkaConditionSinkProvided, "SinkEmpty", "Sink has resolved to empty.%s", "")
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions control-plane/pkg/reconciler/trigger/v2/controllerv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ const (
FinalizerName = "kafka.triggers.eventing.knative.dev"
)

func NewControllerV2(ctx context.Context, configs *config.Env) *controller.Impl {
func NewController(ctx context.Context, configs *config.Env) *controller.Impl {

logger := logging.FromContext(ctx).Desugar()

Expand All @@ -57,7 +57,7 @@ func NewControllerV2(ctx context.Context, configs *config.Env) *controller.Impl
triggerLister := triggerInformer.Lister()
consumerGroupInformer := consumergroupinformer.Get(ctx)

reconciler := &ReconcilerV2{
reconciler := &Reconciler{
BrokerLister: brokerInformer.Lister(),
EventingClient: eventingclient.Get(ctx),
Env: configs,
Expand Down
4 changes: 2 additions & 2 deletions control-plane/pkg/reconciler/trigger/v2/controllerv2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ import (
"knative.dev/eventing-kafka-broker/control-plane/pkg/config"
)

func TestNewControllerV2(t *testing.T) {
func TestNewController(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)

controller := NewControllerV2(ctx, &config.Env{})
controller := NewController(ctx, &config.Env{})
if controller == nil {
t.Error("failed to create controller: <nil>")
}
Expand Down
26 changes: 17 additions & 9 deletions control-plane/pkg/reconciler/trigger/v2/triggerv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ const (
deliveryOrderAnnotation = "kafka.eventing.knative.dev/delivery.order"
)

type ReconcilerV2 struct {
type Reconciler struct {
BrokerLister eventinglisters.BrokerLister
EventingClient eventingclientset.Interface
Env *config.Env
ConsumerGroupLister internalslst.ConsumerGroupLister
InternalsClient internalsclient.Interface
}

func (r *ReconcilerV2) ReconcileKind(ctx context.Context, trigger *eventing.Trigger) reconciler.Event {
func (r *Reconciler) ReconcileKind(ctx context.Context, trigger *eventing.Trigger) reconciler.Event {
logger := kafkalogging.CreateReconcileMethodLogger(ctx, trigger)

broker, err := r.BrokerLister.Brokers(trigger.Namespace).Get(trigger.Spec.Broker)
Expand Down Expand Up @@ -83,25 +83,33 @@ func (r *ReconcilerV2) ReconcileKind(ctx context.Context, trigger *eventing.Trig
return nil
}

consgroup, err := r.reconcileConsumerGroup(ctx, trigger)
cg, err := r.reconcileConsumerGroup(ctx, trigger)
if err != nil {
trigger.Status.MarkDependencyFailed("failed to reconcile consumer group", err.Error())
return err
}
trigger.Status.MarkDependencySucceeded()

trigger.Status.SubscriberURI = consgroup.Status.SubscriberURI
if cg.IsReady() {
trigger.Status.MarkDependencySucceeded()
} else {
topLevelCondition := cg.GetConditionSet().Manage(cg.GetStatus()).GetTopLevelCondition()
if topLevelCondition == nil {
trigger.Status.MarkDependencyUnknown("failed to reconcile consumer group", "consumer group not ready")
} else {
trigger.Status.MarkDependencyFailed(topLevelCondition.Reason, topLevelCondition.Message)
aavarghese marked this conversation as resolved.
Show resolved Hide resolved
}
}
trigger.Status.SubscriberURI = cg.Status.SubscriberURI
trigger.Status.MarkSubscriberResolvedSucceeded()

trigger.Status.DeadLetterSinkURI = consgroup.Status.DeadLetterSinkURI
trigger.Status.DeadLetterSinkURI = cg.Status.DeadLetterSinkURI
trigger.Status.MarkDeadLetterSinkResolvedSucceeded()

return nil
}

func (r ReconcilerV2) reconcileConsumerGroup(ctx context.Context, trigger *eventing.Trigger) (*internalscg.ConsumerGroup, error) {
func (r Reconciler) reconcileConsumerGroup(ctx context.Context, trigger *eventing.Trigger) (*internalscg.ConsumerGroup, error) {

var deliveryOrdering = internals.Unordered
var deliveryOrdering = internals.Ordered
aavarghese marked this conversation as resolved.
Show resolved Hide resolved
var err error
deliveryOrderingAnnotationValue, ok := trigger.Annotations[deliveryOrderAnnotation]
if ok {
Expand Down
Loading