Skip to content

Commit

Permalink
Merge pull request #2292 from Shopify/dnwe/fix-sync-group-as-leader
Browse files Browse the repository at this point in the history
fix: include assignment-less members in SyncGroup
  • Loading branch information
dnwe authored Jul 22, 2022
2 parents 8a001b4 + 034046e commit 06d1a62
Show file tree
Hide file tree
Showing 7 changed files with 578 additions and 92 deletions.
24 changes: 20 additions & 4 deletions consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,9 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler

// Prepare distribution plan if we joined as the leader
var plan BalanceStrategyPlan
var members map[string]ConsumerGroupMemberMetadata
if join.LeaderId == join.MemberId {
members, err := join.GetMembers()
members, err = join.GetMembers()
if err != nil {
return nil, err
}
Expand All @@ -334,7 +335,7 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler
}

// Sync consumer group
groupRequest, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)
groupRequest, err := c.syncGroupRequest(coordinator, members, plan, join.GenerationId)
if consumerGroupSyncTotal != nil {
consumerGroupSyncTotal.Inc(1)
}
Expand Down Expand Up @@ -426,15 +427,22 @@ func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) (
return coordinator.JoinGroup(req)
}

func (c *consumerGroup) syncGroupRequest(coordinator *Broker, plan BalanceStrategyPlan, generationID int32) (*SyncGroupResponse, error) {
func (c *consumerGroup) syncGroupRequest(
coordinator *Broker,
members map[string]ConsumerGroupMemberMetadata,
plan BalanceStrategyPlan,
generationID int32,
) (*SyncGroupResponse, error) {
req := &SyncGroupRequest{
GroupId: c.groupID,
MemberId: c.memberID,
GenerationId: generationID,
}
strategy := c.config.Consumer.Group.Rebalance.Strategy
if c.groupInstanceId != nil {
if c.config.Version.IsAtLeast(V2_3_0_0) {
req.Version = 3
}
if c.groupInstanceId != nil {
req.GroupInstanceId = c.groupInstanceId
}
for memberID, topics := range plan {
Expand All @@ -447,7 +455,15 @@ func (c *consumerGroup) syncGroupRequest(coordinator *Broker, plan BalanceStrate
if err := req.AddGroupAssignmentMember(memberID, assignment); err != nil {
return nil, err
}
delete(members, memberID)
}
// add empty assignments for any remaining members
for memberID := range members {
if err := req.AddGroupAssignmentMember(memberID, &ConsumerGroupMemberAssignment{}); err != nil {
return nil, err
}
}

return coordinator.SyncGroup(req)
}

Expand Down
4 changes: 2 additions & 2 deletions examples/consumergroup/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ module github.com/Shopify/sarama/examples/consumer

go 1.16

replace github.com/Shopify/sarama => ../../
require github.com/Shopify/sarama v1.34.1

require github.com/Shopify/sarama v1.27.0
replace github.com/Shopify/sarama => ../../
486 changes: 458 additions & 28 deletions examples/consumergroup/go.sum

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func testVersionDecodable(t *testing.T, name string, out versionedDecoder, in []
}

func testRequest(t *testing.T, name string, rb protocolBody, expected []byte) {
t.Helper()
if !rb.requiredVersion().IsAtLeast(MinVersion) {
t.Errorf("Request %s has invalid required version", name)
}
Expand Down Expand Up @@ -74,6 +75,7 @@ func testRequestEncode(t *testing.T, name string, rb protocolBody, expected []by
}

func testRequestDecode(t *testing.T, name string, rb protocolBody, packet []byte) {
t.Helper()
decoded, n, err := decodeRequest(bytes.NewReader(packet))
if err != nil {
t.Error("Failed to decode request", err)
Expand Down
135 changes: 82 additions & 53 deletions sync_group_request.go
Original file line number Diff line number Diff line change
@@ -1,84 +1,111 @@
package sarama

type SyncGroupRequestAssignment struct {
// MemberId contains the ID of the member to assign.
MemberId string
// Assignment contains the member assignment.
Assignment []byte
}

func (a *SyncGroupRequestAssignment) encode(pe packetEncoder, version int16) (err error) {
if err := pe.putString(a.MemberId); err != nil {
return err
}

if err := pe.putBytes(a.Assignment); err != nil {
return err
}

return nil
}

func (a *SyncGroupRequestAssignment) decode(pd packetDecoder, version int16) (err error) {
if a.MemberId, err = pd.getString(); err != nil {
return err
}

if a.Assignment, err = pd.getBytes(); err != nil {
return err
}

return nil
}

type SyncGroupRequest struct {
Version int16
GroupId string
GenerationId int32
MemberId string
GroupInstanceId *string
GroupAssignments map[string][]byte
// Version defines the protocol version to use for encode and decode
Version int16
// GroupId contains the unique group identifier.
GroupId string
// GenerationId contains the generation of the group.
GenerationId int32
// MemberId contains the member ID assigned by the group.
MemberId string
// GroupInstanceId contains the unique identifier of the consumer instance provided by end user.
GroupInstanceId *string
// GroupAssignments contains each assignment.
GroupAssignments []SyncGroupRequestAssignment
}

func (r *SyncGroupRequest) encode(pe packetEncoder) error {
if err := pe.putString(r.GroupId); err != nil {
func (s *SyncGroupRequest) encode(pe packetEncoder) (err error) {
if err := pe.putString(s.GroupId); err != nil {
return err
}

pe.putInt32(r.GenerationId)
pe.putInt32(s.GenerationId)

if err := pe.putString(r.MemberId); err != nil {
if err := pe.putString(s.MemberId); err != nil {
return err
}

if r.Version >= 3 {
if err := pe.putNullableString(r.GroupInstanceId); err != nil {
if s.Version >= 3 {
if err := pe.putNullableString(s.GroupInstanceId); err != nil {
return err
}
}

if err := pe.putArrayLength(len(r.GroupAssignments)); err != nil {
if err := pe.putArrayLength(len(s.GroupAssignments)); err != nil {
return err
}
for memberId, memberAssignment := range r.GroupAssignments {
if err := pe.putString(memberId); err != nil {
return err
}
if err := pe.putBytes(memberAssignment); err != nil {
for _, block := range s.GroupAssignments {
if err := block.encode(pe, s.Version); err != nil {
return err
}
}

return nil
}

func (r *SyncGroupRequest) decode(pd packetDecoder, version int16) (err error) {
r.Version = version

if r.GroupId, err = pd.getString(); err != nil {
return
}
if r.GenerationId, err = pd.getInt32(); err != nil {
return
}
if r.MemberId, err = pd.getString(); err != nil {
return
}
if r.Version >= 3 {
if r.GroupInstanceId, err = pd.getNullableString(); err != nil {
return
}
func (s *SyncGroupRequest) decode(pd packetDecoder, version int16) (err error) {
s.Version = version
if s.GroupId, err = pd.getString(); err != nil {
return err
}

n, err := pd.getArrayLength()
if err != nil {
if s.GenerationId, err = pd.getInt32(); err != nil {
return err
}
if n == 0 {
return nil

if s.MemberId, err = pd.getString(); err != nil {
return err
}

r.GroupAssignments = make(map[string][]byte)
for i := 0; i < n; i++ {
memberId, err := pd.getString()
if err != nil {
return err
}
memberAssignment, err := pd.getBytes()
if err != nil {
if s.Version >= 3 {
if s.GroupInstanceId, err = pd.getNullableString(); err != nil {
return err
}
}

r.GroupAssignments[memberId] = memberAssignment
if numAssignments, err := pd.getArrayLength(); err != nil {
return err
} else if numAssignments > 0 {
s.GroupAssignments = make([]SyncGroupRequestAssignment, numAssignments)
for i := 0; i < numAssignments; i++ {
var block SyncGroupRequestAssignment
if err := block.decode(pd, s.Version); err != nil {
return err
}
s.GroupAssignments[i] = block
}
}

return nil
Expand All @@ -105,14 +132,16 @@ func (r *SyncGroupRequest) requiredVersion() KafkaVersion {
}

func (r *SyncGroupRequest) AddGroupAssignment(memberId string, memberAssignment []byte) {
if r.GroupAssignments == nil {
r.GroupAssignments = make(map[string][]byte)
}

r.GroupAssignments[memberId] = memberAssignment
r.GroupAssignments = append(r.GroupAssignments, SyncGroupRequestAssignment{
MemberId: memberId,
Assignment: memberAssignment,
})
}

func (r *SyncGroupRequest) AddGroupAssignmentMember(memberId string, memberAssignment *ConsumerGroupMemberAssignment) error {
func (r *SyncGroupRequest) AddGroupAssignmentMember(
memberId string,
memberAssignment *ConsumerGroupMemberAssignment,
) error {
bin, err := encode(memberAssignment, nil)
if err != nil {
return err
Expand Down
7 changes: 5 additions & 2 deletions sync_group_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,11 @@ func TestSyncGroupRequestV3AndPlus(t *testing.T) {
GenerationId: 0x00010203,
MemberId: "baz",
GroupInstanceId: &groupInstanceId,
GroupAssignments: map[string][]byte{
"baz": []byte("foo"),
GroupAssignments: []SyncGroupRequestAssignment{
{
MemberId: "baz",
Assignment: []byte("foo"),
},
},
},
},
Expand Down
12 changes: 9 additions & 3 deletions sync_group_response.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package sarama

type SyncGroupResponse struct {
Version int16
ThrottleTime int32
Err KError
// Version defines the protocol version to use for encode and decode
Version int16
// ThrottleTimeMs contains the duration in milliseconds for which the
// request was throttled due to a quota violation, or zero if the request
// did not violate any quota.
ThrottleTime int32
// Err contains the error code, or 0 if there was no error.
Err KError
// MemberAssignment contains the member assignment.
MemberAssignment []byte
}

Expand Down

0 comments on commit 06d1a62

Please sign in to comment.