Skip to content

Commit

Permalink
Merge pull request #9 from remicalixte/dm/v1.30.1-patched
Browse files Browse the repository at this point in the history
fix(proto): handle V3 member metadata and empty owned partitions
  • Loading branch information
remicalixte authored Feb 2, 2024
2 parents c65f63a + b370073 commit 7e11e54
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 13 deletions.
10 changes: 5 additions & 5 deletions consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,15 +165,15 @@ func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler Co

// Refresh metadata for requested topics
if err := c.client.RefreshMetadata(topics...); err != nil {
return err
return fmt.Errorf("failed to resfresh topic metadata: %w", err)
}

// Init session
sess, err := c.newSession(ctx, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
if err == ErrClosedClient {
return ErrClosedConsumerGroup
} else if err != nil {
return err
return fmt.Errorf("failed to init new session: %w", err)
}

// loop check topic partition numbers changed
Expand Down Expand Up @@ -209,7 +209,7 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler
coordinator, err := c.client.Coordinator(c.groupID)
if err != nil {
if retries <= 0 {
return nil, err
return nil, fmt.Errorf("failed to get coordinator: %w", err)
}

return c.retryNewSession(ctx, topics, handler, retries, true)
Expand Down Expand Up @@ -240,7 +240,7 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler
if consumerGroupJoinFailed != nil {
consumerGroupJoinFailed.Inc(1)
}
return nil, err
return nil, fmt.Errorf("failed to join consumer group: %w", err)
}
if join.Err != ErrNoError {
if consumerGroupJoinFailed != nil {
Expand Down Expand Up @@ -285,7 +285,7 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler
if join.LeaderId == join.MemberId {
members, err := join.GetMembers()
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to get group members: %w", err)
}

plan, err = c.balance(strategy, members)
Expand Down
60 changes: 52 additions & 8 deletions consumer_group_members.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ type ConsumerGroupMemberMetadata struct {
Topics []string
UserData []byte
OwnedPartitions []*OwnedPartition
GenerationID int32
RackID *string
}

func (m *ConsumerGroupMemberMetadata) encode(pe packetEncoder) error {
Expand All @@ -20,6 +22,27 @@ func (m *ConsumerGroupMemberMetadata) encode(pe packetEncoder) error {
return err
}

if m.Version >= 1 {
if err := pe.putArrayLength(len(m.OwnedPartitions)); err != nil {
return err
}
for _, op := range m.OwnedPartitions {
if err := op.encode(pe); err != nil {
return err
}
}
}

if m.Version >= 2 {
pe.putInt32(m.GenerationID)
}

if m.Version >= 3 {
if err := pe.putNullableString(m.RackID); err != nil {
return err
}
}

return nil
}

Expand All @@ -46,18 +69,29 @@ func (m *ConsumerGroupMemberMetadata) decode(pd packetDecoder) (err error) {
}
return err
}
if n == 0 {
return nil
}
m.OwnedPartitions = make([]*OwnedPartition, n)
for i := 0; i < n; i++ {
m.OwnedPartitions[i] = &OwnedPartition{}
if err := m.OwnedPartitions[i].decode(pd); err != nil {
return err
if n > 0 {
m.OwnedPartitions = make([]*OwnedPartition, n)
for i := 0; i < n; i++ {
m.OwnedPartitions[i] = &OwnedPartition{}
if err := m.OwnedPartitions[i].decode(pd); err != nil {
return err
}
}
}
}

if m.Version >= 2 {
if m.GenerationID, err = pd.getInt32(); err != nil {
return err
}
}

if m.Version >= 3 {
if m.RackID, err = pd.getNullableString(); err != nil {
return err
}
}

return nil
}

Expand All @@ -66,6 +100,16 @@ type OwnedPartition struct {
Partitions []int32
}

func (m *OwnedPartition) encode(pe packetEncoder) error {
if err := pe.putString(m.Topic); err != nil {
return err
}
if err := pe.putInt32Array(m.Partitions); err != nil {
return err
}
return nil
}

func (m *OwnedPartition) decode(pd packetDecoder) (err error) {
if m.Topic, err = pd.getString(); err != nil {
return err
Expand Down
17 changes: 17 additions & 0 deletions consumer_group_members_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ var (
0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
0, 0, 0, 0, // OwnedPartitions KIP-429
}

groupMemberMetadataV3NilOwned = []byte{
0, 3, // Version
0, 0, 0, 1, // Topic array length
0, 3, 'o', 'n', 'e', // Topic one
0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
0, 0, 0, 0, // OwnedPartitions KIP-429
0, 0, 0, 64, // GenerationID
0, 4, 'r', 'a', 'c', 'k', // RackID
}
)

func TestConsumerGroupMemberMetadata(t *testing.T) {
Expand Down Expand Up @@ -77,6 +87,13 @@ func TestConsumerGroupMemberMetadataV1Decode(t *testing.T) {
}
}

func TestConsumerGroupMemberMetadataV3Decode(t *testing.T) {
meta := new(ConsumerGroupMemberMetadata)
if err := decode(groupMemberMetadataV3NilOwned, meta); err != nil {
t.Error("Failed to decode V3 data", err)
}
}

func TestConsumerGroupMemberAssignment(t *testing.T) {
amt := &ConsumerGroupMemberAssignment{
Version: 0,
Expand Down

0 comments on commit 7e11e54

Please sign in to comment.