From ea1f46fe0f6f527ce120bcce3be62db04730397f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9mi=20Calixte?= Date: Thu, 1 Feb 2024 15:11:31 +0100 Subject: [PATCH 1/4] more detailed errors --- consumer_group.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/consumer_group.go b/consumer_group.go index 8e738e912..364940cc3 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -165,7 +165,7 @@ func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler Co // Refresh metadata for requested topics if err := c.client.RefreshMetadata(topics...); err != nil { - return err + return fmt.Errorf("failed to resfresh topic metadata: %w", err) } // Init session @@ -173,7 +173,7 @@ func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler Co if err == ErrClosedClient { return ErrClosedConsumerGroup } else if err != nil { - return err + return fmt.Errorf("failed to init new session: %w", err) } // loop check topic partition numbers changed From ae758558746bef3373bec75e664cfceb1c3dced3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9mi=20Calixte?= Date: Thu, 1 Feb 2024 16:10:43 +0100 Subject: [PATCH 2/4] more errors --- consumer_group.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/consumer_group.go b/consumer_group.go index 364940cc3..92e8f580a 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -209,7 +209,7 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler coordinator, err := c.client.Coordinator(c.groupID) if err != nil { if retries <= 0 { - return nil, err + return nil, fmt.Errorf("failed to get coordinator: %w", err) } return c.retryNewSession(ctx, topics, handler, retries, true) @@ -240,7 +240,7 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler if consumerGroupJoinFailed != nil { consumerGroupJoinFailed.Inc(1) } - return nil, err + return nil, fmt.Errorf("failed to join consumer group: %w", err) } if join.Err != ErrNoError { if consumerGroupJoinFailed != nil { @@ -285,7 +285,7 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler if join.LeaderId == join.MemberId { members, err := join.GetMembers() if err != nil { - return nil, err + return nil, fmt.Errorf("failed to get group members: %w", err) } plan, err = c.balance(strategy, members) From 873c3b0f1feffa975697f2412ba7a12917944817 Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Mon, 21 Aug 2023 22:18:33 +0100 Subject: [PATCH 3/4] fix(proto): handle V3 member metadata and empty owned partitions Signed-off-by: Dominic Evans --- consumer_group_members.go | 60 +++++++++++++++++++++++++++++----- consumer_group_members_test.go | 17 ++++++++++ 2 files changed, 69 insertions(+), 8 deletions(-) diff --git a/consumer_group_members.go b/consumer_group_members.go index f4100f560..30d047430 100644 --- a/consumer_group_members.go +++ b/consumer_group_members.go @@ -7,6 +7,8 @@ type ConsumerGroupMemberMetadata struct { Topics []string UserData []byte OwnedPartitions []*OwnedPartition + GenerationID int32 + RackID *string } func (m *ConsumerGroupMemberMetadata) encode(pe packetEncoder) error { @@ -20,6 +22,27 @@ func (m *ConsumerGroupMemberMetadata) encode(pe packetEncoder) error { return err } + if m.Version >= 1 { + if err := pe.putArrayLength(len(m.OwnedPartitions)); err != nil { + return err + } + for _, op := range m.OwnedPartitions { + if err := op.encode(pe); err != nil { + return err + } + } + } + + if m.Version >= 2 { + pe.putInt32(m.GenerationID) + } + + if m.Version >= 3 { + if err := pe.putNullableString(m.RackID); err != nil { + return err + } + } + return nil } @@ -46,18 +69,29 @@ func (m *ConsumerGroupMemberMetadata) decode(pd packetDecoder) (err error) { } return err } - if n == 0 { - return nil - } - m.OwnedPartitions = make([]*OwnedPartition, n) - for i := 0; i < n; i++ { - m.OwnedPartitions[i] = &OwnedPartition{} - if err := m.OwnedPartitions[i].decode(pd); err != nil { - return err + if n > 0 { + m.OwnedPartitions = make([]*OwnedPartition, n) + for i := 0; i < n; i++ { + m.OwnedPartitions[i] = &OwnedPartition{} + if err := m.OwnedPartitions[i].decode(pd); err != nil { + return err + } } } } + if m.Version >= 2 { + if m.GenerationID, err = pd.getInt32(); err != nil { + return err + } + } + + if m.Version >= 3 { + if m.RackID, err = pd.getNullableString(); err != nil { + return err + } + } + return nil } @@ -66,6 +100,16 @@ type OwnedPartition struct { Partitions []int32 } +func (m *OwnedPartition) encode(pe packetEncoder) error { + if err := pe.putString(m.Topic); err != nil { + return err + } + if err := pe.putInt32Array(m.Partitions); err != nil { + return err + } + return nil +} + func (m *OwnedPartition) decode(pd packetDecoder) (err error) { if m.Topic, err = pd.getString(); err != nil { return err diff --git a/consumer_group_members_test.go b/consumer_group_members_test.go index a99de61c6..5d7e14354 100644 --- a/consumer_group_members_test.go +++ b/consumer_group_members_test.go @@ -42,6 +42,16 @@ var ( 0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata 0, 0, 0, 0, // OwnedPartitions KIP-429 } + + groupMemberMetadataV3NilOwned = []byte{ + 0, 3, // Version + 0, 0, 0, 1, // Topic array length + 0, 3, 'o', 'n', 'e', // Topic one + 0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata + 0, 0, 0, 0, // OwnedPartitions KIP-429 + 0, 0, 0, 64, // GenerationID + 0, 4, 'r', 'a', 'c', 'k', // RackID + } ) func TestConsumerGroupMemberMetadata(t *testing.T) { @@ -77,6 +87,13 @@ func TestConsumerGroupMemberMetadataV1Decode(t *testing.T) { } } +func TestConsumerGroupMemberMetadataV3Decode(t *testing.T) { + meta := new(ConsumerGroupMemberMetadata) + if err := decode(groupMemberMetadataV3NilOwned, meta, nil); err != nil { + t.Error("Failed to decode V3 data", err) + } +} + func TestConsumerGroupMemberAssignment(t *testing.T) { amt := &ConsumerGroupMemberAssignment{ Version: 0, From b3700736a22548c033ffd94eacf69c5fb1b4ee8b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9mi=20Calixte?= Date: Thu, 1 Feb 2024 18:11:40 +0100 Subject: [PATCH 4/4] fix test --- consumer_group_members_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consumer_group_members_test.go b/consumer_group_members_test.go index 5d7e14354..54f07eb2a 100644 --- a/consumer_group_members_test.go +++ b/consumer_group_members_test.go @@ -89,7 +89,7 @@ func TestConsumerGroupMemberMetadataV1Decode(t *testing.T) { func TestConsumerGroupMemberMetadataV3Decode(t *testing.T) { meta := new(ConsumerGroupMemberMetadata) - if err := decode(groupMemberMetadataV3NilOwned, meta, nil); err != nil { + if err := decode(groupMemberMetadataV3NilOwned, meta); err != nil { t.Error("Failed to decode V3 data", err) } }