From 8057e35570aebf765d8f93dced3df3b0b77bf68c Mon Sep 17 00:00:00 2001 From: Leonid Date: Fri, 21 Feb 2020 23:44:34 +0100 Subject: [PATCH] Sets ConfigEntry.Default flag in addition to the ConfigEntry.Source for Kafka versions > V1_1_0_0 (#1594) * Set describeConfigsRequest.Version in ListTopics for consistency with DescribeConfig This breaks the output of ListTopics for newer request versions, it now includes default configuration settings. * Set ConfigEntry.Default for KafkaVersions > 0 Clients can now rely on the `Default` flag again and don't have to check the `Source` for higher Kafka versions. * Set ConfigEntry.Source to default for KafkaVersions <= 0 when applicable * Add tests for default flag/source --- admin.go | 9 +++ admin_test.go | 2 +- describe_configs_response.go | 4 ++ describe_configs_response_test.go | 104 ++++++++++++++++++++++++++++++ mockresponses.go | 6 +- 5 files changed, 123 insertions(+), 2 deletions(-) diff --git a/admin.go b/admin.go index dd634846d..7dd725064 100644 --- a/admin.go +++ b/admin.go @@ -338,6 +338,15 @@ func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) { describeConfigsReq := &DescribeConfigsRequest{ Resources: describeConfigsResources, } + + if ca.conf.Version.IsAtLeast(V1_1_0_0) { + describeConfigsReq.Version = 1 + } + + if ca.conf.Version.IsAtLeast(V2_0_0_0) { + describeConfigsReq.Version = 2 + } + describeConfigsResp, err := b.DescribeConfigs(describeConfigsReq) if err != nil { return nil, err diff --git a/admin_test.go b/admin_test.go index a6a05bf33..fcbe539b5 100644 --- a/admin_test.go +++ b/admin_test.go @@ -149,7 +149,7 @@ func TestClusterAdminListTopics(t *testing.T) { }) config := NewConfig() - config.Version = V1_0_0_0 + config.Version = V1_1_0_0 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) diff --git a/describe_configs_response.go b/describe_configs_response.go index a18eebab3..dd919f127 100644 --- a/describe_configs_response.go +++ b/describe_configs_response.go @@ -249,12 +249,16 @@ func (r *ConfigEntry) decode(pd packetDecoder, version int16) (err error) { return err } r.Default = defaultB + if defaultB { + r.Source = SourceDefault + } } else { source, err := pd.getInt8() if err != nil { return err } r.Source = ConfigSource(source) + r.Default = r.Source == SourceDefault } sensitive, err := pd.getBool() diff --git a/describe_configs_response_test.go b/describe_configs_response_test.go index 90fcf258c..c91a04220 100644 --- a/describe_configs_response_test.go +++ b/describe_configs_response_test.go @@ -25,6 +25,21 @@ var ( 0, // Sensitive } + describeConfigsResponseWithDefaultv0 = []byte{ + 0, 0, 0, 0, //throttle + 0, 0, 0, 1, // response + 0, 0, //errorcode + 0, 0, //string + 2, // topic + 0, 3, 'f', 'o', 'o', + 0, 0, 0, 1, //configs + 0, 10, 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's', + 0, 4, '1', '0', '0', '0', + 0, // ReadOnly + 1, // Default + 0, // Sensitive + } + describeConfigsResponsePopulatedv1 = []byte{ 0, 0, 0, 0, //throttle 0, 0, 0, 1, // response @@ -59,6 +74,22 @@ var ( 0, 4, '1', '0', '0', '0', 4, // Source } + + describeConfigsResponseWithDefaultv1 = []byte{ + 0, 0, 0, 0, //throttle + 0, 0, 0, 1, // response + 0, 0, //errorcode + 0, 0, //string + 2, // topic + 0, 3, 'f', 'o', 'o', + 0, 0, 0, 1, //configs + 0, 10, 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's', + 0, 4, '1', '0', '0', '0', + 0, // ReadOnly + 5, // Source + 0, // Sensitive + 0, 0, 0, 0, // No Synonym + } ) func TestDescribeConfigsResponsev0(t *testing.T) { @@ -86,6 +117,7 @@ func TestDescribeConfigsResponsev0(t *testing.T) { ReadOnly: false, Default: false, Sensitive: false, + Source: SourceUnknown, }, }, }, @@ -94,6 +126,40 @@ func TestDescribeConfigsResponsev0(t *testing.T) { testResponse(t, "response with error", response, describeConfigsResponsePopulatedv0) } +func TestDescribeConfigsResponseWithDefaultv0(t *testing.T) { + var response *DescribeConfigsResponse + + response = &DescribeConfigsResponse{ + Resources: []*ResourceResponse{}, + } + testVersionDecodable(t, "empty", response, describeConfigsResponseEmpty, 0) + if len(response.Resources) != 0 { + t.Error("Expected no groups") + } + + response = &DescribeConfigsResponse{ + Version: 0, Resources: []*ResourceResponse{ + { + ErrorCode: 0, + ErrorMsg: "", + Type: TopicResource, + Name: "foo", + Configs: []*ConfigEntry{ + { + Name: "segment.ms", + Value: "1000", + ReadOnly: false, + Default: true, + Sensitive: false, + Source: SourceDefault, + }, + }, + }, + }, + } + testResponse(t, "response with default", response, describeConfigsResponseWithDefaultv0) +} + func TestDescribeConfigsResponsev1(t *testing.T) { var response *DescribeConfigsResponse @@ -119,6 +185,7 @@ func TestDescribeConfigsResponsev1(t *testing.T) { Value: "1000", ReadOnly: false, Source: SourceStaticBroker, + Default: false, Sensitive: false, Synonyms: []*ConfigSynonym{}, }, @@ -154,6 +221,7 @@ func TestDescribeConfigsResponseWithSynonym(t *testing.T) { Value: "1000", ReadOnly: false, Source: SourceStaticBroker, + Default: false, Sensitive: false, Synonyms: []*ConfigSynonym{ { @@ -169,3 +237,39 @@ func TestDescribeConfigsResponseWithSynonym(t *testing.T) { } testResponse(t, "response with error", response, describeConfigsResponseWithSynonymv1) } + +func TestDescribeConfigsResponseWithDefaultv1(t *testing.T) { + var response *DescribeConfigsResponse + + response = &DescribeConfigsResponse{ + Resources: []*ResourceResponse{}, + } + testVersionDecodable(t, "empty", response, describeConfigsResponseEmpty, 0) + if len(response.Resources) != 0 { + t.Error("Expected no groups") + } + + response = &DescribeConfigsResponse{ + Version: 1, + Resources: []*ResourceResponse{ + { + ErrorCode: 0, + ErrorMsg: "", + Type: TopicResource, + Name: "foo", + Configs: []*ConfigEntry{ + { + Name: "segment.ms", + Value: "1000", + ReadOnly: false, + Source: SourceDefault, + Default: true, + Sensitive: false, + Synonyms: []*ConfigSynonym{}, + }, + }, + }, + }, + } + testResponse(t, "response with error", response, describeConfigsResponseWithDefaultv1) +} diff --git a/mockresponses.go b/mockresponses.go index a7ba44b10..6fa72ebb0 100644 --- a/mockresponses.go +++ b/mockresponses.go @@ -736,6 +736,7 @@ func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoder { } includeSynonyms := (req.Version > 0) + includeSource := (req.Version > 0) for _, r := range req.Resources { var configEntries []*ConfigEntry @@ -770,9 +771,12 @@ func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoder { maxMessageBytes := &ConfigEntry{Name: "max.message.bytes", Value: "1000000", ReadOnly: false, - Default: true, + Default: !includeSource, Sensitive: false, } + if includeSource { + maxMessageBytes.Source = SourceDefault + } if includeSynonyms { maxMessageBytes.Synonyms = []*ConfigSynonym{ {