diff --git a/admin.go b/admin.go index 7dabd3737..0ddd031cf 100644 --- a/admin.go +++ b/admin.go @@ -1018,35 +1018,7 @@ func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions m return nil, err } - request := &OffsetFetchRequest{ - ConsumerGroup: group, - partitions: topicPartitions, - } - - if ca.conf.Version.IsAtLeast(V2_5_0_0) { - // Version 7 is adding the require stable flag. - request.Version = 7 - } else if ca.conf.Version.IsAtLeast(V2_4_0_0) { - // Version 6 is the first flexible version. - request.Version = 6 - } else if ca.conf.Version.IsAtLeast(V2_1_0_0) { - // Version 3, 4, and 5 are the same as version 2. - request.Version = 5 - } else if ca.conf.Version.IsAtLeast(V2_0_0_0) { - request.Version = 4 - } else if ca.conf.Version.IsAtLeast(V0_11_0_0) { - request.Version = 3 - } else if ca.conf.Version.IsAtLeast(V0_10_2_0) { - // Starting in version 2, the request can contain a null topics array to indicate that offsets - // for all topics should be fetched. It also returns a top level error code - // for group or coordinator level errors. - request.Version = 2 - } else if ca.conf.Version.IsAtLeast(V0_8_2_0) { - // In version 0, the request read offsets from ZK. - // - // Starting in version 1, the broker supports fetching offsets from the internal __consumer_offsets topic. - request.Version = 1 - } + request := NewOffsetFetchRequest(ca.conf.Version, group, topicPartitions) return coordinator.FetchOffset(request) } diff --git a/offset_fetch_request.go b/offset_fetch_request.go index c37ae73e3..0c9b8405b 100644 --- a/offset_fetch_request.go +++ b/offset_fetch_request.go @@ -7,6 +7,43 @@ type OffsetFetchRequest struct { partitions map[string][]int32 } +func NewOffsetFetchRequest( + version KafkaVersion, + group string, + partitions map[string][]int32, +) *OffsetFetchRequest { + request := &OffsetFetchRequest{ + ConsumerGroup: group, + partitions: partitions, + } + if version.IsAtLeast(V2_5_0_0) { + // Version 7 is adding the require stable flag. + request.Version = 7 + } else if version.IsAtLeast(V2_4_0_0) { + // Version 6 is the first flexible version. + request.Version = 6 + } else if version.IsAtLeast(V2_1_0_0) { + // Version 3, 4, and 5 are the same as version 2. + request.Version = 5 + } else if version.IsAtLeast(V2_0_0_0) { + request.Version = 4 + } else if version.IsAtLeast(V0_11_0_0) { + request.Version = 3 + } else if version.IsAtLeast(V0_10_2_0) { + // Starting in version 2, the request can contain a null topics array to indicate that offsets + // for all topics should be fetched. It also returns a top level error code + // for group or coordinator level errors. + request.Version = 2 + } else if version.IsAtLeast(V0_8_2_0) { + // In version 0, the request read offsets from ZK. + // + // Starting in version 1, the broker supports fetching offsets from the internal __consumer_offsets topic. + request.Version = 1 + } + + return request +} + func (r *OffsetFetchRequest) encode(pe packetEncoder) (err error) { if r.Version < 0 || r.Version > 7 { return PacketEncodingError{"invalid or unsupported OffsetFetchRequest version field"} diff --git a/offset_fetch_response.go b/offset_fetch_response.go index b412b25f1..7ce7927d8 100644 --- a/offset_fetch_response.go +++ b/offset_fetch_response.go @@ -22,6 +22,8 @@ func (b *OffsetFetchResponseBlock) decode(pd packetDecoder, version int16) (err if err != nil { return err } + } else { + b.LeaderEpoch = -1 } if isFlexible { diff --git a/offset_fetch_response_test.go b/offset_fetch_response_test.go index d70894ab2..0676b0fc4 100644 --- a/offset_fetch_response_test.go +++ b/offset_fetch_response_test.go @@ -44,19 +44,19 @@ func TestNormalOffsetFetchResponse(t *testing.T) { for version := 0; version <= 1; version++ { response := OffsetFetchResponse{Version: int16(version)} - response.AddBlock("t", 0, &OffsetFetchResponseBlock{0, 0, "md", ErrRequestTimedOut}) + response.AddBlock("t", 0, &OffsetFetchResponseBlock{0, -1, "md", ErrRequestTimedOut}) response.Blocks["m"] = nil testResponse(t, fmt.Sprintf("Normal v%d", version), &response, nil) } responseV2 := OffsetFetchResponse{Version: 2, Err: ErrInvalidRequest} - responseV2.AddBlock("t", 0, &OffsetFetchResponseBlock{0, 0, "md", ErrRequestTimedOut}) + responseV2.AddBlock("t", 0, &OffsetFetchResponseBlock{0, -1, "md", ErrRequestTimedOut}) responseV2.Blocks["m"] = nil testResponse(t, "normal V2", &responseV2, nil) for version := 3; version <= 4; version++ { responseV3 := OffsetFetchResponse{Version: int16(version), Err: ErrInvalidRequest, ThrottleTimeMs: 9} - responseV3.AddBlock("t", 0, &OffsetFetchResponseBlock{0, 0, "md", ErrRequestTimedOut}) + responseV3.AddBlock("t", 0, &OffsetFetchResponseBlock{0, -1, "md", ErrRequestTimedOut}) responseV3.Blocks["m"] = nil testResponse(t, fmt.Sprintf("Normal v%d", version), &responseV3, nil) } diff --git a/offset_manager.go b/offset_manager.go index bc5beaf50..1bf545908 100644 --- a/offset_manager.go +++ b/offset_manager.go @@ -153,11 +153,8 @@ func (om *offsetManager) fetchInitialOffset(topic string, partition int32, retri return om.fetchInitialOffset(topic, partition, retries-1) } - req := new(OffsetFetchRequest) - req.Version = 1 - req.ConsumerGroup = om.group - req.AddPartition(topic, partition) - + partitions := map[string][]int32{topic: {partition}} + req := NewOffsetFetchRequest(om.conf.Version, om.group, partitions) resp, err := broker.FetchOffset(req) if err != nil { if retries <= 0 {