Skip to content

Commit

Permalink
Fix Kafka channel event loss during subscription becoming ready (knat…
Browse files Browse the repository at this point in the history
  • Loading branch information
aliok authored and pierDipi committed Nov 15, 2021
1 parent 5266d8b commit a31ec4e
Showing 1 changed file with 125 additions and 87 deletions.
212 changes: 125 additions & 87 deletions pkg/common/kafka/offset/offsets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,110 +25,105 @@ import (
logtesting "knative.dev/pkg/logging/testing"
)

func TestInitOffsets(t *testing.T) {
testCases := map[string]struct {
topics []string
topicOffsets map[string]map[int32]int64
cgOffsets map[string]map[int32]int64
wantCommit bool
}{
"one topic, one partition, initialized": {
topics: []string{"my-topic"},
topicOffsets: map[string]map[int32]int64{
"my-topic": {
0: 5,
},
},
cgOffsets: map[string]map[int32]int64{
"my-topic": {
0: 2,
},
var testCases = map[string]struct {
topics []string
topicOffsets map[string]map[int32]int64
cgOffsets map[string]map[int32]int64
initialized bool
}{
"one topic, one partition, initialized": {
topics: []string{"my-topic"},
topicOffsets: map[string]map[int32]int64{
"my-topic": {
0: 5,
},
wantCommit: false,
},
"one topic, one partition, uninitialized": {
topics: []string{"my-topic"},
topicOffsets: map[string]map[int32]int64{
"my-topic": {
0: 5,
},
cgOffsets: map[string]map[int32]int64{
"my-topic": {
0: 2,
},
cgOffsets: map[string]map[int32]int64{
"my-topic": {
0: -1,
},
},
wantCommit: true,
},
"several topics, several partitions, not all initialized": {
topics: []string{"my-topic", "my-topic-2", "my-topic-3"},
topicOffsets: map[string]map[int32]int64{
"my-topic": {0: 5, 1: 7},
"my-topic-2": {0: 5, 1: 7, 2: 9},
"my-topic-3": {0: 5, 1: 7, 2: 2, 3: 10},
initialized: true,
},
"one topic, one partition, uninitialized": {
topics: []string{"my-topic"},
topicOffsets: map[string]map[int32]int64{
"my-topic": {
0: 5,
},
cgOffsets: map[string]map[int32]int64{
"my-topic": {0: -1, 1: 7},
"my-topic-2": {0: 5, 1: -1, 2: -1},
"my-topic-3": {0: 5, 1: 7, 2: -1, 3: 10},
},
cgOffsets: map[string]map[int32]int64{
"my-topic": {
0: -1,
},
wantCommit: true,
},
}
initialized: false,
},
"several topics, several partitions, not all initialized": {
topics: []string{"my-topic", "my-topic-2", "my-topic-3"},
topicOffsets: map[string]map[int32]int64{
"my-topic": {0: 5, 1: 7},
"my-topic-2": {0: 5, 1: 7, 2: 9},
"my-topic-3": {0: 5, 1: 7, 2: 2, 3: 10},
},
cgOffsets: map[string]map[int32]int64{
"my-topic": {0: -1, 1: 7},
"my-topic-2": {0: 5, 1: -1, 2: -1},
"my-topic-3": {0: 5, 1: 7, 2: -1, 3: 10},
},
initialized: false,
},
}

func TestInitOffsets(t *testing.T) {
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
broker := sarama.NewMockBroker(t, 1)
defer broker.Close()

group := "my-group"

offsetResponse := sarama.NewMockOffsetResponse(t).SetVersion(1)
for topic, partitions := range tc.topicOffsets {
for partition, offset := range partitions {
offsetResponse = offsetResponse.SetOffset(topic, partition, -1, offset)
}
}

offsetFetchResponse := sarama.NewMockOffsetFetchResponse(t).SetError(sarama.ErrNoError)
for topic, partitions := range tc.cgOffsets {
for partition, offset := range partitions {
offsetFetchResponse = offsetFetchResponse.SetOffset(group, topic, partition, offset, "", sarama.ErrNoError)
}
}
configureMockBroker(t, group, tc.topicOffsets, tc.cgOffsets, tc.initialized, broker)

offsetCommitResponse := sarama.NewMockOffsetCommitResponse(t)
serr := sarama.ErrNoError
if !tc.wantCommit {
serr = sarama.ErrUnknown // could be anything
config := sarama.NewConfig()
config.Version = sarama.MaxVersion

sc, err := sarama.NewClient([]string{broker.Addr()}, config)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
defer sc.Close()

for topic, partitions := range tc.cgOffsets {
for partition := range partitions {
offsetCommitResponse = offsetCommitResponse.SetError(group, topic, partition, serr)
}
kac, err := sarama.NewClusterAdminFromClient(sc)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
defer kac.Close()

metadataResponse := sarama.NewMockMetadataResponse(t).
SetController(broker.BrokerID()).
SetBroker(broker.Addr(), broker.BrokerID())
for topic, partitions := range tc.topicOffsets {
for partition := range partitions {
metadataResponse = metadataResponse.SetLeader(topic, partition, broker.BrokerID())
}
// test InitOffsets
ctx := logtesting.TestContextWithLogger(t)
partitionCt, err := InitOffsets(ctx, sc, kac, tc.topics, group)
total := 0
for _, partitions := range tc.topicOffsets {
total += len(partitions)
}
assert.Equal(t, int(partitionCt), total)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
})
}
}

broker.SetHandlerByMap(map[string]sarama.MockResponse{
"OffsetRequest": offsetResponse,
"OffsetFetchRequest": offsetFetchResponse,
"OffsetCommitRequest": offsetCommitResponse,
func TestCheckIfAllOffsetsInitialized(t *testing.T) {
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
broker := sarama.NewMockBroker(t, 1)
defer broker.Close()

"FindCoordinatorRequest": sarama.NewMockFindCoordinatorResponse(t).
SetCoordinator(sarama.CoordinatorGroup, group, broker),
group := "my-group"

"MetadataRequest": metadataResponse,
})
configureMockBroker(t, group, tc.topicOffsets, tc.cgOffsets, tc.initialized, broker)

config := sarama.NewConfig()
config.Version = sarama.MaxVersion
Expand All @@ -145,18 +140,61 @@ func TestInitOffsets(t *testing.T) {
}
defer kac.Close()

ctx := logtesting.TestContextWithLogger(t)
partitionCt, err := InitOffsets(ctx, sc, kac, tc.topics, group)
total := 0
for _, partitions := range tc.topicOffsets {
total += len(partitions)
}
assert.Equal(t, int(partitionCt), total)
// test CheckIfAllOffsetsInitialized
retrieved, err := CheckIfAllOffsetsInitialized(sc, kac, tc.topics, group)
assert.Equal(t, retrieved, tc.initialized)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

})
}
}

func configureMockBroker(t *testing.T, group string, topicOffsets map[string]map[int32]int64, cgOffsets map[string]map[int32]int64, initialized bool, broker *sarama.MockBroker) {
offsetResponse := sarama.NewMockOffsetResponse(t).SetVersion(1)
for topic, partitions := range topicOffsets {
for partition, offset := range partitions {
offsetResponse = offsetResponse.SetOffset(topic, partition, -1, offset)
}
}

offsetFetchResponse := sarama.NewMockOffsetFetchResponse(t).SetError(sarama.ErrNoError)
for topic, partitions := range cgOffsets {
for partition, offset := range partitions {
offsetFetchResponse = offsetFetchResponse.SetOffset(group, topic, partition, offset, "", sarama.ErrNoError)
}
}

offsetCommitResponse := sarama.NewMockOffsetCommitResponse(t)
serr := sarama.ErrNoError
if initialized { // means, we want a commit
serr = sarama.ErrUnknown // could be anything

}

for topic, partitions := range cgOffsets {
for partition := range partitions {
offsetCommitResponse = offsetCommitResponse.SetError(group, topic, partition, serr)
}
}

metadataResponse := sarama.NewMockMetadataResponse(t).
SetController(broker.BrokerID()).
SetBroker(broker.Addr(), broker.BrokerID())
for topic, partitions := range topicOffsets {
for partition := range partitions {
metadataResponse = metadataResponse.SetLeader(topic, partition, broker.BrokerID())
}
}

broker.SetHandlerByMap(map[string]sarama.MockResponse{
"OffsetRequest": offsetResponse,
"OffsetFetchRequest": offsetFetchResponse,
"OffsetCommitRequest": offsetCommitResponse,

"FindCoordinatorRequest": sarama.NewMockFindCoordinatorResponse(t).
SetCoordinator(sarama.CoordinatorGroup, group, broker),

"MetadataRequest": metadataResponse,
})
}

0 comments on commit a31ec4e

Please sign in to comment.