diff --git a/config.go b/config.go index f9299a8c9..ad970a3f0 100644 --- a/config.go +++ b/config.go @@ -15,7 +15,9 @@ import ( const defaultClientID = "sarama" -var validID = regexp.MustCompile(`\A[A-Za-z0-9._-]+\z`) +// validClientID specifies the permitted characters for a client.id when +// connecting to Kafka versions before 1.0.0 (KIP-190) +var validClientID = regexp.MustCompile(`\A[A-Za-z0-9._-]+\z`) // Config is used to pass multiple configuration options to Sarama's constructors. type Config struct { @@ -846,8 +848,11 @@ func (c *Config) Validate() error { switch { case c.ChannelBufferSize < 0: return ConfigurationError("ChannelBufferSize must be >= 0") - case !validID.MatchString(c.ClientID): - return ConfigurationError("ClientID is invalid") + } + + // only validate clientID locally for Kafka versions before KIP-190 was implemented + if !c.Version.IsAtLeast(V1_0_0_0) && !validClientID.MatchString(c.ClientID) { + return ConfigurationError(fmt.Sprintf("ClientID value %q is not valid for Kafka versions before 1.0.0", c.ClientID)) } return nil diff --git a/config_test.go b/config_test.go index 9477e6bd9..4db9af95c 100644 --- a/config_test.go +++ b/config_test.go @@ -2,11 +2,13 @@ package sarama import ( "errors" + "fmt" "os" "strings" "testing" "github.com/rcrowley/go-metrics" + assert "github.com/stretchr/testify/require" ) // NewTestConfig returns a config meant to be used by tests. @@ -30,23 +32,23 @@ func TestDefaultConfigValidates(t *testing.T) { } } -func TestInvalidClientIDConfigValidates(t *testing.T) { - config := NewTestConfig() - config.ClientID = "foo:bar" - err := config.Validate() - var target ConfigurationError - if !errors.As(err, &target) || string(target) != "ClientID is invalid" { - t.Error("Expected invalid ClientID, got ", err) - } -} - -func TestEmptyClientIDConfigValidates(t *testing.T) { - config := NewTestConfig() - config.ClientID = "" - err := config.Validate() - var target ConfigurationError - if !errors.As(err, &target) || string(target) != "ClientID is invalid" { - t.Error("Expected invalid ClientID, got ", err) +// TestInvalidClientIDValidated ensures that the ClientID field is checked +// when Version is set to anything less than 1_0_0_0, but otherwise accepted +func TestInvalidClientIDValidated(t *testing.T) { + for _, version := range SupportedVersions { + for _, clientID := range []string{"", "foo:bar", "foo|bar"} { + config := NewTestConfig() + config.ClientID = clientID + config.Version = version + err := config.Validate() + if config.Version.IsAtLeast(V1_0_0_0) { + assert.NoError(t, err) + continue + } + var target ConfigurationError + assert.ErrorAs(t, err, &target) + assert.ErrorContains(t, err, fmt.Sprintf("ClientID value %q is not valid for Kafka versions before 1.0.0", clientID)) + } } } diff --git a/mocks/async_producer_test.go b/mocks/async_producer_test.go index 69510d144..e282109e2 100644 --- a/mocks/async_producer_test.go +++ b/mocks/async_producer_test.go @@ -251,14 +251,15 @@ func (brokePartitioner) RequiresConsistency() bool { return false } func TestProducerWithInvalidConfiguration(t *testing.T) { trm := newTestReporterMock() config := NewTestConfig() - config.ClientID = "not a valid client ID" + config.Version = sarama.V0_11_0_2 + config.ClientID = "not a valid producer ID" mp := NewAsyncProducer(trm, config) if err := mp.Close(); err != nil { t.Error(err) } if len(trm.errors) != 1 { t.Error("Expected to report a single error") - } else if !strings.Contains(trm.errors[0], "ClientID is invalid") { + } else if !strings.Contains(trm.errors[0], `ClientID value "not a valid producer ID" is not valid for Kafka versions before 1.0.0`) { t.Errorf("Unexpected error: %s", trm.errors[0]) } } diff --git a/mocks/consumer_test.go b/mocks/consumer_test.go index 9a23839cb..1d8659a3d 100644 --- a/mocks/consumer_test.go +++ b/mocks/consumer_test.go @@ -405,7 +405,8 @@ func TestConsumerOffsetsAreManagedCorrectlyWithSpecifiedOffset(t *testing.T) { func TestConsumerInvalidConfiguration(t *testing.T) { trm := newTestReporterMock() config := NewTestConfig() - config.ClientID = "not a valid client ID" + config.Version = sarama.V0_11_0_2 + config.ClientID = "not a valid consumer ID" consumer := NewConsumer(trm, config) if err := consumer.Close(); err != nil { t.Error(err) @@ -413,7 +414,7 @@ func TestConsumerInvalidConfiguration(t *testing.T) { if len(trm.errors) != 1 { t.Error("Expected to report a single error") - } else if !strings.Contains(trm.errors[0], "ClientID is invalid") { + } else if !strings.Contains(trm.errors[0], `ClientID value "not a valid consumer ID" is not valid for Kafka versions before 1.0.0`) { t.Errorf("Unexpected error: %s", trm.errors[0]) } } diff --git a/mocks/sync_producer_test.go b/mocks/sync_producer_test.go index fb6f058bc..f2e51cf5e 100644 --- a/mocks/sync_producer_test.go +++ b/mocks/sync_producer_test.go @@ -356,7 +356,8 @@ func (f faultyEncoder) Length() int { func TestSyncProducerInvalidConfiguration(t *testing.T) { trm := newTestReporterMock() config := NewTestConfig() - config.ClientID = "not a valid client ID" + config.Version = sarama.V0_11_0_2 + config.ClientID = "not a valid producer ID" mp := NewSyncProducer(trm, config) if err := mp.Close(); err != nil { t.Error(err) @@ -364,7 +365,7 @@ func TestSyncProducerInvalidConfiguration(t *testing.T) { if len(trm.errors) != 1 { t.Error("Expected to report a single error") - } else if !strings.Contains(trm.errors[0], "ClientID is invalid") { + } else if !strings.Contains(trm.errors[0], `ClientID value "not a valid producer ID" is not valid for Kafka versions before 1.0.0`) { t.Errorf("Unexpected error: %s", trm.errors[0]) } }