Skip to content

Commit

Permalink
Remove Trigger finalizer when Broker is removed (#1597)
Browse files Browse the repository at this point in the history
This bug is reproducible in this situation:

- Create Kafka Broker `b1`
- Create a trigger `t1` associated with `b1`
- Remove Kafka Broker `b1`
- Remove Trigger `t1`

`t1` will be in `Terminating` state forever since our
reconciler will not reconcile it since we can't know
if Trigger is our trigger given that there is not `b1`.

Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com>

Co-authored-by: Pierangelo Di Pilato <pdipilat@redhat.com>
  • Loading branch information
knative-prow-robot and pierDipi authored Dec 13, 2021
1 parent 67b2cdd commit a893020
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 5 deletions.
13 changes: 13 additions & 0 deletions control-plane/pkg/reconciler/trigger/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ func filterTriggers(lister eventinglisters.BrokerLister) func(interface{}) bool
return false
}

if hasKafkaBrokerTriggerFinalizer(trigger.Finalizers, FinalizerName) {
return true
}

broker, err := lister.Brokers(trigger.Namespace).Get(trigger.Spec.Broker)
if err != nil {
return false
Expand All @@ -129,6 +133,15 @@ func filterTriggers(lister eventinglisters.BrokerLister) func(interface{}) bool
}
}

func hasKafkaBrokerTriggerFinalizer(finalizers []string, finalizerName string) bool {
for _, f := range finalizers {
if f == finalizerName {
return true
}
}
return false
}

func enqueueTriggers(
logger *zap.Logger,
lister eventinglisters.TriggerLister,
Expand Down
125 changes: 120 additions & 5 deletions control-plane/pkg/reconciler/trigger/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
eventing "knative.dev/eventing/pkg/apis/eventing/v1"
brokerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker"
"knative.dev/pkg/configmap"
Expand All @@ -34,6 +35,7 @@ import (
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/secret/fake"

"knative.dev/eventing-kafka-broker/control-plane/pkg/config"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/kafka"
)

func TestNewController(t *testing.T) {
Expand All @@ -48,11 +50,124 @@ func TestNewController(t *testing.T) {
func TestFilterTriggers(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)

pass := filterTriggers(brokerinformer.Get(ctx).Lister())(&eventing.Trigger{
Spec: eventing.TriggerSpec{
Broker: "not-exists",
tt := []struct {
name string
trigger interface{}
pass bool
brokers []*eventing.Broker
}{
{
name: "unknown type",
trigger: &eventing.Broker{},
pass: false,
},
})
{
name: "non existing broker",
trigger: &eventing.Trigger{
Spec: eventing.TriggerSpec{
Broker: "not-exists",
},
},
pass: false,
},
{
name: "non existing broker and trigger with kafka broker finalizer",
trigger: &eventing.Trigger{
ObjectMeta: metav1.ObjectMeta{
Finalizers: []string{FinalizerName},
},
Spec: eventing.TriggerSpec{
Broker: "not-exists",
},
},
pass: true,
},
{
name: "exiting kafka broker",
trigger: &eventing.Trigger{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "tr",
Finalizers: []string{FinalizerName},
},
Spec: eventing.TriggerSpec{
Broker: "br",
},
},
brokers: []*eventing.Broker{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "br",
Annotations: map[string]string{
eventing.BrokerClassAnnotationKey: kafka.BrokerClass,
},
},
},
},
pass: true,
},
{
name: "exiting non kafka broker",
trigger: &eventing.Trigger{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "tr",
},
Spec: eventing.TriggerSpec{
Broker: "br",
},
},
brokers: []*eventing.Broker{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "br",
Annotations: map[string]string{
eventing.BrokerClassAnnotationKey: kafka.BrokerClass + "-boh",
},
},
},
},
pass: false,
},
{
name: "exiting non kafka broker - trigger with kafka broker finalizer",
trigger: &eventing.Trigger{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "tr",
Finalizers: []string{FinalizerName},
},
Spec: eventing.TriggerSpec{
Broker: "br",
},
},
brokers: []*eventing.Broker{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "br",
Annotations: map[string]string{
eventing.BrokerClassAnnotationKey: kafka.BrokerClass + "-boh",
},
},
},
},
pass: true,
},
}

assert.False(t, pass)
for _, tc := range tt {
tc := tc
t.Run(tc.name, func(t *testing.T) {
brokerInformer := brokerinformer.Get(ctx)
for _, obj := range tc.brokers {
_ = brokerInformer.Informer().GetStore().Add(obj)
}
filter := filterTriggers(brokerInformer.Lister())
pass := filter(tc.trigger)
assert.Equal(t, tc.pass, pass)
})
}
}

0 comments on commit a893020

Please sign in to comment.