Skip to content

Commit

Permalink
Updated broker and channel controllers to watch kafka feature config …
Browse files Browse the repository at this point in the history
…maps

Signed-off-by: Calum Murray <cmurray@redhat.com>
  • Loading branch information
Cali0707 committed Jun 28, 2023
1 parent d7206cd commit 7a5c4a8
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 3 deletions.
2 changes: 1 addition & 1 deletion control-plane/cmd/kafka-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func main() {
injection.NamedControllerConstructor{
Name: "channel-controller",
ControllerConstructor: func(ctx context.Context, watcher configmap.Watcher) *controller.Impl {
return channel.NewController(ctx, channelEnv)
return channel.NewController(ctx, watcher, channelEnv)
},
},

Expand Down
6 changes: 6 additions & 0 deletions control-plane/pkg/reconciler/broker/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ func NewController(ctx context.Context, watcher configmap.Watcher, env *config.E

brokerInformer := brokerinformer.Get(ctx)

kafkaConfigStore := apisconfig.NewStore(ctx, func(name string, value *apisconfig.KafkaFeatureFlags) {
reconciler.KafkaFeatureFlags.Reset(value)
impl.GlobalResync(brokerInformer.Informer())
})
kafkaConfigStore.WatchConfigs(watcher)

brokerInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: kafka.BrokerClassFilter(),
Handler: controller.HandleAll(impl.Enqueue),
Expand Down
5 changes: 5 additions & 0 deletions control-plane/pkg/reconciler/broker/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
fakekubeclientset "k8s.io/client-go/kubernetes/fake"
apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config"
_ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger/fake"
_ "knative.dev/pkg/client/injection/ducks/duck/v1/addressable/fake"
Expand Down Expand Up @@ -84,6 +85,10 @@ func TestNewController(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: "cm",
},
}, &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: apisconfig.FlagsConfigName,
},
}),
env,
)
Expand Down
9 changes: 8 additions & 1 deletion control-plane/pkg/reconciler/channel/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod"
secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret"
serviceinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/service"
"knative.dev/pkg/configmap"

"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
Expand All @@ -49,7 +50,7 @@ import (
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base"
)

func NewController(ctx context.Context, configs *config.Env) *controller.Impl {
func NewController(ctx context.Context, watcher configmap.Watcher, configs *config.Env) *controller.Impl {

messagingv1beta.RegisterAlternateKafkaChannelConditionSet(base.IngressConditionSet)

Expand Down Expand Up @@ -95,6 +96,12 @@ func NewController(ctx context.Context, configs *config.Env) *controller.Impl {

channelInformer := kafkachannelinformer.Get(ctx)

kafkaConfigStore := apisconfig.NewStore(ctx, func(name string, value *apisconfig.KafkaFeatureFlags) {
reconciler.KafkaFeatureFlags.Reset(value)
impl.GlobalResync(channelInformer.Informer())
})
kafkaConfigStore.WatchConfigs(watcher)

channelInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))

reconciler.Resolver = resolver.NewURIResolverFromTracker(ctx, impl.Tracker)
Expand Down
7 changes: 7 additions & 0 deletions control-plane/pkg/reconciler/channel/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
fakekubeclientset "k8s.io/client-go/kubernetes/fake"
apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config"
_ "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/informers/messaging/v1beta1/kafkachannel/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription/fake"
"knative.dev/eventing/pkg/eventingtls/eventingtlstesting"
Expand All @@ -34,6 +35,7 @@ import (
secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/secret/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/service/fake"
"knative.dev/pkg/configmap"
dynamicclient "knative.dev/pkg/injection/clients/dynamicclient/fake"
reconcilertesting "knative.dev/pkg/reconciler/testing"

Expand Down Expand Up @@ -81,6 +83,11 @@ func TestNewController(t *testing.T) {

controller := NewController(
ctx,
configmap.NewStaticWatcher(&corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: apisconfig.FlagsConfigName,
},
}),
configs,
)
if controller == nil {
Expand Down
12 changes: 11 additions & 1 deletion control-plane/pkg/reconciler/channel/v2/controllerv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
serviceinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/service"
"knative.dev/pkg/configmap"
"knative.dev/pkg/logging"
"knative.dev/pkg/resolver"

Expand All @@ -45,12 +46,13 @@ import (

"knative.dev/pkg/controller"

apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config"
"knative.dev/eventing-kafka-broker/control-plane/pkg/config"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/consumergroup"
)

func NewController(ctx context.Context, configs *config.Env) *controller.Impl {
func NewController(ctx context.Context, watcher configmap.Watcher, configs *config.Env) *controller.Impl {

configmapInformer := configmapinformer.Get(ctx)
channelInformer := kafkachannelinformer.Get(ctx)
Expand All @@ -75,6 +77,7 @@ func NewController(ctx context.Context, configs *config.Env) *controller.Impl {
SubscriptionLister: subscriptioninformer.Get(ctx).Lister(),
ConsumerGroupLister: consumerGroupInformer.Lister(),
InternalsClient: consumergroupclient.Get(ctx),
KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(),
}

logger := logging.FromContext(ctx)
Expand All @@ -88,6 +91,13 @@ func NewController(ctx context.Context, configs *config.Env) *controller.Impl {
}

impl := kafkachannelreconciler.NewImpl(ctx, reconciler)

kafkaConfigStore := apisconfig.NewStore(ctx, func(name string, value *apisconfig.KafkaFeatureFlags) {
reconciler.KafkaFeatureFlags.Reset(value)
impl.GlobalResync(channelInformer.Informer())
})
kafkaConfigStore.WatchConfigs(watcher)

IPsLister := prober.IdentityIPsLister()
reconciler.Prober = prober.NewAsync(ctx, http.DefaultClient, "", IPsLister, impl.EnqueueKey)
reconciler.IngressHost = network.GetServiceHostname(configs.IngressName, configs.SystemNamespace)
Expand Down
7 changes: 7 additions & 0 deletions control-plane/pkg/reconciler/channel/v2/controllerv2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/apimachinery/pkg/types"
fakekubeclientset "k8s.io/client-go/kubernetes/fake"

apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config"
_ "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/informers/messaging/v1beta1/kafkachannel/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription/fake"
"knative.dev/eventing/pkg/eventingtls/eventingtlstesting"
Expand All @@ -35,6 +36,7 @@ import (
secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/secret/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/service/fake"
"knative.dev/pkg/configmap"
dynamicclient "knative.dev/pkg/injection/clients/dynamicclient/fake"
reconcilertesting "knative.dev/pkg/reconciler/testing"

Expand Down Expand Up @@ -84,6 +86,11 @@ func TestNewController(t *testing.T) {

controller := NewController(
ctx,
configmap.NewStaticWatcher(&corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: apisconfig.FlagsConfigName,
},
}),
configs,
)
if controller == nil {
Expand Down

0 comments on commit 7a5c4a8

Please sign in to comment.