Skip to content

Commit

Permalink
fix(proto): use full range of SyncGroupRequest
Browse files Browse the repository at this point in the history
Signed-off-by: Dominic Evans <dominic.evans@uk.ibm.com>
  • Loading branch information
dnwe committed Aug 7, 2023
1 parent 29487f1 commit 1aed568
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 4 deletions.
11 changes: 9 additions & 2 deletions consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,12 +507,19 @@ func (c *consumerGroup) syncGroupRequest(
GenerationId: generationID,
}

// Versions 1 and 2 are the same as version 0.
if c.config.Version.IsAtLeast(V0_11_0_0) {
req.Version = 1
}
if c.config.Version.IsAtLeast(V2_0_0_0) {
req.Version = 2
}
// Starting from version 3, we add a new field called groupInstanceId to indicate member identity across restarts.
if c.config.Version.IsAtLeast(V2_3_0_0) {
req.Version = 3
}
if c.groupInstanceId != nil {
req.GroupInstanceId = c.groupInstanceId
}

for memberID, topics := range plan {
assignment := &ConsumerGroupMemberAssignment{Topics: topics}
userDataBytes, err := strategy.AssignmentData(memberID, topics, generationID)
Expand Down
4 changes: 3 additions & 1 deletion sync_group_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,10 @@ func (r *SyncGroupRequest) requiredVersion() KafkaVersion {
return V2_0_0_0
case 1:
return V0_11_0_0
default:
case 0:
return V0_9_0_0
default:
return V2_3_0_0
}
}

Expand Down
4 changes: 3 additions & 1 deletion sync_group_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,10 @@ func (r *SyncGroupResponse) requiredVersion() KafkaVersion {
return V2_0_0_0
case 1:
return V0_11_0_0
default:
case 0:
return V0_9_0_0
default:
return V2_3_0_0
}
}

Expand Down

0 comments on commit 1aed568

Please sign in to comment.