Skip to content

Commit

Permalink
Create KafkaSources OIDC service account and expose in its status (kn…
Browse files Browse the repository at this point in the history
…ative-extensions#3660)

* Create KafkaSources OIDC service account and expose in its status

* Run goimport
  • Loading branch information
creydr committed Feb 21, 2024
1 parent 9d1a381 commit 7d38e17
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 14 deletions.
20 changes: 20 additions & 0 deletions control-plane/pkg/apis/sources/v1beta1/kafka_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ const (
// KafkaConditionInitialOffsetsCommitted is True when the KafkaSource has committed the
// initial offset of all claims
KafkaConditionInitialOffsetsCommitted apis.ConditionType = "InitialOffsetsCommitted"

// KafkaConditionOIDCIdentityCreated has status True when the KafkaSource has created an OIDC identity.
KafkaConditionOIDCIdentityCreated apis.ConditionType = "OIDCIdentityCreated"
)

var (
Expand All @@ -54,6 +57,7 @@ var (
KafkaConditionDeployed,
KafkaConditionConnectionEstablished,
KafkaConditionInitialOffsetsCommitted,
KafkaConditionOIDCIdentityCreated,
)

kafkaCondSetLock = sync.RWMutex{}
Expand Down Expand Up @@ -160,6 +164,22 @@ func (s *KafkaSourceStatus) MarkInitialOffsetNotCommitted(reason, messageFormat
KafkaSourceCondSet.Manage(s).MarkFalse(KafkaConditionInitialOffsetsCommitted, reason, messageFormat, messageA...)
}

func (s *KafkaSourceStatus) MarkOIDCIdentityCreatedSucceeded() {
KafkaSourceCondSet.Manage(s).MarkTrue(KafkaConditionOIDCIdentityCreated)
}

func (s *KafkaSourceStatus) MarkOIDCIdentityCreatedSucceededWithReason(reason, messageFormat string, messageA ...interface{}) {
KafkaSourceCondSet.Manage(s).MarkTrueWithReason(KafkaConditionOIDCIdentityCreated, reason, messageFormat, messageA...)
}

func (s *KafkaSourceStatus) MarkOIDCIdentityCreatedFailed(reason, messageFormat string, messageA ...interface{}) {
KafkaSourceCondSet.Manage(s).MarkFalse(KafkaConditionOIDCIdentityCreated, reason, messageFormat, messageA...)
}

func (s *KafkaSourceStatus) MarkOIDCIdentityCreatedUnknown(reason, messageFormat string, messageA ...interface{}) {
KafkaSourceCondSet.Manage(s).MarkUnknown(KafkaConditionOIDCIdentityCreated, reason, messageFormat, messageA...)
}

func (s *KafkaSourceStatus) UpdateConsumerGroupStatus(status string) {
s.Claims = status
}
44 changes: 38 additions & 6 deletions control-plane/pkg/reconciler/source/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package source
import (
"context"

"knative.dev/eventing/pkg/apis/feature"
"knative.dev/pkg/logging"

"k8s.io/client-go/tools/cache"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
Expand All @@ -34,29 +37,51 @@ import (

"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/consumergroup"
kedaclient "knative.dev/eventing-kafka-broker/third_party/pkg/client/injection/client"

kubeclient "knative.dev/pkg/client/injection/kube/client"
serviceaccountinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount"
)

func NewController(ctx context.Context, watcher configmap.Watcher) *controller.Impl {

kafkaInformer := kafkainformer.Get(ctx)
consumerGroupInformer := consumergroupinformer.Get(ctx)
serviceaccountInformer := serviceaccountinformer.Get(ctx)

sources.RegisterAlternateKafkaConditionSet(conditionSet)

var globalResync func()
featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) {
if globalResync != nil {
globalResync()
}
})
featureStore.WatchConfigs(watcher)

r := &Reconciler{
ConsumerGroupLister: consumerGroupInformer.Lister(),
InternalsClient: consumergroupclient.Get(ctx),
KedaClient: kedaclient.Get(ctx),
KafkaFeatureFlags: config.DefaultFeaturesConfig(),
KubeClient: kubeclient.Get(ctx),
ConsumerGroupLister: consumerGroupInformer.Lister(),
InternalsClient: consumergroupclient.Get(ctx),
KedaClient: kedaclient.Get(ctx),
KafkaFeatureFlags: config.DefaultFeaturesConfig(),
ServiceAccountLister: serviceaccountInformer.Lister(),
}

impl := kafkasource.NewImpl(ctx, r)
impl := kafkasource.NewImpl(ctx, r, func(impl *controller.Impl) controller.Options {
return controller.Options{
ConfigStore: featureStore,
}
})

globalResync = func() {
impl.GlobalResync(kafkaInformer.Informer())
}

kafkaInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))

configStore := config.NewStore(ctx, func(name string, value *config.KafkaFeatureFlags) {
r.KafkaFeatureFlags.Reset(value)
impl.GlobalResync(kafkaInformer.Informer())
globalResync()
})
configStore.WatchConfigs(watcher)

Expand All @@ -65,5 +90,12 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I
FilterFunc: consumergroup.Filter("kafkasource"),
Handler: controller.HandleAll(consumergroup.Enqueue("kafkasource", impl.EnqueueKey)),
})

// Reconcile KafkaSource when the OIDC service account changes
serviceaccountInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterController(&sources.KafkaSource{}),
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})

return impl
}
7 changes: 7 additions & 0 deletions control-plane/pkg/reconciler/source/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package source
import (
"testing"

"knative.dev/eventing/pkg/apis/feature"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -40,6 +42,7 @@ import (

"knative.dev/eventing-kafka-broker/control-plane/pkg/config"
kedaclient "knative.dev/eventing-kafka-broker/third_party/pkg/client/injection/client/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/fake"
)

func TestNewController(t *testing.T) {
Expand Down Expand Up @@ -69,6 +72,10 @@ func TestNewController(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: "config-kafka-features",
},
}, &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: feature.FlagsConfigName,
},
}))
if controller == nil {
t.Error("failed to create controller: <nil>")
Expand Down
22 changes: 18 additions & 4 deletions control-plane/pkg/reconciler/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ import (
"fmt"
"strings"

"k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/eventing/pkg/auth"

"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -57,10 +62,12 @@ var (
)

type Reconciler struct {
ConsumerGroupLister internalslst.ConsumerGroupLister
InternalsClient internalsclient.Interface
KedaClient kedaclientset.Interface
KafkaFeatureFlags *config.KafkaFeatureFlags
KubeClient kubernetes.Interface
ConsumerGroupLister internalslst.ConsumerGroupLister
InternalsClient internalsclient.Interface
KedaClient kedaclientset.Interface
KafkaFeatureFlags *config.KafkaFeatureFlags
ServiceAccountLister corelisters.ServiceAccountLister
}

func (r *Reconciler) ReconcileKind(ctx context.Context, ks *sources.KafkaSource) reconciler.Event {
Expand All @@ -71,6 +78,13 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, ks *sources.KafkaSource)
}
ks.Status.Selector = selector.String()

err = auth.SetupOIDCServiceAccount(ctx, feature.FromContext(ctx), r.ServiceAccountLister, r.KubeClient, sources.SchemeGroupVersion.WithKind("KafkaSource"), ks.ObjectMeta, &ks.Status, func(as *duckv1.AuthStatus) {
ks.Status.Auth = as
})
if err != nil {
return fmt.Errorf("could not setup OIDC service account for KafkaSource %s/%s: %w", ks.Name, ks.Namespace, err)
}

cg, err := r.reconcileConsumerGroup(ctx, ks)
if err != nil {
ks.GetConditionSet().Manage(&ks.Status).MarkFalse(KafkaConditionConsumerGroup, "failed to reconcile consumer group", err.Error())
Expand Down
Loading

0 comments on commit 7d38e17

Please sign in to comment.