Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

offsetfetch request topics are now nullable #1162

Merged
merged 13 commits into from
Jul 18, 2023
29 changes: 19 additions & 10 deletions offsetfetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,28 @@ type OffsetFetchPartition struct {
// OffsetFetch sends an offset fetch request to a kafka broker and returns the
// response.
func (c *Client) OffsetFetch(ctx context.Context, req *OffsetFetchRequest) (*OffsetFetchResponse, error) {
topics := make([]offsetfetch.RequestTopic, 0, len(req.Topics))

for topicName, partitions := range req.Topics {
indexes := make([]int32, len(partitions))
// Kafka version 0.10.2.x and above allow null Topics map for OffsetFetch API
// which will return the result for all topics with the desired consumer group:
// https://kafka.apache.org/0102/protocol.html#The_Messages_OffsetFetch
// For Kafka version below 0.10.2.x this call will result in an error
var topics []offsetfetch.RequestTopic

for i, p := range partitions {
indexes[i] = int32(p)
}
if len(req.Topics) > 0 {
petedannemann marked this conversation as resolved.
Show resolved Hide resolved
topics = make([]offsetfetch.RequestTopic, 0, len(req.Topics))

for topicName, partitions := range req.Topics {
indexes := make([]int32, len(partitions))

topics = append(topics, offsetfetch.RequestTopic{
Name: topicName,
PartitionIndexes: indexes,
})
for i, p := range partitions {
indexes[i] = int32(p)
}

topics = append(topics, offsetfetch.RequestTopic{
Name: topicName,
PartitionIndexes: indexes,
})
}
}

m, err := c.roundTrip(ctx, req.Addr, &offsetfetch.Request{
Expand Down
122 changes: 122 additions & 0 deletions offsetfetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ package kafka
import (
"bufio"
"bytes"
"context"
"reflect"
"testing"
"time"

ktesting "github.com/segmentio/kafka-go/testing"
)

func TestOffsetFetchResponseV1(t *testing.T) {
Expand Down Expand Up @@ -43,3 +47,121 @@ func TestOffsetFetchResponseV1(t *testing.T) {
t.FailNow()
}
}

func TestOffsetFetchRequestWithNoTopic(t *testing.T) {
if !ktesting.KafkaIsAtLeast("0.10.2.0") {
t.Logf("Test %s is not applicable for kafka versions below 0.10.2.0", t.Name())
t.SkipNow()
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
topic1 := makeTopic()
defer deleteTopic(t, topic1)
topic2 := makeTopic()
defer deleteTopic(t, topic2)
consumeGroup := makeGroupID()
numMsgs := 50
defer cancel()
r1 := NewReader(ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: topic1,
GroupID: consumeGroup,
MinBytes: 1,
MaxBytes: 100,
MaxWait: 100 * time.Millisecond,
})
defer r1.Close()
prepareReader(t, ctx, r1, makeTestSequence(numMsgs)...)
r2 := NewReader(ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: topic2,
GroupID: consumeGroup,
MinBytes: 1,
MaxBytes: 100,
MaxWait: 100 * time.Millisecond,
})
defer r2.Close()
prepareReader(t, ctx, r2, makeTestSequence(numMsgs)...)

for i := 0; i < numMsgs; i++ {
if _, err := r1.ReadMessage(ctx); err != nil {
t.Fatal(err)
}
}
for i := 0; i < numMsgs; i++ {
if _, err := r2.ReadMessage(ctx); err != nil {
t.Fatal(err)
}
}

client := Client{Addr: TCP("localhost:9092")}

topicOffsets, err := client.OffsetFetch(ctx, &OffsetFetchRequest{GroupID: consumeGroup})

if err != nil {
t.Error(err)
t.FailNow()
}

if len(topicOffsets.Topics) != 2 {
t.Error(err)
t.FailNow()
}

}

func TestOffsetFetchRequestWithOneTopic(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
topic1 := makeTopic()
defer deleteTopic(t, topic1)
topic2 := makeTopic()
defer deleteTopic(t, topic2)
consumeGroup := makeGroupID()
numMsgs := 50
defer cancel()
r1 := NewReader(ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: topic1,
GroupID: consumeGroup,
MinBytes: 1,
MaxBytes: 100,
MaxWait: 100 * time.Millisecond,
})
defer r1.Close()
prepareReader(t, ctx, r1, makeTestSequence(numMsgs)...)
r2 := NewReader(ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: topic2,
GroupID: consumeGroup,
MinBytes: 1,
MaxBytes: 100,
MaxWait: 100 * time.Millisecond,
})
defer r2.Close()
prepareReader(t, ctx, r2, makeTestSequence(numMsgs)...)

for i := 0; i < numMsgs; i++ {
if _, err := r1.ReadMessage(ctx); err != nil {
t.Fatal(err)
}
}
for i := 0; i < numMsgs; i++ {
if _, err := r2.ReadMessage(ctx); err != nil {
t.Fatal(err)
}
}

client := Client{Addr: TCP("localhost:9092")}
topicOffsets, err := client.OffsetFetch(ctx, &OffsetFetchRequest{GroupID: consumeGroup, Topics: map[string][]int{
topic1: {0},
}})

if err != nil {
t.Error(err)
t.FailNow()
}

if len(topicOffsets.Topics) != 1 {
t.Error(err)
t.FailNow()
}
}
2 changes: 1 addition & 1 deletion protocol/offsetfetch/offsetfetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ func init() {

type Request struct {
GroupID string `kafka:"min=v0,max=v5"`
Topics []RequestTopic `kafka:"min=v0,max=v5"`
Topics []RequestTopic `kafka:"min=v0,max=v5,nullable"`
}

func (r *Request) ApiKey() protocol.ApiKey { return protocol.OffsetFetch }
Expand Down