Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trigger Reconciler for ConsumerGroup scaling #1569

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"

eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
)

// +genclient
Expand Down Expand Up @@ -82,6 +85,14 @@ type ConsumerGroupStatus struct {

// inherits PlaceableStatus Status
eventingduckv1alpha1.PlaceableStatus `json:",inline"`

// SubscriberURI is the resolved URI of the receiver for this Trigger.
// +optional
SubscriberURI *apis.URL `json:"subscriberUri,omitempty"`

// DeliveryStatus contains a resolved URL to the dead letter sink address, and any other
// resolved delivery options.
eventingduckv1.DeliveryStatus `json:",inline"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestConsumerGroup_Validate(t *testing.T) {
},
},
Delivery: &DeliverySpec{
DeliverySpec: eventingduck.DeliverySpec{},
DeliverySpec: &eventingduck.DeliverySpec{},
},
Subscriber: duckv1.Destination{
URI: &apis.URL{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
eventingduck "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"

"knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing"
Expand Down Expand Up @@ -75,7 +76,7 @@ type ConsumerSpec struct {
type DeliverySpec struct {
// DeliverySpec is the Knative core delivery spec.
// DeliverySpec contains the delivery options for event senders.
eventingduck.DeliverySpec `json:",inline,omitempty"`
*eventingduck.DeliverySpec `json:",inline,omitempty"`

// Ordering is the ordering of the event delivery.
Ordering eventing.DeliveryOrdering `json:"ordering"`
Expand Down Expand Up @@ -111,6 +112,14 @@ type ConsumerStatus struct {
// * ObservedGeneration - the 'Generation' of the Consumer that was last processed by the controller.
// * Conditions - the latest available observations of a resource's current state.
duckv1.Status

// SubscriberURI is the resolved URI of the receiver for this Trigger.
// +optional
SubscriberURI *apis.URL `json:"subscriberUri,omitempty"`

// DeliveryStatus contains a resolved URL to the dead letter sink address, and any other
// resolved delivery options.
eventingduck.DeliveryStatus `json:",inline"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ import (
// If not specified, will default to allowing all events.
type Filters struct {

// Filter is the filter to apply against all events from the Broker. Only events that pass this
// filter will be sent to the Subscriber. If not specified, will default to allowing all events.
//
// +optional
Filter *eventing.TriggerFilter `json:"filter,omitempty"`

// Filters is an experimental field that conforms to the CNCF CloudEvents Subscriptions
// API. It's an array of filter expressions that evaluate to true or false.
// If any filter expression in the array evaluates to false, the event MUST
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions control-plane/pkg/reconciler/testing/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import (

fakeeventingkafkabrokerclient "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/client/fake"
"knative.dev/eventing-kafka-broker/control-plane/pkg/config"

fakeconsumergroupclient "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/client/fake"
)

const (
Expand All @@ -60,6 +62,7 @@ func NewFactory(env *config.Env, ctor Ctor) Factory {
ctx, sourcesKafkaClient := fakeeventingkafkakafkaclient.With(ctx, listers.GetEventingKafkaObjects()...)
ctx, eventingKafkaClient := fakeeventingkafkabrokerclient.With(ctx, listers.GetEventingKafkaBrokerObjects()...)
ctx, kubeClient := fakekubeclient.With(ctx, listers.GetKubeObjects()...)
ctx, consumerGroupClient := fakeconsumergroupclient.With(ctx, listers.GetConsumerGroupObjects()...)

ctx, dynamicClient := fakedynamicclient.With(ctx,
newScheme(),
Expand Down Expand Up @@ -95,6 +98,7 @@ func NewFactory(env *config.Env, ctor Ctor) Factory {
eventingClient.PrependReactor("*", "*", reactor)
eventingKafkaClient.PrependReactor("*", "*", reactor)
sourcesKafkaClient.PrependReactor("*", "*", reactor)
consumerGroupClient.PrependReactor("*", "*", reactor)
}

actionRecorderList := ActionRecorderList{
Expand All @@ -103,6 +107,7 @@ func NewFactory(env *config.Env, ctor Ctor) Factory {
eventingClient,
eventingKafkaClient,
sourcesKafkaClient,
consumerGroupClient,
}

eventList := EventList{
Expand Down
20 changes: 20 additions & 0 deletions control-plane/pkg/reconciler/testing/listers.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1"
"knative.dev/pkg/reconciler/testing"

eventingkafkabrokerconsumer "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1"
eventingkafkachannels "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1"
eventingkafkasources "knative.dev/eventing-kafka/pkg/apis/sources/v1beta1"
fakeeventingkafkaclientset "knative.dev/eventing-kafka/pkg/client/clientset/versioned/fake"
Expand All @@ -36,6 +37,8 @@ import (

eventingkafkabroker "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing/v1alpha1"
fakeeventingkafkabrokerclientset "knative.dev/eventing-kafka-broker/control-plane/pkg/client/clientset/versioned/fake"
fakeeventingkafkaconsumerclientset "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/clientset/versioned/fake"
consumerlisters "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/listers/eventing/v1alpha1"
eventingkafkabrokerlisters "knative.dev/eventing-kafka-broker/control-plane/pkg/client/listers/eventing/v1alpha1"
)

Expand All @@ -45,6 +48,7 @@ var clientSetSchemes = []func(*runtime.Scheme) error{
fakeapiextensionsclientset.AddToScheme,
fakeeventingkafkabrokerclientset.AddToScheme,
fakeeventingkafkaclientset.AddToScheme,
fakeeventingkafkaconsumerclientset.AddToScheme,
}

type Listers struct {
Expand Down Expand Up @@ -95,6 +99,14 @@ func (l *Listers) GetEventingKafkaObjects() []runtime.Object {
return l.sorter.ObjectsForSchemeFunc(fakeeventingkafkaclientset.AddToScheme)
}

func (l *Listers) GetConsumerGroupObjects() []runtime.Object {
return l.sorter.ObjectsForSchemeFunc(fakeeventingkafkaconsumerclientset.AddToScheme)
}

func (l *Listers) GetConsumerObjects() []runtime.Object {
return l.sorter.ObjectsForSchemeFunc(fakeeventingkafkaconsumerclientset.AddToScheme)
}

func (l *Listers) GetBrokerLister() eventinglisters.BrokerLister {
return eventinglisters.NewBrokerLister(l.indexerFor(&eventing.Broker{}))
}
Expand Down Expand Up @@ -127,6 +139,14 @@ func (l *Listers) GetKafkaChannelLister() eventingkafkachannelslisters.KafkaChan
return eventingkafkachannelslisters.NewKafkaChannelLister(l.indexerFor(&eventingkafkachannels.KafkaChannel{}))
}

func (l *Listers) GetConsumerGroupLister() consumerlisters.ConsumerGroupLister {
return consumerlisters.NewConsumerGroupLister(l.indexerFor(&eventingkafkabrokerconsumer.ConsumerGroup{}))
}

func (l *Listers) GetConsumerLister() consumerlisters.ConsumerLister {
return consumerlisters.NewConsumerLister(l.indexerFor(&eventingkafkabrokerconsumer.Consumer{}))
}

func (l *Listers) indexerFor(obj runtime.Object) cache.Indexer {
return l.sorter.IndexerForObjectType(obj)
}
21 changes: 21 additions & 0 deletions control-plane/pkg/reconciler/testing/objects_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
clientgotesting "k8s.io/client-go/testing"
"k8s.io/utils/pointer"
reconcilertesting "knative.dev/eventing/pkg/reconciler/testing/v1"
duckv1 "knative.dev/pkg/apis/duck/v1"

Expand All @@ -37,6 +38,8 @@ import (
"knative.dev/eventing-kafka-broker/control-plane/pkg/security"

"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base"

internalscg "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1"
)

const (
Expand Down Expand Up @@ -297,3 +300,21 @@ func allocateStatusAnnotations(obj duckv1.KRShaped) {
obj.GetStatus().Annotations = make(map[string]string, 1)
}
}

func NewConsumerGroup() *internalscg.ConsumerGroup {
return &internalscg.ConsumerGroup{
TypeMeta: metav1.TypeMeta{
APIVersion: internalscg.ConsumerGroupGroupVersionKind.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: TriggerUUID,
Namespace: ServiceNamespace,
},
Spec: internalscg.ConsumerGroupSpec{
Template: internalscg.ConsumerTemplateSpec{
Spec: internalscg.ConsumerSpec{},
},
Replicas: pointer.Int32Ptr(1),
},
}
}
11 changes: 5 additions & 6 deletions control-plane/pkg/reconciler/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"knative.dev/pkg/reconciler"
"knative.dev/pkg/resolver"

internals "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing"
"knative.dev/eventing-kafka-broker/control-plane/pkg/config"
coreconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/core/config"
kafkalogging "knative.dev/eventing-kafka-broker/control-plane/pkg/logging"
Expand All @@ -43,8 +44,6 @@ import (

const (
deliveryOrderAnnotation = "kafka.eventing.knative.dev/delivery.order"
deliveryOrderOrdered = "ordered"
deliveryOrderUnordered = "unordered"
)

type Reconciler struct {
Expand Down Expand Up @@ -101,7 +100,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, trigger *eventing.Trigge

logger.Debug("broker deleted", zap.String("finalizeDuringReconcile", "deleted"))

// The associated broker doesn't exist anymore, so clean up Trigger resources.
// The associated broker doesn't exist anymore, so clean up Trigger resources and owning consumer group resource.
return r.FinalizeKind(ctx, trigger)
}

Expand Down Expand Up @@ -326,11 +325,11 @@ func isKnativeKafkaBroker(broker *eventing.Broker) (bool, string) {

func deliveryOrderFromString(val string) (contract.DeliveryOrder, error) {
switch strings.ToLower(val) {
case deliveryOrderOrdered:
case string(internals.Ordered):
return contract.DeliveryOrder_ORDERED, nil
case deliveryOrderUnordered:
case string(internals.Unordered):
return contract.DeliveryOrder_UNORDERED, nil
default:
return contract.DeliveryOrder_UNORDERED, fmt.Errorf("invalid annotation %s value: %s. Allowed values [ %q | %q ]", deliveryOrderAnnotation, val, deliveryOrderOrdered, deliveryOrderUnordered)
return contract.DeliveryOrder_UNORDERED, fmt.Errorf("invalid annotation %s value: %s. Allowed values [ %q | %q ]", deliveryOrderAnnotation, val, internals.Ordered, internals.Unordered)
}
}
12 changes: 6 additions & 6 deletions control-plane/pkg/reconciler/trigger/trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"google.golang.org/protobuf/testing/protocmp"
"k8s.io/utils/pointer"
internals "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing"
"knative.dev/eventing-kafka-broker/control-plane/pkg/config"
eventingduck "knative.dev/eventing/pkg/apis/duck/v1"
duckv1 "knative.dev/pkg/apis/duck/v1"
Expand All @@ -38,7 +39,6 @@ import (
clientgotesting "k8s.io/client-go/testing"
eventing "knative.dev/eventing/pkg/apis/eventing/v1"
eventingclient "knative.dev/eventing/pkg/client/injection/client/fake"
fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake"
triggerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/trigger"
reconcilertesting "knative.dev/eventing/pkg/reconciler/testing/v1"
"knative.dev/pkg/apis"
Expand Down Expand Up @@ -305,7 +305,7 @@ func triggerReconciliation(t *testing.T, format string, env config.Env) {
NewBroker(
BrokerReady,
),
newTrigger(reconcilertesting.WithAnnotation(deliveryOrderAnnotation, deliveryOrderOrdered)),
newTrigger(reconcilertesting.WithAnnotation(deliveryOrderAnnotation, string(internals.Ordered))),
NewService(),
NewConfigMapFromContract(&contract.Contract{
Resources: []*contract.Resource{
Expand Down Expand Up @@ -357,7 +357,7 @@ func triggerReconciliation(t *testing.T, format string, env config.Env) {
reconcilertesting.WithTriggerDependencyReady(),
reconcilertesting.WithTriggerBrokerReady(),
withTriggerSubscriberResolvedSucceeded(contract.DeliveryOrder_ORDERED),
reconcilertesting.WithAnnotation(deliveryOrderAnnotation, deliveryOrderOrdered),
reconcilertesting.WithAnnotation(deliveryOrderAnnotation, string(internals.Ordered)),
reconcilertesting.WithTriggerDeadLetterSinkNotConfigured(),
),
},
Expand All @@ -369,7 +369,7 @@ func triggerReconciliation(t *testing.T, format string, env config.Env) {
NewBroker(
BrokerReady,
),
newTrigger(reconcilertesting.WithAnnotation(deliveryOrderAnnotation, deliveryOrderUnordered)),
newTrigger(reconcilertesting.WithAnnotation(deliveryOrderAnnotation, string(internals.Unordered))),
NewService(),
NewConfigMapFromContract(&contract.Contract{
Resources: []*contract.Resource{
Expand Down Expand Up @@ -421,7 +421,7 @@ func triggerReconciliation(t *testing.T, format string, env config.Env) {
reconcilertesting.WithTriggerDependencyReady(),
reconcilertesting.WithTriggerBrokerReady(),
withTriggerSubscriberResolvedSucceeded(contract.DeliveryOrder_UNORDERED),
reconcilertesting.WithAnnotation(deliveryOrderAnnotation, deliveryOrderUnordered),
reconcilertesting.WithAnnotation(deliveryOrderAnnotation, string(internals.Unordered)),
reconcilertesting.WithTriggerDeadLetterSinkNotConfigured(),
),
},
Expand Down Expand Up @@ -2301,7 +2301,7 @@ func useTable(t *testing.T, table TableTest, env *config.Env) {
return triggerreconciler.NewReconciler(
ctx,
logger,
fakeeventingclient.Get(ctx),
eventingclient.Get(ctx),
listers.GetTriggerLister(),
controller.GetEventRecorder(ctx),
reconciler,
Expand Down
Loading