Skip to content

Commit

Permalink
add tls config to kafka storage
Browse files Browse the repository at this point in the history
  • Loading branch information
mhoffmann committed Mar 17, 2019
1 parent 44eaf08 commit 01def27
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 5 deletions.
6 changes: 5 additions & 1 deletion cmd/ingester/app/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package builder

import (
"fmt"
"github.com/jaegertracing/jaeger/pkg/kafka/config"
"strings"

"github.com/uber/jaeger-lib/metrics"
Expand Down Expand Up @@ -51,7 +52,10 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit
spanProcessor := processor.NewSpanProcessor(spParams)

consumerConfig := kafkaConsumer.Configuration{
Brokers: options.Brokers,
Configuration: config.Configuration{
Brokers: options.Brokers,
TLS: options.TLS,
},
Topic: options.Topic,
GroupID: options.GroupID,
}
Expand Down
32 changes: 32 additions & 0 deletions cmd/ingester/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ const (
SuffixGroupID = ".group-id"
// SuffixEncoding is a suffix for the encoding flag
SuffixEncoding = ".encoding"
// SuffixTLS is a suffix for the tls flag
SuffixTLS = ".tls"
// SuffixCert is a suffix for the tls certificate path flag
SuffixCert = ".tls.cert"
// SuffixKey is a suffix for the tls key path flag
SuffixKey = ".tls.key"
// SuffixCA is a suffix for the tls ca path flag
SuffixCA = ".tls.ca"
// SuffixDeadlockInterval is a suffix for deadlock detecor flag
SuffixDeadlockInterval = ".deadlockInterval"
// SuffixParallelism is a suffix for the parallelism flag
Expand All @@ -55,6 +63,14 @@ const (
DefaultTopic = "jaeger-spans"
// DefaultGroupID is the default consumer Group ID
DefaultGroupID = "jaeger-ingester"
// DefaultTLS is the default for TLS enabled
DefaultTLS = false
// DefaultCAPath is the default for the TLS CA path
DefaultCAPath = ""
// DefaultCertPath is the default for the TLS Cert path
DefaultCertPath = ""
// DefaultKeyPath is the default for the TLS key path
DefaultKeyPath = ""
// DefaultParallelism is the default parallelism for the span processor
DefaultParallelism = 1000
// DefaultEncoding is the default span encoding
Expand Down Expand Up @@ -95,6 +111,22 @@ func AddFlags(flagSet *flag.FlagSet) {
KafkaConsumerConfigPrefix+SuffixEncoding,
DefaultEncoding,
fmt.Sprintf(`The encoding of spans ("%s") consumed from kafka`, strings.Join(kafka.AllEncodings, "\", \"")))
flagSet.Bool(
KafkaConsumerConfigPrefix+SuffixTLS,
DefaultTLS,
"Enable TLS for the Kafka connection")
flagSet.String(
KafkaConsumerConfigPrefix+SuffixCA,
DefaultCAPath,
"Path to the TLS CA for the Kafka connection")
flagSet.String(
KafkaConsumerConfigPrefix+SuffixCert,
DefaultCertPath,
"Path to the TLS Certificate for the Kafka connection")
flagSet.String(
KafkaConsumerConfigPrefix+SuffixKey,
DefaultKeyPath,
"Path to the TLS Key for the Kafka connection")
flagSet.String(
ConfigPrefix+SuffixParallelism,
strconv.Itoa(DefaultParallelism),
Expand Down
58 changes: 58 additions & 0 deletions pkg/kafka/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package config

import (
"crypto/tls"
"crypto/x509"
"io/ioutil"
)

// Configuration describes the shared configuration options for Producers or Consumers
type Configuration struct {
Brokers []string
TLS TLSConfig
}

// TLSConfig describes the configuration properties for TLS Connections to the Kafka Brokers
type TLSConfig struct {
Enabled bool
CertPath string
KeyPath string
CaPath string
}

// GetTLSConfig creates TLS Configuration
func (tlsConfig *TLSConfig) GetTLSConfig() (*tls.Config, error) {
rootCerts, err := tlsConfig.loadCertificate()
if err != nil {
return nil, err
}
clientPrivateKey, err := tlsConfig.loadPrivateKey()
if err != nil {
return nil, err
}
return &tls.Config{
RootCAs: rootCerts,
Certificates: []tls.Certificate{*clientPrivateKey},
}, nil

}

// loadCertificate is used to load root certification
func (tlsConfig *TLSConfig) loadCertificate() (*x509.CertPool, error) {
caCert, err := ioutil.ReadFile(tlsConfig.CaPath)
if err != nil {
return nil, err
}
certificates := x509.NewCertPool()
certificates.AppendCertsFromPEM(caCert)
return certificates, nil
}

// loadPrivateKey is used to load the private certificate and key for TLS
func (tlsConfig *TLSConfig) loadPrivateKey() (*tls.Certificate, error) {
privateKey, err := tls.LoadX509KeyPair(tlsConfig.CertPath, tlsConfig.KeyPath)
if err != nil {
return nil, err
}
return &privateKey, nil
}
12 changes: 11 additions & 1 deletion pkg/kafka/consumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package consumer

import (
"github.com/jaegertracing/jaeger/pkg/kafka/config"
"github.com/pkg/errors"
"io"

"github.com/bsm/sarama-cluster"
Expand All @@ -34,7 +36,7 @@ type Builder interface {

// Configuration describes the configuration properties needed to create a Kafka consumer
type Configuration struct {
Brokers []string
config.Configuration
Topic string
GroupID string
Consumer
Expand All @@ -44,5 +46,13 @@ type Configuration struct {
func (c *Configuration) NewConsumer() (Consumer, error) {
saramaConfig := cluster.NewConfig()
saramaConfig.Group.Mode = cluster.ConsumerModePartitions
if c.TLS.Enabled {
tlsConfig, err := c.TLS.GetTLSConfig()
if err != nil {
return nil, errors.Wrap(err, "error parsing tls configuration")
}
saramaConfig.Net.TLS.Enable = true
saramaConfig.Net.TLS.Config = tlsConfig
}
return cluster.NewConsumer(c.Brokers, c.GroupID, []string{c.Topic}, saramaConfig)
}
12 changes: 11 additions & 1 deletion pkg/kafka/producer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package producer

import (
"github.com/Shopify/sarama"
"github.com/jaegertracing/jaeger/pkg/kafka/config"
"github.com/pkg/errors"
)

// Builder builds a new kafka producer
Expand All @@ -25,12 +27,20 @@ type Builder interface {

// Configuration describes the configuration properties needed to create a Kafka producer
type Configuration struct {
Brokers []string
config.Configuration
}

// NewProducer creates a new asynchronous kafka producer
func (c *Configuration) NewProducer() (sarama.AsyncProducer, error) {
saramaConfig := sarama.NewConfig()
saramaConfig.Producer.Return.Successes = true
if c.TLS.Enabled {
saramaConfig.Net.TLS.Enable = true
tlsConf, err := c.TLS.GetTLSConfig()
if err != nil {
return nil, errors.Wrap(err, "error parsing tls configuration")
}
saramaConfig.Net.TLS.Config = tlsConf
}
return sarama.NewAsyncProducer(c.Brokers, saramaConfig)
}
39 changes: 37 additions & 2 deletions plugin/storage/kafka/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package kafka
import (
"flag"
"fmt"
"github.com/jaegertracing/jaeger/pkg/kafka/config"
"strings"

"github.com/spf13/viper"
Expand All @@ -37,10 +38,18 @@ const (
suffixBrokers = ".brokers"
suffixTopic = ".topic"
suffixEncoding = ".encoding"
suffixTLS = ".tls"
suffixCert = ".tls.cert"
suffixKey = ".tls.key"
suffixCA = ".tls.ca"

defaultBroker = "127.0.0.1:9092"
defaultTopic = "jaeger-spans"
defaultEncoding = EncodingProto
defaultTLS = false
defaultCAPath = ""
defaultCertPath = ""
defaultKeyPath = ""
)

var (
Expand Down Expand Up @@ -70,6 +79,22 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
defaultEncoding,
fmt.Sprintf(`(experimental) Encoding of spans ("%s" or "%s") sent to kafka.`, EncodingJSON, EncodingProto),
)
flagSet.Bool(
configPrefix+suffixTLS,
defaultTLS,
"Enable TLS with client certificates.")
flagSet.String(
configPrefix+suffixCert,
defaultCertPath,
"Path to TLS certificate file")
flagSet.String(
configPrefix+suffixKey,
defaultKeyPath,
"Path to TLS key file")
flagSet.String(
configPrefix+suffixCA,
defaultCAPath,
"Path to TLS CA file")

// TODO: Remove deprecated flags after 1.11
flagSet.String(
Expand All @@ -89,7 +114,15 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
// InitFromViper initializes Options with properties from viper
func (opt *Options) InitFromViper(v *viper.Viper) {
opt.config = producer.Configuration{
Brokers: strings.Split(stripWhiteSpace(v.GetString(configPrefix+suffixBrokers)), ","),
Configuration: config.Configuration{
Brokers: strings.Split(stripWhiteSpace(v.GetString(configPrefix+suffixBrokers)), ","),
TLS: config.TLSConfig{
Enabled: v.GetBool(configPrefix + suffixTLS),
CaPath: v.GetString(configPrefix + suffixCA),
CertPath: v.GetString(configPrefix + suffixCert),
KeyPath: v.GetString(configPrefix + suffixKey),
},
},
}
opt.topic = v.GetString(configPrefix + suffixTopic)
opt.encoding = v.GetString(configPrefix + suffixEncoding)
Expand All @@ -100,7 +133,9 @@ func (opt *Options) InitFromViper(v *viper.Viper) {
configPrefix+suffixBrokers,
)
opt.config = producer.Configuration{
Brokers: strings.Split(stripWhiteSpace(brokers), ","),
Configuration: config.Configuration{
Brokers: strings.Split(stripWhiteSpace(brokers), ","),
},
}
}
if topic := v.GetString(deprecatedPrefix + suffixTopic); topic != "" {
Expand Down

0 comments on commit 01def27

Please sign in to comment.