Skip to content

Commit

Permalink
Trigger reconciler tests for ConsumerGroup scaling (#1756)
Browse files Browse the repository at this point in the history
* Trigger reconciler tests

Signed-off-by: aavarghese <avarghese@us.ibm.com>

* Review updates for trigger reconciler tests

Signed-off-by: aavarghese <avarghese@us.ibm.com>

* Review updates for trigger reconciler tests

Signed-off-by: aavarghese <avarghese@us.ibm.com>
  • Loading branch information
aavarghese authored Jan 21, 2022
1 parent 6d27780 commit 7d23f5f
Show file tree
Hide file tree
Showing 8 changed files with 718 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (c *ConsumerGroup) GetUntypedSpec() interface{} {
return c.Spec
}

// GetStatus retrieves the status of the ConsumerGroupt. Implements the KRShaped interface.
// GetStatus retrieves the status of the ConsumerGroup. Implements the KRShaped interface.
func (c *ConsumerGroup) GetStatus() *duckv1.Status {
return &c.Status.Status
}
Expand Down
51 changes: 51 additions & 0 deletions control-plane/pkg/reconciler/testing/objects_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ import (

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
"knative.dev/eventing-kafka/pkg/apis/bindings/v1beta1"
v1 "knative.dev/eventing/pkg/apis/eventing/v1"
duckv1 "knative.dev/pkg/apis/duck/v1"

internals "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing"
kafkainternals "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1"
)

Expand Down Expand Up @@ -81,6 +84,24 @@ func NewConsumerSpec(opts ...ConsumerSpecOption) kafkainternals.ConsumerSpec {
return *spec
}

func NewConsumerSpecDelivery(order internals.DeliveryOrdering) *kafkainternals.DeliverySpec {
return &kafkainternals.DeliverySpec{
Ordering: order,
}
}

func NewConsumerSpecAuth() *kafkainternals.Auth {
return &kafkainternals.Auth{
NetSpec: &v1beta1.KafkaNetSpec{},
}
}

func NewConsumerSpecFilters() *kafkainternals.Filters {
return &kafkainternals.Filters{
Filter: &v1.TriggerFilter{},
}
}

func ConsumerSpec(spec kafkainternals.ConsumerSpec) ConsumerOption {
return func(cg *kafkainternals.Consumer) {
cg.Spec = spec
Expand Down Expand Up @@ -130,3 +151,33 @@ func ConsumerVReplicas(vreplicas int32) ConsumerSpecOption {
c.VReplicas = &vreplicas
}
}

func ConsumerAuth(auth *kafkainternals.Auth) ConsumerSpecOption {
return func(c *kafkainternals.ConsumerSpec) {
c.Auth = auth
}
}

func ConsumerDelivery(delivery *kafkainternals.DeliverySpec) ConsumerSpecOption {
return func(c *kafkainternals.ConsumerSpec) {
c.Delivery = delivery
}
}

func ConsumerSubscriber(dest duckv1.Destination) ConsumerSpecOption {
return func(c *kafkainternals.ConsumerSpec) {
c.Subscriber = dest
}
}

func ConsumerCloudEventOverrides(ce *duckv1.CloudEventOverrides) ConsumerSpecOption {
return func(c *kafkainternals.ConsumerSpec) {
c.CloudEventOverrides = ce
}
}

func ConsumerFilters(filters *kafkainternals.Filters) ConsumerSpecOption {
return func(c *kafkainternals.ConsumerSpec) {
c.Filters = filters
}
}
51 changes: 51 additions & 0 deletions control-plane/pkg/reconciler/testing/objects_consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package testing

import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"

kafkainternals "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1"
Expand Down Expand Up @@ -68,6 +70,45 @@ func NewConsumerGroup(opts ...ConsumerGroupOption) *kafkainternals.ConsumerGroup
return cg
}

func ConsumerGroupReady(cg *kafkainternals.ConsumerGroup) {
cg.Status.Conditions = duckv1.Conditions{
{
Type: apis.ConditionReady,
Status: corev1.ConditionTrue,
},
}
}

func WithConsumerGroupFailed(reason string, msg string) ConsumerGroupOption {
return func(cg *kafkainternals.ConsumerGroup) {
cg.GetConditionSet().Manage(cg.GetStatus()).MarkFalse(kafkainternals.ConditionConsumerGroupConsumers, reason, msg)
}
}

func WithConsumerGroupName(name string) ConsumerGroupOption {
return func(cg *kafkainternals.ConsumerGroup) {
cg.ObjectMeta.Name = name
}
}

func WithConsumerGroupNamespace(namespace string) ConsumerGroupOption {
return func(cg *kafkainternals.ConsumerGroup) {
cg.ObjectMeta.Namespace = namespace
}
}

func WithConsumerGroupOwnerRef(ownerref *metav1.OwnerReference) ConsumerGroupOption {
return func(cg *kafkainternals.ConsumerGroup) {
cg.ObjectMeta.OwnerReferences = []metav1.OwnerReference{*ownerref}
}
}

func WithConsumerGroupLabels(labels map[string]string) ConsumerGroupOption {
return func(cg *kafkainternals.ConsumerGroup) {
cg.Spec.Template.Labels = labels
}
}

func ConsumerGroupReplicas(replicas int32) ConsumerGroupOption {
return func(cg *kafkainternals.ConsumerGroup) {
cg.Spec.Replicas = pointer.Int32Ptr(replicas)
Expand All @@ -85,3 +126,13 @@ func ConsumerGroupConsumerSpec(spec kafkainternals.ConsumerSpec) ConsumerGroupOp
cg.Spec.Template.Spec = spec
}
}

func WithDeadLetterSinkURI(uri string) func(cg *kafkainternals.ConsumerGroup) {
return func(cg *kafkainternals.ConsumerGroup) {
u, err := apis.ParseURL(uri)
if err != nil {
panic(err)
}
cg.Status.DeadLetterSinkURI = u
}
}
9 changes: 7 additions & 2 deletions control-plane/pkg/reconciler/testing/objects_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,13 @@ func InitSourceConditions(obj duckv1.KRShaped) {
func StatusSourceSinkResolved(uri string) KRShapedOption {
return func(obj duckv1.KRShaped) {
ks := obj.(*sources.KafkaSource)
ks.Status.SinkURI, _ = apis.ParseURL(uri)
ks.GetConditionSet().Manage(ks.GetStatus()).MarkTrue(sources.KafkaConditionSinkProvided)
res, _ := apis.ParseURL(uri)
ks.Status.SinkURI = res
if !res.IsEmpty() {
ks.GetConditionSet().Manage(ks.GetStatus()).MarkTrue(sources.KafkaConditionSinkProvided)
} else {
ks.GetConditionSet().Manage(ks.GetStatus()).MarkUnknown(sources.KafkaConditionSinkProvided, "SinkEmpty", "Sink has resolved to empty.%s", "")
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions control-plane/pkg/reconciler/trigger/v2/controllerv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ const (
FinalizerName = "kafka.triggers.eventing.knative.dev"
)

func NewControllerV2(ctx context.Context, configs *config.Env) *controller.Impl {
func NewController(ctx context.Context, configs *config.Env) *controller.Impl {

logger := logging.FromContext(ctx).Desugar()

Expand All @@ -57,7 +57,7 @@ func NewControllerV2(ctx context.Context, configs *config.Env) *controller.Impl
triggerLister := triggerInformer.Lister()
consumerGroupInformer := consumergroupinformer.Get(ctx)

reconciler := &ReconcilerV2{
reconciler := &Reconciler{
BrokerLister: brokerInformer.Lister(),
EventingClient: eventingclient.Get(ctx),
Env: configs,
Expand Down
4 changes: 2 additions & 2 deletions control-plane/pkg/reconciler/trigger/v2/controllerv2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ import (
"knative.dev/eventing-kafka-broker/control-plane/pkg/config"
)

func TestNewControllerV2(t *testing.T) {
func TestNewController(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)

controller := NewControllerV2(ctx, &config.Env{})
controller := NewController(ctx, &config.Env{})
if controller == nil {
t.Error("failed to create controller: <nil>")
}
Expand Down
24 changes: 16 additions & 8 deletions control-plane/pkg/reconciler/trigger/v2/triggerv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ const (
deliveryOrderAnnotation = "kafka.eventing.knative.dev/delivery.order"
)

type ReconcilerV2 struct {
type Reconciler struct {
BrokerLister eventinglisters.BrokerLister
EventingClient eventingclientset.Interface
Env *config.Env
ConsumerGroupLister internalslst.ConsumerGroupLister
InternalsClient internalsclient.Interface
}

func (r *ReconcilerV2) ReconcileKind(ctx context.Context, trigger *eventing.Trigger) reconciler.Event {
func (r *Reconciler) ReconcileKind(ctx context.Context, trigger *eventing.Trigger) reconciler.Event {
logger := kafkalogging.CreateReconcileMethodLogger(ctx, trigger)

broker, err := r.BrokerLister.Brokers(trigger.Namespace).Get(trigger.Spec.Broker)
Expand Down Expand Up @@ -83,23 +83,31 @@ func (r *ReconcilerV2) ReconcileKind(ctx context.Context, trigger *eventing.Trig
return nil
}

consgroup, err := r.reconcileConsumerGroup(ctx, trigger)
cg, err := r.reconcileConsumerGroup(ctx, trigger)
if err != nil {
trigger.Status.MarkDependencyFailed("failed to reconcile consumer group", err.Error())
return err
}
trigger.Status.MarkDependencySucceeded()

trigger.Status.SubscriberURI = consgroup.Status.SubscriberURI
if cg.IsReady() {
trigger.Status.MarkDependencySucceeded()
} else {
topLevelCondition := cg.GetConditionSet().Manage(cg.GetStatus()).GetTopLevelCondition()
if topLevelCondition == nil {
trigger.Status.MarkDependencyUnknown("failed to reconcile consumer group", "consumer group not ready")
} else {
trigger.Status.MarkDependencyFailed(topLevelCondition.Reason, topLevelCondition.Message)
}
}
trigger.Status.SubscriberURI = cg.Status.SubscriberURI
trigger.Status.MarkSubscriberResolvedSucceeded()

trigger.Status.DeadLetterSinkURI = consgroup.Status.DeadLetterSinkURI
trigger.Status.DeadLetterSinkURI = cg.Status.DeadLetterSinkURI
trigger.Status.MarkDeadLetterSinkResolvedSucceeded()

return nil
}

func (r ReconcilerV2) reconcileConsumerGroup(ctx context.Context, trigger *eventing.Trigger) (*internalscg.ConsumerGroup, error) {
func (r Reconciler) reconcileConsumerGroup(ctx context.Context, trigger *eventing.Trigger) (*internalscg.ConsumerGroup, error) {

var deliveryOrdering = internals.Unordered
var err error
Expand Down
Loading

0 comments on commit 7d23f5f

Please sign in to comment.