diff --git a/control-plane/pkg/reconciler/broker/broker.go b/control-plane/pkg/reconciler/broker/broker.go index 29b7035e47..a34ea47e80 100644 --- a/control-plane/pkg/reconciler/broker/broker.go +++ b/control-plane/pkg/reconciler/broker/broker.go @@ -364,12 +364,12 @@ func (r *Reconciler) topicConfig(logger *zap.Logger, broker *eventing.Broker) (* return nil, nil, fmt.Errorf("failed to get configmap %s/%s: %w", namespace, broker.Spec.Config.Name, err) } - brokerConfig, err := configFromConfigMap(logger, cm) + topicConfig, err := kafka.TopicConfigFromConfigMap(logger, cm) if err != nil { - return nil, cm, err + return nil, cm, fmt.Errorf("unable to build topic config from configmap: %w - ConfigMap data: %v", err, cm.Data) } - return brokerConfig, cm, nil + return topicConfig, cm, nil } func (r *Reconciler) defaultTopicDetail() sarama.TopicDetail { @@ -431,8 +431,13 @@ func (r *Reconciler) ConfigMapUpdated(ctx context.Context) func(configMap *corev return func(configMap *corev1.ConfigMap) { - topicConfig, err := configFromConfigMap(logger, configMap) + topicConfig, err := kafka.TopicConfigFromConfigMap(logger, configMap) if err != nil { + logger.Error("failed to build broker config from configmap", + zap.String("configMap.Namespace", configMap.Namespace), + zap.String("configMap.Name", configMap.Name), + zap.Any("configMap.Data", configMap.Data), + zap.Error(err)) return } @@ -472,7 +477,7 @@ func (r *Reconciler) getDefaultBootstrapServersOrFail() ([]string, error) { defer r.bootstrapServersLock.RUnlock() if len(r.bootstrapServers) == 0 { - return nil, fmt.Errorf("no %s provided", BootstrapServersConfigMapKey) + return nil, fmt.Errorf("no %s provided", kafka.BootstrapServersConfigMapKey) } return r.bootstrapServers, nil diff --git a/control-plane/pkg/reconciler/broker/broker_config.go b/control-plane/pkg/reconciler/broker/broker_config.go deleted file mode 100644 index b942f3d114..0000000000 --- a/control-plane/pkg/reconciler/broker/broker_config.go +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright 2020 The Knative Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package broker - -import ( - "fmt" - - "github.com/Shopify/sarama" - "go.uber.org/zap" - corev1 "k8s.io/api/core/v1" - "knative.dev/pkg/configmap" - - "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/kafka" -) - -func configFromConfigMap(logger *zap.Logger, cm *corev1.ConfigMap) (*kafka.TopicConfig, error) { - - topicDetail := sarama.TopicDetail{} - - var replicationFactor int32 - var bootstrapServers string - - err := configmap.Parse(cm.Data, - configmap.AsInt32(DefaultTopicNumPartitionConfigMapKey, &topicDetail.NumPartitions), - configmap.AsInt32(DefaultTopicReplicationFactorConfigMapKey, &replicationFactor), - configmap.AsString(BootstrapServersConfigMapKey, &bootstrapServers), - ) - if err != nil { - return nil, fmt.Errorf("failed to parse config map %s/%s: %w", cm.Namespace, cm.Name, err) - } - - if topicDetail.NumPartitions <= 0 || replicationFactor <= 0 || bootstrapServers == "" { - return nil, fmt.Errorf( - "invalid configuration - numPartitions: %d - replicationFactor: %d - bootstrapServers: %s - ConfigMap data: %v", - topicDetail.NumPartitions, - replicationFactor, - bootstrapServers, - cm.Data, - ) - } - - topicDetail.ReplicationFactor = int16(replicationFactor) - - config := &kafka.TopicConfig{ - TopicDetail: topicDetail, - BootstrapServers: kafka.BootstrapServersArray(bootstrapServers), - } - - logger.Debug("got broker config from config map", zap.Any("config", config)) - - return config, nil -} diff --git a/control-plane/pkg/reconciler/broker/broker_test.go b/control-plane/pkg/reconciler/broker/broker_test.go index c1f87df3bd..a9ea149474 100644 --- a/control-plane/pkg/reconciler/broker/broker_test.go +++ b/control-plane/pkg/reconciler/broker/broker_test.go @@ -174,7 +174,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { }, }, OtherTestData: map[string]interface{}{ - BootstrapServersConfigMapKey: bootstrapServers, + kafka.BootstrapServersConfigMapKey: bootstrapServers, }, }, { @@ -233,8 +233,8 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { }, }, OtherTestData: map[string]interface{}{ - BootstrapServersConfigMapKey: bootstrapServers, - testProber: probertesting.MockProber(prober.StatusNotReady), + kafka.BootstrapServersConfigMapKey: bootstrapServers, + testProber: probertesting.MockProber(prober.StatusNotReady), }, }, { @@ -293,8 +293,8 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { }, }, OtherTestData: map[string]interface{}{ - BootstrapServersConfigMapKey: bootstrapServers, - testProber: probertesting.MockProber(prober.StatusUnknown), + kafka.BootstrapServersConfigMapKey: bootstrapServers, + testProber: probertesting.MockProber(prober.StatusUnknown), }, }, { @@ -353,7 +353,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { }, }, OtherTestData: map[string]interface{}{ - BootstrapServersConfigMapKey: bootstrapServers, + kafka.BootstrapServersConfigMapKey: bootstrapServers, }, }, { @@ -412,7 +412,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { }, }, OtherTestData: map[string]interface{}{ - BootstrapServersConfigMapKey: bootstrapServers, + kafka.BootstrapServersConfigMapKey: bootstrapServers, }, }, { @@ -447,8 +447,8 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { }, }, OtherTestData: map[string]interface{}{ - wantErrorOnCreateTopic: createTopicError, - BootstrapServersConfigMapKey: bootstrapServers, + wantErrorOnCreateTopic: createTopicError, + kafka.BootstrapServersConfigMapKey: bootstrapServers, }, }, { @@ -514,7 +514,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { }, }, OtherTestData: map[string]interface{}{ - BootstrapServersConfigMapKey: bootstrapServers, + kafka.BootstrapServersConfigMapKey: bootstrapServers, }, }, { @@ -568,7 +568,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { }, }, OtherTestData: map[string]interface{}{ - BootstrapServersConfigMapKey: bootstrapServers, + kafka.BootstrapServersConfigMapKey: bootstrapServers, }, }, { @@ -649,7 +649,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { }, }, OtherTestData: map[string]interface{}{ - BootstrapServersConfigMapKey: bootstrapServers, + kafka.BootstrapServersConfigMapKey: bootstrapServers, }, }, { @@ -747,7 +747,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { }, }, OtherTestData: map[string]interface{}{ - BootstrapServersConfigMapKey: bootstrapServers, + kafka.BootstrapServersConfigMapKey: bootstrapServers, }, }, { @@ -831,7 +831,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { }, }, OtherTestData: map[string]interface{}{ - BootstrapServersConfigMapKey: bootstrapServers, + kafka.BootstrapServersConfigMapKey: bootstrapServers, }, }, { @@ -908,7 +908,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { }, }, OtherTestData: map[string]interface{}{ - BootstrapServersConfigMapKey: bootstrapServers, + kafka.BootstrapServersConfigMapKey: bootstrapServers, }, }, { @@ -975,7 +975,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { }, }, OtherTestData: map[string]interface{}{ - BootstrapServersConfigMapKey: bootstrapServers, + kafka.BootstrapServersConfigMapKey: bootstrapServers, }, }, { @@ -1077,7 +1077,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { }, }, OtherTestData: map[string]interface{}{ - BootstrapServersConfigMapKey: bootstrapServers, + kafka.BootstrapServersConfigMapKey: bootstrapServers, ExpectedTopicDetail: sarama.TopicDetail{ NumPartitions: 20, ReplicationFactor: 5, @@ -1156,7 +1156,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { }, }, OtherTestData: map[string]interface{}{ - BootstrapServersConfigMapKey: bootstrapServers, + kafka.BootstrapServersConfigMapKey: bootstrapServers, ExpectedTopicDetail: sarama.TopicDetail{ NumPartitions: 20, ReplicationFactor: 5, @@ -1200,7 +1200,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { }, }, OtherTestData: map[string]interface{}{ - BootstrapServersConfigMapKey: bootstrapServers, + kafka.BootstrapServersConfigMapKey: bootstrapServers, }, }, { @@ -1256,7 +1256,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { }, }, OtherTestData: map[string]interface{}{ - BootstrapServersConfigMapKey: bootstrapServers, + kafka.BootstrapServersConfigMapKey: bootstrapServers, }, }, { @@ -1369,7 +1369,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { }, }, OtherTestData: map[string]interface{}{ - BootstrapServersConfigMapKey: bootstrapServers, + kafka.BootstrapServersConfigMapKey: bootstrapServers, }, }, { @@ -1462,7 +1462,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { }, }, OtherTestData: map[string]interface{}{ - BootstrapServersConfigMapKey: bootstrapServers, + kafka.BootstrapServersConfigMapKey: bootstrapServers, }, }, { @@ -1528,7 +1528,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { }, }, OtherTestData: map[string]interface{}{ - BootstrapServersConfigMapKey: bootstrapServers, + kafka.BootstrapServersConfigMapKey: bootstrapServers, }, }, { @@ -1591,7 +1591,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { }, }, OtherTestData: map[string]interface{}{ - BootstrapServersConfigMapKey: bootstrapServers, + kafka.BootstrapServersConfigMapKey: bootstrapServers, }, }, { @@ -1657,7 +1657,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { }, }, OtherTestData: map[string]interface{}{ - BootstrapServersConfigMapKey: bootstrapServers, + kafka.BootstrapServersConfigMapKey: bootstrapServers, }, }, { @@ -1706,7 +1706,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { }, }, OtherTestData: map[string]interface{}{ - BootstrapServersConfigMapKey: bootstrapServers, + kafka.BootstrapServersConfigMapKey: bootstrapServers, }, }, { @@ -1758,7 +1758,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { }, }, OtherTestData: map[string]interface{}{ - BootstrapServersConfigMapKey: bootstrapServers, + kafka.BootstrapServersConfigMapKey: bootstrapServers, }, }, } @@ -1808,7 +1808,7 @@ func brokerFinalization(t *testing.T, format string, env config.Env) { }), }, OtherTestData: map[string]interface{}{ - BootstrapServersConfigMapKey: bootstrapServers, + kafka.BootstrapServersConfigMapKey: bootstrapServers, }, }, { @@ -1836,7 +1836,7 @@ func brokerFinalization(t *testing.T, format string, env config.Env) { }), }, OtherTestData: map[string]interface{}{ - BootstrapServersConfigMapKey: bootstrapServers, + kafka.BootstrapServersConfigMapKey: bootstrapServers, }, }, { @@ -1870,8 +1870,8 @@ func brokerFinalization(t *testing.T, format string, env config.Env) { }), }, OtherTestData: map[string]interface{}{ - wantErrorOnDeleteTopic: deleteTopicError, - BootstrapServersConfigMapKey: bootstrapServers, + wantErrorOnDeleteTopic: deleteTopicError, + kafka.BootstrapServersConfigMapKey: bootstrapServers, }, }, { @@ -1888,7 +1888,7 @@ func brokerFinalization(t *testing.T, format string, env config.Env) { }, SkipNamespaceValidation: true, // WantCreates compare the broker namespace with configmap namespace, so skip it OtherTestData: map[string]interface{}{ - BootstrapServersConfigMapKey: bootstrapServers, + kafka.BootstrapServersConfigMapKey: bootstrapServers, }, }, { @@ -1925,7 +1925,7 @@ func brokerFinalization(t *testing.T, format string, env config.Env) { }), }, OtherTestData: map[string]interface{}{ - BootstrapServersConfigMapKey: bootstrapServers, + kafka.BootstrapServersConfigMapKey: bootstrapServers, }, }, { @@ -1962,8 +1962,8 @@ func brokerFinalization(t *testing.T, format string, env config.Env) { }), }, OtherTestData: map[string]interface{}{ - wantErrorOnDeleteTopic: sarama.ErrUnknownTopicOrPartition, - BootstrapServersConfigMapKey: bootstrapServers, + wantErrorOnDeleteTopic: sarama.ErrUnknownTopicOrPartition, + kafka.BootstrapServersConfigMapKey: bootstrapServers, }, }, { @@ -1983,7 +1983,7 @@ func brokerFinalization(t *testing.T, format string, env config.Env) { }, Key: testKey, OtherTestData: map[string]interface{}{ - BootstrapServersConfigMapKey: bootstrapServers, + kafka.BootstrapServersConfigMapKey: bootstrapServers, }, }, } @@ -2017,7 +2017,7 @@ func useTable(t *testing.T, table TableTest, env *config.Env) { } bootstrapServers := "" - if bs, ok := row.OtherTestData[BootstrapServersConfigMapKey]; ok { + if bs, ok := row.OtherTestData[kafka.BootstrapServersConfigMapKey]; ok { bootstrapServers = bs.(string) } @@ -2105,9 +2105,9 @@ func TestConfigMapUpdate(t *testing.T) { Namespace: "cmnamespace", }, Data: map[string]string{ - DefaultTopicNumPartitionConfigMapKey: "42", - DefaultTopicReplicationFactorConfigMapKey: "3", - BootstrapServersConfigMapKey: "server1,server2", + kafka.DefaultTopicNumPartitionConfigMapKey: "42", + kafka.DefaultTopicReplicationFactorConfigMapKey: "3", + kafka.BootstrapServersConfigMapKey: "server1,server2", }, } diff --git a/control-plane/pkg/reconciler/broker/controller.go b/control-plane/pkg/reconciler/broker/controller.go index c3f173d39f..6973cf631e 100644 --- a/control-plane/pkg/reconciler/broker/controller.go +++ b/control-plane/pkg/reconciler/broker/controller.go @@ -48,10 +48,6 @@ import ( ) const ( - DefaultTopicNumPartitionConfigMapKey = "default.topic.partitions" - DefaultTopicReplicationFactorConfigMapKey = "default.topic.replication.factor" - BootstrapServersConfigMapKey = "bootstrap.servers" - DefaultNumPartitions = 10 DefaultReplicationFactor = 1 ) diff --git a/control-plane/pkg/reconciler/kafka/topic.go b/control-plane/pkg/reconciler/kafka/topic.go index 6d8e6117e1..6852a40638 100644 --- a/control-plane/pkg/reconciler/kafka/topic.go +++ b/control-plane/pkg/reconciler/kafka/topic.go @@ -22,7 +22,15 @@ import ( "github.com/Shopify/sarama" "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "knative.dev/pkg/configmap" +) + +const ( + DefaultTopicNumPartitionConfigMapKey = "default.topic.partitions" + DefaultTopicReplicationFactorConfigMapKey = "default.topic.replication.factor" + BootstrapServersConfigMapKey = "bootstrap.servers" ) // TopicConfig contains configurations for creating a topic. @@ -31,6 +39,61 @@ type TopicConfig struct { BootstrapServers []string } +func TopicConfigFromConfigMap(logger *zap.Logger, cm *corev1.ConfigMap) (*TopicConfig, error) { + config, err := buildTopicConfigFromConfigMap(cm) + if err != nil { + return nil, err + } + + if err := validateTopicConfig(config); err != nil { + return nil, fmt.Errorf("error validating topic config from configmap %s - ConfigMap data: %v", err, cm.Data) + } + + logger.Debug("topic config from configmap", + zap.Int32("numPartitions", config.TopicDetail.NumPartitions), + zap.Int16("replicationFactor", config.TopicDetail.ReplicationFactor), + zap.Any("replicationFactor", config.BootstrapServers), + ) + + return config, nil +} + +func buildTopicConfigFromConfigMap(cm *corev1.ConfigMap) (*TopicConfig, error) { + topicDetail := sarama.TopicDetail{} + + var replicationFactor int32 + var bootstrapServers string + + err := configmap.Parse(cm.Data, + configmap.AsInt32(DefaultTopicNumPartitionConfigMapKey, &topicDetail.NumPartitions), + configmap.AsInt32(DefaultTopicReplicationFactorConfigMapKey, &replicationFactor), + configmap.AsString(BootstrapServersConfigMapKey, &bootstrapServers), + ) + if err != nil { + return nil, fmt.Errorf("failed to parse config map %s/%s: %w", cm.Namespace, cm.Name, err) + } + + topicDetail.ReplicationFactor = int16(replicationFactor) + + config := &TopicConfig{ + TopicDetail: topicDetail, + BootstrapServers: BootstrapServersArray(bootstrapServers), + } + return config, nil +} + +func validateTopicConfig(config *TopicConfig) error { + if config.TopicDetail.NumPartitions <= 0 || config.TopicDetail.ReplicationFactor <= 0 || len(config.BootstrapServers) == 0 { + return fmt.Errorf( + "invalid configuration - numPartitions: %d - replicationFactor: %d - bootstrapServers: %s", + config.TopicDetail.NumPartitions, + config.TopicDetail.ReplicationFactor, + config.BootstrapServers, + ) + } + return nil +} + // GetBootstrapServers returns TopicConfig.BootstrapServers as a comma separated list of bootstrap servers. func (c TopicConfig) GetBootstrapServers() string { return BootstrapServersCommaSeparated(c.BootstrapServers) diff --git a/control-plane/pkg/reconciler/kafka/topic_test.go b/control-plane/pkg/reconciler/kafka/topic_test.go index 0964f40234..67a8c04e91 100644 --- a/control-plane/pkg/reconciler/kafka/topic_test.go +++ b/control-plane/pkg/reconciler/kafka/topic_test.go @@ -18,12 +18,14 @@ package kafka import ( "fmt" + "reflect" "testing" "github.com/Shopify/sarama" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" eventing "knative.dev/eventing/pkg/apis/eventing/v1" @@ -416,3 +418,73 @@ func TestBootstrapServersArray(t *testing.T) { require.Contains(t, bss, "bs:9002") require.Len(t, bss, 3) } + +func TestTopicConfigFromConfigMap(t *testing.T) { + tests := []struct { + name string + data map[string]string + want TopicConfig + wantErr bool + }{ + { + name: "All valid", + data: map[string]string{ + "default.topic.partitions": "5", + "default.topic.replication.factor": "8", + "bootstrap.servers": "server1:9092, server2:9092", + }, + want: TopicConfig{ + TopicDetail: sarama.TopicDetail{ + NumPartitions: 5, + ReplicationFactor: 8, + }, + BootstrapServers: []string{"server1:9092", "server2:9092"}, + }, + }, + { + name: "Missing keys 'default.topic.partitions' - not allowed", + data: map[string]string{ + "default.topic.replication.factor": "8", + "bootstrap.servers": "server1:9092, server2:9092", + }, + wantErr: true, + }, + { + name: "Missing keys 'default.topic.replication.factor' - not allowed", + data: map[string]string{ + "default.topic.partitions": "5", + "bootstrap.servers": "server1:9092, server2:9092", + }, + wantErr: true, + }, + { + name: "Missing keys 'bootstrap.servers' - not allowed", + data: map[string]string{ + "default.topic.partitions": "5", + "default.topic.replication.factor": "8", + }, + wantErr: true, + }, + } + for _, tt := range tests { + + logger := zap.NewNop() + + cm := &corev1.ConfigMap{ + Data: tt.data, + } + + t.Run(tt.name, func(t *testing.T) { + got, err := TopicConfigFromConfigMap(logger, cm) + + if (err != nil) != tt.wantErr { + t.Errorf("TopicConfigFromConfigMap() error = %v, wantErr %v", err, tt.wantErr) + return + } + + if !tt.wantErr && !reflect.DeepEqual(*got, tt.want) { + t.Errorf("TopicConfigFromConfigMap() got = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/control-plane/pkg/reconciler/testing/objects_broker.go b/control-plane/pkg/reconciler/testing/objects_broker.go index 4b044be764..cca4cf32fe 100644 --- a/control-plane/pkg/reconciler/testing/objects_broker.go +++ b/control-plane/pkg/reconciler/testing/objects_broker.go @@ -132,9 +132,9 @@ func BrokerConfig(bootstrapServers string, numPartitions, replicationFactor int, Name: ConfigMapName, }, Data: map[string]string{ - BootstrapServersConfigMapKey: bootstrapServers, - DefaultTopicReplicationFactorConfigMapKey: fmt.Sprintf("%d", replicationFactor), - DefaultTopicNumPartitionConfigMapKey: fmt.Sprintf("%d", numPartitions), + kafka.BootstrapServersConfigMapKey: bootstrapServers, + kafka.DefaultTopicReplicationFactorConfigMapKey: fmt.Sprintf("%d", replicationFactor), + kafka.DefaultTopicNumPartitionConfigMapKey: fmt.Sprintf("%d", numPartitions), }, } for _, opt := range options { diff --git a/test/e2e/broker_trigger_test.go b/test/e2e/broker_trigger_test.go index af0ecabed4..00ab0bd51d 100644 --- a/test/e2e/broker_trigger_test.go +++ b/test/e2e/broker_trigger_test.go @@ -186,9 +186,9 @@ func TestBrokerWithConfig(t *testing.T) { eventId := uuid.New().String() cm := client.CreateConfigMapOrFail(configMapName, client.Namespace, map[string]string{ - broker.DefaultTopicNumPartitionConfigMapKey: fmt.Sprintf("%d", numPartitions), - broker.DefaultTopicReplicationFactorConfigMapKey: fmt.Sprintf("%d", replicationFactor), - broker.BootstrapServersConfigMapKey: testingpkg.BootstrapServersPlaintext, + kafka.DefaultTopicNumPartitionConfigMapKey: fmt.Sprintf("%d", numPartitions), + kafka.DefaultTopicReplicationFactorConfigMapKey: fmt.Sprintf("%d", replicationFactor), + kafka.BootstrapServersConfigMapKey: testingpkg.BootstrapServersPlaintext, }) br := client.CreateBrokerOrFail( diff --git a/test/upgrade/continual/broker.go b/test/upgrade/continual/broker.go index 87404ca63b..42801c4c4d 100644 --- a/test/upgrade/continual/broker.go +++ b/test/upgrade/continual/broker.go @@ -35,7 +35,6 @@ import ( eventingkafkaupgrade "knative.dev/eventing-kafka/test/upgrade/continual" - "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/kafka" testingpkg "knative.dev/eventing-kafka-broker/test/pkg/testing" ) @@ -124,9 +123,9 @@ func (k kafkaBrokerSut) deployBroker(ctx sut.Context) { Namespace: namespace, }, Data: map[string]string{ - broker.BootstrapServersConfigMapKey: testingpkg.BootstrapServersPlaintext, - broker.DefaultTopicNumPartitionConfigMapKey: fmt.Sprintf("%d", k.NumPartitions), - broker.DefaultTopicReplicationFactorConfigMapKey: fmt.Sprintf("%d", k.ReplicationFactor), + kafka.BootstrapServersConfigMapKey: testingpkg.BootstrapServersPlaintext, + kafka.DefaultTopicNumPartitionConfigMapKey: fmt.Sprintf("%d", k.NumPartitions), + kafka.DefaultTopicReplicationFactorConfigMapKey: fmt.Sprintf("%d", k.ReplicationFactor), }, } cm, err := ctx.Kube.CoreV1().ConfigMaps(namespace).Create(ctx.Ctx, cm, metav1.CreateOptions{})