From 201ab73cb4d873a6b978ca34dfe2665cef4e3eb8 Mon Sep 17 00:00:00 2001 From: Maxim Vladimirsky Date: Wed, 18 Nov 2015 12:40:00 -0800 Subject: [PATCH] Make mock brokers and protocol packets available for outsider --- async_producer_test.go | 66 ++++++------ broker_test.go | 2 +- client_test.go | 54 +++++----- consumer_test.go | 108 +++++++++---------- mockbroker_test.go => mockbroker.go | 45 ++++---- mockresponses_test.go => mockresponses.go | 126 +++++++++++----------- offset_manager_test.go | 16 +-- sync_producer_test.go | 10 +- 8 files changed, 215 insertions(+), 212 deletions(-) rename mockbroker_test.go => mockbroker.go (84%) rename mockresponses_test.go => mockresponses.go (69%) diff --git a/async_producer_test.go b/async_producer_test.go index 9aa13da53..517ef2a34 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -94,8 +94,8 @@ func (f flakyEncoder) Encode() ([]byte, error) { } func TestAsyncProducer(t *testing.T) { - seedBroker := newMockBroker(t, 1) - leader := newMockBroker(t, 2) + seedBroker := NewMockBroker(t, 1) + leader := NewMockBroker(t, 2) metadataResponse := new(MetadataResponse) metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) @@ -140,8 +140,8 @@ func TestAsyncProducer(t *testing.T) { } func TestAsyncProducerMultipleFlushes(t *testing.T) { - seedBroker := newMockBroker(t, 1) - leader := newMockBroker(t, 2) + seedBroker := NewMockBroker(t, 1) + leader := NewMockBroker(t, 2) metadataResponse := new(MetadataResponse) metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) @@ -175,9 +175,9 @@ func TestAsyncProducerMultipleFlushes(t *testing.T) { } func TestAsyncProducerMultipleBrokers(t *testing.T) { - seedBroker := newMockBroker(t, 1) - leader0 := newMockBroker(t, 2) - leader1 := newMockBroker(t, 3) + seedBroker := NewMockBroker(t, 1) + leader0 := NewMockBroker(t, 2) + leader1 := NewMockBroker(t, 3) metadataResponse := new(MetadataResponse) metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID()) @@ -215,8 +215,8 @@ func TestAsyncProducerMultipleBrokers(t *testing.T) { } func TestAsyncProducerCustomPartitioner(t *testing.T) { - seedBroker := newMockBroker(t, 1) - leader := newMockBroker(t, 2) + seedBroker := NewMockBroker(t, 1) + leader := NewMockBroker(t, 2) metadataResponse := new(MetadataResponse) metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) @@ -257,9 +257,9 @@ func TestAsyncProducerCustomPartitioner(t *testing.T) { } func TestAsyncProducerFailureRetry(t *testing.T) { - seedBroker := newMockBroker(t, 1) - leader1 := newMockBroker(t, 2) - leader2 := newMockBroker(t, 3) + seedBroker := NewMockBroker(t, 1) + leader1 := NewMockBroker(t, 2) + leader2 := NewMockBroker(t, 3) metadataLeader1 := new(MetadataResponse) metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID()) @@ -305,8 +305,8 @@ func TestAsyncProducerFailureRetry(t *testing.T) { } func TestAsyncProducerEncoderFailures(t *testing.T) { - seedBroker := newMockBroker(t, 1) - leader := newMockBroker(t, 2) + seedBroker := NewMockBroker(t, 1) + leader := NewMockBroker(t, 2) metadataResponse := new(MetadataResponse) metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) @@ -344,8 +344,8 @@ func TestAsyncProducerEncoderFailures(t *testing.T) { // producer reconnects to it and continues sending messages. func TestAsyncProducerBrokerBounce(t *testing.T) { // Given - seedBroker := newMockBroker(t, 1) - leader := newMockBroker(t, 2) + seedBroker := NewMockBroker(t, 1) + leader := NewMockBroker(t, 2) leaderAddr := leader.Addr() metadataResponse := new(MetadataResponse) @@ -370,7 +370,7 @@ func TestAsyncProducerBrokerBounce(t *testing.T) { // When: a broker connection gets reset by a broker (network glitch, restart, you name it). leader.Close() // producer should get EOF - leader = newMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles + leader = NewMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles seedBroker.Returns(metadataResponse) // tell it to go to broker 2 again // Then: a produced message goes through the new broker connection. @@ -384,9 +384,9 @@ func TestAsyncProducerBrokerBounce(t *testing.T) { } func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) { - seedBroker := newMockBroker(t, 1) - leader1 := newMockBroker(t, 2) - leader2 := newMockBroker(t, 3) + seedBroker := NewMockBroker(t, 1) + leader1 := NewMockBroker(t, 2) + leader2 := NewMockBroker(t, 3) metadataLeader1 := new(MetadataResponse) metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID()) @@ -427,9 +427,9 @@ func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) { } func TestAsyncProducerMultipleRetries(t *testing.T) { - seedBroker := newMockBroker(t, 1) - leader1 := newMockBroker(t, 2) - leader2 := newMockBroker(t, 3) + seedBroker := NewMockBroker(t, 1) + leader1 := NewMockBroker(t, 2) + leader2 := NewMockBroker(t, 3) metadataLeader1 := new(MetadataResponse) metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID()) @@ -484,8 +484,8 @@ func TestAsyncProducerMultipleRetries(t *testing.T) { func TestAsyncProducerOutOfRetries(t *testing.T) { t.Skip("Enable once bug #294 is fixed.") - seedBroker := newMockBroker(t, 1) - leader := newMockBroker(t, 2) + seedBroker := NewMockBroker(t, 1) + leader := NewMockBroker(t, 2) metadataResponse := new(MetadataResponse) metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) @@ -539,8 +539,8 @@ func TestAsyncProducerOutOfRetries(t *testing.T) { } func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) { - seedBroker := newMockBroker(t, 1) - leader := newMockBroker(t, 2) + seedBroker := NewMockBroker(t, 1) + leader := NewMockBroker(t, 2) leaderAddr := leader.Addr() metadataResponse := new(MetadataResponse) @@ -575,7 +575,7 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) { // reboot the broker (the producer will get EOF on its existing connection) leader.Close() - leader = newMockBrokerAddr(t, 2, leaderAddr) + leader = NewMockBrokerAddr(t, 2, leaderAddr) // send another message on partition 0 to trigger the EOF and retry producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} @@ -596,8 +596,8 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) { } func TestAsyncProducerFlusherRetryCondition(t *testing.T) { - seedBroker := newMockBroker(t, 1) - leader := newMockBroker(t, 2) + seedBroker := NewMockBroker(t, 1) + leader := NewMockBroker(t, 2) metadataResponse := new(MetadataResponse) metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) @@ -638,7 +638,7 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) { time.Sleep(50 * time.Millisecond) leader.SetHandlerByMap(map[string]MockResponse{ - "ProduceRequest": newMockProduceResponse(t). + "ProduceRequest": NewMockProduceResponse(t). SetError("my_topic", 0, ErrNoError), }) @@ -661,8 +661,8 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) { } func TestAsyncProducerRetryShutdown(t *testing.T) { - seedBroker := newMockBroker(t, 1) - leader := newMockBroker(t, 2) + seedBroker := NewMockBroker(t, 1) + leader := NewMockBroker(t, 2) metadataLeader := new(MetadataResponse) metadataLeader.AddBroker(leader.Addr(), leader.BrokerID()) diff --git a/broker_test.go b/broker_test.go index 22731bfa8..89f75bf8d 100644 --- a/broker_test.go +++ b/broker_test.go @@ -52,7 +52,7 @@ func TestBrokerAccessors(t *testing.T) { } func TestSimpleBrokerCommunication(t *testing.T) { - mb := newMockBroker(t, 0) + mb := NewMockBroker(t, 0) defer mb.Close() broker := NewBroker(mb.Addr()) diff --git a/client_test.go b/client_test.go index f84b9af31..b0559466f 100644 --- a/client_test.go +++ b/client_test.go @@ -15,7 +15,7 @@ func safeClose(t testing.TB, c io.Closer) { } func TestSimpleClient(t *testing.T) { - seedBroker := newMockBroker(t, 1) + seedBroker := NewMockBroker(t, 1) seedBroker.Returns(new(MetadataResponse)) @@ -29,7 +29,7 @@ func TestSimpleClient(t *testing.T) { } func TestCachedPartitions(t *testing.T) { - seedBroker := newMockBroker(t, 1) + seedBroker := NewMockBroker(t, 1) replicas := []int32{3, 1, 5} isr := []int32{5, 1} @@ -68,7 +68,7 @@ func TestCachedPartitions(t *testing.T) { } func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) { - seedBroker := newMockBroker(t, 1) + seedBroker := NewMockBroker(t, 1) replicas := []int32{seedBroker.BrokerID()} @@ -122,7 +122,7 @@ func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) { } func TestClientSeedBrokers(t *testing.T) { - seedBroker := newMockBroker(t, 1) + seedBroker := NewMockBroker(t, 1) metadataResponse := new(MetadataResponse) metadataResponse.AddBroker("localhost:12345", 2) @@ -138,8 +138,8 @@ func TestClientSeedBrokers(t *testing.T) { } func TestClientMetadata(t *testing.T) { - seedBroker := newMockBroker(t, 1) - leader := newMockBroker(t, 5) + seedBroker := NewMockBroker(t, 1) + leader := NewMockBroker(t, 5) replicas := []int32{3, 1, 5} isr := []int32{5, 1} @@ -202,8 +202,8 @@ func TestClientMetadata(t *testing.T) { } func TestClientGetOffset(t *testing.T) { - seedBroker := newMockBroker(t, 1) - leader := newMockBroker(t, 2) + seedBroker := NewMockBroker(t, 1) + leader := NewMockBroker(t, 2) leaderAddr := leader.Addr() metadata := new(MetadataResponse) @@ -231,7 +231,7 @@ func TestClientGetOffset(t *testing.T) { leader.Close() seedBroker.Returns(metadata) - leader = newMockBrokerAddr(t, 2, leaderAddr) + leader = NewMockBrokerAddr(t, 2, leaderAddr) offsetResponse = new(OffsetResponse) offsetResponse.AddTopicPartition("foo", 0, 456) leader.Returns(offsetResponse) @@ -250,7 +250,7 @@ func TestClientGetOffset(t *testing.T) { } func TestClientReceivingUnknownTopic(t *testing.T) { - seedBroker := newMockBroker(t, 1) + seedBroker := NewMockBroker(t, 1) metadataResponse1 := new(MetadataResponse) seedBroker.Returns(metadataResponse1) @@ -286,8 +286,8 @@ func TestClientReceivingUnknownTopic(t *testing.T) { } func TestClientReceivingPartialMetadata(t *testing.T) { - seedBroker := newMockBroker(t, 1) - leader := newMockBroker(t, 5) + seedBroker := NewMockBroker(t, 1) + leader := NewMockBroker(t, 5) metadataResponse1 := new(MetadataResponse) metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID()) @@ -339,8 +339,8 @@ func TestClientReceivingPartialMetadata(t *testing.T) { } func TestClientRefreshBehaviour(t *testing.T) { - seedBroker := newMockBroker(t, 1) - leader := newMockBroker(t, 5) + seedBroker := NewMockBroker(t, 1) + leader := NewMockBroker(t, 5) metadataResponse1 := new(MetadataResponse) metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID()) @@ -375,7 +375,7 @@ func TestClientRefreshBehaviour(t *testing.T) { } func TestClientResurrectDeadSeeds(t *testing.T) { - initialSeed := newMockBroker(t, 0) + initialSeed := NewMockBroker(t, 0) emptyMetadata := new(MetadataResponse) initialSeed.Returns(emptyMetadata) @@ -390,9 +390,9 @@ func TestClientResurrectDeadSeeds(t *testing.T) { client := c.(*client) - seed1 := newMockBroker(t, 1) - seed2 := newMockBroker(t, 2) - seed3 := newMockBroker(t, 3) + seed1 := NewMockBroker(t, 1) + seed2 := NewMockBroker(t, 2) + seed3 := NewMockBroker(t, 3) addr1 := seed1.Addr() addr2 := seed2.Addr() addr3 := seed3.Addr() @@ -413,8 +413,8 @@ func TestClientResurrectDeadSeeds(t *testing.T) { seed1.Close() seed2.Close() - seed1 = newMockBrokerAddr(t, 1, addr1) - seed2 = newMockBrokerAddr(t, 2, addr2) + seed1 = NewMockBrokerAddr(t, 1, addr1) + seed2 = NewMockBrokerAddr(t, 2, addr2) seed3.Close() @@ -434,9 +434,9 @@ func TestClientResurrectDeadSeeds(t *testing.T) { } func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) { - seedBroker := newMockBroker(t, 1) - staleCoordinator := newMockBroker(t, 2) - freshCoordinator := newMockBroker(t, 3) + seedBroker := NewMockBroker(t, 1) + staleCoordinator := NewMockBroker(t, 2) + freshCoordinator := NewMockBroker(t, 3) replicas := []int32{staleCoordinator.BrokerID(), freshCoordinator.BrokerID()} metadataResponse1 := new(MetadataResponse) @@ -513,8 +513,8 @@ func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) { } func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) { - seedBroker := newMockBroker(t, 1) - coordinator := newMockBroker(t, 2) + seedBroker := NewMockBroker(t, 1) + coordinator := NewMockBroker(t, 2) metadataResponse1 := new(MetadataResponse) seedBroker.Returns(metadataResponse1) @@ -566,7 +566,7 @@ func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) { } func TestClientAutorefreshShutdownRace(t *testing.T) { - seedBroker := newMockBroker(t, 1) + seedBroker := NewMockBroker(t, 1) metadataResponse := new(MetadataResponse) seedBroker.Returns(metadataResponse) @@ -594,7 +594,7 @@ func TestClientAutorefreshShutdownRace(t *testing.T) { time.Sleep(10 * time.Millisecond) // Then return some metadata to the still-running background thread - leader := newMockBroker(t, 2) + leader := NewMockBroker(t, 2) metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse.AddTopicPartition("foo", 0, leader.BrokerID(), []int32{2}, []int32{2}, ErrNoError) seedBroker.Returns(metadataResponse) diff --git a/consumer_test.go b/consumer_test.go index 5286ef212..3d760142b 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -15,9 +15,9 @@ var testMsg = StringEncoder("Foo") // that offset. func TestConsumerOffsetManual(t *testing.T) { // Given - broker0 := newMockBroker(t, 0) + broker0 := NewMockBroker(t, 0) - mockFetchResponse := newMockFetchResponse(t, 1) + mockFetchResponse := NewMockFetchResponse(t, 1) for i := 0; i < 10; i++ { mockFetchResponse.SetMessage("my_topic", 0, int64(i+1234), testMsg) } @@ -26,7 +26,7 @@ func TestConsumerOffsetManual(t *testing.T) { "MetadataRequest": newMockMetadataResponse(t). SetBroker(broker0.Addr(), broker0.BrokerID()). SetLeader("my_topic", 0, broker0.BrokerID()), - "OffsetRequest": newMockOffsetResponse(t). + "OffsetRequest": NewMockOffsetResponse(t). SetOffset("my_topic", 0, OffsetOldest, 0). SetOffset("my_topic", 0, OffsetNewest, 2345), "FetchRequest": mockFetchResponse, @@ -63,15 +63,15 @@ func TestConsumerOffsetManual(t *testing.T) { // newest in its metadata response. func TestConsumerOffsetNewest(t *testing.T) { // Given - broker0 := newMockBroker(t, 0) + broker0 := NewMockBroker(t, 0) broker0.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": newMockMetadataResponse(t). SetBroker(broker0.Addr(), broker0.BrokerID()). SetLeader("my_topic", 0, broker0.BrokerID()), - "OffsetRequest": newMockOffsetResponse(t). + "OffsetRequest": NewMockOffsetResponse(t). SetOffset("my_topic", 0, OffsetNewest, 10). SetOffset("my_topic", 0, OffsetOldest, 7), - "FetchRequest": newMockFetchResponse(t, 1). + "FetchRequest": NewMockFetchResponse(t, 1). SetMessage("my_topic", 0, 9, testMsg). SetMessage("my_topic", 0, 10, testMsg). SetMessage("my_topic", 0, 11, testMsg). @@ -103,15 +103,15 @@ func TestConsumerOffsetNewest(t *testing.T) { // It is possible to close a partition consumer and create the same anew. func TestConsumerRecreate(t *testing.T) { // Given - broker0 := newMockBroker(t, 0) + broker0 := NewMockBroker(t, 0) broker0.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": newMockMetadataResponse(t). SetBroker(broker0.Addr(), broker0.BrokerID()). SetLeader("my_topic", 0, broker0.BrokerID()), - "OffsetRequest": newMockOffsetResponse(t). + "OffsetRequest": NewMockOffsetResponse(t). SetOffset("my_topic", 0, OffsetOldest, 0). SetOffset("my_topic", 0, OffsetNewest, 1000), - "FetchRequest": newMockFetchResponse(t, 1). + "FetchRequest": NewMockFetchResponse(t, 1). SetMessage("my_topic", 0, 10, testMsg), }) @@ -144,15 +144,15 @@ func TestConsumerRecreate(t *testing.T) { // An attempt to consume the same partition twice should fail. func TestConsumerDuplicate(t *testing.T) { // Given - broker0 := newMockBroker(t, 0) + broker0 := NewMockBroker(t, 0) broker0.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": newMockMetadataResponse(t). SetBroker(broker0.Addr(), broker0.BrokerID()). SetLeader("my_topic", 0, broker0.BrokerID()), - "OffsetRequest": newMockOffsetResponse(t). + "OffsetRequest": NewMockOffsetResponse(t). SetOffset("my_topic", 0, OffsetOldest, 0). SetOffset("my_topic", 0, OffsetNewest, 1000), - "FetchRequest": newMockFetchResponse(t, 1), + "FetchRequest": NewMockFetchResponse(t, 1), }) config := NewConfig() @@ -184,7 +184,7 @@ func TestConsumerDuplicate(t *testing.T) { // specified by `Config.Consumer.Retry.Backoff`. func TestConsumerLeaderRefreshError(t *testing.T) { // Given - broker0 := newMockBroker(t, 100) + broker0 := NewMockBroker(t, 100) // Stage 1: my_topic/0 served by broker0 Logger.Printf(" STAGE 1") @@ -193,10 +193,10 @@ func TestConsumerLeaderRefreshError(t *testing.T) { "MetadataRequest": newMockMetadataResponse(t). SetBroker(broker0.Addr(), broker0.BrokerID()). SetLeader("my_topic", 0, broker0.BrokerID()), - "OffsetRequest": newMockOffsetResponse(t). + "OffsetRequest": NewMockOffsetResponse(t). SetOffset("my_topic", 0, OffsetOldest, 123). SetOffset("my_topic", 0, OffsetNewest, 1000), - "FetchRequest": newMockFetchResponse(t, 1). + "FetchRequest": NewMockFetchResponse(t, 1). SetMessage("my_topic", 0, 123, testMsg), }) @@ -225,7 +225,7 @@ func TestConsumerLeaderRefreshError(t *testing.T) { fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition) broker0.SetHandlerByMap(map[string]MockResponse{ - "FetchRequest": newMockWrapper(fetchResponse2), + "FetchRequest": NewMockWrapper(fetchResponse2), }) if consErr := <-pc.Errors(); consErr.Err != ErrOutOfBrokers { @@ -237,10 +237,10 @@ func TestConsumerLeaderRefreshError(t *testing.T) { Logger.Printf(" STAGE 3") - broker1 := newMockBroker(t, 101) + broker1 := NewMockBroker(t, 101) broker1.SetHandlerByMap(map[string]MockResponse{ - "FetchRequest": newMockFetchResponse(t, 1). + "FetchRequest": NewMockFetchResponse(t, 1). SetMessage("my_topic", 0, 124, testMsg), }) broker0.SetHandlerByMap(map[string]MockResponse{ @@ -260,7 +260,7 @@ func TestConsumerLeaderRefreshError(t *testing.T) { func TestConsumerInvalidTopic(t *testing.T) { // Given - broker0 := newMockBroker(t, 100) + broker0 := NewMockBroker(t, 100) broker0.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": newMockMetadataResponse(t). SetBroker(broker0.Addr(), broker0.BrokerID()), @@ -287,15 +287,15 @@ func TestConsumerInvalidTopic(t *testing.T) { // the moment is closed. func TestConsumerClosePartitionWithoutLeader(t *testing.T) { // Given - broker0 := newMockBroker(t, 100) + broker0 := NewMockBroker(t, 100) broker0.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": newMockMetadataResponse(t). SetBroker(broker0.Addr(), broker0.BrokerID()). SetLeader("my_topic", 0, broker0.BrokerID()), - "OffsetRequest": newMockOffsetResponse(t). + "OffsetRequest": NewMockOffsetResponse(t). SetOffset("my_topic", 0, OffsetOldest, 123). SetOffset("my_topic", 0, OffsetNewest, 1000), - "FetchRequest": newMockFetchResponse(t, 1). + "FetchRequest": NewMockFetchResponse(t, 1). SetMessage("my_topic", 0, 123, testMsg), }) @@ -322,7 +322,7 @@ func TestConsumerClosePartitionWithoutLeader(t *testing.T) { fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition) broker0.SetHandlerByMap(map[string]MockResponse{ - "FetchRequest": newMockWrapper(fetchResponse2), + "FetchRequest": NewMockWrapper(fetchResponse2), }) // When @@ -341,17 +341,17 @@ func TestConsumerClosePartitionWithoutLeader(t *testing.T) { // immediately closing its output channels. func TestConsumerShutsDownOutOfRange(t *testing.T) { // Given - broker0 := newMockBroker(t, 0) + broker0 := NewMockBroker(t, 0) fetchResponse := new(FetchResponse) fetchResponse.AddError("my_topic", 0, ErrOffsetOutOfRange) broker0.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": newMockMetadataResponse(t). SetBroker(broker0.Addr(), broker0.BrokerID()). SetLeader("my_topic", 0, broker0.BrokerID()), - "OffsetRequest": newMockOffsetResponse(t). + "OffsetRequest": NewMockOffsetResponse(t). SetOffset("my_topic", 0, OffsetNewest, 1234). SetOffset("my_topic", 0, OffsetOldest, 7), - "FetchRequest": newMockWrapper(fetchResponse), + "FetchRequest": NewMockWrapper(fetchResponse), }) master, err := NewConsumer([]string{broker0.Addr()}, nil) @@ -379,7 +379,7 @@ func TestConsumerShutsDownOutOfRange(t *testing.T) { // requested, then such messages are ignored. func TestConsumerExtraOffsets(t *testing.T) { // Given - broker0 := newMockBroker(t, 0) + broker0 := NewMockBroker(t, 0) fetchResponse1 := &FetchResponse{} fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1) fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2) @@ -391,10 +391,10 @@ func TestConsumerExtraOffsets(t *testing.T) { "MetadataRequest": newMockMetadataResponse(t). SetBroker(broker0.Addr(), broker0.BrokerID()). SetLeader("my_topic", 0, broker0.BrokerID()), - "OffsetRequest": newMockOffsetResponse(t). + "OffsetRequest": NewMockOffsetResponse(t). SetOffset("my_topic", 0, OffsetNewest, 1234). SetOffset("my_topic", 0, OffsetOldest, 0), - "FetchRequest": newMockSequence(fetchResponse1, fetchResponse2), + "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2), }) master, err := NewConsumer([]string{broker0.Addr()}, nil) @@ -422,7 +422,7 @@ func TestConsumerExtraOffsets(t *testing.T) { // strictly increasing!). func TestConsumerNonSequentialOffsets(t *testing.T) { // Given - broker0 := newMockBroker(t, 0) + broker0 := NewMockBroker(t, 0) fetchResponse1 := &FetchResponse{} fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 5) fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 7) @@ -433,10 +433,10 @@ func TestConsumerNonSequentialOffsets(t *testing.T) { "MetadataRequest": newMockMetadataResponse(t). SetBroker(broker0.Addr(), broker0.BrokerID()). SetLeader("my_topic", 0, broker0.BrokerID()), - "OffsetRequest": newMockOffsetResponse(t). + "OffsetRequest": NewMockOffsetResponse(t). SetOffset("my_topic", 0, OffsetNewest, 1234). SetOffset("my_topic", 0, OffsetOldest, 0), - "FetchRequest": newMockSequence(fetchResponse1, fetchResponse2), + "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2), }) master, err := NewConsumer([]string{broker0.Addr()}, nil) @@ -465,9 +465,9 @@ func TestConsumerNonSequentialOffsets(t *testing.T) { // leader and switches to it. func TestConsumerRebalancingMultiplePartitions(t *testing.T) { // initial setup - seedBroker := newMockBroker(t, 10) - leader0 := newMockBroker(t, 0) - leader1 := newMockBroker(t, 1) + seedBroker := NewMockBroker(t, 10) + leader0 := NewMockBroker(t, 0) + leader1 := NewMockBroker(t, 1) seedBroker.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": newMockMetadataResponse(t). @@ -477,18 +477,18 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { SetLeader("my_topic", 1, leader1.BrokerID()), }) - mockOffsetResponse1 := newMockOffsetResponse(t). + mockOffsetResponse1 := NewMockOffsetResponse(t). SetOffset("my_topic", 0, OffsetOldest, 0). SetOffset("my_topic", 0, OffsetNewest, 1000). SetOffset("my_topic", 1, OffsetOldest, 0). SetOffset("my_topic", 1, OffsetNewest, 1000) leader0.SetHandlerByMap(map[string]MockResponse{ "OffsetRequest": mockOffsetResponse1, - "FetchRequest": newMockFetchResponse(t, 1), + "FetchRequest": NewMockFetchResponse(t, 1), }) leader1.SetHandlerByMap(map[string]MockResponse{ "OffsetRequest": mockOffsetResponse1, - "FetchRequest": newMockFetchResponse(t, 1), + "FetchRequest": NewMockFetchResponse(t, 1), }) // launch test goroutines @@ -535,7 +535,7 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { // * my_topic/0 -> leader0 serves 4 messages // * my_topic/1 -> leader1 serves 0 messages - mockFetchResponse := newMockFetchResponse(t, 1) + mockFetchResponse := NewMockFetchResponse(t, 1) for i := 0; i < 4; i++ { mockFetchResponse.SetMessage("my_topic", 0, int64(i), testMsg) } @@ -560,7 +560,7 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { fetchResponse := new(FetchResponse) fetchResponse.AddError("my_topic", 0, ErrNotLeaderForPartition) leader0.SetHandlerByMap(map[string]MockResponse{ - "FetchRequest": newMockWrapper(fetchResponse), + "FetchRequest": NewMockWrapper(fetchResponse), }) time.Sleep(50 * time.Millisecond) @@ -570,7 +570,7 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { // * my_topic/1 -> leader1 server 8 messages // leader1 provides 3 message on partition 0, and 8 messages on partition 1 - mockFetchResponse2 := newMockFetchResponse(t, 2) + mockFetchResponse2 := NewMockFetchResponse(t, 2) for i := 4; i < 7; i++ { mockFetchResponse2.SetMessage("my_topic", 0, int64(i), testMsg) } @@ -596,18 +596,18 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { }) // leader1 provides three more messages on partition0, says no longer leader of partition1 - mockFetchResponse3 := newMockFetchResponse(t, 3). + mockFetchResponse3 := NewMockFetchResponse(t, 3). SetMessage("my_topic", 0, int64(7), testMsg). SetMessage("my_topic", 0, int64(8), testMsg). SetMessage("my_topic", 0, int64(9), testMsg) fetchResponse4 := new(FetchResponse) fetchResponse4.AddError("my_topic", 1, ErrNotLeaderForPartition) leader1.SetHandlerByMap(map[string]MockResponse{ - "FetchRequest": newMockSequence(mockFetchResponse3, fetchResponse4), + "FetchRequest": NewMockSequence(mockFetchResponse3, fetchResponse4), }) // leader0 provides two messages on partition 1 - mockFetchResponse4 := newMockFetchResponse(t, 2) + mockFetchResponse4 := NewMockFetchResponse(t, 2) for i := 8; i < 10; i++ { mockFetchResponse4.SetMessage("my_topic", 1, int64(i), testMsg) } @@ -627,18 +627,18 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { // read messages by the other consumer. func TestConsumerInterleavedClose(t *testing.T) { // Given - broker0 := newMockBroker(t, 0) + broker0 := NewMockBroker(t, 0) broker0.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": newMockMetadataResponse(t). SetBroker(broker0.Addr(), broker0.BrokerID()). SetLeader("my_topic", 0, broker0.BrokerID()). SetLeader("my_topic", 1, broker0.BrokerID()), - "OffsetRequest": newMockOffsetResponse(t). + "OffsetRequest": NewMockOffsetResponse(t). SetOffset("my_topic", 0, OffsetOldest, 1000). SetOffset("my_topic", 0, OffsetNewest, 1100). SetOffset("my_topic", 1, OffsetOldest, 2000). SetOffset("my_topic", 1, OffsetNewest, 2100), - "FetchRequest": newMockFetchResponse(t, 1). + "FetchRequest": NewMockFetchResponse(t, 1). SetMessage("my_topic", 0, 1000, testMsg). SetMessage("my_topic", 0, 1001, testMsg). SetMessage("my_topic", 0, 1002, testMsg). @@ -674,9 +674,9 @@ func TestConsumerInterleavedClose(t *testing.T) { } func TestConsumerBounceWithReferenceOpen(t *testing.T) { - broker0 := newMockBroker(t, 0) + broker0 := NewMockBroker(t, 0) broker0Addr := broker0.Addr() - broker1 := newMockBroker(t, 1) + broker1 := NewMockBroker(t, 1) mockMetadataResponse := newMockMetadataResponse(t). SetBroker(broker0.Addr(), broker0.BrokerID()). @@ -684,13 +684,13 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) { SetLeader("my_topic", 0, broker0.BrokerID()). SetLeader("my_topic", 1, broker1.BrokerID()) - mockOffsetResponse := newMockOffsetResponse(t). + mockOffsetResponse := NewMockOffsetResponse(t). SetOffset("my_topic", 0, OffsetOldest, 1000). SetOffset("my_topic", 0, OffsetNewest, 1100). SetOffset("my_topic", 1, OffsetOldest, 2000). SetOffset("my_topic", 1, OffsetNewest, 2100) - mockFetchResponse := newMockFetchResponse(t, 1) + mockFetchResponse := NewMockFetchResponse(t, 1) for i := 0; i < 10; i++ { mockFetchResponse.SetMessage("my_topic", 0, int64(1000+i), testMsg) mockFetchResponse.SetMessage("my_topic", 1, int64(2000+i), testMsg) @@ -745,7 +745,7 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) { } // Bring broker0 back to service. - broker0 = newMockBrokerAddr(t, 0, broker0Addr) + broker0 = NewMockBrokerAddr(t, 0, broker0Addr) broker0.SetHandlerByMap(map[string]MockResponse{ "FetchRequest": mockFetchResponse, }) @@ -773,12 +773,12 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) { func TestConsumerOffsetOutOfRange(t *testing.T) { // Given - broker0 := newMockBroker(t, 2) + broker0 := NewMockBroker(t, 2) broker0.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": newMockMetadataResponse(t). SetBroker(broker0.Addr(), broker0.BrokerID()). SetLeader("my_topic", 0, broker0.BrokerID()), - "OffsetRequest": newMockOffsetResponse(t). + "OffsetRequest": NewMockOffsetResponse(t). SetOffset("my_topic", 0, OffsetNewest, 1234). SetOffset("my_topic", 0, OffsetOldest, 2345), }) diff --git a/mockbroker_test.go b/mockbroker.go similarity index 84% rename from mockbroker_test.go rename to mockbroker.go index 28f85ba35..1055a9dff 100644 --- a/mockbroker_test.go +++ b/mockbroker.go @@ -36,7 +36,7 @@ type requestHandlerFunc func(req *request) (res encoder) // // It is not necessary to prefix message length or correlation ID to your // response bytes, the server does that automatically as a convenience. -type mockBroker struct { +type MockBroker struct { brokerID int32 port int32 closing chan none @@ -55,11 +55,11 @@ type RequestResponse struct { Response encoder } -func (b *mockBroker) SetLatency(latency time.Duration) { +func (b *MockBroker) SetLatency(latency time.Duration) { b.latency = latency } -func (b *mockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) { +func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) { b.setHandler(func(req *request) (res encoder) { reqTypeName := reflect.TypeOf(req.body).Elem().Name() mockResponse := handlerMap[reqTypeName] @@ -70,11 +70,11 @@ func (b *mockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) { }) } -func (b *mockBroker) BrokerID() int32 { +func (b *MockBroker) BrokerID() int32 { return b.brokerID } -func (b *mockBroker) History() []RequestResponse { +func (b *MockBroker) History() []RequestResponse { b.lock.Lock() history := make([]RequestResponse, len(b.history)) copy(history, b.history) @@ -82,15 +82,15 @@ func (b *mockBroker) History() []RequestResponse { return history } -func (b *mockBroker) Port() int32 { +func (b *MockBroker) Port() int32 { return b.port } -func (b *mockBroker) Addr() string { +func (b *MockBroker) Addr() string { return b.listener.Addr().String() } -func (b *mockBroker) Close() { +func (b *MockBroker) Close() { close(b.expectations) if len(b.expectations) > 0 { buf := bytes.NewBufferString(fmt.Sprintf("mockbroker/%d: not all expectations were satisfied! Still waiting on:\n", b.BrokerID())) @@ -106,20 +106,23 @@ func (b *mockBroker) Close() { // setHandler sets the specified function as the request handler. Whenever // a mock broker reads a request from the wire it passes the request to the // function and sends back whatever the handler function returns. -func (b *mockBroker) setHandler(handler requestHandlerFunc) { +func (b *MockBroker) setHandler(handler requestHandlerFunc) { b.lock.Lock() b.handler = handler b.lock.Unlock() } -func (b *mockBroker) serverLoop() { +func (b *MockBroker) serverLoop() { defer close(b.stopper) var err error var conn net.Conn go func() { <-b.closing - safeClose(b.t, b.listener) + err := b.listener.Close() + if err != nil { + b.t.Error(err) + } }() wg := &sync.WaitGroup{} @@ -133,7 +136,7 @@ func (b *mockBroker) serverLoop() { Logger.Printf("*** mockbroker/%d: listener closed, err=%v", b.BrokerID(), err) } -func (b *mockBroker) handleRequests(conn net.Conn, idx int, wg *sync.WaitGroup) { +func (b *MockBroker) handleRequests(conn net.Conn, idx int, wg *sync.WaitGroup) { defer wg.Done() defer func() { _ = conn.Close() @@ -198,7 +201,7 @@ func (b *mockBroker) handleRequests(conn net.Conn, idx int, wg *sync.WaitGroup) Logger.Printf("*** mockbroker/%d/%d: connection closed, err=%v", b.BrokerID(), idx, err) } -func (b *mockBroker) defaultRequestHandler(req *request) (res encoder) { +func (b *MockBroker) defaultRequestHandler(req *request) (res encoder) { select { case res, ok := <-b.expectations: if !ok { @@ -210,7 +213,7 @@ func (b *mockBroker) defaultRequestHandler(req *request) (res encoder) { } } -func (b *mockBroker) serverError(err error) { +func (b *MockBroker) serverError(err error) { isConnectionClosedError := false if _, ok := err.(*net.OpError); ok { isConnectionClosedError = true @@ -227,19 +230,19 @@ func (b *mockBroker) serverError(err error) { b.t.Errorf(err.Error()) } -// newMockBroker launches a fake Kafka broker. It takes a *testing.T as provided by the +// NewMockBroker launches a fake Kafka broker. It takes a *testing.T as provided by the // test framework and a channel of responses to use. If an error occurs it is // simply logged to the *testing.T and the broker exits. -func newMockBroker(t *testing.T, brokerID int32) *mockBroker { - return newMockBrokerAddr(t, brokerID, "localhost:0") +func NewMockBroker(t *testing.T, brokerID int32) *MockBroker { + return NewMockBrokerAddr(t, brokerID, "localhost:0") } -// newMockBrokerAddr behaves like newMockBroker but listens on the address you give +// NewMockBrokerAddr behaves like newMockBroker but listens on the address you give // it rather than just some ephemeral port. -func newMockBrokerAddr(t *testing.T, brokerID int32, addr string) *mockBroker { +func NewMockBrokerAddr(t *testing.T, brokerID int32, addr string) *MockBroker { var err error - broker := &mockBroker{ + broker := &MockBroker{ closing: make(chan none), stopper: make(chan none), t: t, @@ -268,6 +271,6 @@ func newMockBrokerAddr(t *testing.T, brokerID int32, addr string) *mockBroker { return broker } -func (b *mockBroker) Returns(e encoder) { +func (b *MockBroker) Returns(e encoder) { b.expectations <- e } diff --git a/mockresponses_test.go b/mockresponses.go similarity index 69% rename from mockresponses_test.go rename to mockresponses.go index 55b648d6f..07af43c55 100644 --- a/mockresponses_test.go +++ b/mockresponses.go @@ -11,37 +11,37 @@ type MockResponse interface { For(reqBody decoder) (res encoder) } -// mockWrapper is a mock response builder that returns a particular concrete +// MockWrapper is a mock response builder that returns a particular concrete // response regardless of the actual request passed to the `For` method. -type mockWrapper struct { +type MockWrapper struct { res encoder } -func (mw *mockWrapper) For(reqBody decoder) (res encoder) { +func (mw *MockWrapper) For(reqBody decoder) (res encoder) { return mw.res } -func newMockWrapper(res encoder) *mockWrapper { - return &mockWrapper{res: res} +func NewMockWrapper(res encoder) *MockWrapper { + return &MockWrapper{res: res} } -// mockSequence is a mock response builder that is created from a sequence of +// MockSequence is a mock response builder that is created from a sequence of // concrete responses. Every time when a `MockBroker` calls its `For` method // the next response from the sequence is returned. When the end of the // sequence is reached the last element from the sequence is returned. -type mockSequence struct { +type MockSequence struct { responses []MockResponse } -func newMockSequence(responses ...interface{}) *mockSequence { - ms := &mockSequence{} +func NewMockSequence(responses ...interface{}) *MockSequence { + ms := &MockSequence{} ms.responses = make([]MockResponse, len(responses)) for i, res := range responses { switch res := res.(type) { case MockResponse: ms.responses[i] = res case encoder: - ms.responses[i] = newMockWrapper(res) + ms.responses[i] = NewMockWrapper(res) default: panic(fmt.Sprintf("Unexpected response type: %T", res)) } @@ -49,7 +49,7 @@ func newMockSequence(responses ...interface{}) *mockSequence { return ms } -func (mc *mockSequence) For(reqBody decoder) (res encoder) { +func (mc *MockSequence) For(reqBody decoder) (res encoder) { res = mc.responses[0].For(reqBody) if len(mc.responses) > 1 { mc.responses = mc.responses[1:] @@ -57,22 +57,22 @@ func (mc *mockSequence) For(reqBody decoder) (res encoder) { return res } -// mockMetadataResponse is a `MetadataResponse` builder. -type mockMetadataResponse struct { +// MockMetadataResponse is a `MetadataResponse` builder. +type MockMetadataResponse struct { leaders map[string]map[int32]int32 brokers map[string]int32 t *testing.T } -func newMockMetadataResponse(t *testing.T) *mockMetadataResponse { - return &mockMetadataResponse{ +func newMockMetadataResponse(t *testing.T) *MockMetadataResponse { + return &MockMetadataResponse{ leaders: make(map[string]map[int32]int32), brokers: make(map[string]int32), t: t, } } -func (mmr *mockMetadataResponse) SetLeader(topic string, partition, brokerID int32) *mockMetadataResponse { +func (mmr *MockMetadataResponse) SetLeader(topic string, partition, brokerID int32) *MockMetadataResponse { partitions := mmr.leaders[topic] if partitions == nil { partitions = make(map[int32]int32) @@ -82,12 +82,12 @@ func (mmr *mockMetadataResponse) SetLeader(topic string, partition, brokerID int return mmr } -func (mmr *mockMetadataResponse) SetBroker(addr string, brokerID int32) *mockMetadataResponse { +func (mmr *MockMetadataResponse) SetBroker(addr string, brokerID int32) *MockMetadataResponse { mmr.brokers[addr] = brokerID return mmr } -func (mmr *mockMetadataResponse) For(reqBody decoder) encoder { +func (mmr *MockMetadataResponse) For(reqBody decoder) encoder { metadataRequest := reqBody.(*MetadataRequest) metadataResponse := &MetadataResponse{} for addr, brokerID := range mmr.brokers { @@ -109,20 +109,20 @@ func (mmr *mockMetadataResponse) For(reqBody decoder) encoder { return metadataResponse } -// mockOffsetResponse is an `OffsetResponse` builder. -type mockOffsetResponse struct { +// MockOffsetResponse is an `OffsetResponse` builder. +type MockOffsetResponse struct { offsets map[string]map[int32]map[int64]int64 t *testing.T } -func newMockOffsetResponse(t *testing.T) *mockOffsetResponse { - return &mockOffsetResponse{ +func NewMockOffsetResponse(t *testing.T) *MockOffsetResponse { + return &MockOffsetResponse{ offsets: make(map[string]map[int32]map[int64]int64), t: t, } } -func (mor *mockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *mockOffsetResponse { +func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *MockOffsetResponse { partitions := mor.offsets[topic] if partitions == nil { partitions = make(map[int32]map[int64]int64) @@ -137,7 +137,7 @@ func (mor *mockOffsetResponse) SetOffset(topic string, partition int32, time, of return mor } -func (mor *mockOffsetResponse) For(reqBody decoder) encoder { +func (mor *MockOffsetResponse) For(reqBody decoder) encoder { offsetRequest := reqBody.(*OffsetRequest) offsetResponse := &OffsetResponse{} for topic, partitions := range offsetRequest.blocks { @@ -149,7 +149,7 @@ func (mor *mockOffsetResponse) For(reqBody decoder) encoder { return offsetResponse } -func (mor *mockOffsetResponse) getOffset(topic string, partition int32, time int64) int64 { +func (mor *MockOffsetResponse) getOffset(topic string, partition int32, time int64) int64 { partitions := mor.offsets[topic] if partitions == nil { mor.t.Errorf("missing topic: %s", topic) @@ -165,16 +165,16 @@ func (mor *mockOffsetResponse) getOffset(topic string, partition int32, time int return offset } -// mockFetchResponse is a `FetchResponse` builder. -type mockFetchResponse struct { +// MockFetchResponse is a `FetchResponse` builder. +type MockFetchResponse struct { messages map[string]map[int32]map[int64]Encoder highWaterMarks map[string]map[int32]int64 t *testing.T batchSize int } -func newMockFetchResponse(t *testing.T, batchSize int) *mockFetchResponse { - return &mockFetchResponse{ +func NewMockFetchResponse(t *testing.T, batchSize int) *MockFetchResponse { + return &MockFetchResponse{ messages: make(map[string]map[int32]map[int64]Encoder), highWaterMarks: make(map[string]map[int32]int64), t: t, @@ -182,7 +182,7 @@ func newMockFetchResponse(t *testing.T, batchSize int) *mockFetchResponse { } } -func (mfr *mockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *mockFetchResponse { +func (mfr *MockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *MockFetchResponse { partitions := mfr.messages[topic] if partitions == nil { partitions = make(map[int32]map[int64]Encoder) @@ -197,7 +197,7 @@ func (mfr *mockFetchResponse) SetMessage(topic string, partition int32, offset i return mfr } -func (mfr *mockFetchResponse) SetHighWaterMark(topic string, partition int32, offset int64) *mockFetchResponse { +func (mfr *MockFetchResponse) SetHighWaterMark(topic string, partition int32, offset int64) *MockFetchResponse { partitions := mfr.highWaterMarks[topic] if partitions == nil { partitions = make(map[int32]int64) @@ -207,7 +207,7 @@ func (mfr *mockFetchResponse) SetHighWaterMark(topic string, partition int32, of return mfr } -func (mfr *mockFetchResponse) For(reqBody decoder) encoder { +func (mfr *MockFetchResponse) For(reqBody decoder) encoder { fetchRequest := reqBody.(*FetchRequest) res := &FetchResponse{} for topic, partitions := range fetchRequest.blocks { @@ -234,7 +234,7 @@ func (mfr *mockFetchResponse) For(reqBody decoder) encoder { return res } -func (mfr *mockFetchResponse) getMessage(topic string, partition int32, offset int64) Encoder { +func (mfr *MockFetchResponse) getMessage(topic string, partition int32, offset int64) Encoder { partitions := mfr.messages[topic] if partitions == nil { return nil @@ -246,7 +246,7 @@ func (mfr *mockFetchResponse) getMessage(topic string, partition int32, offset i return messages[offset] } -func (mfr *mockFetchResponse) getMessageCount(topic string, partition int32) int { +func (mfr *MockFetchResponse) getMessageCount(topic string, partition int32) int { partitions := mfr.messages[topic] if partitions == nil { return 0 @@ -258,7 +258,7 @@ func (mfr *mockFetchResponse) getMessageCount(topic string, partition int32) int return len(messages) } -func (mfr *mockFetchResponse) getHighWaterMark(topic string, partition int32) int64 { +func (mfr *MockFetchResponse) getHighWaterMark(topic string, partition int32) int64 { partitions := mfr.highWaterMarks[topic] if partitions == nil { return 0 @@ -266,36 +266,36 @@ func (mfr *mockFetchResponse) getHighWaterMark(topic string, partition int32) in return partitions[partition] } -// mockConsumerMetadataResponse is a `ConsumerMetadataResponse` builder. -type mockConsumerMetadataResponse struct { +// MockConsumerMetadataResponse is a `ConsumerMetadataResponse` builder. +type MockConsumerMetadataResponse struct { coordinators map[string]interface{} t *testing.T } -func newMockConsumerMetadataResponse(t *testing.T) *mockConsumerMetadataResponse { - return &mockConsumerMetadataResponse{ +func newMockConsumerMetadataResponse(t *testing.T) *MockConsumerMetadataResponse { + return &MockConsumerMetadataResponse{ coordinators: make(map[string]interface{}), t: t, } } -func (mr *mockConsumerMetadataResponse) SetCoordinator(group string, broker *mockBroker) *mockConsumerMetadataResponse { +func (mr *MockConsumerMetadataResponse) SetCoordinator(group string, broker *MockBroker) *MockConsumerMetadataResponse { mr.coordinators[group] = broker return mr } -func (mr *mockConsumerMetadataResponse) SetError(group string, kerror KError) *mockConsumerMetadataResponse { +func (mr *MockConsumerMetadataResponse) SetError(group string, kerror KError) *MockConsumerMetadataResponse { mr.coordinators[group] = kerror return mr } -func (mr *mockConsumerMetadataResponse) For(reqBody decoder) encoder { +func (mr *MockConsumerMetadataResponse) For(reqBody decoder) encoder { req := reqBody.(*ConsumerMetadataRequest) group := req.ConsumerGroup res := &ConsumerMetadataResponse{} v := mr.coordinators[group] switch v := v.(type) { - case *mockBroker: + case *MockBroker: res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()} case KError: res.Err = v @@ -303,17 +303,17 @@ func (mr *mockConsumerMetadataResponse) For(reqBody decoder) encoder { return res } -// mockOffsetCommitResponse is a `OffsetCommitResponse` builder. -type mockOffsetCommitResponse struct { +// MockOffsetCommitResponse is a `OffsetCommitResponse` builder. +type MockOffsetCommitResponse struct { errors map[string]map[string]map[int32]KError t *testing.T } -func newMockOffsetCommitResponse(t *testing.T) *mockOffsetCommitResponse { - return &mockOffsetCommitResponse{t: t} +func NewMockOffsetCommitResponse(t *testing.T) *MockOffsetCommitResponse { + return &MockOffsetCommitResponse{t: t} } -func (mr *mockOffsetCommitResponse) SetError(group, topic string, partition int32, kerror KError) *mockOffsetCommitResponse { +func (mr *MockOffsetCommitResponse) SetError(group, topic string, partition int32, kerror KError) *MockOffsetCommitResponse { if mr.errors == nil { mr.errors = make(map[string]map[string]map[int32]KError) } @@ -331,7 +331,7 @@ func (mr *mockOffsetCommitResponse) SetError(group, topic string, partition int3 return mr } -func (mr *mockOffsetCommitResponse) For(reqBody decoder) encoder { +func (mr *MockOffsetCommitResponse) For(reqBody decoder) encoder { req := reqBody.(*OffsetCommitRequest) group := req.ConsumerGroup res := &OffsetCommitResponse{} @@ -343,7 +343,7 @@ func (mr *mockOffsetCommitResponse) For(reqBody decoder) encoder { return res } -func (mr *mockOffsetCommitResponse) getError(group, topic string, partition int32) KError { +func (mr *MockOffsetCommitResponse) getError(group, topic string, partition int32) KError { topics := mr.errors[group] if topics == nil { return ErrNoError @@ -359,17 +359,17 @@ func (mr *mockOffsetCommitResponse) getError(group, topic string, partition int3 return kerror } -// mockProduceResponse is a `ProduceResponse` builder. -type mockProduceResponse struct { +// MockProduceResponse is a `ProduceResponse` builder. +type MockProduceResponse struct { errors map[string]map[int32]KError t *testing.T } -func newMockProduceResponse(t *testing.T) *mockProduceResponse { - return &mockProduceResponse{t: t} +func NewMockProduceResponse(t *testing.T) *MockProduceResponse { + return &MockProduceResponse{t: t} } -func (mr *mockProduceResponse) SetError(topic string, partition int32, kerror KError) *mockProduceResponse { +func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KError) *MockProduceResponse { if mr.errors == nil { mr.errors = make(map[string]map[int32]KError) } @@ -382,7 +382,7 @@ func (mr *mockProduceResponse) SetError(topic string, partition int32, kerror KE return mr } -func (mr *mockProduceResponse) For(reqBody decoder) encoder { +func (mr *MockProduceResponse) For(reqBody decoder) encoder { req := reqBody.(*ProduceRequest) res := &ProduceResponse{} for topic, partitions := range req.msgSets { @@ -393,7 +393,7 @@ func (mr *mockProduceResponse) For(reqBody decoder) encoder { return res } -func (mr *mockProduceResponse) getError(topic string, partition int32) KError { +func (mr *MockProduceResponse) getError(topic string, partition int32) KError { partitions := mr.errors[topic] if partitions == nil { return ErrNoError @@ -405,17 +405,17 @@ func (mr *mockProduceResponse) getError(topic string, partition int32) KError { return kerror } -// mockOffsetFetchResponse is a `OffsetFetchResponse` builder. -type mockOffsetFetchResponse struct { +// MockOffsetFetchResponse is a `OffsetFetchResponse` builder. +type MockOffsetFetchResponse struct { offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock t *testing.T } -func newMockOffsetFetchResponse(t *testing.T) *mockOffsetFetchResponse { - return &mockOffsetFetchResponse{t: t} +func NewMockOffsetFetchResponse(t *testing.T) *MockOffsetFetchResponse { + return &MockOffsetFetchResponse{t: t} } -func (mr *mockOffsetFetchResponse) SetOffset(group, topic string, partition int32, offset int64, metadata string, kerror KError) *mockOffsetFetchResponse { +func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int32, offset int64, metadata string, kerror KError) *MockOffsetFetchResponse { if mr.offsets == nil { mr.offsets = make(map[string]map[string]map[int32]*OffsetFetchResponseBlock) } @@ -433,7 +433,7 @@ func (mr *mockOffsetFetchResponse) SetOffset(group, topic string, partition int3 return mr } -func (mr *mockOffsetFetchResponse) For(reqBody decoder) encoder { +func (mr *MockOffsetFetchResponse) For(reqBody decoder) encoder { req := reqBody.(*OffsetFetchRequest) group := req.ConsumerGroup res := &OffsetFetchResponse{} diff --git a/offset_manager_test.go b/offset_manager_test.go index 00d5fba9e..d9fbb1d05 100644 --- a/offset_manager_test.go +++ b/offset_manager_test.go @@ -6,14 +6,14 @@ import ( ) func initOffsetManager(t *testing.T) (om OffsetManager, - testClient Client, broker, coordinator *mockBroker) { + testClient Client, broker, coordinator *MockBroker) { config := NewConfig() config.Metadata.Retry.Max = 1 config.Consumer.Offsets.CommitInterval = 1 * time.Millisecond - broker = newMockBroker(t, 1) - coordinator = newMockBroker(t, 2) + broker = NewMockBroker(t, 1) + coordinator = NewMockBroker(t, 2) seedMeta := new(MetadataResponse) seedMeta.AddBroker(coordinator.Addr(), coordinator.BrokerID()) @@ -42,7 +42,7 @@ func initOffsetManager(t *testing.T) (om OffsetManager, } func initPartitionOffsetManager(t *testing.T, om OffsetManager, - coordinator *mockBroker, initialOffset int64, metadata string) PartitionOffsetManager { + coordinator *MockBroker, initialOffset int64, metadata string) PartitionOffsetManager { fetchResponse := new(OffsetFetchResponse) fetchResponse.AddBlock("my_topic", 0, &OffsetFetchResponseBlock{ @@ -61,7 +61,7 @@ func initPartitionOffsetManager(t *testing.T, om OffsetManager, } func TestNewOffsetManager(t *testing.T) { - seedBroker := newMockBroker(t, 1) + seedBroker := NewMockBroker(t, 1) seedBroker.Returns(new(MetadataResponse)) testClient, err := NewClient([]string{seedBroker.Addr()}, nil) @@ -101,7 +101,7 @@ func TestOffsetManagerFetchInitialFail(t *testing.T) { coordinator.Returns(fetchResponse) // Refresh coordinator - newCoordinator := newMockBroker(t, 3) + newCoordinator := NewMockBroker(t, 3) broker.Returns(&ConsumerMetadataResponse{ CoordinatorID: newCoordinator.BrokerID(), CoordinatorHost: "127.0.0.1", @@ -238,7 +238,7 @@ func TestPartitionOffsetManagerCommitErr(t *testing.T) { ocResponse.AddError("my_topic", 1, ErrNoError) coordinator.Returns(ocResponse) - newCoordinator := newMockBroker(t, 3) + newCoordinator := NewMockBroker(t, 3) // For RefreshCoordinator() broker.Returns(&ConsumerMetadataResponse{ @@ -311,7 +311,7 @@ func TestAbortPartitionOffsetManager(t *testing.T) { coordinator.Close() // Response to refresh coordinator request - newCoordinator := newMockBroker(t, 3) + newCoordinator := NewMockBroker(t, 3) broker.Returns(&ConsumerMetadataResponse{ CoordinatorID: newCoordinator.BrokerID(), CoordinatorHost: "127.0.0.1", diff --git a/sync_producer_test.go b/sync_producer_test.go index d378949b1..765877466 100644 --- a/sync_producer_test.go +++ b/sync_producer_test.go @@ -7,8 +7,8 @@ import ( ) func TestSyncProducer(t *testing.T) { - seedBroker := newMockBroker(t, 1) - leader := newMockBroker(t, 2) + seedBroker := NewMockBroker(t, 1) + leader := NewMockBroker(t, 2) metadataResponse := new(MetadataResponse) metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) @@ -55,8 +55,8 @@ func TestSyncProducer(t *testing.T) { } func TestConcurrentSyncProducer(t *testing.T) { - seedBroker := newMockBroker(t, 1) - leader := newMockBroker(t, 2) + seedBroker := NewMockBroker(t, 1) + leader := NewMockBroker(t, 2) metadataResponse := new(MetadataResponse) metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) @@ -98,7 +98,7 @@ func TestConcurrentSyncProducer(t *testing.T) { } func TestSyncProducerToNonExistingTopic(t *testing.T) { - broker := newMockBroker(t, 1) + broker := NewMockBroker(t, 1) metadataResponse := new(MetadataResponse) metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())