Skip to content

Commit

Permalink
feat: kafka broker now supports all kafka topic config options (#3671)
Browse files Browse the repository at this point in the history
* feat: kafka broker now supports all kafka topic config options

Signed-off-by: Calum Murray <cmurray@redhat.com>

* test: added unit tests, fixed existing tests

Signed-off-by: Calum Murray <cmurray@redhat.com>

---------

Signed-off-by: Calum Murray <cmurray@redhat.com>
  • Loading branch information
Cali0707 authored Feb 9, 2024
1 parent d6a383f commit 75194cf
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 0 deletions.
11 changes: 11 additions & 0 deletions control-plane/pkg/kafka/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ import (
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
"knative.dev/pkg/configmap"
)

const (
DefaultTopicNumPartitionConfigMapKey = "default.topic.partitions"
DefaultTopicReplicationFactorConfigMapKey = "default.topic.replication.factor"
BootstrapServersConfigMapKey = "bootstrap.servers"
DefaultTopicConfigPrefix = "default.topic.config."

GroupIDConfigMapKey = "group.id"

Expand Down Expand Up @@ -97,6 +99,15 @@ func buildTopicConfigFromConfigMap(cm *corev1.ConfigMap) (*TopicConfig, error) {
return nil, fmt.Errorf("failed to parse config map %s/%s: %w", cm.Namespace, cm.Name, err)
}

for k, v := range cm.Data {
if s := strings.TrimPrefix(k, DefaultTopicConfigPrefix); s != k {
if topicDetail.ConfigEntries == nil {
topicDetail.ConfigEntries = make(map[string]*string)
}
topicDetail.ConfigEntries[s] = pointer.String(v)
}
}

topicDetail.ReplicationFactor = int16(replicationFactor)

config := &TopicConfig{
Expand Down
22 changes: 22 additions & 0 deletions control-plane/pkg/kafka/topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
eventing "knative.dev/eventing/pkg/apis/eventing/v1"

kafkatesting "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/testing"
Expand Down Expand Up @@ -441,6 +442,27 @@ func TestTopicConfigFromConfigMap(t *testing.T) {
BootstrapServers: []string{"server1:9092", "server2:9092"},
},
},
{
name: "All valid, config options provided",
data: map[string]string{
"default.topic.partitions": "5",
"default.topic.replication.factor": "8",
"bootstrap.servers": "server1:9092, server2:9092",
"default.topic.config.retention.ms": "3600",
"default.topic.config.max.message.bytes": "68000",
},
want: TopicConfig{
TopicDetail: sarama.TopicDetail{
NumPartitions: 5,
ReplicationFactor: 8,
ConfigEntries: map[string]*string{
"retention.ms": pointer.String("3600"),
"max.message.bytes": pointer.String("68000"),
},
},
BootstrapServers: []string{"server1:9092", "server2:9092"},
},
},
{
name: "Missing keys 'default.topic.partitions' - not allowed",
data: map[string]string{
Expand Down

0 comments on commit 75194cf

Please sign in to comment.