diff --git a/Gopkg.lock b/Gopkg.lock index 2926746e193f..59234240bbe0 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -42,15 +42,14 @@ revision = "de5bf2ad457846296e2031421a34e2568e304e35" [[projects]] - digest = "1:7e31a67d6e81ae7bac48b27c9260ad164eff0abdb4300d0f2aa8d4856cc45479" + digest = "1:42831312efd0280d7ea16fc3e85819127852416547e6b35370fa0845455c56eb" name = "github.com/Shopify/sarama" packages = [ ".", "mocks", ] pruneopts = "UT" - revision = "ea9ab1c316850bee881a07bb2555ee8a685cd4b6" - version = "v1.22.1" + revision = "cd910a683f9faa57222e5120d17b60d2e65f7fa9" [[projects]] digest = "1:8515c0ca4381246cf332cee05fc84070bbbb07bd679b639161506ba532f47128" @@ -476,6 +475,14 @@ revision = "52e1c4730856c1438ced7597c9b5c585a7bd06a2" version = "v1.0.0" +[[projects]] + digest = "1:f14364057165381ea296e49f8870a9ffce2b8a95e34d6ae06c759106aaef428c" + name = "github.com/hashicorp/go-uuid" + packages = ["."] + pruneopts = "UT" + revision = "4f571afc59f3043a65f8fe6bf46d887b10a01d43" + version = "v1.0.1" + [[projects]] digest = "1:c0d19ab64b32ce9fe5cf4ddceba78d5bc9807f0016db6b1183599da3dcc24d10" name = "github.com/hashicorp/hcl" @@ -511,6 +518,17 @@ revision = "76626ae9c91c4f2a10f34cad8ce83ea42c93bb75" version = "v1.0" +[[projects]] + branch = "master" + digest = "1:ae221758bdddd57f5c76f4ee5e4110af32ee62583c46299094697f8f127e63da" + name = "github.com/jcmturner/gofork" + packages = [ + "encoding/asn1", + "x/crypto/pbkdf2", + ] + pruneopts = "UT" + revision = "dc7c13fece037a4a36e2b3c69db4991498d30692" + [[projects]] digest = "1:15ec2166e33ef6c60b344a04d050eec79193517e7f5082b6233b2d09ef0d10b8" name = "github.com/kisielk/gotool" @@ -976,6 +994,17 @@ revision = "27376062155ad36be76b0f12cf1572a221d3a48c" version = "v1.10.0" +[[projects]] + branch = "master" + digest = "1:04b43fe96213ea69cfa6e6b8be218a43a375035ea09d9bdda9fed2691f5a7e76" + name = "golang.org/x/crypto" + packages = [ + "md4", + "pbkdf2", + ] + pruneopts = "UT" + revision = "f99c8df09eb5bff426315721bfa5f16a99cad32c" + [[projects]] branch = "master" digest = "1:6bf120070ed448fd0a139da7b9514006b3390bd815a0013c50cc672c24a74fa1" @@ -1120,6 +1149,72 @@ revision = "d2d2541c53f18d2a059457998ce2876cc8e67cbf" version = "v0.9.1" +[[projects]] + digest = "1:c902038ee2d6f964d3b9f2c718126571410c5d81251cbab9fe58abd37803513c" + name = "gopkg.in/jcmturner/aescts.v1" + packages = ["."] + pruneopts = "UT" + revision = "f6abebb3171c4c1b1fea279cb7c7325020a26290" + version = "v1.0.1" + +[[projects]] + digest = "1:a1a3e185c03d79a7452d5d5b4c91be4cc433f55e6ed3a35233d852c966e39013" + name = "gopkg.in/jcmturner/dnsutils.v1" + packages = ["."] + pruneopts = "UT" + revision = "13eeb8d49ffb74d7a75784c35e4d900607a3943c" + version = "v1.0.1" + +[[projects]] + digest = "1:462bc6dbe06e0f5d060b651bcefe0b4a2433799be6758285b888e9ef188c6411" + name = "gopkg.in/jcmturner/gokrb5.v7" + packages = [ + "asn1tools", + "client", + "config", + "credentials", + "crypto", + "crypto/common", + "crypto/etype", + "crypto/rfc3961", + "crypto/rfc3962", + "crypto/rfc4757", + "crypto/rfc8009", + "gssapi", + "iana", + "iana/addrtype", + "iana/adtype", + "iana/asnAppTag", + "iana/chksumtype", + "iana/errorcode", + "iana/etypeID", + "iana/flags", + "iana/keyusage", + "iana/msgtype", + "iana/nametype", + "iana/patype", + "kadmin", + "keytab", + "krberror", + "messages", + "pac", + "types", + ] + pruneopts = "UT" + revision = "bae8ea1f6fab91f6bcb830efe54eb697c8350050" + version = "v7.2.4" + +[[projects]] + digest = "1:0f16d9c577198e3b8d3209f5a89aabe679525b2aba2a7548714e973035c0e232" + name = "gopkg.in/jcmturner/rpc.v1" + packages = [ + "mstypes", + "ndr", + ] + pruneopts = "UT" + revision = "99a8ce2fbf8b8087b6ed12a37c61b10f04070043" + version = "v1.1.0" + [[projects]] digest = "1:9a1d716749c77399bfa71792d77eef3278586423947f3431dbac6d6049c24787" name = "gopkg.in/olivere/elastic.v5" diff --git a/Gopkg.toml b/Gopkg.toml index 8641413990b4..5f920fdd00ef 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -150,7 +150,7 @@ required = [ [[constraint]] name = "github.com/Shopify/sarama" - version = "1.20.1" + revision = "cd910a683f9faa57222e5120d17b60d2e65f7fa9" [[constraint]] name = "github.com/grpc-ecosystem/go-grpc-middleware" @@ -171,3 +171,7 @@ required = [ [[constraint]] name = "github.com/hashicorp/go-hclog" version = "0.8.0" + +[[override]] + name = "github.com/Shopify/sarama" + revision = "cd910a683f9faa57222e5120d17b60d2e65f7fa9" \ No newline at end of file diff --git a/cmd/ingester/app/flags.go b/cmd/ingester/app/flags.go index 39e98e387dae..a5c495ac6c8a 100644 --- a/cmd/ingester/app/flags.go +++ b/cmd/ingester/app/flags.go @@ -48,6 +48,24 @@ const ( SuffixParallelism = ".parallelism" // SuffixHTTPPort is a suffix for the HTTP port SuffixHTTPPort = ".http-port" + // SuffixAuthentication for enable or disable authentication when connect to kafka cluster. + SuffixAuthentication = ".authentication" + // KerberosPrefix for Kerberos configuration options + KerberosPrefix = ".kerberos" + // SuffixKerberosServiceName is the suffix for Kerberos service name + SuffixKerberosServiceName = ".service-name" + // SuffixKerberosRealm is the suffix for Kerberos realm name + SuffixKerberosRealm = ".realm" + // SuffixKerberosUseKeyTab is the suffix determine if kerberos should use keytab file or password + SuffixKerberosUseKeyTab = ".use-keytab" + // SuffixKerberosUserName is Kerberos username + SuffixKerberosUserName = ".username" + // SuffixKerberosPassword is Kerberos password + SuffixKerberosPassword = ".password" + // SuffixKerberosConfig is path to the kerberos configuration file. + SuffixKerberosConfig = ".config-path" + // SuffixKerberosKeyTab is path keytab file used instead of password when SuffixKerberosUseKeyTab = true + SuffixKerberosKeyTab = ".keytab-path" // DefaultBroker is the default kafka broker DefaultBroker = "127.0.0.1:9092" @@ -63,6 +81,12 @@ const ( DefaultEncoding = kafka.EncodingProto // DefaultDeadlockInterval is the default deadlock interval DefaultDeadlockInterval = 1 * time.Minute + // DefaultAuthentication is the default value for enable/disable authentication + DefaultAuthentication = "none" + // DefaultKerberosConfig is the default kerberos configuration path + DefaultKerberosConfig = "/etc/krb5.conf" + // DefaultKerberosUseKeyTab is the default use of keytab file + DefaultKerberosUseKeyTab = false ) // Options stores the configuration options for the Ingester @@ -103,6 +127,39 @@ func AddFlags(flagSet *flag.FlagSet) { ConfigPrefix+SuffixDeadlockInterval, DefaultDeadlockInterval, "Interval to check for deadlocks. If no messages gets processed in given time, ingester app will exit. Value of 0 disables deadlock check.") + flagSet.String( + KafkaConsumerConfigPrefix+SuffixAuthentication, + DefaultAuthentication, + "Authentication type used to authenticate with kafka cluster. default value is none") + // Kerberos + flagSet.String( + KafkaConsumerConfigPrefix+KerberosPrefix+SuffixKerberosServiceName, + strconv.Itoa(DefaultParallelism), + "Kerberos service name") + flagSet.String( + KafkaConsumerConfigPrefix+KerberosPrefix+SuffixKerberosRealm, + strconv.Itoa(DefaultParallelism), + "Kerberos realm") + flagSet.String( + KafkaConsumerConfigPrefix+KerberosPrefix+SuffixKerberosPassword, + strconv.Itoa(DefaultParallelism), + "The Kerberos password used for authenticate, when "+KafkaConsumerConfigPrefix+KerberosPrefix+SuffixKerberosUseKeyTab+"=false.") + flagSet.String( + KafkaConsumerConfigPrefix+KerberosPrefix+SuffixKerberosUserName, + strconv.Itoa(DefaultParallelism), + "The Kerberos username used for authenticate with KDC") + flagSet.String( + KafkaConsumerConfigPrefix+KerberosPrefix+SuffixKerberosConfig, + DefaultKerberosConfig, + "Path to Kerberos configuration. i.e /etc/krb5.conf") + flagSet.Bool( + KafkaConsumerConfigPrefix+KerberosPrefix+SuffixKerberosUseKeyTab, + DefaultKerberosUseKeyTab, + "Use of keytab instead of password, if this is true, keytab file will be used instead of password") + flagSet.String( + KafkaConsumerConfigPrefix+KerberosPrefix+SuffixKerberosKeyTab, + strconv.Itoa(DefaultParallelism), + "Path to keytab file. i.e /etc/security/kafka.keytab") } // InitFromViper initializes Builder with properties from viper @@ -115,6 +172,15 @@ func (o *Options) InitFromViper(v *viper.Viper) { o.Parallelism = v.GetInt(ConfigPrefix + SuffixParallelism) o.DeadlockInterval = v.GetDuration(ConfigPrefix + SuffixDeadlockInterval) + + o.Authentication = v.GetString(KafkaConsumerConfigPrefix + SuffixAuthentication) + o.Kerberos.ServiceName = v.GetString(ConfigPrefix + KerberosPrefix + SuffixKerberosServiceName) + o.Kerberos.Realm = v.GetString(ConfigPrefix + KerberosPrefix + SuffixKerberosRealm) + o.Kerberos.KeyTabPath = v.GetString(ConfigPrefix + KerberosPrefix + SuffixKerberosKeyTab) + o.Kerberos.Password = v.GetString(ConfigPrefix + KerberosPrefix + SuffixKerberosPassword) + o.Kerberos.Username = v.GetString(ConfigPrefix + KerberosPrefix + SuffixKerberosUserName) + o.Kerberos.ConfigPath = v.GetString(ConfigPrefix + KerberosPrefix + SuffixKerberosConfig) + o.Kerberos.UseKeyTab = v.GetBool(ConfigPrefix + KerberosPrefix + SuffixKerberosUseKeyTab) } // stripWhiteSpace removes all whitespace characters from a string diff --git a/pkg/kafka/auth/.nocover b/pkg/kafka/auth/.nocover new file mode 100644 index 000000000000..98344a6f8ba2 --- /dev/null +++ b/pkg/kafka/auth/.nocover @@ -0,0 +1 @@ +requires connection to Kafka diff --git a/pkg/kafka/auth/config.go b/pkg/kafka/auth/config.go new file mode 100644 index 000000000000..4a4f769db471 --- /dev/null +++ b/pkg/kafka/auth/config.go @@ -0,0 +1,46 @@ +// Copyright (c) 2019 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package auth + +import ( + "log" + "strings" + + "github.com/Shopify/sarama" +) + +const none = "none" +const kerberos = "kerberos" + +// AuthenticationConfig describes the configuration properties needed authenticate with kafka cluster +type AuthenticationConfig struct { + Authentication string + Kerberos KerberosConfig +} + +//SetConfiguration set configure authentication into sarama config structure +func SetConfiguration(config AuthenticationConfig, saramaConfig *sarama.Config) { + authentication := strings.ToLower(config.Authentication) + if strings.Trim(authentication, " ") == "" { + authentication = none + } + switch authentication { + case kerberos: + setKerberosConfiguration(&config.Kerberos, saramaConfig) + case none: + return + } + log.Fatalf("Unknown/Unsupported authentication method %s to kafka cluster.", config.Authentication) +} diff --git a/pkg/kafka/auth/kerberos.go b/pkg/kafka/auth/kerberos.go new file mode 100644 index 000000000000..ad847fae852d --- /dev/null +++ b/pkg/kafka/auth/kerberos.go @@ -0,0 +1,45 @@ +// Copyright (c) 2019 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package auth + +import ( + "github.com/Shopify/sarama" +) + +// KerberosConfig describes the configuration properties needed for Kerberos authentication with kafka consumer +type KerberosConfig struct { + ServiceName string + Realm string + UseKeyTab bool + Username string + Password string + ConfigPath string + KeyTabPath string +} + +func setKerberosConfiguration(config *KerberosConfig, saramaConfig *sarama.Config) { + saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeGSSAPI + if config.UseKeyTab { + saramaConfig.Net.SASL.GSSAPI.KeyTabPath = config.KeyTabPath + saramaConfig.Net.SASL.GSSAPI.AuthType = sarama.KRB5_KEYTAB_AUTH + } else { + saramaConfig.Net.SASL.GSSAPI.AuthType = sarama.KRB5_USER_AUTH + saramaConfig.Net.SASL.GSSAPI.KeyTabPath = config.Password + } + saramaConfig.Net.SASL.GSSAPI.KerberosConfigPath = config.ConfigPath + saramaConfig.Net.SASL.GSSAPI.Username = config.Username + saramaConfig.Net.SASL.GSSAPI.Realm = config.Realm + saramaConfig.Net.SASL.GSSAPI.ServiceName = config.ServiceName +} diff --git a/pkg/kafka/consumer/config.go b/pkg/kafka/consumer/config.go index 094914e1b6e2..e4d6a9241433 100644 --- a/pkg/kafka/consumer/config.go +++ b/pkg/kafka/consumer/config.go @@ -18,6 +18,8 @@ import ( "io" "github.com/bsm/sarama-cluster" + + "github.com/jaegertracing/jaeger/pkg/kafka/auth" ) // Consumer is an interface to features of Sarama that are necessary for the consumer @@ -39,6 +41,7 @@ type Configuration struct { GroupID string ClientID string Consumer + auth.AuthenticationConfig } // NewConsumer creates a new kafka consumer @@ -46,5 +49,6 @@ func (c *Configuration) NewConsumer() (Consumer, error) { saramaConfig := cluster.NewConfig() saramaConfig.Group.Mode = cluster.ConsumerModePartitions saramaConfig.ClientID = c.ClientID + auth.SetConfiguration(c.AuthenticationConfig, &saramaConfig.Config) return cluster.NewConsumer(c.Brokers, c.GroupID, []string{c.Topic}, saramaConfig) } diff --git a/pkg/kafka/producer/config.go b/pkg/kafka/producer/config.go index 5079ca8fef22..d40e86fe482f 100644 --- a/pkg/kafka/producer/config.go +++ b/pkg/kafka/producer/config.go @@ -16,6 +16,8 @@ package producer import ( "github.com/Shopify/sarama" + + "github.com/jaegertracing/jaeger/pkg/kafka/auth" ) // Builder builds a new kafka producer @@ -26,11 +28,13 @@ type Builder interface { // Configuration describes the configuration properties needed to create a Kafka producer type Configuration struct { Brokers []string + auth.AuthenticationConfig } // NewProducer creates a new asynchronous kafka producer func (c *Configuration) NewProducer() (sarama.AsyncProducer, error) { saramaConfig := sarama.NewConfig() saramaConfig.Producer.Return.Successes = true + auth.SetConfiguration(c.AuthenticationConfig, saramaConfig) return sarama.NewAsyncProducer(c.Brokers, saramaConfig) } diff --git a/plugin/storage/kafka/options.go b/plugin/storage/kafka/options.go index 4ecb18ce1435..e3a38594e6fa 100644 --- a/plugin/storage/kafka/options.go +++ b/plugin/storage/kafka/options.go @@ -21,6 +21,7 @@ import ( "github.com/spf13/viper" + "github.com/jaegertracing/jaeger/pkg/kafka/auth" "github.com/jaegertracing/jaeger/pkg/kafka/producer" ) @@ -32,14 +33,28 @@ const ( // EncodingZipkinThrift is used for spans encoded as Zipkin Thrift. EncodingZipkinThrift = "zipkin-thrift" - configPrefix = "kafka.producer" - suffixBrokers = ".brokers" - suffixTopic = ".topic" - suffixEncoding = ".encoding" - - defaultBroker = "127.0.0.1:9092" - defaultTopic = "jaeger-spans" - defaultEncoding = EncodingProto + configPrefix = "kafka.producer" + suffixBrokers = ".brokers" + suffixTopic = ".topic" + suffixEncoding = ".encoding" + suffixAuthentication = ".authentication" + // Kerberos configuration options + kerberosPrefix = ".kerberos" + suffixKerberosServiceName = ".serviceName" + suffixKerberosRealm = ".realm" + suffixKerberosUseKeyTab = ".use-keytab" + suffixKerberosUserName = ".username" + suffixKerberosPassword = ".password" + suffixKerberosConfig = ".config-path" + suffixKerberosKeyTab = ".keytab-path" + defaultBroker = "127.0.0.1:9092" + defaultTopic = "jaeger-spans" + defaultEncoding = EncodingProto + defaultKerberosConfig = "/etc/krb5.conf" + defaultKerberosUseKeyTab = false + defaultKerberosServiceName = "kafka" + defaultKerberosRealm = "jaeger" + defaultAuthentication = "none" ) var ( @@ -69,12 +84,57 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) { defaultEncoding, fmt.Sprintf(`(experimental) Encoding of spans ("%s" or "%s") sent to kafka.`, EncodingJSON, EncodingProto), ) + flagSet.String( + configPrefix+suffixAuthentication, + defaultAuthentication, + fmt.Sprintf(`(experimental) Encoding of spans ("%s" or "%s") sent to kafka.`, EncodingJSON, EncodingProto), + ) + flagSet.String( + configPrefix+kerberosPrefix+suffixKerberosServiceName, + defaultKerberosServiceName, + "Kerberos service name") + flagSet.String( + configPrefix+kerberosPrefix+suffixKerberosRealm, + defaultKerberosRealm, + "Kerberos realm") + flagSet.Bool( + configPrefix+kerberosPrefix+suffixKerberosUseKeyTab, + defaultKerberosUseKeyTab, + "Use of keytab instead of password, if this is true, keytab file will be used instead of password") + flagSet.String( + configPrefix+kerberosPrefix+suffixKerberosUserName, + "", + "The Kerberos username used for authenticate with KDC") + flagSet.String( + configPrefix+kerberosPrefix+suffixKerberosPassword, + "", + "The Kerberos password used for authenticate, when "+configPrefix+kerberosPrefix+suffixKerberosUseKeyTab+"=false.") + flagSet.String( + configPrefix+kerberosPrefix+suffixKerberosConfig, + defaultKerberosConfig, + "Path to Kerberos configuration. i.e /etc/krb5.conf") + flagSet.String( + configPrefix+kerberosPrefix+suffixKerberosKeyTab, + defaultKerberosConfig, + "Path to keytab file. i.e /etc/security/kafka.keytab") } // InitFromViper initializes Options with properties from viper func (opt *Options) InitFromViper(v *viper.Viper) { opt.config = producer.Configuration{ Brokers: strings.Split(stripWhiteSpace(v.GetString(configPrefix+suffixBrokers)), ","), + AuthenticationConfig: auth.AuthenticationConfig{ + Authentication: v.GetString(configPrefix + suffixAuthentication), + Kerberos: auth.KerberosConfig{ + ServiceName: v.GetString(configPrefix + kerberosPrefix + suffixKerberosServiceName), + Realm: v.GetString(configPrefix + kerberosPrefix + suffixKerberosRealm), + UseKeyTab: v.GetBool(configPrefix + kerberosPrefix + suffixKerberosUseKeyTab), + Username: v.GetString(configPrefix + kerberosPrefix + suffixKerberosUserName), + Password: v.GetString(configPrefix + kerberosPrefix + suffixKerberosPassword), + ConfigPath: v.GetString(configPrefix + kerberosPrefix + suffixKerberosConfig), + KeyTabPath: v.GetString(configPrefix + kerberosPrefix + suffixKerberosKeyTab), + }, + }, } opt.topic = v.GetString(configPrefix + suffixTopic) opt.encoding = v.GetString(configPrefix + suffixEncoding)