diff --git a/cmd/agent/app/builder.go b/cmd/agent/app/builder.go index a79400d7d94..66ab15137cb 100644 --- a/cmd/agent/app/builder.go +++ b/cmd/agent/app/builder.go @@ -21,7 +21,6 @@ import ( "strconv" "github.com/apache/thrift/lib/go/thrift" - "github.com/pkg/errors" "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" @@ -235,6 +234,6 @@ func CreateCollectorProxy( case reporter.TCHANNEL: return tchannel.NewCollectorProxy(tchanBuilder, mFactory, logger) default: - return nil, errors.New(fmt.Sprintf("unknown reporter type %s", string(opts.ReporterType))) + return nil, fmt.Errorf("unknown reporter type %s", string(opts.ReporterType)) } } diff --git a/pkg/kafka/auth/config.go b/pkg/kafka/auth/config.go index 09f5ae82c02..130155b1384 100644 --- a/pkg/kafka/auth/config.go +++ b/pkg/kafka/auth/config.go @@ -15,40 +15,48 @@ package auth import ( - "log" "strings" "github.com/Shopify/sarama" + "github.com/pkg/errors" "github.com/spf13/viper" ) -const none = "none" -const kerberos = "kerberos" +const ( + none = "none" + kerberos = "kerberos" + tls = "tls" +) var authTypes = []string{ none, kerberos, + tls, } // AuthenticationConfig describes the configuration properties needed authenticate with kafka cluster type AuthenticationConfig struct { Authentication string Kerberos KerberosConfig + TLS TLSConfig } //SetConfiguration set configure authentication into sarama config structure -func (config *AuthenticationConfig) SetConfiguration(saramaConfig *sarama.Config) { +func (config *AuthenticationConfig) SetConfiguration(saramaConfig *sarama.Config) error { authentication := strings.ToLower(config.Authentication) if strings.Trim(authentication, " ") == "" { authentication = none } switch authentication { + case none: + return nil case kerberos: setKerberosConfiguration(&config.Kerberos, saramaConfig) - case none: - return + return nil + case tls: + return setTLSConfiguration(&config.TLS, saramaConfig) default: - log.Fatalf("Unknown/Unsupported authentication method %s to kafka cluster.", config.Authentication) + return errors.Errorf("Unknown/Unsupported authentication method %s to kafka cluster.", config.Authentication) } } @@ -62,4 +70,8 @@ func (config *AuthenticationConfig) InitFromViper(configPrefix string, v *viper. config.Kerberos.Password = v.GetString(configPrefix + kerberosPrefix + suffixKerberosPassword) config.Kerberos.ConfigPath = v.GetString(configPrefix + kerberosPrefix + suffixKerberosConfig) config.Kerberos.KeyTabPath = v.GetString(configPrefix + kerberosPrefix + suffixKerberosKeyTab) + + config.TLS.CaPath = v.GetString(configPrefix + tlsPrefix + suffixTLSCA) + config.TLS.CertPath = v.GetString(configPrefix + tlsPrefix + suffixTLSCert) + config.TLS.KeyPath = v.GetString(configPrefix + tlsPrefix + suffixTLSKey) } diff --git a/pkg/kafka/auth/options.go b/pkg/kafka/auth/options.go index 7cb3df11dd3..43a2e98b9b5 100644 --- a/pkg/kafka/auth/options.go +++ b/pkg/kafka/auth/options.go @@ -20,7 +20,8 @@ import ( ) const ( - suffixAuthentication = ".authentication" + suffixAuthentication = ".authentication" + defaultAuthentication = none // Kerberos configuration options kerberosPrefix = ".kerberos" @@ -32,7 +33,6 @@ const ( suffixKerberosConfig = ".config-file" suffixKerberosKeyTab = ".keytab-file" - defaultAuthentication = none defaultKerberosConfig = "/etc/krb5.conf" defaultKerberosUseKeyTab = false defaultKerberosServiceName = "kafka" @@ -40,6 +40,16 @@ const ( defaultKerberosPassword = "" defaultKerberosUsername = "" defaultKerberosKeyTab = "/etc/security/kafka.keytab" + + // TLS configuration options + tlsPrefix = ".tls" + suffixTLSCert = ".cert" + suffixTLSKey = ".key" + suffixTLSCA = ".ca" + + defaultCAPath = "" + defaultCertPath = "" + defaultKeyPath = "" ) func addKerberosFlags(configPrefix string, flagSet *flag.FlagSet) { @@ -73,6 +83,22 @@ func addKerberosFlags(configPrefix string, flagSet *flag.FlagSet) { "Path to keytab file. i.e /etc/security/kafka.keytab") } +// AddFlags adds the flags for this package to the flagSet +func addTLSFlags(configPrefix string, flagSet *flag.FlagSet) { + flagSet.String( + configPrefix+tlsPrefix+suffixTLSCA, + defaultCAPath, + "Path to the TLS CA for the Kafka connection") + flagSet.String( + configPrefix+tlsPrefix+suffixTLSCert, + defaultCertPath, + "Path to the TLS Certificate for the Kafka connection") + flagSet.String( + configPrefix+tlsPrefix+suffixTLSKey, + defaultKeyPath, + "Path to the TLS Key for the Kafka connection") +} + // AddFlags add configuration flags to a flagSet. func AddFlags(configPrefix string, flagSet *flag.FlagSet) { flagSet.String( @@ -81,4 +107,5 @@ func AddFlags(configPrefix string, flagSet *flag.FlagSet) { "Authentication type used to authenticate with kafka cluster. e.g. "+strings.Join(authTypes, ", "), ) addKerberosFlags(configPrefix, flagSet) + addTLSFlags(configPrefix, flagSet) } diff --git a/pkg/kafka/auth/tls.go b/pkg/kafka/auth/tls.go new file mode 100644 index 00000000000..4d4b2be8912 --- /dev/null +++ b/pkg/kafka/auth/tls.go @@ -0,0 +1,71 @@ +// Copyright (c) 2019 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package auth + +import ( + gotls "crypto/tls" + "crypto/x509" + "io/ioutil" + "path/filepath" + + "github.com/Shopify/sarama" + "github.com/pkg/errors" +) + +// TLSConfig describes the configuration properties for TLS Connections to the Kafka Brokers +type TLSConfig struct { + CertPath string + KeyPath string + CaPath string +} + +func setTLSConfiguration(config *TLSConfig, saramaConfig *sarama.Config) error { + tlsConfig, err := config.getTLS() + if err != nil { + return errors.Wrap(err, "error loading tls config") + } + saramaConfig.Net.TLS.Enable = true + saramaConfig.Net.TLS.Config = tlsConfig + return nil +} + +func (tlsConfig TLSConfig) getTLS() (*gotls.Config, error) { + ca, err := loadCA(tlsConfig.CaPath) + if err != nil { + return nil, errors.Wrapf(err, "error reading ca") + } + + cert, err := gotls.LoadX509KeyPair(filepath.Clean(tlsConfig.CertPath), filepath.Clean(tlsConfig.KeyPath)) + if err != nil { + return nil, errors.Wrap(err, "error loading certificate") + } + + return &gotls.Config{ + RootCAs: ca, + Certificates: []gotls.Certificate{cert}, + }, nil +} + +func loadCA(caPath string) (*x509.CertPool, error) { + caBytes, err := ioutil.ReadFile(filepath.Clean(caPath)) + if err != nil { + return nil, errors.Wrapf(err, "error reading caFile %s", caPath) + } + certificates := x509.NewCertPool() + if ok := certificates.AppendCertsFromPEM(caBytes); !ok { + return nil, errors.Errorf("no ca certificates could be parsed") + } + return certificates, nil +} diff --git a/pkg/kafka/consumer/config.go b/pkg/kafka/consumer/config.go index caa21818de0..eba0e847858 100644 --- a/pkg/kafka/consumer/config.go +++ b/pkg/kafka/consumer/config.go @@ -37,13 +37,14 @@ type Builder interface { // Configuration describes the configuration properties needed to create a Kafka consumer type Configuration struct { + auth.AuthenticationConfig + Consumer + Brokers []string Topic string GroupID string ClientID string ProtocolVersion string - Consumer - auth.AuthenticationConfig } // NewConsumer creates a new kafka consumer @@ -58,6 +59,8 @@ func (c *Configuration) NewConsumer() (Consumer, error) { } saramaConfig.Config.Version = ver } - c.AuthenticationConfig.SetConfiguration(&saramaConfig.Config) + if err := c.AuthenticationConfig.SetConfiguration(&saramaConfig.Config); err != nil { + return nil, err + } return cluster.NewConsumer(c.Brokers, c.GroupID, []string{c.Topic}, saramaConfig) } diff --git a/pkg/kafka/producer/config.go b/pkg/kafka/producer/config.go index 729332ba716..1560bc3c122 100644 --- a/pkg/kafka/producer/config.go +++ b/pkg/kafka/producer/config.go @@ -42,7 +42,6 @@ func (c *Configuration) NewProducer() (sarama.AsyncProducer, error) { saramaConfig.Producer.Compression = c.Compression saramaConfig.Producer.CompressionLevel = c.CompressionLevel saramaConfig.Producer.Return.Successes = true - c.AuthenticationConfig.SetConfiguration(saramaConfig) if len(c.ProtocolVersion) > 0 { ver, err := sarama.ParseKafkaVersion(c.ProtocolVersion) if err != nil { @@ -50,5 +49,8 @@ func (c *Configuration) NewProducer() (sarama.AsyncProducer, error) { } saramaConfig.Version = ver } + if err := c.AuthenticationConfig.SetConfiguration(saramaConfig); err != nil { + return nil, err + } return sarama.NewAsyncProducer(c.Brokers, saramaConfig) }