Skip to content

Commit

Permalink
Support TLS for Kafka (#1414)
Browse files Browse the repository at this point in the history
* add tls config to kafka storage

Signed-off-by: mhoffmann <michoffmann.potsdam@gmail.com>

* fmt & lint

Signed-off-by: mhoffmann <michoffmann.potsdam@gmail.com>

* readd pkg/es/config.nocover

Signed-off-by: mhoffmann <michoffmann.potsdam@gmail.com>

* checkout correct commits in submodules

Signed-off-by: mhoffmann <michoffmann.potsdam@gmail.com>
  • Loading branch information
Michael Hoffmann authored and yurishkuro committed Oct 6, 2019
1 parent 2957d89 commit fcc0adb
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 15 deletions.
3 changes: 1 addition & 2 deletions cmd/agent/app/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"strconv"

"github.com/apache/thrift/lib/go/thrift"
"github.com/pkg/errors"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

Expand Down Expand Up @@ -235,6 +234,6 @@ func CreateCollectorProxy(
case reporter.TCHANNEL:
return tchannel.NewCollectorProxy(tchanBuilder, mFactory, logger)
default:
return nil, errors.New(fmt.Sprintf("unknown reporter type %s", string(opts.ReporterType)))
return nil, fmt.Errorf("unknown reporter type %s", string(opts.ReporterType))
}
}
26 changes: 19 additions & 7 deletions pkg/kafka/auth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,40 +15,48 @@
package auth

import (
"log"
"strings"

"github.com/Shopify/sarama"
"github.com/pkg/errors"
"github.com/spf13/viper"
)

const none = "none"
const kerberos = "kerberos"
const (
none = "none"
kerberos = "kerberos"
tls = "tls"
)

var authTypes = []string{
none,
kerberos,
tls,
}

// AuthenticationConfig describes the configuration properties needed authenticate with kafka cluster
type AuthenticationConfig struct {
Authentication string
Kerberos KerberosConfig
TLS TLSConfig
}

//SetConfiguration set configure authentication into sarama config structure
func (config *AuthenticationConfig) SetConfiguration(saramaConfig *sarama.Config) {
func (config *AuthenticationConfig) SetConfiguration(saramaConfig *sarama.Config) error {
authentication := strings.ToLower(config.Authentication)
if strings.Trim(authentication, " ") == "" {
authentication = none
}
switch authentication {
case none:
return nil
case kerberos:
setKerberosConfiguration(&config.Kerberos, saramaConfig)
case none:
return
return nil
case tls:
return setTLSConfiguration(&config.TLS, saramaConfig)
default:
log.Fatalf("Unknown/Unsupported authentication method %s to kafka cluster.", config.Authentication)
return errors.Errorf("Unknown/Unsupported authentication method %s to kafka cluster.", config.Authentication)
}
}

Expand All @@ -62,4 +70,8 @@ func (config *AuthenticationConfig) InitFromViper(configPrefix string, v *viper.
config.Kerberos.Password = v.GetString(configPrefix + kerberosPrefix + suffixKerberosPassword)
config.Kerberos.ConfigPath = v.GetString(configPrefix + kerberosPrefix + suffixKerberosConfig)
config.Kerberos.KeyTabPath = v.GetString(configPrefix + kerberosPrefix + suffixKerberosKeyTab)

config.TLS.CaPath = v.GetString(configPrefix + tlsPrefix + suffixTLSCA)
config.TLS.CertPath = v.GetString(configPrefix + tlsPrefix + suffixTLSCert)
config.TLS.KeyPath = v.GetString(configPrefix + tlsPrefix + suffixTLSKey)
}
31 changes: 29 additions & 2 deletions pkg/kafka/auth/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import (
)

const (
suffixAuthentication = ".authentication"
suffixAuthentication = ".authentication"
defaultAuthentication = none

// Kerberos configuration options
kerberosPrefix = ".kerberos"
Expand All @@ -32,14 +33,23 @@ const (
suffixKerberosConfig = ".config-file"
suffixKerberosKeyTab = ".keytab-file"

defaultAuthentication = none
defaultKerberosConfig = "/etc/krb5.conf"
defaultKerberosUseKeyTab = false
defaultKerberosServiceName = "kafka"
defaultKerberosRealm = ""
defaultKerberosPassword = ""
defaultKerberosUsername = ""
defaultKerberosKeyTab = "/etc/security/kafka.keytab"

// TLS configuration options
tlsPrefix = ".tls"
suffixTLSCert = ".cert"
suffixTLSKey = ".key"
suffixTLSCA = ".ca"

defaultCAPath = ""
defaultCertPath = ""
defaultKeyPath = ""
)

func addKerberosFlags(configPrefix string, flagSet *flag.FlagSet) {
Expand Down Expand Up @@ -73,6 +83,22 @@ func addKerberosFlags(configPrefix string, flagSet *flag.FlagSet) {
"Path to keytab file. i.e /etc/security/kafka.keytab")
}

// AddFlags adds the flags for this package to the flagSet
func addTLSFlags(configPrefix string, flagSet *flag.FlagSet) {
flagSet.String(
configPrefix+tlsPrefix+suffixTLSCA,
defaultCAPath,
"Path to the TLS CA for the Kafka connection")
flagSet.String(
configPrefix+tlsPrefix+suffixTLSCert,
defaultCertPath,
"Path to the TLS Certificate for the Kafka connection")
flagSet.String(
configPrefix+tlsPrefix+suffixTLSKey,
defaultKeyPath,
"Path to the TLS Key for the Kafka connection")
}

// AddFlags add configuration flags to a flagSet.
func AddFlags(configPrefix string, flagSet *flag.FlagSet) {
flagSet.String(
Expand All @@ -81,4 +107,5 @@ func AddFlags(configPrefix string, flagSet *flag.FlagSet) {
"Authentication type used to authenticate with kafka cluster. e.g. "+strings.Join(authTypes, ", "),
)
addKerberosFlags(configPrefix, flagSet)
addTLSFlags(configPrefix, flagSet)
}
71 changes: 71 additions & 0 deletions pkg/kafka/auth/tls.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package auth

import (
gotls "crypto/tls"
"crypto/x509"
"io/ioutil"
"path/filepath"

"github.com/Shopify/sarama"
"github.com/pkg/errors"
)

// TLSConfig describes the configuration properties for TLS Connections to the Kafka Brokers
type TLSConfig struct {
CertPath string
KeyPath string
CaPath string
}

func setTLSConfiguration(config *TLSConfig, saramaConfig *sarama.Config) error {
tlsConfig, err := config.getTLS()
if err != nil {
return errors.Wrap(err, "error loading tls config")
}
saramaConfig.Net.TLS.Enable = true
saramaConfig.Net.TLS.Config = tlsConfig
return nil
}

func (tlsConfig TLSConfig) getTLS() (*gotls.Config, error) {
ca, err := loadCA(tlsConfig.CaPath)
if err != nil {
return nil, errors.Wrapf(err, "error reading ca")
}

cert, err := gotls.LoadX509KeyPair(filepath.Clean(tlsConfig.CertPath), filepath.Clean(tlsConfig.KeyPath))
if err != nil {
return nil, errors.Wrap(err, "error loading certificate")
}

return &gotls.Config{
RootCAs: ca,
Certificates: []gotls.Certificate{cert},
}, nil
}

func loadCA(caPath string) (*x509.CertPool, error) {
caBytes, err := ioutil.ReadFile(filepath.Clean(caPath))
if err != nil {
return nil, errors.Wrapf(err, "error reading caFile %s", caPath)
}
certificates := x509.NewCertPool()
if ok := certificates.AppendCertsFromPEM(caBytes); !ok {
return nil, errors.Errorf("no ca certificates could be parsed")
}
return certificates, nil
}
9 changes: 6 additions & 3 deletions pkg/kafka/consumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ type Builder interface {

// Configuration describes the configuration properties needed to create a Kafka consumer
type Configuration struct {
auth.AuthenticationConfig
Consumer

Brokers []string
Topic string
GroupID string
ClientID string
ProtocolVersion string
Consumer
auth.AuthenticationConfig
}

// NewConsumer creates a new kafka consumer
Expand All @@ -58,6 +59,8 @@ func (c *Configuration) NewConsumer() (Consumer, error) {
}
saramaConfig.Config.Version = ver
}
c.AuthenticationConfig.SetConfiguration(&saramaConfig.Config)
if err := c.AuthenticationConfig.SetConfiguration(&saramaConfig.Config); err != nil {
return nil, err
}
return cluster.NewConsumer(c.Brokers, c.GroupID, []string{c.Topic}, saramaConfig)
}
4 changes: 3 additions & 1 deletion pkg/kafka/producer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@ func (c *Configuration) NewProducer() (sarama.AsyncProducer, error) {
saramaConfig.Producer.Compression = c.Compression
saramaConfig.Producer.CompressionLevel = c.CompressionLevel
saramaConfig.Producer.Return.Successes = true
c.AuthenticationConfig.SetConfiguration(saramaConfig)
if len(c.ProtocolVersion) > 0 {
ver, err := sarama.ParseKafkaVersion(c.ProtocolVersion)
if err != nil {
return nil, err
}
saramaConfig.Version = ver
}
if err := c.AuthenticationConfig.SetConfiguration(saramaConfig); err != nil {
return nil, err
}
return sarama.NewAsyncProducer(c.Brokers, saramaConfig)
}

0 comments on commit fcc0adb

Please sign in to comment.