diff --git a/consumer_group.go b/consumer_group.go index 8e738e912..92e8f580a 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -165,7 +165,7 @@ func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler Co // Refresh metadata for requested topics if err := c.client.RefreshMetadata(topics...); err != nil { - return err + return fmt.Errorf("failed to resfresh topic metadata: %w", err) } // Init session @@ -173,7 +173,7 @@ func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler Co if err == ErrClosedClient { return ErrClosedConsumerGroup } else if err != nil { - return err + return fmt.Errorf("failed to init new session: %w", err) } // loop check topic partition numbers changed @@ -209,7 +209,7 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler coordinator, err := c.client.Coordinator(c.groupID) if err != nil { if retries <= 0 { - return nil, err + return nil, fmt.Errorf("failed to get coordinator: %w", err) } return c.retryNewSession(ctx, topics, handler, retries, true) @@ -240,7 +240,7 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler if consumerGroupJoinFailed != nil { consumerGroupJoinFailed.Inc(1) } - return nil, err + return nil, fmt.Errorf("failed to join consumer group: %w", err) } if join.Err != ErrNoError { if consumerGroupJoinFailed != nil { @@ -285,7 +285,7 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler if join.LeaderId == join.MemberId { members, err := join.GetMembers() if err != nil { - return nil, err + return nil, fmt.Errorf("failed to get group members: %w", err) } plan, err = c.balance(strategy, members) diff --git a/consumer_group_members.go b/consumer_group_members.go index f4100f560..30d047430 100644 --- a/consumer_group_members.go +++ b/consumer_group_members.go @@ -7,6 +7,8 @@ type ConsumerGroupMemberMetadata struct { Topics []string UserData []byte OwnedPartitions []*OwnedPartition + GenerationID int32 + RackID *string } func (m *ConsumerGroupMemberMetadata) encode(pe packetEncoder) error { @@ -20,6 +22,27 @@ func (m *ConsumerGroupMemberMetadata) encode(pe packetEncoder) error { return err } + if m.Version >= 1 { + if err := pe.putArrayLength(len(m.OwnedPartitions)); err != nil { + return err + } + for _, op := range m.OwnedPartitions { + if err := op.encode(pe); err != nil { + return err + } + } + } + + if m.Version >= 2 { + pe.putInt32(m.GenerationID) + } + + if m.Version >= 3 { + if err := pe.putNullableString(m.RackID); err != nil { + return err + } + } + return nil } @@ -46,18 +69,29 @@ func (m *ConsumerGroupMemberMetadata) decode(pd packetDecoder) (err error) { } return err } - if n == 0 { - return nil - } - m.OwnedPartitions = make([]*OwnedPartition, n) - for i := 0; i < n; i++ { - m.OwnedPartitions[i] = &OwnedPartition{} - if err := m.OwnedPartitions[i].decode(pd); err != nil { - return err + if n > 0 { + m.OwnedPartitions = make([]*OwnedPartition, n) + for i := 0; i < n; i++ { + m.OwnedPartitions[i] = &OwnedPartition{} + if err := m.OwnedPartitions[i].decode(pd); err != nil { + return err + } } } } + if m.Version >= 2 { + if m.GenerationID, err = pd.getInt32(); err != nil { + return err + } + } + + if m.Version >= 3 { + if m.RackID, err = pd.getNullableString(); err != nil { + return err + } + } + return nil } @@ -66,6 +100,16 @@ type OwnedPartition struct { Partitions []int32 } +func (m *OwnedPartition) encode(pe packetEncoder) error { + if err := pe.putString(m.Topic); err != nil { + return err + } + if err := pe.putInt32Array(m.Partitions); err != nil { + return err + } + return nil +} + func (m *OwnedPartition) decode(pd packetDecoder) (err error) { if m.Topic, err = pd.getString(); err != nil { return err diff --git a/consumer_group_members_test.go b/consumer_group_members_test.go index a99de61c6..54f07eb2a 100644 --- a/consumer_group_members_test.go +++ b/consumer_group_members_test.go @@ -42,6 +42,16 @@ var ( 0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata 0, 0, 0, 0, // OwnedPartitions KIP-429 } + + groupMemberMetadataV3NilOwned = []byte{ + 0, 3, // Version + 0, 0, 0, 1, // Topic array length + 0, 3, 'o', 'n', 'e', // Topic one + 0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata + 0, 0, 0, 0, // OwnedPartitions KIP-429 + 0, 0, 0, 64, // GenerationID + 0, 4, 'r', 'a', 'c', 'k', // RackID + } ) func TestConsumerGroupMemberMetadata(t *testing.T) { @@ -77,6 +87,13 @@ func TestConsumerGroupMemberMetadataV1Decode(t *testing.T) { } } +func TestConsumerGroupMemberMetadataV3Decode(t *testing.T) { + meta := new(ConsumerGroupMemberMetadata) + if err := decode(groupMemberMetadataV3NilOwned, meta); err != nil { + t.Error("Failed to decode V3 data", err) + } +} + func TestConsumerGroupMemberAssignment(t *testing.T) { amt := &ConsumerGroupMemberAssignment{ Version: 0,