Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

Commit

Permalink
Watch kafka config in consolidated channel controller (#423)
Browse files Browse the repository at this point in the history
* Rename some import aliases before creating another common pkg

* Move some structs to common package

* Fix build

* Add missing license header

* Move kafka-config map watcher to common pkg

* Move code from channel/distr/common/testing to /common/testing

* Move more from channel/distr/common/testing to /common/testing

* Rename InitializeConfigWatcher->InitializeKafkaConfigMapWatcher

* Use common method to watch config-kafka in consolidated contrlr

* Use the already existing CM watcher, or create it otherwise

* Use the already existing CM watcher, or create it otherwise

* Don't start the cm watcher again
  • Loading branch information
aliok authored Mar 4, 2021
1 parent 68000ce commit ee7edfd
Show file tree
Hide file tree
Showing 34 changed files with 243 additions and 181 deletions.
23 changes: 17 additions & 6 deletions cmd/channel/distributed/dispatcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@ import (
"strconv"
"strings"

commonconfig "knative.dev/eventing-kafka/pkg/common/config"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/pkg/injection"
"knative.dev/pkg/injection/sharedmain"

"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
commonconfig "knative.dev/eventing-kafka/pkg/channel/distributed/common/config"
distributedcommonconfig "knative.dev/eventing-kafka/pkg/channel/distributed/common/config"
commonk8s "knative.dev/eventing-kafka/pkg/channel/distributed/common/k8s"
"knative.dev/eventing-kafka/pkg/channel/distributed/common/kafka/sarama"
"knative.dev/eventing-kafka/pkg/channel/distributed/common/metrics"
Expand Down Expand Up @@ -88,7 +90,7 @@ func main() {
}

// Update The Sarama Config - Username/Password Overrides (Values From Secret Take Precedence Over ConfigMap)
kafkaAuthCfg, err := commonconfig.GetAuthConfigFromKubernetes(ctx, environment.KafkaSecretName, environment.KafkaSecretNamespace)
kafkaAuthCfg, err := distributedcommonconfig.GetAuthConfigFromKubernetes(ctx, environment.KafkaSecretName, environment.KafkaSecretNamespace)
if err != nil {
logger.Fatal("Failed To Load Auth Config", zap.Error(err))
}
Expand All @@ -103,13 +105,13 @@ func main() {
sarama.EnableSaramaLogging(ekConfig.Kafka.EnableSaramaLogging)

// Initialize Tracing (Watches config-tracing ConfigMap, Assumes Context Came From LoggingContext With Embedded K8S Client Key)
err = commonconfig.InitializeTracing(logger.Sugar(), ctx, environment.ServiceName, environment.SystemNamespace)
err = distributedcommonconfig.InitializeTracing(logger.Sugar(), ctx, environment.ServiceName, environment.SystemNamespace)
if err != nil {
logger.Fatal("Failed To Initialize Tracing - Terminating", zap.Error(err))
}

// Initialize Observability (Watches config-observability ConfigMap And Starts Profiling Server)
err = commonconfig.InitializeObservability(ctx, logger.Sugar(), environment.MetricsDomain, environment.MetricsPort, environment.SystemNamespace)
err = distributedcommonconfig.InitializeObservability(ctx, logger.Sugar(), environment.MetricsDomain, environment.MetricsPort, environment.SystemNamespace)
if err != nil {
logger.Fatal("Failed To Initialize Observability - Terminating", zap.Error(err))
}
Expand Down Expand Up @@ -143,14 +145,23 @@ func main() {
}
dispatcher = dispatch.NewDispatcher(dispatcherConfig)

// Create A Watcher On The Configuration Settings ConfigMap & Dynamically Update Configuration
// Since this is designed to be called by the main() function, the default KNative package behavior here
// is a fatal exit if the watch cannot be set up.
cmw := sharedmain.SetupConfigMapWatchOrDie(ctx, logger.Sugar())

// Watch The Settings ConfigMap For Changes
err = commonconfig.InitializeConfigWatcher(ctx, logger.Sugar(), configMapObserver, environment.SystemNamespace)
err = commonconfig.InitializeKafkaConfigMapWatcher(ctx, cmw, logger.Sugar(), configMapObserver, environment.SystemNamespace)
if err != nil {
logger.Fatal("Failed To Initialize ConfigMap Watcher", zap.Error(err))
}

if err := cmw.Start(ctx.Done()); err != nil {
logger.Fatal("Failed to start configmap watcher", zap.Error(err))
}

// Watch The Secret For Changes
err = commonconfig.InitializeSecretWatcher(ctx, environment.KafkaSecretNamespace, environment.KafkaSecretName, secretObserver)
err = distributedcommonconfig.InitializeSecretWatcher(ctx, environment.KafkaSecretNamespace, environment.KafkaSecretName, secretObserver)
if err != nil {
logger.Fatal("Failed To Start Secret Watcher", zap.Error(err))
}
Expand Down
23 changes: 17 additions & 6 deletions cmd/channel/distributed/receiver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
commonconfig "knative.dev/eventing-kafka/pkg/channel/distributed/common/config"
distributedcommonconfig "knative.dev/eventing-kafka/pkg/channel/distributed/common/config"
commonk8s "knative.dev/eventing-kafka/pkg/channel/distributed/common/k8s"
"knative.dev/eventing-kafka/pkg/channel/distributed/common/kafka/sarama"
kafkautil "knative.dev/eventing-kafka/pkg/channel/distributed/common/kafka/util"
Expand All @@ -38,9 +38,11 @@ import (
channelhealth "knative.dev/eventing-kafka/pkg/channel/distributed/receiver/health"
"knative.dev/eventing-kafka/pkg/channel/distributed/receiver/producer"
kafkaclientset "knative.dev/eventing-kafka/pkg/client/clientset/versioned"
commonconfig "knative.dev/eventing-kafka/pkg/common/config"
eventingchannel "knative.dev/eventing/pkg/channel"
injectionclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/injection"
"knative.dev/pkg/injection/sharedmain"
"knative.dev/pkg/kmeta"
"knative.dev/pkg/logging"
eventingmetrics "knative.dev/pkg/metrics"
Expand Down Expand Up @@ -87,7 +89,7 @@ func main() {
}

// Update The Sarama Config - Username/Password Overrides (Values From Secret Take Precedence Over ConfigMap)
kafkaAuthCfg, err := commonconfig.GetAuthConfigFromKubernetes(ctx, environment.KafkaSecretName, environment.KafkaSecretNamespace)
kafkaAuthCfg, err := distributedcommonconfig.GetAuthConfigFromKubernetes(ctx, environment.KafkaSecretName, environment.KafkaSecretNamespace)
if err != nil {
logger.Fatal("Failed To Load Auth Config", zap.Error(err))
}
Expand All @@ -102,13 +104,13 @@ func main() {
sarama.EnableSaramaLogging(ekConfig.Kafka.EnableSaramaLogging)

// Initialize Tracing (Watches config-tracing ConfigMap, Assumes Context Came From LoggingContext With Embedded K8S Client Key)
err = commonconfig.InitializeTracing(logger.Sugar(), ctx, environment.ServiceName, environment.SystemNamespace)
err = distributedcommonconfig.InitializeTracing(logger.Sugar(), ctx, environment.ServiceName, environment.SystemNamespace)
if err != nil {
logger.Fatal("Could Not Initialize Tracing - Terminating", zap.Error(err))
}

// Initialize Observability (Watches config-observability ConfigMap And Starts Profiling Server)
err = commonconfig.InitializeObservability(ctx, logger.Sugar(), environment.MetricsDomain, environment.MetricsPort, environment.SystemNamespace)
err = distributedcommonconfig.InitializeObservability(ctx, logger.Sugar(), environment.MetricsDomain, environment.MetricsPort, environment.SystemNamespace)
if err != nil {
logger.Fatal("Could Not Initialize Observability - Terminating", zap.Error(err))
}
Expand All @@ -133,14 +135,23 @@ func main() {
// Create A New Stats StatsReporter
statsReporter := metrics.NewStatsReporter(logger)

// Create A Watcher On The Configuration Settings ConfigMap & Dynamically Update Configuration
// Since this is designed to be called by the main() function, the default KNative package behavior here
// is a fatal exit if the watch cannot be set up.
cmw := sharedmain.SetupConfigMapWatchOrDie(ctx, logger.Sugar())

// Watch The Settings ConfigMap For Changes
err = commonconfig.InitializeConfigWatcher(ctx, logger.Sugar(), configMapObserver, environment.SystemNamespace)
err = commonconfig.InitializeKafkaConfigMapWatcher(ctx, cmw, logger.Sugar(), configMapObserver, environment.SystemNamespace)
if err != nil {
logger.Fatal("Failed To Initialize ConfigMap Watcher", zap.Error(err))
}

if err := cmw.Start(ctx.Done()); err != nil {
logger.Fatal("Failed to start configmap watcher", zap.Error(err))
}

// Watch The Secret For Changes
err = commonconfig.InitializeSecretWatcher(ctx, environment.KafkaSecretNamespace, environment.KafkaSecretName, secretObserver)
err = distributedcommonconfig.InitializeSecretWatcher(ctx, environment.KafkaSecretNamespace, environment.KafkaSecretName, secretObserver)
if err != nil {
logger.Fatal("Failed To Start Secret Watcher", zap.Error(err))
}
Expand Down
23 changes: 9 additions & 14 deletions pkg/channel/consolidated/reconciler/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,9 @@ import (
"context"

"github.com/kelseyhightower/envconfig"
"knative.dev/eventing-kafka/pkg/common/constants"

"go.uber.org/zap"
commonconfig "knative.dev/eventing-kafka/pkg/common/config"

v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment"
Expand Down Expand Up @@ -74,29 +70,28 @@ func NewController(
roleBindingLister: roleBindingInformer.Lister(),
}

logger := logging.FromContext(ctx)

env := &envConfig{}
if err := envconfig.Process("", env); err != nil {
logging.FromContext(ctx).Panicf("unable to process Kafka channel's required environment variables: %v", err)
logger.Panicf("unable to process Kafka channel's required environment variables: %v", err)
}

if env.Image == "" {
logging.FromContext(ctx).Panic("unable to process Kafka channel's required environment variables (missing DISPATCHER_IMAGE)")
logger.Panic("unable to process Kafka channel's required environment variables (missing DISPATCHER_IMAGE)")
}

r.dispatcherImage = env.Image

impl := kafkaChannelReconciler.NewImpl(ctx, r)

// Get and Watch the Kakfa config map and dynamically update Kafka configuration.
if _, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(ctx, constants.SettingsConfigMapName, metav1.GetOptions{}); err == nil {
cmw.Watch(constants.SettingsConfigMapName, func(configMap *v1.ConfigMap) {
r.updateKafkaConfig(ctx, configMap)
})
} else if !apierrors.IsNotFound(err) {
logging.FromContext(ctx).With(zap.Error(err)).Fatalf("Error reading ConfigMap '%s'", constants.SettingsConfigMapName)
err := commonconfig.InitializeKafkaConfigMapWatcher(ctx, cmw, logger, r.updateKafkaConfig, system.Namespace())
if err != nil {
logger.Fatal("Failed To Initialize ConfigMap Watcher", zap.Error(err))
}

logging.FromContext(ctx).Info("Setting up event handlers")
logger.Info("Setting up event handlers")
kafkaChannelInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))

// Set up watches for dispatcher resources we care about, since any changes to these
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
"knative.dev/eventing-kafka/pkg/channel/distributed/common/env"
commontesting "knative.dev/eventing-kafka/pkg/channel/distributed/common/testing"
commontesting "knative.dev/eventing-kafka/pkg/common/testing"
injectionclient "knative.dev/pkg/client/injection/kube/client"
configmap "knative.dev/pkg/configmap/informer"
logtesting "knative.dev/pkg/logging/testing"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
kafkaconstants "knative.dev/eventing-kafka/pkg/channel/distributed/common/kafka/constants"
commontesting "knative.dev/eventing-kafka/pkg/channel/distributed/common/testing"
"knative.dev/eventing-kafka/pkg/common/constants"
commontesting "knative.dev/eventing-kafka/pkg/common/testing"
injectionclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/system"
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/channel/distributed/common/config/tracing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"context"
"testing"

commontesting "knative.dev/eventing-kafka/pkg/channel/distributed/common/testing"
commontesting "knative.dev/eventing-kafka/pkg/common/testing"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
Expand Down
2 changes: 1 addition & 1 deletion pkg/channel/distributed/common/k8s/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
commontesting "knative.dev/eventing-kafka/pkg/channel/distributed/common/testing"
commontesting "knative.dev/eventing-kafka/pkg/common/testing"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/logging"
"knative.dev/pkg/system"
Expand Down
2 changes: 1 addition & 1 deletion pkg/channel/distributed/common/kafka/sarama/sarama.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
"github.com/ghodss/yaml"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
commonconfig "knative.dev/eventing-kafka/pkg/channel/distributed/common/config"
"knative.dev/eventing-kafka/pkg/common/client"
commonconfig "knative.dev/eventing-kafka/pkg/common/config"
"knative.dev/eventing-kafka/pkg/common/constants"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/system"
Expand Down
4 changes: 2 additions & 2 deletions pkg/channel/distributed/common/kafka/sarama/sarama_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import (
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/client-go/kubernetes/fake"
commonconfig "knative.dev/eventing-kafka/pkg/channel/distributed/common/config"
commontesting "knative.dev/eventing-kafka/pkg/channel/distributed/common/testing"
commonconfig "knative.dev/eventing-kafka/pkg/common/config"
"knative.dev/eventing-kafka/pkg/common/constants"
commontesting "knative.dev/eventing-kafka/pkg/common/testing"
injectionclient "knative.dev/pkg/client/injection/kube/client"
)

Expand Down
4 changes: 2 additions & 2 deletions pkg/channel/distributed/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package config
import (
"strings"

"knative.dev/eventing-kafka/pkg/channel/distributed/common/config"
"knative.dev/eventing-kafka/pkg/channel/distributed/controller/constants"
commonconfig "knative.dev/eventing-kafka/pkg/common/config"
)

// ConfigurationError is the type of error returned from VerifyConfiguration
Expand All @@ -33,7 +33,7 @@ func (err ControllerConfigurationError) Error() string {

// VerifyConfiguration returns an error if mandatory fields in the EventingKafkaConfig have not been set either
// via the external configmap or the internal variables.
func VerifyConfiguration(configuration *config.EventingKafkaConfig) error {
func VerifyConfiguration(configuration *commonconfig.EventingKafkaConfig) error {

// Verify & Lowercase The Kafka AdminType
lowercaseKafkaAdminType := strings.ToLower(configuration.Kafka.AdminType)
Expand Down
4 changes: 2 additions & 2 deletions pkg/channel/distributed/controller/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/api/resource"
"knative.dev/eventing-kafka/pkg/channel/distributed/common/config"
commonconfig "knative.dev/eventing-kafka/pkg/common/config"
)

// Test Constants
Expand Down Expand Up @@ -163,7 +163,7 @@ func TestVerifyConfiguration(t *testing.T) {

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
testConfig := &config.EventingKafkaConfig{}
testConfig := &commonconfig.EventingKafkaConfig{}
testConfig.Kafka.Topic.DefaultNumPartitions = testCase.kafkaTopicDefaultNumPartitions
testConfig.Kafka.Topic.DefaultReplicationFactor = testCase.kafkaTopicDefaultReplicationFactor
testConfig.Kafka.Topic.DefaultRetentionMillis = testCase.kafkaTopicDefaultRetentionMillis
Expand Down
6 changes: 3 additions & 3 deletions pkg/channel/distributed/controller/kafkachannel/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
kafkachannelv1beta1 "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1"
commonconfig "knative.dev/eventing-kafka/pkg/channel/distributed/common/config"
"knative.dev/eventing-kafka/pkg/channel/distributed/common/kafka/admin/types"
clientconstants "knative.dev/eventing-kafka/pkg/channel/distributed/common/kafka/constants"
"knative.dev/eventing-kafka/pkg/channel/distributed/common/kafka/sarama"
Expand All @@ -35,6 +34,7 @@ import (
"knative.dev/eventing-kafka/pkg/client/injection/informers/messaging/v1beta1/kafkachannel"
kafkachannelreconciler "knative.dev/eventing-kafka/pkg/client/injection/reconciler/messaging/v1beta1/kafkachannel"
commonclient "knative.dev/eventing-kafka/pkg/common/client"
commonconfig "knative.dev/eventing-kafka/pkg/common/config"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment"
"knative.dev/pkg/client/injection/kube/informers/core/v1/service"
Expand All @@ -47,7 +47,7 @@ import (
var rec *Reconciler

// Create A New KafkaChannel Controller
func NewController(ctx context.Context, _ configmap.Watcher) *controller.Impl {
func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl {

// Get A Logger
logger := logging.FromContext(ctx).Desugar()
Expand Down Expand Up @@ -140,7 +140,7 @@ func NewController(ctx context.Context, _ configmap.Watcher) *controller.Impl {
}

// Watch The Settings ConfigMap For Changes
err = commonconfig.InitializeConfigWatcher(ctx, logger.Sugar(), rec.configMapObserver, environment.SystemNamespace)
err = commonconfig.InitializeKafkaConfigMapWatcher(ctx, cmw, logger.Sugar(), rec.configMapObserver, environment.SystemNamespace)
if err != nil {
logger.Fatal("Failed To Initialize ConfigMap Watcher", zap.Error(err))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,17 @@ import (
"k8s.io/client-go/rest"
kafkachannelv1beta1 "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1"
commonenv "knative.dev/eventing-kafka/pkg/channel/distributed/common/env"
commontesting "knative.dev/eventing-kafka/pkg/channel/distributed/common/testing"
"knative.dev/eventing-kafka/pkg/channel/distributed/controller/constants"
controllerenv "knative.dev/eventing-kafka/pkg/channel/distributed/controller/env"
controllertesting "knative.dev/eventing-kafka/pkg/channel/distributed/controller/testing"
fakeKafkaClient "knative.dev/eventing-kafka/pkg/client/injection/client/fake"
_ "knative.dev/eventing-kafka/pkg/client/injection/informers/messaging/v1beta1/kafkachannel/fake" // Knative Fake Informer Injection
commontesting "knative.dev/eventing-kafka/pkg/common/testing"
"knative.dev/pkg/client/injection/kube/client/fake"
_ "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment/fake" // Knative Fake Informer Injection
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/service/fake" // Knative Fake Informer Injection
"knative.dev/pkg/injection"
"knative.dev/pkg/injection/sharedmain"
"knative.dev/pkg/logging"
logtesting "knative.dev/pkg/logging/testing"
)
Expand Down Expand Up @@ -67,11 +68,14 @@ func TestNewController(t *testing.T) {
ctx, fakeKafkaClientset := fakeKafkaClient.With(ctx)
assert.NotNil(t, fakeKafkaClientset)

// Create A Watcher On The Configuration Settings ConfigMap & Dynamically Update Configuration
cmw := sharedmain.SetupConfigMapWatchOrDie(ctx, logger)

// Perform The Test (Create The KafkaChannel Controller)
environment, err := controllerenv.GetEnvironment(logger.Desugar())
assert.Nil(t, err)
ctx = context.WithValue(ctx, controllerenv.Key{}, environment)
controller := NewController(ctx, nil)
controller := NewController(ctx, cmw)

// Verify The Results
assert.NotNil(t, controller)
Expand Down
6 changes: 3 additions & 3 deletions pkg/channel/distributed/controller/kafkachannel/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
kafkav1beta1 "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1"
"knative.dev/eventing-kafka/pkg/channel/distributed/common/config"
"knative.dev/eventing-kafka/pkg/channel/distributed/common/kafka/admin"
"knative.dev/eventing-kafka/pkg/channel/distributed/common/kafka/admin/types"
kafkasarama "knative.dev/eventing-kafka/pkg/channel/distributed/common/kafka/sarama"
Expand All @@ -42,6 +41,7 @@ import (
"knative.dev/eventing-kafka/pkg/client/injection/reconciler/messaging/v1beta1/kafkachannel"
kafkalisters "knative.dev/eventing-kafka/pkg/client/listers/messaging/v1beta1"
"knative.dev/eventing-kafka/pkg/common/client"
commonconfig "knative.dev/eventing-kafka/pkg/common/config"
commonconstants "knative.dev/eventing-kafka/pkg/common/constants"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/logging"
Expand All @@ -55,13 +55,13 @@ type Reconciler struct {
adminClientType types.AdminClientType
adminClient types.AdminClientInterface
environment *env.Environment
config *config.EventingKafkaConfig
config *commonconfig.EventingKafkaConfig
saramaConfig *sarama.Config
kafkachannelLister kafkalisters.KafkaChannelLister
kafkachannelInformer cache.SharedIndexInformer
deploymentLister appsv1listers.DeploymentLister
serviceLister corev1listers.ServiceLister
configObserver config.LoggingObserver
configObserver commonconfig.LoggingObserver
adminMutex *sync.Mutex
kafkaSecret string
kafkaBrokers string
Expand Down
Loading

0 comments on commit ee7edfd

Please sign in to comment.