diff --git a/common/config/kafkaConfig.go b/common/config/kafkaConfig.go index 87069e13b62..86b1b4ebe49 100644 --- a/common/config/kafkaConfig.go +++ b/common/config/kafkaConfig.go @@ -44,9 +44,8 @@ type ( // TopicConfig describes the mapping from topic to Kafka cluster TopicConfig struct { Cluster string `yaml:"cluster"` - // IsSecure describes whether the topic is secure, by default it is false - // If it is set to true, it allows to pass in the token to initialize secure producer for this topic - IsSecure bool `yaml:"isSecure,omitempty"` + // Properties map describes whether the topic properties, such as whether it is secure + Properties map[string]any `yaml:"properties,omitempty"` } // TopicList describes the topic names for each cluster @@ -103,7 +102,20 @@ func (k *KafkaConfig) GetTopicsForApplication(app string) TopicList { return k.Applications[app] } -// GetKafkaClusterSecureConfigForTopic gets isSecure status from topic -func (k *KafkaConfig) GetKafkaSecureConfigForTopic(topic string) bool { - return k.Topics[topic].IsSecure +// GetKafkaPropertiesForTopic gets properties from topic +func (k *KafkaConfig) GetKafkaPropertyForTopic(topic string, property string) any { + topicConfig, ok := k.Topics[topic] + if !ok || topicConfig.Properties == nil { + // No properties for the specified topic in the config + return nil + } + + // retrieve the property from the topic properties + propertyValue, ok := topicConfig.Properties[property] + if !ok { + // Property not found + return nil + } + + return propertyValue }