Skip to content

Commit

Permalink
Merge pull request IBM#2214 from Shopify/dnwe/handle-offsets-load-in-…
Browse files Browse the repository at this point in the history
…progress

fix: cope with OffsetsLoadInProgress on Join+Sync
  • Loading branch information
dnwe authored Apr 14, 2022
2 parents 9cf9b0f + 3c1940a commit 30b9203
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 48 deletions.
30 changes: 10 additions & 20 deletions consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,21 +290,16 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler
switch join.Err {
case ErrNoError:
c.memberID = join.MemberId
case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
case ErrUnknownMemberId, ErrIllegalGeneration:
// reset member ID and retry immediately
c.memberID = ""
return c.newSession(ctx, topics, handler, retries)
case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
case ErrNotCoordinatorForConsumer, ErrRebalanceInProgress, ErrOffsetsLoadInProgress:
// retry after backoff
if retries <= 0 {
return nil, join.Err
}

return c.retryNewSession(ctx, topics, handler, retries, true)
case ErrRebalanceInProgress: // retry after backoff
if retries <= 0 {
return nil, join.Err
}

return c.retryNewSession(ctx, topics, handler, retries, false)
default:
return nil, join.Err
}
Expand Down Expand Up @@ -343,21 +338,16 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler

switch groupRequest.Err {
case ErrNoError:
case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
case ErrUnknownMemberId, ErrIllegalGeneration:
// reset member ID and retry immediately
c.memberID = ""
return c.newSession(ctx, topics, handler, retries)
case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
case ErrNotCoordinatorForConsumer, ErrRebalanceInProgress, ErrOffsetsLoadInProgress:
// retry after backoff
if retries <= 0 {
return nil, groupRequest.Err
}

return c.retryNewSession(ctx, topics, handler, retries, true)
case ErrRebalanceInProgress: // retry after backoff
if retries <= 0 {
return nil, groupRequest.Err
}

return c.retryNewSession(ctx, topics, handler, retries, false)
default:
return nil, groupRequest.Err
}
Expand Down Expand Up @@ -564,8 +554,8 @@ func (c *consumerGroup) topicToPartitionNumbers(topics []string) (map[string]int
for _, topic := range topics {
if partitionNum, err := c.client.Partitions(topic); err != nil {
Logger.Printf(
"consumergroup/%s topic %s get partition number failed %v\n",
c.groupID, err)
"consumergroup/%s topic %s get partition number failed due to '%v'\n",
c.groupID, topic, err)
return nil, err
} else {
topicToPartitionNum[topic] = len(partitionNum)
Expand Down
52 changes: 52 additions & 0 deletions consumer_group_example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package sarama

import (
"context"
"fmt"
)

type exampleConsumerGroupHandler struct{}

func (exampleConsumerGroupHandler) Setup(_ ConsumerGroupSession) error { return nil }
func (exampleConsumerGroupHandler) Cleanup(_ ConsumerGroupSession) error { return nil }
func (h exampleConsumerGroupHandler) ConsumeClaim(sess ConsumerGroupSession, claim ConsumerGroupClaim) error {
for msg := range claim.Messages() {
fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
sess.MarkMessage(msg, "")
}
return nil
}

func ExampleConsumerGroup() {
config := NewTestConfig()
config.Version = V2_0_0_0 // specify appropriate version
config.Consumer.Return.Errors = true

group, err := NewConsumerGroup([]string{"localhost:9092"}, "my-group", config)
if err != nil {
panic(err)
}
defer func() { _ = group.Close() }()

// Track errors
go func() {
for err := range group.Errors() {
fmt.Println("ERROR", err)
}
}()

// Iterate over consumer sessions.
ctx := context.Background()
for {
topics := []string{"my-topic"}
handler := exampleConsumerGroupHandler{}

// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
err := group.Consume(ctx, topics, handler)
if err != nil {
panic(err)
}
}
}
99 changes: 71 additions & 28 deletions consumer_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,51 +2,94 @@ package sarama

import (
"context"
"fmt"
"sync"
"testing"
)

type exampleConsumerGroupHandler struct{}
type handler struct {
*testing.T
cancel context.CancelFunc
}

func (exampleConsumerGroupHandler) Setup(_ ConsumerGroupSession) error { return nil }
func (exampleConsumerGroupHandler) Cleanup(_ ConsumerGroupSession) error { return nil }
func (h exampleConsumerGroupHandler) ConsumeClaim(sess ConsumerGroupSession, claim ConsumerGroupClaim) error {
func (h *handler) Setup(s ConsumerGroupSession) error { return nil }
func (h *handler) Cleanup(s ConsumerGroupSession) error { return nil }
func (h *handler) ConsumeClaim(sess ConsumerGroupSession, claim ConsumerGroupClaim) error {
for msg := range claim.Messages() {
fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
sess.MarkMessage(msg, "")
h.Logf("consumed msg %v", msg)
h.cancel()
break
}
return nil
}

func ExampleConsumerGroup() {
// TestConsumerGroupNewSessionDuringOffsetLoad ensures that the consumer group
// will retry Join and Sync group operations, if it receives a temporary
// OffsetsLoadInProgress error response, in the same way as it would for a
// RebalanceInProgress.
func TestConsumerGroupNewSessionDuringOffsetLoad(t *testing.T) {
config := NewTestConfig()
config.Version = V2_0_0_0 // specify appropriate version
config.ClientID = t.Name()
config.Version = V2_0_0_0
config.Consumer.Return.Errors = true
config.Consumer.Group.Rebalance.Retry.Max = 2
config.Consumer.Offsets.AutoCommit.Enable = false

broker0 := NewMockBroker(t, 0)

broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my-topic", 0, broker0.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetOffset("my-topic", 0, OffsetOldest, 0).
SetOffset("my-topic", 0, OffsetNewest, 1),
"FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).
SetCoordinator(CoordinatorGroup, "my-group", broker0),
"HeartbeatRequest": NewMockHeartbeatResponse(t),
"JoinGroupRequest": NewMockSequence(
NewMockJoinGroupResponse(t).SetError(ErrOffsetsLoadInProgress),
NewMockJoinGroupResponse(t),
),
"SyncGroupRequest": NewMockSequence(
NewMockSyncGroupResponse(t).SetError(ErrOffsetsLoadInProgress),
NewMockSyncGroupResponse(t).SetMemberAssignment(
&ConsumerGroupMemberAssignment{
Version: 0,
Topics: map[string][]int32{
"my-topic": {0},
},
}),
),
"OffsetFetchRequest": NewMockOffsetFetchResponse(t).SetOffset(
"my-group", "my-topic", 0, 0, "", ErrNoError,
).SetError(ErrNoError),
"FetchRequest": NewMockSequence(
NewMockFetchResponse(t, 1).
SetMessage("my-topic", 0, 0, StringEncoder("foo")).
SetMessage("my-topic", 0, 1, StringEncoder("bar")),
NewMockFetchResponse(t, 1),
),
})

group, err := NewConsumerGroup([]string{"localhost:9092"}, "my-group", config)
group, err := NewConsumerGroup([]string{broker0.Addr()}, "my-group", config)
if err != nil {
panic(err)
t.Fatal(err)
}
defer func() { _ = group.Close() }()

// Track errors
go func() {
for err := range group.Errors() {
fmt.Println("ERROR", err)
}
}()
ctx, cancel := context.WithCancel(context.Background())
h := &handler{t, cancel}

var wg sync.WaitGroup
wg.Add(1)

// Iterate over consumer sessions.
ctx := context.Background()
for {
go func() {
topics := []string{"my-topic"}
handler := exampleConsumerGroupHandler{}

// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
err := group.Consume(ctx, topics, handler)
if err != nil {
panic(err)
if err := group.Consume(ctx, topics, h); err != nil {
t.Error(err)
}
}
wg.Done()
}()
wg.Wait()
}

0 comments on commit 30b9203

Please sign in to comment.