Skip to content

Commit

Permalink
Merge pull request #2558 from IBM/dnwe/proto
Browse files Browse the repository at this point in the history
fix(proto): use fuller ranges of supported proto
  • Loading branch information
dnwe committed Aug 4, 2023
2 parents cf96776 + e4bf4df commit d8d9e73
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 33 deletions.
23 changes: 21 additions & 2 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -987,9 +987,28 @@ func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions m
partitions: topicPartitions,
}

if ca.conf.Version.IsAtLeast(V0_10_2_0) {
if ca.conf.Version.IsAtLeast(V2_5_0_0) {
// Version 7 is adding the require stable flag.
request.Version = 7
} else if ca.conf.Version.IsAtLeast(V2_4_0_0) {
// Version 6 is the first flexible version.
request.Version = 6
} else if ca.conf.Version.IsAtLeast(V2_1_0_0) {
// Version 3, 4, and 5 are the same as version 2.
request.Version = 5
} else if ca.conf.Version.IsAtLeast(V2_0_0_0) {
request.Version = 4
} else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
request.Version = 3
} else if ca.conf.Version.IsAtLeast(V0_10_2_0) {
// Starting in version 2, the request can contain a null topics array to indicate that offsets
// for all topics should be fetched. It also returns a top level error code
// for group or coordinator level errors.
request.Version = 2
} else if ca.conf.Version.IsAtLeast(V0_8_2_2) {
} else if ca.conf.Version.IsAtLeast(V0_8_2_0) {
// In version 0, the request read offsets from ZK.
//
// Starting in version 1, the broker supports fetching offsets from the internal __consumer_offsets topic.
request.Version = 1
}

Expand Down
5 changes: 5 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1197,9 +1197,14 @@ func (client *client) findCoordinator(coordinatorKey string, coordinatorType Coo
request.CoordinatorKey = coordinatorKey
request.CoordinatorType = coordinatorType

// Version 1 adds KeyType.
if client.conf.Version.IsAtLeast(V0_11_0_0) {
request.Version = 1
}
// Version 2 is the same as version 1.
if client.conf.Version.IsAtLeast(V2_0_0_0) {
request.Version = 2
}

response, err := broker.FindCoordinator(request)
if err != nil {
Expand Down
11 changes: 10 additions & 1 deletion consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,16 @@ func (c *consumerGroup) heartbeatRequest(coordinator *Broker, memberID string, g
MemberId: memberID,
GenerationId: generationID,
}
if c.groupInstanceId != nil {

// Version 1 and version 2 are the same as version 0.
if c.config.Version.IsAtLeast(V0_11_0_0) {
req.Version = 1
}
if c.config.Version.IsAtLeast(V2_0_0_0) {
req.Version = 2
}
// Starting from version 3, we add a new field called groupInstanceId to indicate member identity across restarts.
if c.config.Version.IsAtLeast(V2_3_0_0) {
req.Version = 3
req.GroupInstanceId = c.groupInstanceId
}
Expand Down
4 changes: 3 additions & 1 deletion heartbeat_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ func (r *HeartbeatRequest) requiredVersion() KafkaVersion {
return V2_0_0_0
case 1:
return V0_11_0_0
case 0:
return V0_8_2_0
default:
return V0_9_0_0
return V2_3_0_0
}
}
4 changes: 3 additions & 1 deletion heartbeat_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ func (r *HeartbeatResponse) requiredVersion() KafkaVersion {
return V2_0_0_0
case 1:
return V0_11_0_0
case 0:
return V0_8_2_0
default:
return V0_9_0_0
return V2_3_0_0
}
}

Expand Down
4 changes: 3 additions & 1 deletion leave_group_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ func (r *LeaveGroupRequest) requiredVersion() KafkaVersion {
return V2_0_0_0
case 1:
return V0_11_0_0
default:
case 0:
return V0_9_0_0
default:
return V2_4_0_0
}
}
4 changes: 3 additions & 1 deletion leave_group_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,10 @@ func (r *LeaveGroupResponse) requiredVersion() KafkaVersion {
return V2_0_0_0
case 1:
return V0_11_0_0
default:
case 0:
return V0_9_0_0
default:
return V2_4_0_0
}
}

Expand Down
28 changes: 15 additions & 13 deletions offset_fetch_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,22 +177,24 @@ func (r *OffsetFetchRequest) isValidVersion() bool {

func (r *OffsetFetchRequest) requiredVersion() KafkaVersion {
switch r.Version {
case 1:
return V0_8_2_0
case 2:
return V0_10_2_0
case 3:
return V0_11_0_0
case 4:
return V2_0_0_0
case 5:
return V2_1_0_0
case 6:
return V2_4_0_0
case 7:
return V2_5_0_0
case 6:
return V2_4_0_0
case 5:
return V2_1_0_0
case 4:
return V2_0_0_0
case 3:
return V0_11_0_0
case 2:
return V0_10_2_0
case 1:
return V0_8_2_0
case 0:
return V0_8_2_0
default:
return MinVersion
return V2_5_0_0
}
}

Expand Down
28 changes: 15 additions & 13 deletions offset_fetch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,22 +242,24 @@ func (r *OffsetFetchResponse) isValidVersion() bool {

func (r *OffsetFetchResponse) requiredVersion() KafkaVersion {
switch r.Version {
case 1:
return V0_8_2_0
case 2:
return V0_10_2_0
case 3:
return V0_11_0_0
case 4:
return V2_0_0_0
case 5:
return V2_1_0_0
case 6:
return V2_4_0_0
case 7:
return V2_5_0_0
case 6:
return V2_4_0_0
case 5:
return V2_1_0_0
case 4:
return V2_0_0_0
case 3:
return V0_11_0_0
case 2:
return V0_10_2_0
case 1:
return V0_8_2_0
case 0:
return V0_8_2_0
default:
return MinVersion
return V2_5_0_0
}
}

Expand Down

0 comments on commit d8d9e73

Please sign in to comment.