Skip to content

Commit

Permalink
Merge pull request #1230 from Mongey/cm-new-describe-configs
Browse files Browse the repository at this point in the history
Implement DescribeConfigs Request + Response v1 & v2
  • Loading branch information
bai authored Dec 10, 2018
2 parents 49e0aa4 + f9494b4 commit 861a752
Show file tree
Hide file tree
Showing 4 changed files with 317 additions and 24 deletions.
33 changes: 27 additions & 6 deletions describe_configs_request.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package sarama

type DescribeConfigsRequest struct {
Version int16
Resources []*ConfigResource
IncludeSynonyms bool
}

type ConfigResource struct {
Type ConfigResourceType
Name string
ConfigNames []string
}

type DescribeConfigsRequest struct {
Resources []*ConfigResource
}

func (r *DescribeConfigsRequest) encode(pe packetEncoder) error {
if err := pe.putArrayLength(len(r.Resources)); err != nil {
return err
Expand All @@ -30,6 +32,10 @@ func (r *DescribeConfigsRequest) encode(pe packetEncoder) error {
}
}

if r.Version >= 1 {
pe.putBool(r.IncludeSynonyms)
}

return nil
}

Expand Down Expand Up @@ -74,6 +80,14 @@ func (r *DescribeConfigsRequest) decode(pd packetDecoder, version int16) (err er
}
r.Resources[i].ConfigNames = cfnames
}
r.Version = version
if r.Version >= 1 {
b, err := pd.getBool()
if err != nil {
return err
}
r.IncludeSynonyms = b
}

return nil
}
Expand All @@ -83,9 +97,16 @@ func (r *DescribeConfigsRequest) key() int16 {
}

func (r *DescribeConfigsRequest) version() int16 {
return 0
return r.Version
}

func (r *DescribeConfigsRequest) requiredVersion() KafkaVersion {
return V0_11_0_0
switch r.Version {
case 1:
return V1_0_0_0
case 2:
return V2_0_0_0
default:
return V0_11_0_0
}
}
31 changes: 30 additions & 1 deletion describe_configs_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,33 @@ var (
}

singleDescribeConfigsRequestAllConfigs = []byte{
0, 0, 0, 1, // 1 config
2, // a topic
0, 3, 'f', 'o', 'o', // topic name: foo
255, 255, 255, 255, // all configs
}

singleDescribeConfigsRequestAllConfigsv1 = []byte{
0, 0, 0, 1, // 1 config
2, // a topic
0, 3, 'f', 'o', 'o', // topic name: foo
255, 255, 255, 255, // no configs
1, //synoms
}
)

func TestDescribeConfigsRequest(t *testing.T) {
func TestDescribeConfigsRequestv0(t *testing.T) {
var request *DescribeConfigsRequest

request = &DescribeConfigsRequest{
Version: 0,
Resources: []*ConfigResource{},
}
testRequest(t, "no requests", request, emptyDescribeConfigsRequest)

configs := []string{"segment.ms"}
request = &DescribeConfigsRequest{
Version: 0,
Resources: []*ConfigResource{
&ConfigResource{
Type: TopicResource,
Expand All @@ -62,6 +72,7 @@ func TestDescribeConfigsRequest(t *testing.T) {
testRequest(t, "one config", request, singleDescribeConfigsRequest)

request = &DescribeConfigsRequest{
Version: 0,
Resources: []*ConfigResource{
&ConfigResource{
Type: TopicResource,
Expand All @@ -78,6 +89,7 @@ func TestDescribeConfigsRequest(t *testing.T) {
testRequest(t, "two configs", request, doubleDescribeConfigsRequest)

request = &DescribeConfigsRequest{
Version: 0,
Resources: []*ConfigResource{
&ConfigResource{
Type: TopicResource,
Expand All @@ -88,3 +100,20 @@ func TestDescribeConfigsRequest(t *testing.T) {

testRequest(t, "one topic, all configs", request, singleDescribeConfigsRequestAllConfigs)
}

func TestDescribeConfigsRequestv1(t *testing.T) {
var request *DescribeConfigsRequest

request = &DescribeConfigsRequest{
Version: 1,
Resources: []*ConfigResource{
{
Type: TopicResource,
Name: "foo",
},
},
IncludeSynonyms: true,
}

testRequest(t, "one topic, all configs", request, singleDescribeConfigsRequestAllConfigsv1)
}
158 changes: 145 additions & 13 deletions describe_configs_response.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,41 @@
package sarama

import "time"
import (
"fmt"
"time"
)

type ConfigSource int8

func (s ConfigSource) String() string {
switch s {
case SourceUnknown:
return "Unknown"
case SourceTopic:
return "Topic"
case SourceDynamicBroker:
return "DynamicBroker"
case SourceDynamicDefaultBroker:
return "DynamicDefaultBroker"
case SourceStaticBroker:
return "StaticBroker"
case SourceDefault:
return "Default"
}
return fmt.Sprintf("Source Invalid: %d", int(s))
}

const (
SourceUnknown ConfigSource = 0
SourceTopic ConfigSource = 1
SourceDynamicBroker ConfigSource = 2
SourceDynamicDefaultBroker ConfigSource = 3
SourceStaticBroker ConfigSource = 4
SourceDefault ConfigSource = 5
)

type DescribeConfigsResponse struct {
Version int16
ThrottleTime time.Duration
Resources []*ResourceResponse
}
Expand All @@ -20,7 +53,15 @@ type ConfigEntry struct {
Value string
ReadOnly bool
Default bool
Source ConfigSource
Sensitive bool
Synonyms []*ConfigSynonym
}

type ConfigSynonym struct {
ConfigName string
ConfigValue string
Source ConfigSource
}

func (r *DescribeConfigsResponse) encode(pe packetEncoder) (err error) {
Expand All @@ -30,14 +71,16 @@ func (r *DescribeConfigsResponse) encode(pe packetEncoder) (err error) {
}

for _, c := range r.Resources {
if err = c.encode(pe); err != nil {
if err = c.encode(pe, r.Version); err != nil {
return err
}
}

return nil
}

func (r *DescribeConfigsResponse) decode(pd packetDecoder, version int16) (err error) {
r.Version = version
throttleTime, err := pd.getInt32()
if err != nil {
return err
Expand Down Expand Up @@ -66,14 +109,21 @@ func (r *DescribeConfigsResponse) key() int16 {
}

func (r *DescribeConfigsResponse) version() int16 {
return 0
return r.Version
}

func (r *DescribeConfigsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
switch r.Version {
case 1:
return V1_0_0_0
case 2:
return V2_0_0_0
default:
return V0_11_0_0
}
}

func (r *ResourceResponse) encode(pe packetEncoder) (err error) {
func (r *ResourceResponse) encode(pe packetEncoder, version int16) (err error) {
pe.putInt16(r.ErrorCode)

if err = pe.putString(r.ErrorMsg); err != nil {
Expand All @@ -91,7 +141,7 @@ func (r *ResourceResponse) encode(pe packetEncoder) (err error) {
}

for _, c := range r.Configs {
if err = c.encode(pe); err != nil {
if err = c.encode(pe, version); err != nil {
return err
}
}
Expand Down Expand Up @@ -139,7 +189,7 @@ func (r *ResourceResponse) decode(pd packetDecoder, version int16) (err error) {
return nil
}

func (r *ConfigEntry) encode(pe packetEncoder) (err error) {
func (r *ConfigEntry) encode(pe packetEncoder, version int16) (err error) {
if err = pe.putString(r.Name); err != nil {
return err
}
Expand All @@ -149,12 +199,32 @@ func (r *ConfigEntry) encode(pe packetEncoder) (err error) {
}

pe.putBool(r.ReadOnly)
pe.putBool(r.Default)
pe.putBool(r.Sensitive)

if version <= 0 {
pe.putBool(r.Default)
pe.putBool(r.Sensitive)
} else {
pe.putInt8(int8(r.Source))
pe.putBool(r.Sensitive)

if err := pe.putArrayLength(len(r.Synonyms)); err != nil {
return err
}
for _, c := range r.Synonyms {
if err = c.encode(pe, version); err != nil {
return err
}
}
}

return nil
}

//https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration
func (r *ConfigEntry) decode(pd packetDecoder, version int16) (err error) {
if version == 0 {
r.Source = SourceUnknown
}
name, err := pd.getString()
if err != nil {
return err
Expand All @@ -173,16 +243,78 @@ func (r *ConfigEntry) decode(pd packetDecoder, version int16) (err error) {
}
r.ReadOnly = read

de, err := pd.getBool()
if err != nil {
return err
if version == 0 {
defaultB, err := pd.getBool()
if err != nil {
return err
}
r.Default = defaultB
} else {
source, err := pd.getInt8()
if err != nil {
return err
}
r.Source = ConfigSource(source)
}
r.Default = de

sensitive, err := pd.getBool()
if err != nil {
return err
}
r.Sensitive = sensitive

if version > 0 {
n, err := pd.getArrayLength()
if err != nil {
return err
}
r.Synonyms = make([]*ConfigSynonym, n)

for i := 0; i < n; i++ {
s := &ConfigSynonym{}
if err := s.decode(pd, version); err != nil {
return err
}
r.Synonyms[i] = s
}

}
return nil
}

func (c *ConfigSynonym) encode(pe packetEncoder, version int16) (err error) {
err = pe.putString(c.ConfigName)
if err != nil {
return err
}

err = pe.putString(c.ConfigValue)
if err != nil {
return err
}

pe.putInt8(int8(c.Source))

return nil
}

func (c *ConfigSynonym) decode(pd packetDecoder, version int16) error {
name, err := pd.getString()
if err != nil {
return nil
}
c.ConfigName = name

value, err := pd.getString()
if err != nil {
return nil
}
c.ConfigValue = value

source, err := pd.getInt8()
if err != nil {
return nil
}
c.Source = ConfigSource(source)
return nil
}
Loading

0 comments on commit 861a752

Please sign in to comment.