From 1aed568869ba2e917a13e5ddad7edb2a9c3325fd Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Fri, 4 Aug 2023 12:28:10 +0100 Subject: [PATCH] fix(proto): use full range of SyncGroupRequest Signed-off-by: Dominic Evans --- consumer_group.go | 11 +++++++++-- sync_group_request.go | 4 +++- sync_group_response.go | 4 +++- 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/consumer_group.go b/consumer_group.go index de119d520f..a0a1e1a41b 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -507,12 +507,19 @@ func (c *consumerGroup) syncGroupRequest( GenerationId: generationID, } + // Versions 1 and 2 are the same as version 0. + if c.config.Version.IsAtLeast(V0_11_0_0) { + req.Version = 1 + } + if c.config.Version.IsAtLeast(V2_0_0_0) { + req.Version = 2 + } + // Starting from version 3, we add a new field called groupInstanceId to indicate member identity across restarts. if c.config.Version.IsAtLeast(V2_3_0_0) { req.Version = 3 - } - if c.groupInstanceId != nil { req.GroupInstanceId = c.groupInstanceId } + for memberID, topics := range plan { assignment := &ConsumerGroupMemberAssignment{Topics: topics} userDataBytes, err := strategy.AssignmentData(memberID, topics, generationID) diff --git a/sync_group_request.go b/sync_group_request.go index 581b1ea29c..95efc28580 100644 --- a/sync_group_request.go +++ b/sync_group_request.go @@ -135,8 +135,10 @@ func (r *SyncGroupRequest) requiredVersion() KafkaVersion { return V2_0_0_0 case 1: return V0_11_0_0 - default: + case 0: return V0_9_0_0 + default: + return V2_3_0_0 } } diff --git a/sync_group_response.go b/sync_group_response.go index ca47f790d2..f7da15b4f1 100644 --- a/sync_group_response.go +++ b/sync_group_response.go @@ -71,8 +71,10 @@ func (r *SyncGroupResponse) requiredVersion() KafkaVersion { return V2_0_0_0 case 1: return V0_11_0_0 - default: + case 0: return V0_9_0_0 + default: + return V2_3_0_0 } }