Skip to content

Commit

Permalink
Add kafka kerberos authentication support for collector/ingester
Browse files Browse the repository at this point in the history
Signed-off-by: Ruben Vargas <ruben.vp8510@gmail.com>
  • Loading branch information
rubenvp8510 committed Jun 7, 2019
1 parent 7f2a49d commit 4424d7e
Show file tree
Hide file tree
Showing 9 changed files with 337 additions and 12 deletions.
101 changes: 98 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ required = [

[[constraint]]
name = "github.com/Shopify/sarama"
version = "1.20.1"
revision = "cd910a683f9faa57222e5120d17b60d2e65f7fa9"

[[constraint]]
name = "github.com/grpc-ecosystem/go-grpc-middleware"
Expand All @@ -171,3 +171,7 @@ required = [
[[constraint]]
name = "github.com/hashicorp/go-hclog"
version = "0.8.0"

[[override]]
name = "github.com/Shopify/sarama"
revision = "cd910a683f9faa57222e5120d17b60d2e65f7fa9"
66 changes: 66 additions & 0 deletions cmd/ingester/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,24 @@ const (
SuffixParallelism = ".parallelism"
// SuffixHTTPPort is a suffix for the HTTP port
SuffixHTTPPort = ".http-port"
// SuffixAuthentication for enable or disable authentication when connect to kafka cluster.
SuffixAuthentication = ".authentication"
// KerberosPrefix for Kerberos configuration options
KerberosPrefix = ".kerberos"
// SuffixKerberosServiceName is the suffix for Kerberos service name
SuffixKerberosServiceName = ".service-name"
// SuffixKerberosRealm is the suffix for Kerberos realm name
SuffixKerberosRealm = ".realm"
// SuffixKerberosUseKeyTab is the suffix determine if kerberos should use keytab file or password
SuffixKerberosUseKeyTab = ".use-keytab"
// SuffixKerberosUserName is Kerberos username
SuffixKerberosUserName = ".username"
// SuffixKerberosPassword is Kerberos password
SuffixKerberosPassword = ".password"
// SuffixKerberosConfig is path to the kerberos configuration file.
SuffixKerberosConfig = ".config-path"
// SuffixKerberosKeyTab is path keytab file used instead of password when SuffixKerberosUseKeyTab = true
SuffixKerberosKeyTab = ".keytab-path"

// DefaultBroker is the default kafka broker
DefaultBroker = "127.0.0.1:9092"
Expand All @@ -63,6 +81,12 @@ const (
DefaultEncoding = kafka.EncodingProto
// DefaultDeadlockInterval is the default deadlock interval
DefaultDeadlockInterval = 1 * time.Minute
// DefaultAuthentication is the default value for enable/disable authentication
DefaultAuthentication = "none"
// DefaultKerberosConfig is the default kerberos configuration path
DefaultKerberosConfig = "/etc/krb5.conf"
// DefaultKerberosUseKeyTab is the default use of keytab file
DefaultKerberosUseKeyTab = false
)

// Options stores the configuration options for the Ingester
Expand Down Expand Up @@ -103,6 +127,39 @@ func AddFlags(flagSet *flag.FlagSet) {
ConfigPrefix+SuffixDeadlockInterval,
DefaultDeadlockInterval,
"Interval to check for deadlocks. If no messages gets processed in given time, ingester app will exit. Value of 0 disables deadlock check.")
flagSet.String(
KafkaConsumerConfigPrefix+SuffixAuthentication,
DefaultAuthentication,
"Authentication type used to authenticate with kafka cluster. default value is none")
// Kerberos
flagSet.String(
KafkaConsumerConfigPrefix+KerberosPrefix+SuffixKerberosServiceName,
strconv.Itoa(DefaultParallelism),
"Kerberos service name")
flagSet.String(
KafkaConsumerConfigPrefix+KerberosPrefix+SuffixKerberosRealm,
strconv.Itoa(DefaultParallelism),
"Kerberos realm")
flagSet.String(
KafkaConsumerConfigPrefix+KerberosPrefix+SuffixKerberosPassword,
strconv.Itoa(DefaultParallelism),
"The Kerberos password used for authenticate, when "+KafkaConsumerConfigPrefix+KerberosPrefix+SuffixKerberosUseKeyTab+"=false.")
flagSet.String(
KafkaConsumerConfigPrefix+KerberosPrefix+SuffixKerberosUserName,
strconv.Itoa(DefaultParallelism),
"The Kerberos username used for authenticate with KDC")
flagSet.String(
KafkaConsumerConfigPrefix+KerberosPrefix+SuffixKerberosConfig,
DefaultKerberosConfig,
"Path to Kerberos configuration. i.e /etc/krb5.conf")
flagSet.Bool(
KafkaConsumerConfigPrefix+KerberosPrefix+SuffixKerberosUseKeyTab,
DefaultKerberosUseKeyTab,
"Use of keytab instead of password, if this is true, keytab file will be used instead of password")
flagSet.String(
KafkaConsumerConfigPrefix+KerberosPrefix+SuffixKerberosKeyTab,
strconv.Itoa(DefaultParallelism),
"Path to keytab file. i.e /etc/security/kafka.keytab")
}

// InitFromViper initializes Builder with properties from viper
Expand All @@ -115,6 +172,15 @@ func (o *Options) InitFromViper(v *viper.Viper) {

o.Parallelism = v.GetInt(ConfigPrefix + SuffixParallelism)
o.DeadlockInterval = v.GetDuration(ConfigPrefix + SuffixDeadlockInterval)

o.Authentication = v.GetString(KafkaConsumerConfigPrefix + SuffixAuthentication)
o.Kerberos.ServiceName = v.GetString(ConfigPrefix + KerberosPrefix + SuffixKerberosServiceName)
o.Kerberos.Realm = v.GetString(ConfigPrefix + KerberosPrefix + SuffixKerberosRealm)
o.Kerberos.KeyTabPath = v.GetString(ConfigPrefix + KerberosPrefix + SuffixKerberosKeyTab)
o.Kerberos.Password = v.GetString(ConfigPrefix + KerberosPrefix + SuffixKerberosPassword)
o.Kerberos.Username = v.GetString(ConfigPrefix + KerberosPrefix + SuffixKerberosUserName)
o.Kerberos.ConfigPath = v.GetString(ConfigPrefix + KerberosPrefix + SuffixKerberosConfig)
o.Kerberos.UseKeyTab = v.GetBool(ConfigPrefix + KerberosPrefix + SuffixKerberosUseKeyTab)
}

// stripWhiteSpace removes all whitespace characters from a string
Expand Down
1 change: 1 addition & 0 deletions pkg/kafka/auth/.nocover
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
requires connection to Kafka
46 changes: 46 additions & 0 deletions pkg/kafka/auth/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package auth

import (
"log"
"strings"

"github.com/Shopify/sarama"
)

const none = "none"
const kerberos = "kerberos"

// AuthenticationConfig describes the configuration properties needed authenticate with kafka cluster
type AuthenticationConfig struct {
Authentication string
Kerberos KerberosConfig
}

//SetConfiguration set configure authentication into sarama config structure
func SetConfiguration(config AuthenticationConfig, saramaConfig *sarama.Config) {
authentication := strings.ToLower(config.Authentication)
if strings.Trim(authentication, " ") == "" {
authentication = none
}
switch authentication {
case kerberos:
setKerberosConfiguration(&config.Kerberos, saramaConfig)
case none:
return
}
log.Fatalf("Unknown/Unsupported authentication method %s to kafka cluster.", config.Authentication)
}
45 changes: 45 additions & 0 deletions pkg/kafka/auth/kerberos.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package auth

import (
"github.com/Shopify/sarama"
)

// KerberosConfig describes the configuration properties needed for Kerberos authentication with kafka consumer
type KerberosConfig struct {
ServiceName string
Realm string
UseKeyTab bool
Username string
Password string
ConfigPath string
KeyTabPath string
}

func setKerberosConfiguration(config *KerberosConfig, saramaConfig *sarama.Config) {
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeGSSAPI
if config.UseKeyTab {
saramaConfig.Net.SASL.GSSAPI.KeyTabPath = config.KeyTabPath
saramaConfig.Net.SASL.GSSAPI.AuthType = sarama.KRB5_KEYTAB_AUTH
} else {
saramaConfig.Net.SASL.GSSAPI.AuthType = sarama.KRB5_USER_AUTH
saramaConfig.Net.SASL.GSSAPI.KeyTabPath = config.Password
}
saramaConfig.Net.SASL.GSSAPI.KerberosConfigPath = config.ConfigPath
saramaConfig.Net.SASL.GSSAPI.Username = config.Username
saramaConfig.Net.SASL.GSSAPI.Realm = config.Realm
saramaConfig.Net.SASL.GSSAPI.ServiceName = config.ServiceName
}
4 changes: 4 additions & 0 deletions pkg/kafka/consumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"io"

"github.com/bsm/sarama-cluster"

"github.com/jaegertracing/jaeger/pkg/kafka/auth"
)

// Consumer is an interface to features of Sarama that are necessary for the consumer
Expand All @@ -39,12 +41,14 @@ type Configuration struct {
GroupID string
ClientID string
Consumer
auth.AuthenticationConfig
}

// NewConsumer creates a new kafka consumer
func (c *Configuration) NewConsumer() (Consumer, error) {
saramaConfig := cluster.NewConfig()
saramaConfig.Group.Mode = cluster.ConsumerModePartitions
saramaConfig.ClientID = c.ClientID
auth.SetConfiguration(c.AuthenticationConfig, &saramaConfig.Config)
return cluster.NewConsumer(c.Brokers, c.GroupID, []string{c.Topic}, saramaConfig)
}
4 changes: 4 additions & 0 deletions pkg/kafka/producer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package producer

import (
"github.com/Shopify/sarama"

"github.com/jaegertracing/jaeger/pkg/kafka/auth"
)

// Builder builds a new kafka producer
Expand All @@ -26,11 +28,13 @@ type Builder interface {
// Configuration describes the configuration properties needed to create a Kafka producer
type Configuration struct {
Brokers []string
auth.AuthenticationConfig
}

// NewProducer creates a new asynchronous kafka producer
func (c *Configuration) NewProducer() (sarama.AsyncProducer, error) {
saramaConfig := sarama.NewConfig()
saramaConfig.Producer.Return.Successes = true
auth.SetConfiguration(c.AuthenticationConfig, saramaConfig)
return sarama.NewAsyncProducer(c.Brokers, saramaConfig)
}
Loading

0 comments on commit 4424d7e

Please sign in to comment.