Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SASL/SCRAM authentication mechanism on kafka receiver and exporter #2503

Merged
merged 1 commit into from
Feb 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/schemagen/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -957,8 +957,11 @@ github.com/vektah/gqlparser v1.1.2/go.mod h1:1ycwN7Ij5njmMkPPAOaRFY4rET2Enx7IkVv
github.com/vektra/mockery v0.0.0-20181123154057-e78b021dcbb5/go.mod h1:ppEjwdhyy7Y31EnHRDm1JkChoC7LXIJ7Ex0VYLWtZtQ=
github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad/go.mod h1:Hy8o65+MXnS6EwGElrSRjUzQDLXreJlzYLlWiHtt8hM=
github.com/willf/bitset v1.1.3/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
github.com/xdg-go/scram v0.0.0-20180814205039-7eeb5667e42c h1:Wm21TPasVdeOUTg1m/uNkRdMuvI+jIeYfTIwq98Z2V0=
github.com/xdg-go/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:FV1RpvYFmF8wnKtr3ArzkC0b+tAySCbw8eP7QSIvLKM=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xlab/treeprint v0.0.0-20180616005107-d6fb6747feb6/go.mod h1:ce1O1j6UtZfjr22oyGxGLbauSBp2YVXpARAosm7dHBg=
Expand Down
4 changes: 4 additions & 0 deletions exporter/kafkaexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ The following settings can be optionally configured:
- `plain_text`
- `username`: The username to use.
- `password`: The password to use
- `sasl`
- `username`: The username to use.
- `password`: The password to use
- `mechanism`: The sasl mechanism to use (SCRAM-SHA-256, SCRAM-SHA-512 or PLAIN)
- `tls`
- `ca_file`: path to the CA cert. For a client this verifies the server certificate. Should
only be used if `insecure` is set to true.
Expand Down
49 changes: 49 additions & 0 deletions exporter/kafkaexporter/authentication.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package kafkaexporter

import (
"crypto/sha256"
"crypto/sha512"
"fmt"

"github.com/Shopify/sarama"
Expand All @@ -25,6 +27,7 @@ import (
// Authentication defines authentication.
type Authentication struct {
PlainText *PlainTextConfig `mapstructure:"plain_text"`
SASL *SASLConfig `mapstructure:"sasl"`
TLS *configtls.TLSClientSetting `mapstructure:"tls"`
Kerberos *KerberosConfig `mapstructure:"kerberos"`
}
Expand All @@ -35,6 +38,16 @@ type PlainTextConfig struct {
Password string `mapstructure:"password"`
}

// SASLConfig defines the configuration for the SASL authentication.
type SASLConfig struct {
// Username to be used on authentication
Username string `mapstructure:"username"`
// Password to be used on authentication
Password string `mapstructure:"password"`
// SASL Mechanism to be used, possible values are: (PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512).
Mechanism string `mapstructure:"mechanism"`
}

// KerberosConfig defines kereros configuration.
type KerberosConfig struct {
ServiceName string `mapstructure:"service_name"`
Expand All @@ -56,6 +69,12 @@ func ConfigureAuthentication(config Authentication, saramaConfig *sarama.Config)
return err
}
}
if config.SASL != nil {
if err := configureSASL(*config.SASL, saramaConfig); err != nil {
return err
}
}

if config.Kerberos != nil {
configureKerberos(*config.Kerberos, saramaConfig)
}
Expand All @@ -68,6 +87,36 @@ func configurePlaintext(config PlainTextConfig, saramaConfig *sarama.Config) {
saramaConfig.Net.SASL.Password = config.Password
}

func configureSASL(config SASLConfig, saramaConfig *sarama.Config) error {

if config.Username == "" {
return fmt.Errorf("username have to be provided")
}

if config.Password == "" {
return fmt.Errorf("password have to be provided")
}

saramaConfig.Net.SASL.Enable = true
saramaConfig.Net.SASL.User = config.Username
altieresfreitas marked this conversation as resolved.
Show resolved Hide resolved
saramaConfig.Net.SASL.Password = config.Password

switch config.Mechanism {
case "SCRAM-SHA-512":
saramaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: sha512.New} }
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
case "SCRAM-SHA-256":
saramaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: sha256.New} }
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
case "PLAIN":
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypePlaintext
default:
return fmt.Errorf("invalid SASL Mechanism %q: can be either \"PLAIN\" , \"SCRAM-SHA-256\" or \"SCRAM-SHA-512\"", config.Mechanism)
}

return nil
}

func configureTLS(config configtls.TLSClientSetting, saramaConfig *sarama.Config) error {
tlsConfig, err := config.LoadTLSConfig()
if err != nil {
Expand Down
49 changes: 49 additions & 0 deletions exporter/kafkaexporter/authentication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,25 @@ func TestAuthentication(t *testing.T) {
saramaPlaintext.Net.SASL.User = "jdoe"
saramaPlaintext.Net.SASL.Password = "pass"

saramaSASLSCRAM256Config := &sarama.Config{}
saramaSASLSCRAM256Config.Net.SASL.Enable = true
saramaSASLSCRAM256Config.Net.SASL.User = "jdoe"
saramaSASLSCRAM256Config.Net.SASL.Password = "pass"
saramaSASLSCRAM256Config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256

saramaSASLSCRAM512Config := &sarama.Config{}
saramaSASLSCRAM512Config.Net.SASL.Enable = true
saramaSASLSCRAM512Config.Net.SASL.User = "jdoe"
saramaSASLSCRAM512Config.Net.SASL.Password = "pass"
saramaSASLSCRAM512Config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512

saramaSASLPLAINConfig := &sarama.Config{}
saramaSASLPLAINConfig.Net.SASL.Enable = true
saramaSASLPLAINConfig.Net.SASL.User = "jdoe"
saramaSASLPLAINConfig.Net.SASL.Password = "pass"

saramaSASLPLAINConfig.Net.SASL.Mechanism = sarama.SASLTypePlaintext

saramaTLSCfg := &sarama.Config{}
saramaTLSCfg.Net.TLS.Enable = true
tlsClient := configtls.TLSClientSetting{}
Expand Down Expand Up @@ -77,6 +96,34 @@ func TestAuthentication(t *testing.T) {
auth: Authentication{Kerberos: &KerberosConfig{UseKeyTab: true, KeyTabPath: "/path"}},
saramaConfig: saramaKerberosKeyTabCfg,
},
{
auth: Authentication{SASL: &SASLConfig{Username: "jdoe", Password: "pass", Mechanism: "SCRAM-SHA-256"}},
saramaConfig: saramaSASLSCRAM256Config,
},
{
auth: Authentication{SASL: &SASLConfig{Username: "jdoe", Password: "pass", Mechanism: "SCRAM-SHA-512"}},
saramaConfig: saramaSASLSCRAM512Config,
},

{
auth: Authentication{SASL: &SASLConfig{Username: "jdoe", Password: "pass", Mechanism: "PLAIN"}},
saramaConfig: saramaSASLPLAINConfig,
},
{
auth: Authentication{SASL: &SASLConfig{Username: "jdoe", Password: "pass", Mechanism: "SCRAM-SHA-222"}},
saramaConfig: saramaSASLSCRAM512Config,
err: "invalid SASL Mechanism",
},
{
auth: Authentication{SASL: &SASLConfig{Username: "", Password: "pass", Mechanism: "SCRAM-SHA-512"}},
saramaConfig: saramaSASLSCRAM512Config,
err: "username have to be provided",
},
{
auth: Authentication{SASL: &SASLConfig{Username: "jdoe", Password: "", Mechanism: "SCRAM-SHA-512"}},
saramaConfig: saramaSASLSCRAM512Config,
err: "password have to be provided",
},
}
for _, test := range tests {
t.Run("", func(t *testing.T) {
Expand All @@ -86,6 +133,8 @@ func TestAuthentication(t *testing.T) {
require.Error(t, err)
assert.Contains(t, err.Error(), test.err)
} else {
// equalizes SCRAMClientGeneratorFunc to do assertion with the same reference.
config.Net.SASL.SCRAMClientGeneratorFunc = test.saramaConfig.Net.SASL.SCRAMClientGeneratorFunc
assert.Equal(t, test.saramaConfig, config)
}
})
Expand Down
47 changes: 47 additions & 0 deletions exporter/kafkaexporter/scram_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafkaexporter

import (
"github.com/Shopify/sarama"
"github.com/xdg-go/scram"
)

var _ sarama.SCRAMClient = (*XDGSCRAMClient)(nil)

// XDGSCRAMClient uses xdg-go scram to authentication conversation
type XDGSCRAMClient struct {
*scram.Client
*scram.ClientConversation
scram.HashGeneratorFcn
}

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
if err != nil {
return err
}
x.ClientConversation = x.Client.NewConversation()
return nil
}

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
return x.ClientConversation.Step(challenge)

}

func (x *XDGSCRAMClient) Done() bool {
return x.ClientConversation.Done()
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ require (
github.com/tinylib/msgp v1.1.5
github.com/uber/jaeger-lib v2.4.0+incompatible
go.opencensus.io v0.22.6
github.com/xdg-go/scram v0.0.0-20180814205039-7eeb5667e42c
go.uber.org/atomic v1.7.0
go.uber.org/zap v1.16.0
golang.org/x/sys v0.0.0-20201214210602-f9fddec55a1e
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -959,8 +959,11 @@ github.com/vektah/gqlparser v1.1.2/go.mod h1:1ycwN7Ij5njmMkPPAOaRFY4rET2Enx7IkVv
github.com/vektra/mockery v0.0.0-20181123154057-e78b021dcbb5/go.mod h1:ppEjwdhyy7Y31EnHRDm1JkChoC7LXIJ7Ex0VYLWtZtQ=
github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad/go.mod h1:Hy8o65+MXnS6EwGElrSRjUzQDLXreJlzYLlWiHtt8hM=
github.com/willf/bitset v1.1.3/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
github.com/xdg-go/scram v0.0.0-20180814205039-7eeb5667e42c h1:Wm21TPasVdeOUTg1m/uNkRdMuvI+jIeYfTIwq98Z2V0=
github.com/xdg-go/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:FV1RpvYFmF8wnKtr3ArzkC0b+tAySCbw8eP7QSIvLKM=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xlab/treeprint v0.0.0-20180616005107-d6fb6747feb6/go.mod h1:ce1O1j6UtZfjr22oyGxGLbauSBp2YVXpARAosm7dHBg=
Expand Down