Skip to content

Commit

Permalink
Implement DescribeConfigsResponse 1 & 2
Browse files Browse the repository at this point in the history
  • Loading branch information
Mongey committed Dec 3, 2018
1 parent fbd8338 commit b50a625
Show file tree
Hide file tree
Showing 2 changed files with 261 additions and 17 deletions.
159 changes: 146 additions & 13 deletions describe_configs_response.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,41 @@
package sarama

import "time"
import (
"fmt"
"time"
)

type ConfigSource int8

func (s ConfigSource) String() string {
switch s {
case SourceUnknown:
return "Unknown"
case SourceTopic:
return "Topic"
case SourceDynamicBroker:
return "DynamicBroker"
case SourceDynamicDefaultBroker:
return "DynamicDefaultBroker"
case SourceStaticBroker:
return "StaticBroker"
case SourceDefault:
return "Default"
}
return fmt.Sprintf("Source Invalid: %d", int(s))
}

const (
SourceUnknown ConfigSource = 0
SourceTopic ConfigSource = 1
SourceDynamicBroker ConfigSource = 2
SourceDynamicDefaultBroker ConfigSource = 3
SourceStaticBroker ConfigSource = 4
SourceDefault ConfigSource = 5
)

type DescribeConfigsResponse struct {
Version int16
ThrottleTime time.Duration
Resources []*ResourceResponse
}
Expand All @@ -20,7 +53,15 @@ type ConfigEntry struct {
Value string
ReadOnly bool
Default bool
Source ConfigSource
Sensitive bool
Synonyms []*ConfigSyonym
}

type ConfigSyonym struct {
ConfigName string
ConfigValue string
Source ConfigSource
}

func (r *DescribeConfigsResponse) encode(pe packetEncoder) (err error) {
Expand All @@ -30,14 +71,16 @@ func (r *DescribeConfigsResponse) encode(pe packetEncoder) (err error) {
}

for _, c := range r.Resources {
if err = c.encode(pe); err != nil {
if err = c.encode(pe, r.Version); err != nil {
return err
}
}

return nil
}

func (r *DescribeConfigsResponse) decode(pd packetDecoder, version int16) (err error) {
r.Version = version
throttleTime, err := pd.getInt32()
if err != nil {
return err
Expand Down Expand Up @@ -66,14 +109,21 @@ func (r *DescribeConfigsResponse) key() int16 {
}

func (r *DescribeConfigsResponse) version() int16 {
return 0
return r.Version
}

func (r *DescribeConfigsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
switch r.Version {
case 1:
return V1_0_0_0
case 2:
return V2_0_0_0
default:
return V0_11_0_0
}
}

func (r *ResourceResponse) encode(pe packetEncoder) (err error) {
func (r *ResourceResponse) encode(pe packetEncoder, version int16) (err error) {
pe.putInt16(r.ErrorCode)

if err = pe.putString(r.ErrorMsg); err != nil {
Expand All @@ -91,7 +141,7 @@ func (r *ResourceResponse) encode(pe packetEncoder) (err error) {
}

for _, c := range r.Configs {
if err = c.encode(pe); err != nil {
if err = c.encode(pe, version); err != nil {
return err
}
}
Expand Down Expand Up @@ -139,7 +189,7 @@ func (r *ResourceResponse) decode(pd packetDecoder, version int16) (err error) {
return nil
}

func (r *ConfigEntry) encode(pe packetEncoder) (err error) {
func (r *ConfigEntry) encode(pe packetEncoder, version int16) (err error) {
if err = pe.putString(r.Name); err != nil {
return err
}
Expand All @@ -149,12 +199,33 @@ func (r *ConfigEntry) encode(pe packetEncoder) (err error) {
}

pe.putBool(r.ReadOnly)
pe.putBool(r.Default)
pe.putBool(r.Sensitive)

if version <= 0 {
pe.putBool(r.Default)
pe.putBool(r.Sensitive)

} else {
pe.putInt8(int8(r.Source))
pe.putBool(r.Sensitive)

if err := pe.putArrayLength(len(r.Synonyms)); err != nil {
return err
}
for _, c := range r.Synonyms {
if err = c.encode(pe, version); err != nil {
return err
}
}
}

return nil
}

//https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration
func (r *ConfigEntry) decode(pd packetDecoder, version int16) (err error) {
if version == 0 {
r.Source = SourceUnknown
}
name, err := pd.getString()
if err != nil {
return err
Expand All @@ -173,16 +244,78 @@ func (r *ConfigEntry) decode(pd packetDecoder, version int16) (err error) {
}
r.ReadOnly = read

de, err := pd.getBool()
if err != nil {
return err
if version == 0 {
defaultB, err := pd.getBool()
if err != nil {
return err
}
r.Default = defaultB
} else {
source, err := pd.getInt8()
if err != nil {
return err
}
r.Source = ConfigSource(source)
}
r.Default = de

sensitive, err := pd.getBool()
if err != nil {
return err
}
r.Sensitive = sensitive

if version > 0 {
n, err := pd.getArrayLength()
if err != nil {
return err
}
r.Synonyms = make([]*ConfigSyonym, n)

for i := 0; i < n; i++ {
s := &ConfigSyonym{}
if err := s.decode(pd, version); err != nil {
return err
}
r.Synonyms[i] = s
}

}
return nil
}

func (c *ConfigSyonym) encode(pe packetEncoder, version int16) (err error) {
err = pe.putString(c.ConfigName)
if err != nil {
return err
}

err = pe.putString(c.ConfigValue)
if err != nil {
return err
}

pe.putInt8(int8(c.Source))

return nil
}

func (c *ConfigSyonym) decode(pd packetDecoder, version int16) error {
name, err := pd.getString()
if err != nil {
return nil
}
c.ConfigName = name

value, err := pd.getString()
if err != nil {
return nil
}
c.ConfigValue = value

source, err := pd.getInt8()
if err != nil {
return nil
}
c.Source = ConfigSource(source)
return nil
}
119 changes: 115 additions & 4 deletions describe_configs_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ var (
0, 0, 0, 0, // no configs
}

describeConfigsResponsePopulated = []byte{
describeConfigsResponsePopulatedv0 = []byte{
0, 0, 0, 0, //throttle
0, 0, 0, 1, // response
0, 0, //errorcode
Expand All @@ -24,9 +24,44 @@ var (
0, // Default
0, // Sensitive
}

describeConfigsResponsePopulatedv1 = []byte{
0, 0, 0, 0, //throttle
0, 0, 0, 1, // response
0, 0, //errorcode
0, 0, //string
2, // topic
0, 3, 'f', 'o', 'o',
0, 0, 0, 1, //configs
0, 10, 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's',
0, 4, '1', '0', '0', '0',
0, // ReadOnly
4, // Source
0, // Sensitive
0, 0, 0, 0, // No Synonym
}

describeConfigsResponseWithSynonymv1 = []byte{
0, 0, 0, 0, //throttle
0, 0, 0, 1, // response
0, 0, //errorcode
0, 0, //string
2, // topic
0, 3, 'f', 'o', 'o',
0, 0, 0, 1, //configs
0, 10, 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's',
0, 4, '1', '0', '0', '0',
0, // ReadOnly
4, // Source
0, // Sensitive
0, 0, 0, 1, // 1 Synonym
0, 14, 'l', 'o', 'g', '.', 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's',
0, 4, '1', '0', '0', '0',
4, // Source
}
)

func TestDescribeConfigsResponse(t *testing.T) {
func TestDescribeConfigsResponsev0(t *testing.T) {
var response *DescribeConfigsResponse

response = &DescribeConfigsResponse{
Expand All @@ -38,7 +73,7 @@ func TestDescribeConfigsResponse(t *testing.T) {
}

response = &DescribeConfigsResponse{
Resources: []*ResourceResponse{
Version: 0, Resources: []*ResourceResponse{
&ResourceResponse{
ErrorCode: 0,
ErrorMsg: "",
Expand All @@ -56,5 +91,81 @@ func TestDescribeConfigsResponse(t *testing.T) {
},
},
}
testResponse(t, "response with error", response, describeConfigsResponsePopulated)
testResponse(t, "response with error", response, describeConfigsResponsePopulatedv0)
}

func TestDescribeConfigsResponsev1(t *testing.T) {
var response *DescribeConfigsResponse

response = &DescribeConfigsResponse{
Resources: []*ResourceResponse{},
}
testVersionDecodable(t, "empty", response, describeConfigsResponseEmpty, 0)
if len(response.Resources) != 0 {
t.Error("Expected no groups")
}

response = &DescribeConfigsResponse{
Version: 1,
Resources: []*ResourceResponse{
&ResourceResponse{
ErrorCode: 0,
ErrorMsg: "",
Type: TopicResource,
Name: "foo",
Configs: []*ConfigEntry{
&ConfigEntry{
Name: "segment.ms",
Value: "1000",
ReadOnly: false,
Source: SourceStaticBroker,
Sensitive: false,
Synonyms: []*ConfigSyonym{},
},
},
},
},
}
testResponse(t, "response with error", response, describeConfigsResponsePopulatedv1)
}

func TestDescribeConfigsResponseWithSynonym(t *testing.T) {
var response *DescribeConfigsResponse

response = &DescribeConfigsResponse{
Resources: []*ResourceResponse{},
}
testVersionDecodable(t, "empty", response, describeConfigsResponseEmpty, 0)
if len(response.Resources) != 0 {
t.Error("Expected no groups")
}

response = &DescribeConfigsResponse{
Version: 1,
Resources: []*ResourceResponse{
&ResourceResponse{
ErrorCode: 0,
ErrorMsg: "",
Type: TopicResource,
Name: "foo",
Configs: []*ConfigEntry{
&ConfigEntry{
Name: "segment.ms",
Value: "1000",
ReadOnly: false,
Source: SourceStaticBroker,
Sensitive: false,
Synonyms: []*ConfigSyonym{
{
ConfigName: "log.segment.ms",
ConfigValue: "1000",
Source: SourceStaticBroker,
},
},
},
},
},
},
}
testResponse(t, "response with error", response, describeConfigsResponseWithSynonymv1)
}

0 comments on commit b50a625

Please sign in to comment.