Skip to content

Commit

Permalink
Add support to use PEM string for SSL (#3868)
Browse files Browse the repository at this point in the history
Make it possible to use CA certificate string (PEM format) for verifying the broker's key,
Client's private key string (PEM format) used for authentication and Client's public key string (PEM format) used for authentication.
  • Loading branch information
stephen37 authored Jan 27, 2022
1 parent 19f2c88 commit 8eaf8fe
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 7 deletions.
13 changes: 10 additions & 3 deletions executor/api/kafka/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,16 @@ func NewKafkaServer(fullGraph bool, workers int, deploymentName, namespace, prot
if util.GetKafkaSecurityProtocol() == "SSL" {
sslKakfaServer := util.GetSslElements()
producerConfigMap["security.protocol"] = util.GetKafkaSecurityProtocol()
producerConfigMap["ssl.ca.location"] = sslKakfaServer.CACertFile
producerConfigMap["ssl.key.location"] = sslKakfaServer.ClientKeyFile
producerConfigMap["ssl.certificate.location"] = sslKakfaServer.ClientCertFile
if sslKakfaServer.CACertFile != "" && sslKakfaServer.ClientCertFile != "" {
producerConfigMap["ssl.ca.location"] = sslKakfaServer.CACertFile
producerConfigMap["ssl.key.location"] = sslKakfaServer.ClientKeyFile
producerConfigMap["ssl.certificate.location"] = sslKakfaServer.ClientCertFile
}
if sslKakfaServer.CACert != "" && sslKakfaServer.ClientCert != "" {
producerConfigMap["ssl.ca.pem"] = sslKakfaServer.CACert
producerConfigMap["ssl.key.pem"] = sslKakfaServer.ClientKey
producerConfigMap["ssl.certificate.pem"] = sslKakfaServer.ClientCert
}
producerConfigMap["ssl.key.password"] = sslKakfaServer.ClientKeyPass // Key password, if any

}
Expand Down
11 changes: 10 additions & 1 deletion executor/api/util/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ func GetEnvAsBool(key string, fallback bool) bool {
}

type SslKakfa struct {
ClientCert string
ClientKey string
CACert string
ClientCertFile string
ClientKeyFile string
CACertFile string
Expand All @@ -160,10 +163,16 @@ func GetKafkaSecurityProtocol() string {

func GetSslElements() *SslKakfa {
sslElements := SslKakfa{
ClientCert: GetEnv("KAFKA_SSL_CLIENT_CERT", ""),
ClientKey: GetEnv("KAFKA_SSL_CLIENT_KEY", ""),
CACert: GetEnv("KAFKA_SSL_CA_CERT", ""),
// If we use path to files instead of string
ClientCertFile: GetEnv("KAFKA_SSL_CLIENT_CERT_FILE", ""),
ClientKeyFile: GetEnv("KAFKA_SSL_CLIENT_KEY_FILE", ""),
CACertFile: GetEnv("KAFKA_SSL_CA_CERT_FILE", ""),
ClientKeyPass: GetEnv("KAFKA_SSL_CLIENT_KEY_PASS", ""),
// Optional password
ClientKeyPass: GetEnv("KAFKA_SSL_CLIENT_KEY_PASS", ""),
}
return &sslElements

}
14 changes: 11 additions & 3 deletions executor/logger/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,18 @@ func NewWorker(id int, workQueue chan LogRequest, log logr.Logger, sdepName stri
if util.GetKafkaSecurityProtocol() == "SSL" {
sslKafka := util.GetSslElements()
producerConfigMap["security.protocol"] = util.GetKafkaSecurityProtocol()
producerConfigMap["ssl.ca.location"] = sslKafka.CACertFile
producerConfigMap["ssl.key.location"] = sslKafka.ClientKeyFile
producerConfigMap["ssl.certificate.location"] = sslKafka.ClientCertFile
if sslKafka.CACertFile != "" && sslKafka.ClientCertFile != "" {
producerConfigMap["ssl.ca.location"] = sslKafka.CACertFile
producerConfigMap["ssl.key.location"] = sslKafka.ClientKeyFile
producerConfigMap["ssl.certificate.location"] = sslKafka.ClientCertFile
}
if sslKafka.CACert != "" && sslKafka.ClientCert != "" {
producerConfigMap["ssl.ca.pem"] = sslKafka.CACert
producerConfigMap["ssl.key.pem"] = sslKafka.ClientKey
producerConfigMap["ssl.certificate.pem"] = sslKafka.ClientCert
}
producerConfigMap["ssl.key.password"] = sslKafka.ClientKeyPass // Key password, if any

}

producer, err = kafka.NewProducer(&producerConfigMap)
Expand Down
67 changes: 67 additions & 0 deletions executor/logger/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,3 +191,70 @@ func TestWorkerKafkaConfigurations(t *testing.T) {
})
}
}

func TestWorkerKafkaConfigurationsString(t *testing.T) {
type test struct {
name string
kafkaConfig kafka.ConfigMap
expectError bool
}

g := NewWithT(t)

tests := []test{
{
name: "All options are valid",
kafkaConfig: kafka.ConfigMap{
"security.protocol": "SSL",
"ssl.ca.pem": caPEM,
"ssl.key.pem": keyPEM,
"ssl.certificate.pem": certPEM,
"ssl.key.password": keyPassword,
},
expectError: false,
},
{
name: "CA cert location is invalid",
kafkaConfig: kafka.ConfigMap{
"security.protocol": "SSL",
"ssl.ca.pem": "foobar",
"ssl.key.pem": keyPEM,
"ssl.certificate.pem": certPEM,
"ssl.key.password": keyPassword,
},
expectError: true,
},
{
name: "Private key file location is invalid",
kafkaConfig: kafka.ConfigMap{
"security.protocol": "SSL",
"ssl.ca.pem": caPEM,
"ssl.key.pem": "foobar",
"ssl.certificate.pem": certPEM,
"ssl.key.password": keyPassword,
},
expectError: true,
},
{
name: "Public key certificate location is invalid",
kafkaConfig: kafka.ConfigMap{
"security.protocol": "SSL",
"ssl.ca.pem": caPEM,
"ssl.key.pem": keyPEM,
"ssl.certificate.pem": "foobar",
"ssl.key.password": keyPassword,
},
expectError: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := kafka.NewProducer(&tt.kafkaConfig)
if tt.expectError {
g.Expect(err).ToNot(BeNil())
} else {
g.Expect(err).To(BeNil())
}
})
}
}

0 comments on commit 8eaf8fe

Please sign in to comment.