From e9bd1b89e144d3cd07125938129b1092503c0e94 Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Mon, 21 Aug 2023 22:18:33 +0100 Subject: [PATCH] fix(proto): handle V3 member metadata and empty owned partitions Signed-off-by: Dominic Evans --- consumer_group_members.go | 60 +++++++++++++++++++++++++++++----- consumer_group_members_test.go | 17 ++++++++++ 2 files changed, 69 insertions(+), 8 deletions(-) diff --git a/consumer_group_members.go b/consumer_group_members.go index 3b8ca36f6..2d3896091 100644 --- a/consumer_group_members.go +++ b/consumer_group_members.go @@ -9,6 +9,8 @@ type ConsumerGroupMemberMetadata struct { Topics []string UserData []byte OwnedPartitions []*OwnedPartition + GenerationID int32 + RackID *string } func (m *ConsumerGroupMemberMetadata) encode(pe packetEncoder) error { @@ -22,6 +24,27 @@ func (m *ConsumerGroupMemberMetadata) encode(pe packetEncoder) error { return err } + if m.Version >= 1 { + if err := pe.putArrayLength(len(m.OwnedPartitions)); err != nil { + return err + } + for _, op := range m.OwnedPartitions { + if err := op.encode(pe); err != nil { + return err + } + } + } + + if m.Version >= 2 { + pe.putInt32(m.GenerationID) + } + + if m.Version >= 3 { + if err := pe.putNullableString(m.RackID); err != nil { + return err + } + } + return nil } @@ -48,18 +71,29 @@ func (m *ConsumerGroupMemberMetadata) decode(pd packetDecoder) (err error) { } return err } - if n == 0 { - return nil - } - m.OwnedPartitions = make([]*OwnedPartition, n) - for i := 0; i < n; i++ { - m.OwnedPartitions[i] = &OwnedPartition{} - if err := m.OwnedPartitions[i].decode(pd); err != nil { - return err + if n > 0 { + m.OwnedPartitions = make([]*OwnedPartition, n) + for i := 0; i < n; i++ { + m.OwnedPartitions[i] = &OwnedPartition{} + if err := m.OwnedPartitions[i].decode(pd); err != nil { + return err + } } } } + if m.Version >= 2 { + if m.GenerationID, err = pd.getInt32(); err != nil { + return err + } + } + + if m.Version >= 3 { + if m.RackID, err = pd.getNullableString(); err != nil { + return err + } + } + return nil } @@ -68,6 +102,16 @@ type OwnedPartition struct { Partitions []int32 } +func (m *OwnedPartition) encode(pe packetEncoder) error { + if err := pe.putString(m.Topic); err != nil { + return err + } + if err := pe.putInt32Array(m.Partitions); err != nil { + return err + } + return nil +} + func (m *OwnedPartition) decode(pd packetDecoder) (err error) { if m.Topic, err = pd.getString(); err != nil { return err diff --git a/consumer_group_members_test.go b/consumer_group_members_test.go index 8c1c2d56d..a28755d2e 100644 --- a/consumer_group_members_test.go +++ b/consumer_group_members_test.go @@ -42,6 +42,16 @@ var ( 0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata 0, 0, 0, 0, // OwnedPartitions KIP-429 } + + groupMemberMetadataV3NilOwned = []byte{ + 0, 3, // Version + 0, 0, 0, 1, // Topic array length + 0, 3, 'o', 'n', 'e', // Topic one + 0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata + 0, 0, 0, 0, // OwnedPartitions KIP-429 + 0, 0, 0, 64, // GenerationID + 0, 4, 'r', 'a', 'c', 'k', // RackID + } ) func TestConsumerGroupMemberMetadata(t *testing.T) { @@ -77,6 +87,13 @@ func TestConsumerGroupMemberMetadataV1Decode(t *testing.T) { } } +func TestConsumerGroupMemberMetadataV3Decode(t *testing.T) { + meta := new(ConsumerGroupMemberMetadata) + if err := decode(groupMemberMetadataV3NilOwned, meta, nil); err != nil { + t.Error("Failed to decode V3 data", err) + } +} + func TestConsumerGroupMemberAssignment(t *testing.T) { amt := &ConsumerGroupMemberAssignment{ Version: 0,