diff --git a/pkg/common/kafka/offset/offsets_test.go b/pkg/common/kafka/offset/offsets_test.go index e5effa5042..41285318f0 100644 --- a/pkg/common/kafka/offset/offsets_test.go +++ b/pkg/common/kafka/offset/offsets_test.go @@ -25,57 +25,57 @@ import ( logtesting "knative.dev/pkg/logging/testing" ) -func TestInitOffsets(t *testing.T) { - testCases := map[string]struct { - topics []string - topicOffsets map[string]map[int32]int64 - cgOffsets map[string]map[int32]int64 - wantCommit bool - }{ - "one topic, one partition, initialized": { - topics: []string{"my-topic"}, - topicOffsets: map[string]map[int32]int64{ - "my-topic": { - 0: 5, - }, - }, - cgOffsets: map[string]map[int32]int64{ - "my-topic": { - 0: 2, - }, +var testCases = map[string]struct { + topics []string + topicOffsets map[string]map[int32]int64 + cgOffsets map[string]map[int32]int64 + initialized bool +}{ + "one topic, one partition, initialized": { + topics: []string{"my-topic"}, + topicOffsets: map[string]map[int32]int64{ + "my-topic": { + 0: 5, }, - wantCommit: false, }, - "one topic, one partition, uninitialized": { - topics: []string{"my-topic"}, - topicOffsets: map[string]map[int32]int64{ - "my-topic": { - 0: 5, - }, + cgOffsets: map[string]map[int32]int64{ + "my-topic": { + 0: 2, }, - cgOffsets: map[string]map[int32]int64{ - "my-topic": { - 0: -1, - }, - }, - wantCommit: true, }, - "several topics, several partitions, not all initialized": { - topics: []string{"my-topic", "my-topic-2", "my-topic-3"}, - topicOffsets: map[string]map[int32]int64{ - "my-topic": {0: 5, 1: 7}, - "my-topic-2": {0: 5, 1: 7, 2: 9}, - "my-topic-3": {0: 5, 1: 7, 2: 2, 3: 10}, + initialized: true, + }, + "one topic, one partition, uninitialized": { + topics: []string{"my-topic"}, + topicOffsets: map[string]map[int32]int64{ + "my-topic": { + 0: 5, }, - cgOffsets: map[string]map[int32]int64{ - "my-topic": {0: -1, 1: 7}, - "my-topic-2": {0: 5, 1: -1, 2: -1}, - "my-topic-3": {0: 5, 1: 7, 2: -1, 3: 10}, + }, + cgOffsets: map[string]map[int32]int64{ + "my-topic": { + 0: -1, }, - wantCommit: true, }, - } + initialized: false, + }, + "several topics, several partitions, not all initialized": { + topics: []string{"my-topic", "my-topic-2", "my-topic-3"}, + topicOffsets: map[string]map[int32]int64{ + "my-topic": {0: 5, 1: 7}, + "my-topic-2": {0: 5, 1: 7, 2: 9}, + "my-topic-3": {0: 5, 1: 7, 2: 2, 3: 10}, + }, + cgOffsets: map[string]map[int32]int64{ + "my-topic": {0: -1, 1: 7}, + "my-topic-2": {0: 5, 1: -1, 2: -1}, + "my-topic-3": {0: 5, 1: 7, 2: -1, 3: 10}, + }, + initialized: false, + }, +} +func TestInitOffsets(t *testing.T) { for n, tc := range testCases { t.Run(n, func(t *testing.T) { broker := sarama.NewMockBroker(t, 1) @@ -83,52 +83,47 @@ func TestInitOffsets(t *testing.T) { group := "my-group" - offsetResponse := sarama.NewMockOffsetResponse(t).SetVersion(1) - for topic, partitions := range tc.topicOffsets { - for partition, offset := range partitions { - offsetResponse = offsetResponse.SetOffset(topic, partition, -1, offset) - } - } - - offsetFetchResponse := sarama.NewMockOffsetFetchResponse(t).SetError(sarama.ErrNoError) - for topic, partitions := range tc.cgOffsets { - for partition, offset := range partitions { - offsetFetchResponse = offsetFetchResponse.SetOffset(group, topic, partition, offset, "", sarama.ErrNoError) - } - } + configureMockBroker(t, group, tc.topicOffsets, tc.cgOffsets, tc.initialized, broker) - offsetCommitResponse := sarama.NewMockOffsetCommitResponse(t) - serr := sarama.ErrNoError - if !tc.wantCommit { - serr = sarama.ErrUnknown // could be anything + config := sarama.NewConfig() + config.Version = sarama.MaxVersion + sc, err := sarama.NewClient([]string{broker.Addr()}, config) + if err != nil { + t.Errorf("unexpected error: %v", err) } + defer sc.Close() - for topic, partitions := range tc.cgOffsets { - for partition := range partitions { - offsetCommitResponse = offsetCommitResponse.SetError(group, topic, partition, serr) - } + kac, err := sarama.NewClusterAdminFromClient(sc) + if err != nil { + t.Errorf("unexpected error: %v", err) } + defer kac.Close() - metadataResponse := sarama.NewMockMetadataResponse(t). - SetController(broker.BrokerID()). - SetBroker(broker.Addr(), broker.BrokerID()) - for topic, partitions := range tc.topicOffsets { - for partition := range partitions { - metadataResponse = metadataResponse.SetLeader(topic, partition, broker.BrokerID()) - } + // test InitOffsets + ctx := logtesting.TestContextWithLogger(t) + partitionCt, err := InitOffsets(ctx, sc, kac, tc.topics, group) + total := 0 + for _, partitions := range tc.topicOffsets { + total += len(partitions) + } + assert.Equal(t, int(partitionCt), total) + if err != nil { + t.Errorf("unexpected error: %v", err) } + }) + } +} - broker.SetHandlerByMap(map[string]sarama.MockResponse{ - "OffsetRequest": offsetResponse, - "OffsetFetchRequest": offsetFetchResponse, - "OffsetCommitRequest": offsetCommitResponse, +func TestCheckIfAllOffsetsInitialized(t *testing.T) { + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + broker := sarama.NewMockBroker(t, 1) + defer broker.Close() - "FindCoordinatorRequest": sarama.NewMockFindCoordinatorResponse(t). - SetCoordinator(sarama.CoordinatorGroup, group, broker), + group := "my-group" - "MetadataRequest": metadataResponse, - }) + configureMockBroker(t, group, tc.topicOffsets, tc.cgOffsets, tc.initialized, broker) config := sarama.NewConfig() config.Version = sarama.MaxVersion @@ -145,18 +140,61 @@ func TestInitOffsets(t *testing.T) { } defer kac.Close() - ctx := logtesting.TestContextWithLogger(t) - partitionCt, err := InitOffsets(ctx, sc, kac, tc.topics, group) - total := 0 - for _, partitions := range tc.topicOffsets { - total += len(partitions) - } - assert.Equal(t, int(partitionCt), total) + // test CheckIfAllOffsetsInitialized + retrieved, err := CheckIfAllOffsetsInitialized(sc, kac, tc.topics, group) + assert.Equal(t, retrieved, tc.initialized) if err != nil { t.Errorf("unexpected error: %v", err) } - }) } +} + +func configureMockBroker(t *testing.T, group string, topicOffsets map[string]map[int32]int64, cgOffsets map[string]map[int32]int64, initialized bool, broker *sarama.MockBroker) { + offsetResponse := sarama.NewMockOffsetResponse(t).SetVersion(1) + for topic, partitions := range topicOffsets { + for partition, offset := range partitions { + offsetResponse = offsetResponse.SetOffset(topic, partition, -1, offset) + } + } + + offsetFetchResponse := sarama.NewMockOffsetFetchResponse(t).SetError(sarama.ErrNoError) + for topic, partitions := range cgOffsets { + for partition, offset := range partitions { + offsetFetchResponse = offsetFetchResponse.SetOffset(group, topic, partition, offset, "", sarama.ErrNoError) + } + } + + offsetCommitResponse := sarama.NewMockOffsetCommitResponse(t) + serr := sarama.ErrNoError + if initialized { // means, we want a commit + serr = sarama.ErrUnknown // could be anything + + } + + for topic, partitions := range cgOffsets { + for partition := range partitions { + offsetCommitResponse = offsetCommitResponse.SetError(group, topic, partition, serr) + } + } + + metadataResponse := sarama.NewMockMetadataResponse(t). + SetController(broker.BrokerID()). + SetBroker(broker.Addr(), broker.BrokerID()) + for topic, partitions := range topicOffsets { + for partition := range partitions { + metadataResponse = metadataResponse.SetLeader(topic, partition, broker.BrokerID()) + } + } + + broker.SetHandlerByMap(map[string]sarama.MockResponse{ + "OffsetRequest": offsetResponse, + "OffsetFetchRequest": offsetFetchResponse, + "OffsetCommitRequest": offsetCommitResponse, + + "FindCoordinatorRequest": sarama.NewMockFindCoordinatorResponse(t). + SetCoordinator(sarama.CoordinatorGroup, group, broker), + "MetadataRequest": metadataResponse, + }) }