Skip to content

Commit

Permalink
[release-1.14] Refactor NewFiltersFeatureSet to prevent starvation (k…
Browse files Browse the repository at this point in the history
…native-extensions#1086)

* Define InstallKafkaBrokerStepFn for newfilters.NewFiltersFeatureSet

The NewFiltersFeatureSet now requires a different type after upgrading
dependencies.

* Refactor NewFiltersFeatureSet to prevent starvation

* Add patch
  • Loading branch information
mgencur authored May 10, 2024
1 parent 61aac5d commit 7acc3dd
Show file tree
Hide file tree
Showing 6 changed files with 249 additions and 46 deletions.
168 changes: 168 additions & 0 deletions openshift/patches/refactor_newfilters.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
commit 33d3d89e28a64d97d615c017b984974cac28cc9f
Author: Martin Gencur <mgencur@redhat.com>
Date: Fri May 10 10:21:41 2024 +0200

Refactor NewFiltersFeatureSet to prevent starvation

diff --git a/vendor/knative.dev/eventing/test/rekt/features/new_trigger_filters/feature.go b/vendor/knative.dev/eventing/test/rekt/features/new_trigger_filters/feature.go
index 6da3b0a69..ee95bab4e 100644
--- a/vendor/knative.dev/eventing/test/rekt/features/new_trigger_filters/feature.go
+++ b/vendor/knative.dev/eventing/test/rekt/features/new_trigger_filters/feature.go
@@ -17,15 +17,21 @@ limitations under the License.
package new_trigger_filters

import (
+ "context"
"fmt"

. "github.com/cloudevents/sdk-go/v2/test"
+ "knative.dev/eventing/test/rekt/resources/broker"
+ "knative.dev/eventing/test/rekt/resources/trigger"
+ "knative.dev/reconciler-test/pkg/eventshub"
"knative.dev/reconciler-test/pkg/feature"
+ "knative.dev/reconciler-test/pkg/manifest"
+ "knative.dev/reconciler-test/pkg/resources/service"

eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
)

-type InstallBrokerFunc func(f *feature.Feature) string
+type InstallBrokerFunc func(brokerName string) feature.StepFn

type CloudEventsContext struct {
eventType string
@@ -329,14 +335,40 @@ func MultipleTriggersAndSinksFeature(installBroker InstallBrokerFunc) *feature.F
},
}

- // We need to create the broker here and mock it later so that the test uses the same broker for both filters
- brokerName := installBroker(f)
- fakeInstallBroker := func(_ *feature.Feature) string {
- return brokerName
- }
-
- createNewFiltersFeature(f, eventContextsFirstSink, filtersFirstTrigger, eventingv1.TriggerFilter{}, fakeInstallBroker)
- createNewFiltersFeature(f, eventContextsSecondSink, filtersSecondTrigger, eventingv1.TriggerFilter{}, fakeInstallBroker)
+ subscriberName1 := feature.MakeRandomK8sName("subscriber1")
+ subscriberName2 := feature.MakeRandomK8sName("subscriber2")
+ triggerName1 := feature.MakeRandomK8sName("trigger1")
+ triggerName2 := feature.MakeRandomK8sName("trigger2")
+ brokerName := feature.MakeRandomK8sName("broker")
+
+ f.Setup("Install Broker, Sinks, Triggers", func(ctx context.Context, t feature.T) {
+ installBroker(brokerName)(ctx, t)
+
+ eventshub.Install(subscriberName1, eventshub.StartReceiver)(ctx, t)
+ eventshub.Install(subscriberName2, eventshub.StartReceiver)(ctx, t)
+
+ triggerCfg1 := []manifest.CfgFn{
+ trigger.WithSubscriber(service.AsKReference(subscriberName1), ""),
+ trigger.WithNewFilters(filtersFirstTrigger),
+ trigger.WithFilter(eventingv1.TriggerFilter{}.Attributes),
+ }
+ trigger.Install(triggerName1, brokerName, triggerCfg1...)(ctx, t)
+
+ triggerCfg2 := []manifest.CfgFn{
+ trigger.WithSubscriber(service.AsKReference(subscriberName2), ""),
+ trigger.WithNewFilters(filtersSecondTrigger),
+ trigger.WithFilter(eventingv1.TriggerFilter{}.Attributes),
+ }
+ trigger.Install(triggerName2, brokerName, triggerCfg2...)(ctx, t)
+
+ broker.IsReady(brokerName)(ctx, t)
+ broker.IsAddressable(brokerName)(ctx, t)
+ trigger.IsReady(triggerName1)(ctx, t)
+ trigger.IsReady(triggerName2)(ctx, t)
+ })
+
+ assertDelivery(f, brokerName, subscriberName1, eventContextsFirstSink)
+ assertDelivery(f, brokerName, subscriberName2, eventContextsSecondSink)

return f
}
diff --git a/vendor/knative.dev/eventing/test/rekt/features/new_trigger_filters/filters.go b/vendor/knative.dev/eventing/test/rekt/features/new_trigger_filters/filters.go
index f9a83d1f4..b07d66f81 100644
--- a/vendor/knative.dev/eventing/test/rekt/features/new_trigger_filters/filters.go
+++ b/vendor/knative.dev/eventing/test/rekt/features/new_trigger_filters/filters.go
@@ -17,39 +17,49 @@ limitations under the License.
package new_trigger_filters

import (
+ "context"
"fmt"

+ "github.com/cloudevents/sdk-go/v2/event"
. "github.com/cloudevents/sdk-go/v2/test"
+ eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
+ "knative.dev/eventing/test/rekt/resources/broker"
+ "knative.dev/eventing/test/rekt/resources/trigger"
"knative.dev/reconciler-test/pkg/eventshub"
. "knative.dev/reconciler-test/pkg/eventshub/assert"
"knative.dev/reconciler-test/pkg/feature"
"knative.dev/reconciler-test/pkg/manifest"
"knative.dev/reconciler-test/pkg/resources/service"
-
- "github.com/cloudevents/sdk-go/v2/event"
- eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
- "knative.dev/eventing/test/rekt/resources/broker"
- "knative.dev/eventing/test/rekt/resources/trigger"
)

func createNewFiltersFeature(f *feature.Feature, eventContexts []CloudEventsContext, filters []eventingv1.SubscriptionsAPIFilter, filter eventingv1.TriggerFilter, installBroker InstallBrokerFunc) {
subscriberName := feature.MakeRandomK8sName("subscriber")
triggerName := feature.MakeRandomK8sName("trigger")
- brokerName := installBroker(f)
+ brokerName := feature.MakeRandomK8sName("broker")

- f.Setup("Install trigger subscriber", eventshub.Install(subscriberName, eventshub.StartReceiver))
+ f.Setup("Install Sink, Broker, Trigger", func(ctx context.Context, t feature.T) {
+ installBroker(brokerName)(ctx, t)

- cfg := []manifest.CfgFn{
- trigger.WithSubscriber(service.AsKReference(subscriberName), ""),
- trigger.WithNewFilters(filters),
- trigger.WithFilter(filter.Attributes),
- }
+ eventshub.Install(subscriberName, eventshub.StartReceiver)(ctx, t)

- f.Setup("Install trigger", trigger.Install(triggerName, brokerName, cfg...))
- f.Setup("Wait for trigger to become ready", trigger.IsReady(triggerName))
+ triggerCfg := []manifest.CfgFn{
+ trigger.WithSubscriber(service.AsKReference(subscriberName), ""),
+ trigger.WithNewFilters(filters),
+ trigger.WithFilter(filter.Attributes),
+ }

- asserter := f.Beta("New filters")
+ trigger.Install(triggerName, brokerName, triggerCfg...)(ctx, t)

+ broker.IsReady(brokerName)(ctx, t)
+ broker.IsAddressable(brokerName)(ctx, t)
+ trigger.IsReady(triggerName)(ctx, t)
+ })
+
+ assertDelivery(f, brokerName, subscriberName, eventContexts)
+}
+
+func assertDelivery(f *feature.Feature, brokerName, subscriberName string, eventContexts []CloudEventsContext) {
+ asserter := f.Beta("New filters")
for _, eventCtx := range eventContexts {
e := newEventFromEventContext(eventCtx)
eventSender := feature.MakeRandomK8sName("sender")
diff --git a/vendor/knative.dev/eventing/test/rekt/resources/broker/broker.go b/vendor/knative.dev/eventing/test/rekt/resources/broker/broker.go
index b3abbda98..d5c618514 100644
--- a/vendor/knative.dev/eventing/test/rekt/resources/broker/broker.go
+++ b/vendor/knative.dev/eventing/test/rekt/resources/broker/broker.go
@@ -278,10 +278,6 @@ func InstallMTBroker(name string) *feature.Feature {
return f
}

-func InstallMTBrokerIntoFeature(f *feature.Feature) string {
- brokerName := feature.MakeRandomK8sName("broker")
- f.Setup(fmt.Sprintf("Install broker %q", brokerName), Install(brokerName, WithEnvConfig()...))
- f.Setup("Broker is ready", IsReady(brokerName))
- f.Setup("Broker is addressable", k8s.IsAddressable(GVR(), brokerName))
- return brokerName
+func InstallMTBrokerStepFn(brokerName string) feature.StepFn {
+ return Install(brokerName, WithEnvConfig()...)
}
1 change: 1 addition & 0 deletions openshift/release/generate-release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ set -euo pipefail
source $(dirname $0)/resolve.sh

GITHUB_ACTIONS=true $(dirname $0)/../../hack/update-codegen.sh
git apply openshift/patches/refactor_newfilters.patch

# Eventing core will bring the config tracing ConfigMap, so remove it from heret
rm -f control-plane/config/eventing-kafka-broker/200-controller/100-config-tracing.yaml
Expand Down
28 changes: 12 additions & 16 deletions test/e2e_new/new_trigger_filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package e2e_new

import (
"context"
"testing"

"knative.dev/pkg/system"
Expand All @@ -45,22 +46,17 @@ func TestNewTriggerFilters(t *testing.T) {
environment.Managed(t),
)

env.ParallelTestSet(ctx, t, newfilters.NewFiltersFeatureSet(InstallKafkaBroker))
env.ParallelTestSet(ctx, t, newfilters.NewFiltersFeatureSet(InstallKafkaBrokerStepFn))
}

func InstallKafkaBroker(f *feature.Feature) string {
brokerName := feature.MakeRandomK8sName("kafka-broker")

install, cmName := single_partition_config.MakeInstall()

f.Setup("install one partition configuration", install)
f.Setup("install kafka broker", broker.Install(
brokerName,
broker.WithBrokerClass(kafka.BrokerClass),
broker.WithConfig(cmName),
))
f.Setup("kafka broker is ready", broker.IsReady(brokerName))
f.Setup("kafka broker is addressable", broker.IsAddressable(brokerName))

return brokerName
func InstallKafkaBrokerStepFn(brokerName string) feature.StepFn {
return func(ctx context.Context, t feature.T) {
install, cmName := single_partition_config.MakeInstall()
install(ctx, t)
broker.Install(
brokerName,
broker.WithBrokerClass(kafka.BrokerClass),
broker.WithConfig(cmName),
)(ctx, t)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,21 @@ limitations under the License.
package new_trigger_filters

import (
"context"
"fmt"

. "github.com/cloudevents/sdk-go/v2/test"
"knative.dev/eventing/test/rekt/resources/broker"
"knative.dev/eventing/test/rekt/resources/trigger"
"knative.dev/reconciler-test/pkg/eventshub"
"knative.dev/reconciler-test/pkg/feature"
"knative.dev/reconciler-test/pkg/manifest"
"knative.dev/reconciler-test/pkg/resources/service"

eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
)

type InstallBrokerFunc func(f *feature.Feature) string
type InstallBrokerFunc func(brokerName string) feature.StepFn

type CloudEventsContext struct {
eventType string
Expand Down Expand Up @@ -329,14 +335,40 @@ func MultipleTriggersAndSinksFeature(installBroker InstallBrokerFunc) *feature.F
},
}

// We need to create the broker here and mock it later so that the test uses the same broker for both filters
brokerName := installBroker(f)
fakeInstallBroker := func(_ *feature.Feature) string {
return brokerName
}

createNewFiltersFeature(f, eventContextsFirstSink, filtersFirstTrigger, eventingv1.TriggerFilter{}, fakeInstallBroker)
createNewFiltersFeature(f, eventContextsSecondSink, filtersSecondTrigger, eventingv1.TriggerFilter{}, fakeInstallBroker)
subscriberName1 := feature.MakeRandomK8sName("subscriber1")
subscriberName2 := feature.MakeRandomK8sName("subscriber2")
triggerName1 := feature.MakeRandomK8sName("trigger1")
triggerName2 := feature.MakeRandomK8sName("trigger2")
brokerName := feature.MakeRandomK8sName("broker")

f.Setup("Install Broker, Sinks, Triggers", func(ctx context.Context, t feature.T) {
installBroker(brokerName)(ctx, t)

eventshub.Install(subscriberName1, eventshub.StartReceiver)(ctx, t)
eventshub.Install(subscriberName2, eventshub.StartReceiver)(ctx, t)

triggerCfg1 := []manifest.CfgFn{
trigger.WithSubscriber(service.AsKReference(subscriberName1), ""),
trigger.WithNewFilters(filtersFirstTrigger),
trigger.WithFilter(eventingv1.TriggerFilter{}.Attributes),
}
trigger.Install(triggerName1, brokerName, triggerCfg1...)(ctx, t)

triggerCfg2 := []manifest.CfgFn{
trigger.WithSubscriber(service.AsKReference(subscriberName2), ""),
trigger.WithNewFilters(filtersSecondTrigger),
trigger.WithFilter(eventingv1.TriggerFilter{}.Attributes),
}
trigger.Install(triggerName2, brokerName, triggerCfg2...)(ctx, t)

broker.IsReady(brokerName)(ctx, t)
broker.IsAddressable(brokerName)(ctx, t)
trigger.IsReady(triggerName1)(ctx, t)
trigger.IsReady(triggerName2)(ctx, t)
})

assertDelivery(f, brokerName, subscriberName1, eventContextsFirstSink)
assertDelivery(f, brokerName, subscriberName2, eventContextsSecondSink)

return f
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,39 +17,49 @@ limitations under the License.
package new_trigger_filters

import (
"context"
"fmt"

"github.com/cloudevents/sdk-go/v2/event"
. "github.com/cloudevents/sdk-go/v2/test"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/eventing/test/rekt/resources/broker"
"knative.dev/eventing/test/rekt/resources/trigger"
"knative.dev/reconciler-test/pkg/eventshub"
. "knative.dev/reconciler-test/pkg/eventshub/assert"
"knative.dev/reconciler-test/pkg/feature"
"knative.dev/reconciler-test/pkg/manifest"
"knative.dev/reconciler-test/pkg/resources/service"

"github.com/cloudevents/sdk-go/v2/event"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/eventing/test/rekt/resources/broker"
"knative.dev/eventing/test/rekt/resources/trigger"
)

func createNewFiltersFeature(f *feature.Feature, eventContexts []CloudEventsContext, filters []eventingv1.SubscriptionsAPIFilter, filter eventingv1.TriggerFilter, installBroker InstallBrokerFunc) {
subscriberName := feature.MakeRandomK8sName("subscriber")
triggerName := feature.MakeRandomK8sName("trigger")
brokerName := installBroker(f)
brokerName := feature.MakeRandomK8sName("broker")

f.Setup("Install trigger subscriber", eventshub.Install(subscriberName, eventshub.StartReceiver))
f.Setup("Install Sink, Broker, Trigger", func(ctx context.Context, t feature.T) {
installBroker(brokerName)(ctx, t)

cfg := []manifest.CfgFn{
trigger.WithSubscriber(service.AsKReference(subscriberName), ""),
trigger.WithNewFilters(filters),
trigger.WithFilter(filter.Attributes),
}
eventshub.Install(subscriberName, eventshub.StartReceiver)(ctx, t)

f.Setup("Install trigger", trigger.Install(triggerName, brokerName, cfg...))
f.Setup("Wait for trigger to become ready", trigger.IsReady(triggerName))
triggerCfg := []manifest.CfgFn{
trigger.WithSubscriber(service.AsKReference(subscriberName), ""),
trigger.WithNewFilters(filters),
trigger.WithFilter(filter.Attributes),
}

asserter := f.Beta("New filters")
trigger.Install(triggerName, brokerName, triggerCfg...)(ctx, t)

broker.IsReady(brokerName)(ctx, t)
broker.IsAddressable(brokerName)(ctx, t)
trigger.IsReady(triggerName)(ctx, t)
})

assertDelivery(f, brokerName, subscriberName, eventContexts)
}

func assertDelivery(f *feature.Feature, brokerName, subscriberName string, eventContexts []CloudEventsContext) {
asserter := f.Beta("New filters")
for _, eventCtx := range eventContexts {
e := newEventFromEventContext(eventCtx)
eventSender := feature.MakeRandomK8sName("sender")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,10 +278,6 @@ func InstallMTBroker(name string) *feature.Feature {
return f
}

func InstallMTBrokerIntoFeature(f *feature.Feature) string {
brokerName := feature.MakeRandomK8sName("broker")
f.Setup(fmt.Sprintf("Install broker %q", brokerName), Install(brokerName, WithEnvConfig()...))
f.Setup("Broker is ready", IsReady(brokerName))
f.Setup("Broker is addressable", k8s.IsAddressable(GVR(), brokerName))
return brokerName
func InstallMTBrokerStepFn(brokerName string) feature.StepFn {
return Install(brokerName, WithEnvConfig()...)
}

0 comments on commit 7acc3dd

Please sign in to comment.