Skip to content

Commit

Permalink
Implement DescribeConfigsResponse 1 & 2
Browse files Browse the repository at this point in the history
  • Loading branch information
Mongey committed Nov 29, 2018
1 parent ff763f1 commit 6f2ac05
Showing 1 changed file with 141 additions and 12 deletions.
153 changes: 141 additions & 12 deletions describe_configs_response.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,41 @@
package sarama

import "time"
import (
"fmt"
"time"
)

type ConfigSource int8

func (s ConfigSource) String() string {
switch s {
case SourceUnknown:
return "Unknown"
case SourceTopic:
return "Topic"
case SourceDynamicBroker:
return "DynamicBroker"
case SourceDynamicDefaultBroker:
return "DynamicDefaultBroker"
case SourceStaticBroker:
return "StaticBroker"
case SourceDefault:
return "Default"
}
return fmt.Sprintf("Source Invalid: %d", int(s))
}

const (
SourceUnknown ConfigSource = 0
SourceTopic ConfigSource = 1
SourceDynamicBroker ConfigSource = 2
SourceDynamicDefaultBroker ConfigSource = 3
SourceStaticBroker ConfigSource = 4
SourceDefault ConfigSource = 5
)

type DescribeConfigsResponse struct {
Version int16
ThrottleTime time.Duration
Resources []*ResourceResponse
}
Expand All @@ -20,7 +53,15 @@ type ConfigEntry struct {
Value string
ReadOnly bool
Default bool
Source ConfigSource
Sensitive bool
Syonyms []*ConfigSyonym
}

type ConfigSyonym struct {
ConfigName string
ConfigValue string
Source ConfigSource
}

func (r *DescribeConfigsResponse) encode(pe packetEncoder) (err error) {
Expand All @@ -30,14 +71,16 @@ func (r *DescribeConfigsResponse) encode(pe packetEncoder) (err error) {
}

for _, c := range r.Resources {
if err = c.encode(pe); err != nil {
if err = c.encode(pe, r.Version); err != nil {
return err
}
}

return nil
}

func (r *DescribeConfigsResponse) decode(pd packetDecoder, version int16) (err error) {
r.Version = version
throttleTime, err := pd.getInt32()
if err != nil {
return err
Expand Down Expand Up @@ -66,14 +109,21 @@ func (r *DescribeConfigsResponse) key() int16 {
}

func (r *DescribeConfigsResponse) version() int16 {
return 0
return r.Version
}

func (r *DescribeConfigsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
switch r.Version {
case 1:
return V1_0_0_0
case 2:
return V2_0_0_0
default:
return V0_11_0_0
}
}

func (r *ResourceResponse) encode(pe packetEncoder) (err error) {
func (r *ResourceResponse) encode(pe packetEncoder, version int16) (err error) {
pe.putInt16(r.ErrorCode)

if err = pe.putString(r.ErrorMsg); err != nil {
Expand All @@ -91,7 +141,7 @@ func (r *ResourceResponse) encode(pe packetEncoder) (err error) {
}

for _, c := range r.Configs {
if err = c.encode(pe); err != nil {
if err = c.encode(pe, version); err != nil {
return err
}
}
Expand Down Expand Up @@ -139,7 +189,7 @@ func (r *ResourceResponse) decode(pd packetDecoder, version int16) (err error) {
return nil
}

func (r *ConfigEntry) encode(pe packetEncoder) (err error) {
func (r *ConfigEntry) encode(pe packetEncoder, version int16) (err error) {
if err = pe.putString(r.Name); err != nil {
return err
}
Expand All @@ -149,12 +199,29 @@ func (r *ConfigEntry) encode(pe packetEncoder) (err error) {
}

pe.putBool(r.ReadOnly)
pe.putBool(r.Default)

if version <= 0 {
pe.putBool(r.Default)
} else {
pe.putInt8(int8(r.Source))

pe.putArrayLength(len(r.Syonyms))
for _, c := range r.Syonyms {
if err = c.encode(pe, version); err != nil {
return err
}
}
}

pe.putBool(r.Sensitive)
return nil
}

//https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration
func (r *ConfigEntry) decode(pd packetDecoder, version int16) (err error) {
if version == 0 {
r.Source = SourceUnknown
}
name, err := pd.getString()
if err != nil {
return err
Expand All @@ -173,16 +240,78 @@ func (r *ConfigEntry) decode(pd packetDecoder, version int16) (err error) {
}
r.ReadOnly = read

de, err := pd.getBool()
if err != nil {
return err
if version == 0 {
defaultB, err := pd.getBool()
if err != nil {
return err
}
r.Default = defaultB
} else {
source, err := pd.getInt8()
if err != nil {
return err
}
r.Source = ConfigSource(source)
}
r.Default = de

sensitive, err := pd.getBool()
if err != nil {
return err
}
r.Sensitive = sensitive

if version > 0 {
n, err := pd.getArrayLength()
if err != nil {
return err
}
r.Syonyms = make([]*ConfigSyonym, n)

for i := 0; i < n; i++ {
s := &ConfigSyonym{}
if err := s.decode(pd, version); err != nil {
return err
}
r.Syonyms[i] = s
}

}
return nil
}

func (c *ConfigSyonym) encode(pe packetEncoder, version int16) (err error) {
err = pe.putString(c.ConfigName)
if err != nil {
return err
}

err = pe.putString(c.ConfigValue)
if err != nil {
return err
}

pe.putInt8(int8(c.Source))

return nil
}

func (c *ConfigSyonym) decode(pd packetDecoder, version int16) error {
name, err := pd.getString()
if err != nil {
return nil
}
c.ConfigName = name

value, err := pd.getString()
if err != nil {
return nil
}
c.ConfigValue = value

source, err := pd.getInt8()
if err != nil {
return nil
}
c.Source = ConfigSource(source)
return nil
}

0 comments on commit 6f2ac05

Please sign in to comment.