Skip to content

Commit 2f5f210

Browse files
committed
Add kafka kerberos authentication support for collector/ingester
Signed-off-by: Ruben Vargas <ruben.vp8510@gmail.com>
1 parent 7f2a49d commit 2f5f210

File tree

11 files changed

+320
-15
lines changed

11 files changed

+320
-15
lines changed

Gopkg.lock

+98-3
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Gopkg.toml

+5-1
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ required = [
150150

151151
[[constraint]]
152152
name = "github.com/Shopify/sarama"
153-
version = "1.20.1"
153+
revision = "cd910a683f9faa57222e5120d17b60d2e65f7fa9"
154154

155155
[[constraint]]
156156
name = "github.com/grpc-ecosystem/go-grpc-middleware"
@@ -171,3 +171,7 @@ required = [
171171
[[constraint]]
172172
name = "github.com/hashicorp/go-hclog"
173173
version = "0.8.0"
174+
175+
[[override]]
176+
name = "github.com/Shopify/sarama"
177+
revision = "cd910a683f9faa57222e5120d17b60d2e65f7fa9"

cmd/ingester/app/builder/builder.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,11 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit
5151
spanProcessor := processor.NewSpanProcessor(spParams)
5252

5353
consumerConfig := kafkaConsumer.Configuration{
54-
Brokers: options.Brokers,
55-
Topic: options.Topic,
56-
GroupID: options.GroupID,
57-
ClientID: options.ClientID,
54+
Brokers: options.Brokers,
55+
Topic: options.Topic,
56+
GroupID: options.GroupID,
57+
ClientID: options.ClientID,
58+
AuthenticationConfig: options.AuthenticationConfig,
5859
}
5960
saramaConsumer, err := consumerConfig.NewConsumer()
6061
if err != nil {

cmd/ingester/app/flags.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
"github.com/spf13/viper"
2525

26+
"github.com/jaegertracing/jaeger/pkg/kafka/auth"
2627
kafkaConsumer "github.com/jaegertracing/jaeger/pkg/kafka/consumer"
2728
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
2829
)
@@ -48,7 +49,6 @@ const (
4849
SuffixParallelism = ".parallelism"
4950
// SuffixHTTPPort is a suffix for the HTTP port
5051
SuffixHTTPPort = ".http-port"
51-
5252
// DefaultBroker is the default kafka broker
5353
DefaultBroker = "127.0.0.1:9092"
5454
// DefaultTopic is the default kafka topic
@@ -103,6 +103,8 @@ func AddFlags(flagSet *flag.FlagSet) {
103103
ConfigPrefix+SuffixDeadlockInterval,
104104
DefaultDeadlockInterval,
105105
"Interval to check for deadlocks. If no messages gets processed in given time, ingester app will exit. Value of 0 disables deadlock check.")
106+
// Authentication flags
107+
auth.AddFlags(KafkaConsumerConfigPrefix, flagSet)
106108
}
107109

108110
// InitFromViper initializes Builder with properties from viper
@@ -115,6 +117,9 @@ func (o *Options) InitFromViper(v *viper.Viper) {
115117

116118
o.Parallelism = v.GetInt(ConfigPrefix + SuffixParallelism)
117119
o.DeadlockInterval = v.GetDuration(ConfigPrefix + SuffixDeadlockInterval)
120+
authenticationOptions := auth.AuthenticationConfig{}
121+
authenticationOptions.InitFromViper(KafkaConsumerConfigPrefix, v)
122+
o.AuthenticationConfig = authenticationOptions
118123
}
119124

120125
// stripWhiteSpace removes all whitespace characters from a string

pkg/kafka/auth/.nocover

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
requires connection to Kafka

pkg/kafka/auth/config.go

+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// Copyright (c) 2019 The Jaeger Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package auth
16+
17+
import (
18+
"log"
19+
"strings"
20+
21+
"github.com/Shopify/sarama"
22+
"github.com/spf13/viper"
23+
)
24+
25+
const none = "none"
26+
const kerberos = "kerberos"
27+
28+
// AuthenticationConfig describes the configuration properties needed authenticate with kafka cluster
29+
type AuthenticationConfig struct {
30+
Authentication string
31+
Kerberos KerberosConfig
32+
}
33+
34+
//SetConfiguration set configure authentication into sarama config structure
35+
func (config *AuthenticationConfig) SetConfiguration(saramaConfig *sarama.Config) {
36+
authentication := strings.ToLower(config.Authentication)
37+
if strings.Trim(authentication, " ") == "" {
38+
authentication = none
39+
}
40+
switch authentication {
41+
case kerberos:
42+
setKerberosConfiguration(&config.Kerberos, saramaConfig)
43+
case none:
44+
return
45+
default:
46+
log.Fatalf("Unknown/Unsupported authentication method %s to kafka cluster.", config.Authentication)
47+
}
48+
}
49+
50+
// InitFromViper loads authentication configuration from viper flags.
51+
func (config *AuthenticationConfig) InitFromViper(configPrefix string, v *viper.Viper) {
52+
config.Authentication = v.GetString(configPrefix + suffixAuthentication)
53+
config.Kerberos.ServiceName = v.GetString(configPrefix + kerberosPrefix + suffixKerberosServiceName)
54+
config.Kerberos.Realm = v.GetString(configPrefix + kerberosPrefix + suffixKerberosRealm)
55+
config.Kerberos.UseKeyTab = v.GetBool(configPrefix + kerberosPrefix + suffixKerberosUseKeyTab)
56+
config.Kerberos.Username = v.GetString(configPrefix + kerberosPrefix + suffixKerberosUserName)
57+
config.Kerberos.Password = v.GetString(configPrefix + kerberosPrefix + suffixKerberosPassword)
58+
config.Kerberos.ConfigPath = v.GetString(configPrefix + kerberosPrefix + suffixKerberosConfig)
59+
config.Kerberos.KeyTabPath = v.GetString(configPrefix + kerberosPrefix + suffixKerberosKeyTab)
60+
}

pkg/kafka/auth/kerberos.go

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// Copyright (c) 2019 The Jaeger Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package auth
16+
17+
import (
18+
"github.com/Shopify/sarama"
19+
)
20+
21+
// KerberosConfig describes the configuration properties needed for Kerberos authentication with kafka consumer
22+
type KerberosConfig struct {
23+
ServiceName string
24+
Realm string
25+
UseKeyTab bool
26+
Username string
27+
Password string
28+
ConfigPath string
29+
KeyTabPath string
30+
}
31+
32+
func setKerberosConfiguration(config *KerberosConfig, saramaConfig *sarama.Config) {
33+
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeGSSAPI
34+
saramaConfig.Net.SASL.Enable = true
35+
if config.UseKeyTab {
36+
saramaConfig.Net.SASL.GSSAPI.KeyTabPath = config.KeyTabPath
37+
saramaConfig.Net.SASL.GSSAPI.AuthType = sarama.KRB5_KEYTAB_AUTH
38+
} else {
39+
saramaConfig.Net.SASL.GSSAPI.AuthType = sarama.KRB5_USER_AUTH
40+
saramaConfig.Net.SASL.GSSAPI.Password = config.Password
41+
}
42+
saramaConfig.Net.SASL.GSSAPI.KerberosConfigPath = config.ConfigPath
43+
saramaConfig.Net.SASL.GSSAPI.Username = config.Username
44+
saramaConfig.Net.SASL.GSSAPI.Realm = config.Realm
45+
saramaConfig.Net.SASL.GSSAPI.ServiceName = config.ServiceName
46+
}

0 commit comments

Comments
 (0)