Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: make fetchInitialOffset use correct protocol #2705

Merged
merged 1 commit into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 1 addition & 29 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -1018,35 +1018,7 @@ func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions m
return nil, err
}

request := &OffsetFetchRequest{
ConsumerGroup: group,
partitions: topicPartitions,
}

if ca.conf.Version.IsAtLeast(V2_5_0_0) {
// Version 7 is adding the require stable flag.
request.Version = 7
} else if ca.conf.Version.IsAtLeast(V2_4_0_0) {
// Version 6 is the first flexible version.
request.Version = 6
} else if ca.conf.Version.IsAtLeast(V2_1_0_0) {
// Version 3, 4, and 5 are the same as version 2.
request.Version = 5
} else if ca.conf.Version.IsAtLeast(V2_0_0_0) {
request.Version = 4
} else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
request.Version = 3
} else if ca.conf.Version.IsAtLeast(V0_10_2_0) {
// Starting in version 2, the request can contain a null topics array to indicate that offsets
// for all topics should be fetched. It also returns a top level error code
// for group or coordinator level errors.
request.Version = 2
} else if ca.conf.Version.IsAtLeast(V0_8_2_0) {
// In version 0, the request read offsets from ZK.
//
// Starting in version 1, the broker supports fetching offsets from the internal __consumer_offsets topic.
request.Version = 1
}
request := NewOffsetFetchRequest(ca.conf.Version, group, topicPartitions)

return coordinator.FetchOffset(request)
}
Expand Down
37 changes: 37 additions & 0 deletions offset_fetch_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,43 @@ type OffsetFetchRequest struct {
partitions map[string][]int32
}

func NewOffsetFetchRequest(
version KafkaVersion,
group string,
partitions map[string][]int32,
) *OffsetFetchRequest {
request := &OffsetFetchRequest{
ConsumerGroup: group,
partitions: partitions,
}
if version.IsAtLeast(V2_5_0_0) {
// Version 7 is adding the require stable flag.
request.Version = 7
} else if version.IsAtLeast(V2_4_0_0) {
// Version 6 is the first flexible version.
request.Version = 6
} else if version.IsAtLeast(V2_1_0_0) {
// Version 3, 4, and 5 are the same as version 2.
request.Version = 5
} else if version.IsAtLeast(V2_0_0_0) {
request.Version = 4
} else if version.IsAtLeast(V0_11_0_0) {
request.Version = 3
} else if version.IsAtLeast(V0_10_2_0) {
// Starting in version 2, the request can contain a null topics array to indicate that offsets
// for all topics should be fetched. It also returns a top level error code
// for group or coordinator level errors.
request.Version = 2
} else if version.IsAtLeast(V0_8_2_0) {
// In version 0, the request read offsets from ZK.
//
// Starting in version 1, the broker supports fetching offsets from the internal __consumer_offsets topic.
request.Version = 1
}

return request
}

func (r *OffsetFetchRequest) encode(pe packetEncoder) (err error) {
if r.Version < 0 || r.Version > 7 {
return PacketEncodingError{"invalid or unsupported OffsetFetchRequest version field"}
Expand Down
2 changes: 2 additions & 0 deletions offset_fetch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ func (b *OffsetFetchResponseBlock) decode(pd packetDecoder, version int16) (err
if err != nil {
return err
}
} else {
b.LeaderEpoch = -1
}

if isFlexible {
Expand Down
6 changes: 3 additions & 3 deletions offset_fetch_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,19 @@ func TestNormalOffsetFetchResponse(t *testing.T) {

for version := 0; version <= 1; version++ {
response := OffsetFetchResponse{Version: int16(version)}
response.AddBlock("t", 0, &OffsetFetchResponseBlock{0, 0, "md", ErrRequestTimedOut})
response.AddBlock("t", 0, &OffsetFetchResponseBlock{0, -1, "md", ErrRequestTimedOut})
response.Blocks["m"] = nil
testResponse(t, fmt.Sprintf("Normal v%d", version), &response, nil)
}

responseV2 := OffsetFetchResponse{Version: 2, Err: ErrInvalidRequest}
responseV2.AddBlock("t", 0, &OffsetFetchResponseBlock{0, 0, "md", ErrRequestTimedOut})
responseV2.AddBlock("t", 0, &OffsetFetchResponseBlock{0, -1, "md", ErrRequestTimedOut})
responseV2.Blocks["m"] = nil
testResponse(t, "normal V2", &responseV2, nil)

for version := 3; version <= 4; version++ {
responseV3 := OffsetFetchResponse{Version: int16(version), Err: ErrInvalidRequest, ThrottleTimeMs: 9}
responseV3.AddBlock("t", 0, &OffsetFetchResponseBlock{0, 0, "md", ErrRequestTimedOut})
responseV3.AddBlock("t", 0, &OffsetFetchResponseBlock{0, -1, "md", ErrRequestTimedOut})
responseV3.Blocks["m"] = nil
testResponse(t, fmt.Sprintf("Normal v%d", version), &responseV3, nil)
}
Expand Down
7 changes: 2 additions & 5 deletions offset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,8 @@ func (om *offsetManager) fetchInitialOffset(topic string, partition int32, retri
return om.fetchInitialOffset(topic, partition, retries-1)
}

req := new(OffsetFetchRequest)
req.Version = 1
req.ConsumerGroup = om.group
req.AddPartition(topic, partition)

partitions := map[string][]int32{topic: {partition}}
req := NewOffsetFetchRequest(om.conf.Version, om.group, partitions)
resp, err := broker.FetchOffset(req)
if err != nil {
if retries <= 0 {
Expand Down
Loading