diff --git a/pkg/channel/consolidated/utils/util.go b/pkg/channel/consolidated/utils/util.go index a24acc7f39..8e8c43ce9a 100644 --- a/pkg/channel/consolidated/utils/util.go +++ b/pkg/channel/consolidated/utils/util.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "strconv" "strings" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -41,6 +42,7 @@ const ( MaxIdleConnectionsKey = "maxIdleConns" MaxIdleConnectionsPerHostKey = "maxIdleConnsPerHost" + TlsEnabled = "tls.enabled" TlsCacert = "ca.crt" TlsUsercert = "user.crt" TlsUserkey = "user.key" @@ -65,27 +67,32 @@ type KafkaConfig struct { SaramaSettingsYamlString string } -func GetKafkaAuthData(ctx context.Context, secretname string, secretNS string) *client.KafkaAuthConfig { - - k8sClient := kubeclient.Get(ctx) - secret, err := k8sClient.CoreV1().Secrets(secretNS).Get(ctx, secretname, metav1.GetOptions{}) - - if err != nil || secret == nil { - logging.FromContext(ctx).Errorf("Referenced Auth Secret not found") - return nil - } +func parseTls(secret *corev1.Secret, kafkaAuthConfig *client.KafkaAuthConfig) { - kafkaAuthConfig := &client.KafkaAuthConfig{} - // check for TLS + // self-signed CERTs we need CA CERT, USER CERT and KEy if string(secret.Data[TlsCacert]) != "" { + // We have a self-signed TLS cert tls := &client.KafkaTlsConfig{ Cacert: string(secret.Data[TlsCacert]), Usercert: string(secret.Data[TlsUsercert]), Userkey: string(secret.Data[TlsUserkey]), } kafkaAuthConfig.TLS = tls + } else { + // Public CERTS from a proper CA do not need this, + // we can just say `tls.enabled: true` + tlsEnabled, err := strconv.ParseBool(string(secret.Data[TlsEnabled])) + if err != nil { + tlsEnabled = false + } + if tlsEnabled { + // Looks like TLS is desired/enabled: + kafkaAuthConfig.TLS = &client.KafkaTlsConfig{} + } } +} +func parseSasl(secret *corev1.Secret, kafkaAuthConfig *client.KafkaAuthConfig) { if string(secret.Data[SaslUser]) != "" { sasl := &client.KafkaSaslConfig{ User: string(secret.Data[SaslUser]), @@ -94,6 +101,24 @@ func GetKafkaAuthData(ctx context.Context, secretname string, secretNS string) * } kafkaAuthConfig.SASL = sasl } +} + +func GetKafkaAuthData(ctx context.Context, secretname string, secretNS string) *client.KafkaAuthConfig { + + k8sClient := kubeclient.Get(ctx) + secret, err := k8sClient.CoreV1().Secrets(secretNS).Get(ctx, secretname, metav1.GetOptions{}) + + if err != nil || secret == nil { + logging.FromContext(ctx).Errorf("Referenced Auth Secret not found") + return nil + } + + kafkaAuthConfig := &client.KafkaAuthConfig{} + + // check for TLS and SASL options + parseTls(secret, kafkaAuthConfig) + parseSasl(secret, kafkaAuthConfig) + return kafkaAuthConfig } diff --git a/pkg/common/client/config.go b/pkg/common/client/config.go index 9e63d178ed..c73f44833e 100644 --- a/pkg/common/client/config.go +++ b/pkg/common/client/config.go @@ -186,14 +186,18 @@ func (b *configBuilder) Build(ctx context.Context) (*sarama.Config, error) { // then apply auth settings if b.auth != nil { - // tls + // TLS if b.auth.TLS != nil { config.Net.TLS.Enable = true - tlsConfig, err := newTLSConfig(b.auth.TLS.Usercert, b.auth.TLS.Userkey, b.auth.TLS.Cacert) - if err != nil { - return nil, fmt.Errorf("Error creating TLS config: %w", err) + + // if we have TLS, we might want to use the certs for self-signed CERTs + if b.auth.TLS.Cacert != "" { + tlsConfig, err := newTLSConfig(b.auth.TLS.Usercert, b.auth.TLS.Userkey, b.auth.TLS.Cacert) + if err != nil { + return nil, fmt.Errorf("Error creating TLS config: %w", err) + } + config.Net.TLS.Config = tlsConfig } - config.Net.TLS.Config = tlsConfig } // SASL if b.auth.SASL != nil {