Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make mock brokers and protocol packets available for outsider #570

Merged
merged 2 commits into from
Feb 17, 2016
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 33 additions & 33 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ func (f flakyEncoder) Encode() ([]byte, error) {
}

func TestAsyncProducer(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
Expand Down Expand Up @@ -140,8 +140,8 @@ func TestAsyncProducer(t *testing.T) {
}

func TestAsyncProducerMultipleFlushes(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
Expand Down Expand Up @@ -175,9 +175,9 @@ func TestAsyncProducerMultipleFlushes(t *testing.T) {
}

func TestAsyncProducerMultipleBrokers(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader0 := newMockBroker(t, 2)
leader1 := newMockBroker(t, 3)
seedBroker := NewMockBroker(t, 1)
leader0 := NewMockBroker(t, 2)
leader1 := NewMockBroker(t, 3)

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
Expand Down Expand Up @@ -215,8 +215,8 @@ func TestAsyncProducerMultipleBrokers(t *testing.T) {
}

func TestAsyncProducerCustomPartitioner(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
Expand Down Expand Up @@ -257,9 +257,9 @@ func TestAsyncProducerCustomPartitioner(t *testing.T) {
}

func TestAsyncProducerFailureRetry(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader1 := newMockBroker(t, 2)
leader2 := newMockBroker(t, 3)
seedBroker := NewMockBroker(t, 1)
leader1 := NewMockBroker(t, 2)
leader2 := NewMockBroker(t, 3)

metadataLeader1 := new(MetadataResponse)
metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
Expand Down Expand Up @@ -305,8 +305,8 @@ func TestAsyncProducerFailureRetry(t *testing.T) {
}

func TestAsyncProducerEncoderFailures(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
Expand Down Expand Up @@ -344,8 +344,8 @@ func TestAsyncProducerEncoderFailures(t *testing.T) {
// producer reconnects to it and continues sending messages.
func TestAsyncProducerBrokerBounce(t *testing.T) {
// Given
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)
leaderAddr := leader.Addr()

metadataResponse := new(MetadataResponse)
Expand All @@ -370,7 +370,7 @@ func TestAsyncProducerBrokerBounce(t *testing.T) {

// When: a broker connection gets reset by a broker (network glitch, restart, you name it).
leader.Close() // producer should get EOF
leader = newMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles
leader = NewMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles
seedBroker.Returns(metadataResponse) // tell it to go to broker 2 again

// Then: a produced message goes through the new broker connection.
Expand All @@ -384,9 +384,9 @@ func TestAsyncProducerBrokerBounce(t *testing.T) {
}

func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader1 := newMockBroker(t, 2)
leader2 := newMockBroker(t, 3)
seedBroker := NewMockBroker(t, 1)
leader1 := NewMockBroker(t, 2)
leader2 := NewMockBroker(t, 3)

metadataLeader1 := new(MetadataResponse)
metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
Expand Down Expand Up @@ -427,9 +427,9 @@ func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) {
}

func TestAsyncProducerMultipleRetries(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader1 := newMockBroker(t, 2)
leader2 := newMockBroker(t, 3)
seedBroker := NewMockBroker(t, 1)
leader1 := NewMockBroker(t, 2)
leader2 := NewMockBroker(t, 3)

metadataLeader1 := new(MetadataResponse)
metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
Expand Down Expand Up @@ -484,8 +484,8 @@ func TestAsyncProducerMultipleRetries(t *testing.T) {
func TestAsyncProducerOutOfRetries(t *testing.T) {
t.Skip("Enable once bug #294 is fixed.")

seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
Expand Down Expand Up @@ -539,8 +539,8 @@ func TestAsyncProducerOutOfRetries(t *testing.T) {
}

func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)
leaderAddr := leader.Addr()

metadataResponse := new(MetadataResponse)
Expand Down Expand Up @@ -575,7 +575,7 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {

// reboot the broker (the producer will get EOF on its existing connection)
leader.Close()
leader = newMockBrokerAddr(t, 2, leaderAddr)
leader = NewMockBrokerAddr(t, 2, leaderAddr)

// send another message on partition 0 to trigger the EOF and retry
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
Expand All @@ -596,8 +596,8 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
}

func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
Expand Down Expand Up @@ -638,7 +638,7 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
time.Sleep(50 * time.Millisecond)

leader.SetHandlerByMap(map[string]MockResponse{
"ProduceRequest": newMockProduceResponse(t).
"ProduceRequest": NewMockProduceResponse(t).
SetError("my_topic", 0, ErrNoError),
})

Expand All @@ -661,8 +661,8 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
}

func TestAsyncProducerRetryShutdown(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)

metadataLeader := new(MetadataResponse)
metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
Expand Down
2 changes: 1 addition & 1 deletion broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestBrokerAccessors(t *testing.T) {
}

func TestSimpleBrokerCommunication(t *testing.T) {
mb := newMockBroker(t, 0)
mb := NewMockBroker(t, 0)
defer mb.Close()

broker := NewBroker(mb.Addr())
Expand Down
54 changes: 27 additions & 27 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func safeClose(t testing.TB, c io.Closer) {
}

func TestSimpleClient(t *testing.T) {
seedBroker := newMockBroker(t, 1)
seedBroker := NewMockBroker(t, 1)

seedBroker.Returns(new(MetadataResponse))

Expand All @@ -29,7 +29,7 @@ func TestSimpleClient(t *testing.T) {
}

func TestCachedPartitions(t *testing.T) {
seedBroker := newMockBroker(t, 1)
seedBroker := NewMockBroker(t, 1)

replicas := []int32{3, 1, 5}
isr := []int32{5, 1}
Expand Down Expand Up @@ -68,7 +68,7 @@ func TestCachedPartitions(t *testing.T) {
}

func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) {
seedBroker := newMockBroker(t, 1)
seedBroker := NewMockBroker(t, 1)

replicas := []int32{seedBroker.BrokerID()}

Expand Down Expand Up @@ -122,7 +122,7 @@ func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) {
}

func TestClientSeedBrokers(t *testing.T) {
seedBroker := newMockBroker(t, 1)
seedBroker := NewMockBroker(t, 1)

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker("localhost:12345", 2)
Expand All @@ -138,8 +138,8 @@ func TestClientSeedBrokers(t *testing.T) {
}

func TestClientMetadata(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 5)
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 5)

replicas := []int32{3, 1, 5}
isr := []int32{5, 1}
Expand Down Expand Up @@ -202,8 +202,8 @@ func TestClientMetadata(t *testing.T) {
}

func TestClientGetOffset(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)
leaderAddr := leader.Addr()

metadata := new(MetadataResponse)
Expand Down Expand Up @@ -231,7 +231,7 @@ func TestClientGetOffset(t *testing.T) {
leader.Close()
seedBroker.Returns(metadata)

leader = newMockBrokerAddr(t, 2, leaderAddr)
leader = NewMockBrokerAddr(t, 2, leaderAddr)
offsetResponse = new(OffsetResponse)
offsetResponse.AddTopicPartition("foo", 0, 456)
leader.Returns(offsetResponse)
Expand All @@ -250,7 +250,7 @@ func TestClientGetOffset(t *testing.T) {
}

func TestClientReceivingUnknownTopic(t *testing.T) {
seedBroker := newMockBroker(t, 1)
seedBroker := NewMockBroker(t, 1)

metadataResponse1 := new(MetadataResponse)
seedBroker.Returns(metadataResponse1)
Expand Down Expand Up @@ -286,8 +286,8 @@ func TestClientReceivingUnknownTopic(t *testing.T) {
}

func TestClientReceivingPartialMetadata(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 5)
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 5)

metadataResponse1 := new(MetadataResponse)
metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
Expand Down Expand Up @@ -339,8 +339,8 @@ func TestClientReceivingPartialMetadata(t *testing.T) {
}

func TestClientRefreshBehaviour(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 5)
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 5)

metadataResponse1 := new(MetadataResponse)
metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
Expand Down Expand Up @@ -375,7 +375,7 @@ func TestClientRefreshBehaviour(t *testing.T) {
}

func TestClientResurrectDeadSeeds(t *testing.T) {
initialSeed := newMockBroker(t, 0)
initialSeed := NewMockBroker(t, 0)
emptyMetadata := new(MetadataResponse)
initialSeed.Returns(emptyMetadata)

Expand All @@ -390,9 +390,9 @@ func TestClientResurrectDeadSeeds(t *testing.T) {

client := c.(*client)

seed1 := newMockBroker(t, 1)
seed2 := newMockBroker(t, 2)
seed3 := newMockBroker(t, 3)
seed1 := NewMockBroker(t, 1)
seed2 := NewMockBroker(t, 2)
seed3 := NewMockBroker(t, 3)
addr1 := seed1.Addr()
addr2 := seed2.Addr()
addr3 := seed3.Addr()
Expand All @@ -413,8 +413,8 @@ func TestClientResurrectDeadSeeds(t *testing.T) {
seed1.Close()
seed2.Close()

seed1 = newMockBrokerAddr(t, 1, addr1)
seed2 = newMockBrokerAddr(t, 2, addr2)
seed1 = NewMockBrokerAddr(t, 1, addr1)
seed2 = NewMockBrokerAddr(t, 2, addr2)

seed3.Close()

Expand All @@ -434,9 +434,9 @@ func TestClientResurrectDeadSeeds(t *testing.T) {
}

func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) {
seedBroker := newMockBroker(t, 1)
staleCoordinator := newMockBroker(t, 2)
freshCoordinator := newMockBroker(t, 3)
seedBroker := NewMockBroker(t, 1)
staleCoordinator := NewMockBroker(t, 2)
freshCoordinator := NewMockBroker(t, 3)

replicas := []int32{staleCoordinator.BrokerID(), freshCoordinator.BrokerID()}
metadataResponse1 := new(MetadataResponse)
Expand Down Expand Up @@ -513,8 +513,8 @@ func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) {
}

func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) {
seedBroker := newMockBroker(t, 1)
coordinator := newMockBroker(t, 2)
seedBroker := NewMockBroker(t, 1)
coordinator := NewMockBroker(t, 2)

metadataResponse1 := new(MetadataResponse)
seedBroker.Returns(metadataResponse1)
Expand Down Expand Up @@ -566,7 +566,7 @@ func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) {
}

func TestClientAutorefreshShutdownRace(t *testing.T) {
seedBroker := newMockBroker(t, 1)
seedBroker := NewMockBroker(t, 1)

metadataResponse := new(MetadataResponse)
seedBroker.Returns(metadataResponse)
Expand Down Expand Up @@ -594,7 +594,7 @@ func TestClientAutorefreshShutdownRace(t *testing.T) {
time.Sleep(10 * time.Millisecond)

// Then return some metadata to the still-running background thread
leader := newMockBroker(t, 2)
leader := NewMockBroker(t, 2)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddTopicPartition("foo", 0, leader.BrokerID(), []int32{2}, []int32{2}, ErrNoError)
seedBroker.Returns(metadataResponse)
Expand Down
Loading