Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added SASLVersion in config #1410

Merged
merged 1 commit into from
Jun 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"crypto/tls"
"encoding/binary"
"fmt"
metrics "github.com/rcrowley/go-metrics"
"io"
"net"
"sort"
Expand All @@ -13,6 +12,8 @@ import (
"sync"
"sync/atomic"
"time"

metrics "github.com/rcrowley/go-metrics"
)

// Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
Expand Down Expand Up @@ -944,19 +945,16 @@ func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int
func (b *Broker) sendAndReceiveSASLPlainAuth() error {
// default to V0 to allow for backward compatability when SASL is enabled
// but not the handshake
saslHandshake := SASLHandshakeV0
if b.conf.Net.SASL.Handshake {
if b.conf.Version.IsAtLeast(V1_0_0_0) {
saslHandshake = SASLHandshakeV1
}
handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, saslHandshake)

handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, b.conf.Net.SASL.Version)
if handshakeErr != nil {
Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
return handshakeErr
}
}

if saslHandshake == SASLHandshakeV1 {
if b.conf.Net.SASL.Version == SASLHandshakeV1 {
return b.sendAndReceiveV1SASLPlainAuth()
}
return b.sendAndReceiveV0SASLPlainAuth()
Expand Down
1 change: 1 addition & 0 deletions broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ func TestSASLPlainAuth(t *testing.T) {
conf.Net.SASL.Mechanism = SASLTypePlaintext
conf.Net.SASL.User = "token"
conf.Net.SASL.Password = "password"
conf.Net.SASL.Version = SASLHandshakeV1

broker.conf = conf
broker.conf.Version = V1_0_0_0
Expand Down
4 changes: 4 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ type Config struct {
// SASLMechanism is the name of the enabled SASL mechanism.
// Possible values: OAUTHBEARER, PLAIN (defaults to PLAIN).
Mechanism SASLMechanism
// Version is the SASL Protocol Version to use
// Kafka > 1.x should use V1, except on Azure EventHub which use V0
Version int16
// Whether or not to send the Kafka SASL handshake first if enabled
// (defaults to true). You should only set this to false if you're using
// a non-Kafka SASL proxy.
Expand Down Expand Up @@ -398,6 +401,7 @@ func NewConfig() *Config {
c.Net.ReadTimeout = 30 * time.Second
c.Net.WriteTimeout = 30 * time.Second
c.Net.SASL.Handshake = true
c.Net.SASL.Version = SASLHandshakeV0

c.Metadata.Retry.Max = 3
c.Metadata.Retry.Backoff = 250 * time.Millisecond
Expand Down