Skip to content

Commit

Permalink
feat(proto): support for Metadata V6-V10
Browse files Browse the repository at this point in the history
In particular, this adds support for topic UUIDs

Signed-off-by: Adrian Preston <PRESTONA@uk.ibm.com>
  • Loading branch information
prestona authored and dnwe committed Aug 7, 2023
1 parent 10dd922 commit 096182c
Show file tree
Hide file tree
Showing 5 changed files with 745 additions and 65 deletions.
41 changes: 34 additions & 7 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ func (b *Broker) Rack() string {
// GetMetadata send a metadata request and returns a metadata response or error
func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
response := new(MetadataResponse)
response.Version = request.Version // Required to ensure use of the correct response header version

err := b.sendAndReceive(request, response)
if err != nil {
Expand Down Expand Up @@ -1072,7 +1073,12 @@ func (b *Broker) decode(pd packetDecoder, version int16) (err error) {
return err
}

host, err := pd.getString()
var host string
if version < 9 {
host, err = pd.getString()
} else {
host, err = pd.getCompactString()
}
if err != nil {
return err
}
Expand All @@ -1082,18 +1088,27 @@ func (b *Broker) decode(pd packetDecoder, version int16) (err error) {
return err
}

if version >= 1 {
if version >= 1 && version < 9 {
b.rack, err = pd.getNullableString()
if err != nil {
return err
}
} else if version >= 9 {
b.rack, err = pd.getCompactNullableString()
}
if err != nil {
return err
}

b.addr = net.JoinHostPort(host, fmt.Sprint(port))
if _, _, err := net.SplitHostPort(b.addr); err != nil {
return err
}

if version >= 9 {
_, err := pd.getEmptyTaggedFieldArray()
if err != nil {
return err
}
}

return nil
}

Expand All @@ -1110,20 +1125,32 @@ func (b *Broker) encode(pe packetEncoder, version int16) (err error) {

pe.putInt32(b.id)

err = pe.putString(host)
if version < 9 {
err = pe.putString(host)
} else {
err = pe.putCompactString(host)
}
if err != nil {
return err
}

pe.putInt32(int32(port))

if version >= 1 {
err = pe.putNullableString(b.rack)
if version < 9 {
err = pe.putNullableString(b.rack)
} else {
err = pe.putNullableCompactString(b.rack)
}
if err != nil {
return err
}
}

if version >= 9 {
pe.putEmptyTaggedFieldArray()
}

return nil
}

Expand Down
166 changes: 144 additions & 22 deletions metadata_request.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,35 @@
package sarama

import "encoding/base64"

type Uuid [16]byte

func (u Uuid) String() string {
return base64.URLEncoding.WithPadding(base64.NoPadding).EncodeToString(u[:])
}

var NullUUID = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}

type MetadataRequest struct {
// Version defines the protocol version to use for encode and decode
Version int16
// Topics contains the topics to fetch metadata for.
Topics []string
// AllowAutoTopicCreation contains a If this is true, the broker may auto-create topics that we requested which do not already exist, if it is configured to do so.
AllowAutoTopicCreation bool
AllowAutoTopicCreation bool
IncludeClusterAuthorizedOperations bool // version 8 and up
IncludeTopicAuthorizedOperations bool // version 8 and up
}

func NewMetadataRequest(version KafkaVersion, topics []string) *MetadataRequest {
m := &MetadataRequest{Topics: topics}
if version.IsAtLeast(V2_1_0_0) {
if version.IsAtLeast(V2_8_0_0) {
m.Version = 10
} else if version.IsAtLeast(V2_4_0_0) {
m.Version = 9
} else if version.IsAtLeast(V2_4_0_0) {
m.Version = 8
} else if version.IsAtLeast(V2_1_0_0) {
m.Version = 7
} else if version.IsAtLeast(V2_0_0_0) {
m.Version = 6
Expand All @@ -28,46 +46,124 @@ func NewMetadataRequest(version KafkaVersion, topics []string) *MetadataRequest
}

func (r *MetadataRequest) encode(pe packetEncoder) (err error) {
if r.Version < 0 || r.Version > 7 {
if r.Version < 0 || r.Version > 10 {
return PacketEncodingError{"invalid or unsupported MetadataRequest version field"}
}
if r.Version == 0 || len(r.Topics) > 0 {
err := pe.putArrayLength(len(r.Topics))
if err != nil {
return err
}

for i := range r.Topics {
err = pe.putString(r.Topics[i])
if r.Version < 9 {
err := pe.putArrayLength(len(r.Topics))
if err != nil {
return err
}

for i := range r.Topics {
err = pe.putString(r.Topics[i])
if err != nil {
return err
}
}
} else if r.Version == 9 {
pe.putCompactArrayLength(len(r.Topics))
for _, topicName := range r.Topics {
if err := pe.putCompactString(topicName); err != nil {
return err
}
pe.putEmptyTaggedFieldArray()
}
} else { // r.Version = 10
pe.putCompactArrayLength(len(r.Topics))
for _, topicName := range r.Topics {
if err := pe.putRawBytes(NullUUID); err != nil {
return err
}
// Avoid implicit memory aliasing in for loop
tn := topicName
if err := pe.putNullableCompactString(&tn); err != nil {
return err
}
pe.putEmptyTaggedFieldArray()
}
}
} else {
pe.putInt32(-1)
if r.Version < 9 {
pe.putInt32(-1)
} else {
pe.putCompactArrayLength(-1)
}
}

if r.Version >= 4 {
if r.Version > 3 {
pe.putBool(r.AllowAutoTopicCreation)
}

if r.Version > 7 {
pe.putBool(r.IncludeClusterAuthorizedOperations)
pe.putBool(r.IncludeTopicAuthorizedOperations)
}
if r.Version > 8 {
pe.putEmptyTaggedFieldArray()
}
return nil
}

func (r *MetadataRequest) decode(pd packetDecoder, version int16) (err error) {
r.Version = version
size, err := pd.getInt32()
if err != nil {
return err
}
if size > 0 {
r.Topics = make([]string, size)
if r.Version < 9 {
size, err := pd.getInt32()
if err != nil {
return err
}
if size > 0 {
r.Topics = make([]string, size)
for i := range r.Topics {
topic, err := pd.getString()
if err != nil {
return err
}
r.Topics[i] = topic
}
}
} else if r.Version == 9 {
size, err := pd.getCompactArrayLength()
if err != nil {
return err
}
if size > 0 {
r.Topics = make([]string, size)
}
for i := range r.Topics {
topic, err := pd.getString()
topic, err := pd.getCompactString()
if err != nil {
return err
}
r.Topics[i] = topic
if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
return err
}
}
} else { // version 10+
size, err := pd.getCompactArrayLength()
if err != nil {
return err
}

if size > 0 {
r.Topics = make([]string, size)
}
for i := range r.Topics {
if _, err = pd.getRawBytes(16); err != nil { // skip UUID
return err
}
topic, err := pd.getCompactNullableString()
if err != nil {
return err
}
if topic != nil {
r.Topics[i] = *topic
}

if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
return err
}
}
}

Expand All @@ -77,6 +173,23 @@ func (r *MetadataRequest) decode(pd packetDecoder, version int16) (err error) {
}
}

if r.Version > 7 {
includeClusterAuthz, err := pd.getBool()
if err != nil {
return err
}
r.IncludeClusterAuthorizedOperations = includeClusterAuthz
includeTopicAuthz, err := pd.getBool()
if err != nil {
return err
}
r.IncludeTopicAuthorizedOperations = includeTopicAuthz
}
if r.Version > 8 {
if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
return err
}
}
return nil
}

Expand All @@ -89,15 +202,24 @@ func (r *MetadataRequest) version() int16 {
}

func (r *MetadataRequest) headerVersion() int16 {
if r.Version >= 9 {
return 2
}
return 1
}

func (r *MetadataRequest) isValidVersion() bool {
return r.Version >= 0 && r.Version <= 7
return r.Version >= 0 && r.Version <= 10
}

func (r *MetadataRequest) requiredVersion() KafkaVersion {
switch r.Version {
case 10:
return V2_8_0_0
case 9:
return V2_4_0_0
case 8:
return V2_3_0_0
case 7:
return V2_1_0_0
case 6:
Expand All @@ -113,6 +235,6 @@ func (r *MetadataRequest) requiredVersion() KafkaVersion {
case 0:
return V0_8_2_0
default:
return V2_1_0_0
return V2_8_0_0
}
}
Loading

0 comments on commit 096182c

Please sign in to comment.