Skip to content

Commit

Permalink
Trigger updates
Browse files Browse the repository at this point in the history
Signed-off-by: aavarghese <avarghese@us.ibm.com>
  • Loading branch information
aavarghese committed Dec 9, 2021
1 parent 07af0c9 commit d9301fe
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ import (
// If not specified, will default to allowing all events.
type Filters struct {

// Filter is the filter to apply against all events from the Broker. Only events that pass this
// filter will be sent to the Subscriber. If not specified, will default to allowing all events.
//
// +optional
Filter *eventing.TriggerFilter `json:"filter,omitempty"`

// Filters is an experimental field that conforms to the CNCF CloudEvents Subscriptions
// API. It's an array of filter expressions that evaluate to true or false.
// If any filter expression in the array evaluates to false, the event MUST
Expand Down
4 changes: 2 additions & 2 deletions control-plane/pkg/reconciler/testing/objects_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
clientgotesting "k8s.io/client-go/testing"
"k8s.io/utils/pointer"
reconcilertesting "knative.dev/eventing/pkg/reconciler/testing/v1"
duckv1 "knative.dev/pkg/apis/duck/v1"

Expand Down Expand Up @@ -301,7 +302,6 @@ func allocateStatusAnnotations(obj duckv1.KRShaped) {
}

func NewConsumerGroup() *internalscg.ConsumerGroup {
replicas := int32(1)
return &internalscg.ConsumerGroup{
TypeMeta: metav1.TypeMeta{
APIVersion: internalscg.ConsumerGroupGroupVersionKind.String(),
Expand All @@ -314,7 +314,7 @@ func NewConsumerGroup() *internalscg.ConsumerGroup {
Template: internalscg.ConsumerTemplateSpec{
Spec: internalscg.ConsumerSpec{},
},
Replicas: &replicas,
Replicas: pointer.Int32Ptr(1),
},
}
}
45 changes: 31 additions & 14 deletions control-plane/pkg/reconciler/trigger/controllerv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,30 @@ package trigger
import (
"context"

"go.uber.org/zap"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"

eventingclient "knative.dev/eventing/pkg/client/injection/client"
brokerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker"
triggerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger"
triggerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/trigger"
eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1"

"knative.dev/eventing-kafka-broker/control-plane/pkg/config"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/kafka"

internalscg "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1"
consumergroupclient "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/client"
consumergroupinformer "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/informers/eventing/v1alpha1/consumergroup"
)

func NewControllerV2(ctx context.Context, _ configmap.Watcher, configs *config.Env) *controller.Impl {
func NewControllerV2(ctx context.Context, configs *config.Env) *controller.Impl {

logger := logging.FromContext(ctx).Desugar()

configmapInformer := configmapinformer.Get(ctx)
brokerInformer := brokerinformer.Get(ctx)
triggerInformer := triggerinformer.Get(ctx)
triggerLister := triggerInformer.Lister()
Expand Down Expand Up @@ -74,17 +75,33 @@ func NewControllerV2(ctx context.Context, _ configmap.Watcher, configs *config.E
Handler: enqueueTriggers(logger, triggerLister, impl.Enqueue),
})

globalResync := func(_ interface{}) {
impl.GlobalResync(triggerInformer.Informer())
}

configmapInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterWithNameAndNamespace(configs.DataPlaneConfigMapNamespace, configs.DataPlaneConfigMapName),
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: globalResync,
DeleteFunc: globalResync,
},
// ConsumerGroup changes and enqueue associated Trigger
consumerGroupInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
Handler: enqueueTrigger(logger, triggerLister, impl.Enqueue),
})

return impl
}

func enqueueTrigger(
logger *zap.Logger,
lister eventinglisters.TriggerLister,
enqueue func(obj interface{})) cache.ResourceEventHandler {

return controller.HandleAll(func(obj interface{}) {

if cg, ok := obj.(*internalscg.ConsumerGroup); ok {
triggers, err := lister.Triggers(cg.Namespace).List(labels.Everything())
if err != nil {
logger.Warn("Failed to list triggers", zap.Error(err))
return
}

for _, trigger := range triggers {
if cg.Name == string(trigger.UID) { //
enqueue(trigger)
}
}
}
})
}
4 changes: 1 addition & 3 deletions control-plane/pkg/reconciler/trigger/controllerv2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package trigger
import (
"testing"

"knative.dev/pkg/configmap"

reconcilertesting "knative.dev/pkg/reconciler/testing"

_ "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/informers/eventing/v1alpha1/consumergroup/fake"
Expand All @@ -37,7 +35,7 @@ import (
func TestNewControllerV2(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)

controller := NewControllerV2(ctx, configmap.NewStaticWatcher(), &config.Env{})
controller := NewControllerV2(ctx, &config.Env{})
if controller == nil {
t.Error("failed to create controller: <nil>")
}
Expand Down
45 changes: 35 additions & 10 deletions control-plane/pkg/reconciler/trigger/triggerv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package trigger
import (
"context"
"fmt"
"strings"

"go.uber.org/zap"
"k8s.io/apimachinery/pkg/api/equality"
Expand Down Expand Up @@ -100,12 +101,20 @@ func (r *ReconcilerV2) reconcileKind(ctx context.Context, trigger *eventing.Trig
return nil
}

func (r ReconcilerV2) reconcileConsumerGroup(ctx context.Context, trigger *eventing.Trigger) (consgroup *internalscg.ConsumerGroup, err error) {
func (r ReconcilerV2) reconcileConsumerGroup(ctx context.Context, trigger *eventing.Trigger) (*internalscg.ConsumerGroup, error) {

var deliveryOrdering = internals.Unordered
var err error
deliveryOrderingAnnotationValue, ok := trigger.Annotations[deliveryOrderAnnotation]
if ok {
deliveryOrdering, err = deliveryOrderingFromString(deliveryOrderingAnnotationValue)
if err != nil {
return nil, err
}
}

//todo additionally handle trigger.Spec.Filter
newcg := &internalscg.ConsumerGroup{
TypeMeta: metav1.TypeMeta{
APIVersion: internalscg.ConsumerGroupGroupVersionKind.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: string(trigger.UID),
Namespace: trigger.Namespace,
Expand All @@ -116,13 +125,14 @@ func (r ReconcilerV2) reconcileConsumerGroup(ctx context.Context, trigger *event
Spec: internalscg.ConsumerGroupSpec{
Template: internalscg.ConsumerTemplateSpec{
Spec: internalscg.ConsumerSpec{
//Topics: []string{}, //todo from contract resource
//Configs: internalscg.ConsumerConfigs{Configs: map[string]string{"group.id": ""}}, //todo
Topics: []string{}, //todo from contract resource
Configs: internalscg.ConsumerConfigs{Configs: map[string]string{"group.id": string(trigger.UID)}},
Delivery: &internalscg.DeliverySpec{
Delivery: trigger.Spec.Delivery,
Ordering: internals.Ordered,
Ordering: deliveryOrdering,
},
Filters: &internalscg.Filters{
Filter: trigger.Spec.Filter,
Filters: trigger.Spec.Filters,
},
Subscriber: trigger.Spec.Subscriber,
Expand All @@ -131,17 +141,21 @@ func (r ReconcilerV2) reconcileConsumerGroup(ctx context.Context, trigger *event
},
}

//todo validate newcg

cg, err := r.ConsumerGroupLister.ConsumerGroups(trigger.GetNamespace()).Get(string(trigger.UID)) //Get by consumer group name
if err != nil && !apierrors.IsNotFound(err) {
return nil, err
}

if apierrors.IsNotFound(err) {
cg, err := r.InternalsClient.InternalV1alpha1().ConsumerGroups(newcg.GetNamespace()).Create(ctx, newcg, metav1.CreateOptions{})
if err != nil {
return nil, err
if err != nil && !apierrors.IsAlreadyExists(err) {
return nil, fmt.Errorf("failed to create consumer group %s/%s: %w", newcg.GetNamespace(), newcg.GetName(), err)
}
if apierrors.IsAlreadyExists(err) {
return newcg, nil
}
fmt.Printf("created consumer group%s/%s", newcg.GetNamespace(), newcg.GetName())
return cg, nil
}

Expand All @@ -161,3 +175,14 @@ func (r ReconcilerV2) reconcileConsumerGroup(ctx context.Context, trigger *event

return cg, nil
}

func deliveryOrderingFromString(val string) (internals.DeliveryOrdering, error) {
switch strings.ToLower(val) {
case string(internals.Ordered):
return internals.Ordered, nil
case string(internals.Unordered):
return internals.Unordered, nil
default:
return internals.Unordered, fmt.Errorf("invalid annotation %s value: %s. Allowed values [ %q | %q ]", deliveryOrderAnnotation, val, internals.Ordered, internals.Unordered)
}
}

0 comments on commit d9301fe

Please sign in to comment.