diff --git a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/filter_types.go b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/filter_types.go index 92f9d5cefe..3ddcc10c36 100644 --- a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/filter_types.go +++ b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/filter_types.go @@ -27,6 +27,12 @@ import ( // If not specified, will default to allowing all events. type Filters struct { + // Filter is the filter to apply against all events from the Broker. Only events that pass this + // filter will be sent to the Subscriber. If not specified, will default to allowing all events. + // + // +optional + Filter *eventing.TriggerFilter `json:"filter,omitempty"` + // Filters is an experimental field that conforms to the CNCF CloudEvents Subscriptions // API. It's an array of filter expressions that evaluate to true or false. // If any filter expression in the array evaluates to false, the event MUST diff --git a/control-plane/pkg/reconciler/testing/objects_common.go b/control-plane/pkg/reconciler/testing/objects_common.go index eeafa5f5ca..9ed7e3793a 100644 --- a/control-plane/pkg/reconciler/testing/objects_common.go +++ b/control-plane/pkg/reconciler/testing/objects_common.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" clientgotesting "k8s.io/client-go/testing" + "k8s.io/utils/pointer" reconcilertesting "knative.dev/eventing/pkg/reconciler/testing/v1" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -301,7 +302,6 @@ func allocateStatusAnnotations(obj duckv1.KRShaped) { } func NewConsumerGroup() *internalscg.ConsumerGroup { - replicas := int32(1) return &internalscg.ConsumerGroup{ TypeMeta: metav1.TypeMeta{ APIVersion: internalscg.ConsumerGroupGroupVersionKind.String(), @@ -314,7 +314,7 @@ func NewConsumerGroup() *internalscg.ConsumerGroup { Template: internalscg.ConsumerTemplateSpec{ Spec: internalscg.ConsumerSpec{}, }, - Replicas: &replicas, + Replicas: pointer.Int32Ptr(1), }, } } diff --git a/control-plane/pkg/reconciler/trigger/controllerv2.go b/control-plane/pkg/reconciler/trigger/controllerv2.go index 1b6ddc55af..49271d3a7e 100644 --- a/control-plane/pkg/reconciler/trigger/controllerv2.go +++ b/control-plane/pkg/reconciler/trigger/controllerv2.go @@ -19,9 +19,9 @@ package trigger import ( "context" + "go.uber.org/zap" + "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/tools/cache" - configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap" - "knative.dev/pkg/configmap" "knative.dev/pkg/controller" "knative.dev/pkg/logging" @@ -29,19 +29,20 @@ import ( brokerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker" triggerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger" triggerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/trigger" + eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1" "knative.dev/eventing-kafka-broker/control-plane/pkg/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/kafka" + internalscg "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1" consumergroupclient "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/client" consumergroupinformer "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/informers/eventing/v1alpha1/consumergroup" ) -func NewControllerV2(ctx context.Context, _ configmap.Watcher, configs *config.Env) *controller.Impl { +func NewControllerV2(ctx context.Context, configs *config.Env) *controller.Impl { logger := logging.FromContext(ctx).Desugar() - configmapInformer := configmapinformer.Get(ctx) brokerInformer := brokerinformer.Get(ctx) triggerInformer := triggerinformer.Get(ctx) triggerLister := triggerInformer.Lister() @@ -74,17 +75,33 @@ func NewControllerV2(ctx context.Context, _ configmap.Watcher, configs *config.E Handler: enqueueTriggers(logger, triggerLister, impl.Enqueue), }) - globalResync := func(_ interface{}) { - impl.GlobalResync(triggerInformer.Informer()) - } - - configmapInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ - FilterFunc: controller.FilterWithNameAndNamespace(configs.DataPlaneConfigMapNamespace, configs.DataPlaneConfigMapName), - Handler: cache.ResourceEventHandlerFuncs{ - AddFunc: globalResync, - DeleteFunc: globalResync, - }, + // ConsumerGroup changes and enqueue associated Trigger + consumerGroupInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + Handler: enqueueTrigger(logger, triggerLister, impl.Enqueue), }) return impl } + +func enqueueTrigger( + logger *zap.Logger, + lister eventinglisters.TriggerLister, + enqueue func(obj interface{})) cache.ResourceEventHandler { + + return controller.HandleAll(func(obj interface{}) { + + if cg, ok := obj.(*internalscg.ConsumerGroup); ok { + triggers, err := lister.Triggers(cg.Namespace).List(labels.Everything()) + if err != nil { + logger.Warn("Failed to list triggers", zap.Error(err)) + return + } + + for _, trigger := range triggers { + if cg.Name == string(trigger.UID) { // + enqueue(trigger) + } + } + } + }) +} diff --git a/control-plane/pkg/reconciler/trigger/controllerv2_test.go b/control-plane/pkg/reconciler/trigger/controllerv2_test.go index aee2464936..336375cf58 100644 --- a/control-plane/pkg/reconciler/trigger/controllerv2_test.go +++ b/control-plane/pkg/reconciler/trigger/controllerv2_test.go @@ -19,8 +19,6 @@ package trigger import ( "testing" - "knative.dev/pkg/configmap" - reconcilertesting "knative.dev/pkg/reconciler/testing" _ "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/informers/eventing/v1alpha1/consumergroup/fake" @@ -37,7 +35,7 @@ import ( func TestNewControllerV2(t *testing.T) { ctx, _ := reconcilertesting.SetupFakeContext(t) - controller := NewControllerV2(ctx, configmap.NewStaticWatcher(), &config.Env{}) + controller := NewControllerV2(ctx, &config.Env{}) if controller == nil { t.Error("failed to create controller: ") } diff --git a/control-plane/pkg/reconciler/trigger/triggerv2.go b/control-plane/pkg/reconciler/trigger/triggerv2.go index a5cc6528bd..16938e765b 100644 --- a/control-plane/pkg/reconciler/trigger/triggerv2.go +++ b/control-plane/pkg/reconciler/trigger/triggerv2.go @@ -19,6 +19,7 @@ package trigger import ( "context" "fmt" + "strings" "go.uber.org/zap" "k8s.io/apimachinery/pkg/api/equality" @@ -100,12 +101,20 @@ func (r *ReconcilerV2) reconcileKind(ctx context.Context, trigger *eventing.Trig return nil } -func (r ReconcilerV2) reconcileConsumerGroup(ctx context.Context, trigger *eventing.Trigger) (consgroup *internalscg.ConsumerGroup, err error) { +func (r ReconcilerV2) reconcileConsumerGroup(ctx context.Context, trigger *eventing.Trigger) (*internalscg.ConsumerGroup, error) { + var deliveryOrdering = internals.Unordered + var err error + deliveryOrderingAnnotationValue, ok := trigger.Annotations[deliveryOrderAnnotation] + if ok { + deliveryOrdering, err = deliveryOrderingFromString(deliveryOrderingAnnotationValue) + if err != nil { + return nil, err + } + } + + //todo additionally handle trigger.Spec.Filter newcg := &internalscg.ConsumerGroup{ - TypeMeta: metav1.TypeMeta{ - APIVersion: internalscg.ConsumerGroupGroupVersionKind.String(), - }, ObjectMeta: metav1.ObjectMeta{ Name: string(trigger.UID), Namespace: trigger.Namespace, @@ -116,13 +125,14 @@ func (r ReconcilerV2) reconcileConsumerGroup(ctx context.Context, trigger *event Spec: internalscg.ConsumerGroupSpec{ Template: internalscg.ConsumerTemplateSpec{ Spec: internalscg.ConsumerSpec{ - //Topics: []string{}, //todo from contract resource - //Configs: internalscg.ConsumerConfigs{Configs: map[string]string{"group.id": ""}}, //todo + Topics: []string{}, //todo from contract resource + Configs: internalscg.ConsumerConfigs{Configs: map[string]string{"group.id": string(trigger.UID)}}, Delivery: &internalscg.DeliverySpec{ Delivery: trigger.Spec.Delivery, - Ordering: internals.Ordered, + Ordering: deliveryOrdering, }, Filters: &internalscg.Filters{ + Filter: trigger.Spec.Filter, Filters: trigger.Spec.Filters, }, Subscriber: trigger.Spec.Subscriber, @@ -131,6 +141,8 @@ func (r ReconcilerV2) reconcileConsumerGroup(ctx context.Context, trigger *event }, } + //todo validate newcg + cg, err := r.ConsumerGroupLister.ConsumerGroups(trigger.GetNamespace()).Get(string(trigger.UID)) //Get by consumer group name if err != nil && !apierrors.IsNotFound(err) { return nil, err @@ -138,10 +150,12 @@ func (r ReconcilerV2) reconcileConsumerGroup(ctx context.Context, trigger *event if apierrors.IsNotFound(err) { cg, err := r.InternalsClient.InternalV1alpha1().ConsumerGroups(newcg.GetNamespace()).Create(ctx, newcg, metav1.CreateOptions{}) - if err != nil { - return nil, err + if err != nil && !apierrors.IsAlreadyExists(err) { + return nil, fmt.Errorf("failed to create consumer group %s/%s: %w", newcg.GetNamespace(), newcg.GetName(), err) + } + if apierrors.IsAlreadyExists(err) { + return newcg, nil } - fmt.Printf("created consumer group%s/%s", newcg.GetNamespace(), newcg.GetName()) return cg, nil } @@ -161,3 +175,14 @@ func (r ReconcilerV2) reconcileConsumerGroup(ctx context.Context, trigger *event return cg, nil } + +func deliveryOrderingFromString(val string) (internals.DeliveryOrdering, error) { + switch strings.ToLower(val) { + case string(internals.Ordered): + return internals.Ordered, nil + case string(internals.Unordered): + return internals.Unordered, nil + default: + return internals.Unordered, fmt.Errorf("invalid annotation %s value: %s. Allowed values [ %q | %q ]", deliveryOrderAnnotation, val, internals.Ordered, internals.Unordered) + } +}