Skip to content

Commit

Permalink
Fix occasionaly failing TestBroadcast* tests
Browse files Browse the repository at this point in the history
This changeset removes a race condition responsible for the ocassional
failures we see in CI in the TestBroadcast* tests that read from "disk",
the construct that represents the broker's disk where all produced
messages are eventually written to.

The TestBroadcast* tests have been executed 10000 times and do not fail.

Change-Id: I5dea63a52d64171bfd703e5cd053437b0d215cf8
Signed-off-by: Kostas Christidis <kostas@christidis.io>
  • Loading branch information
kchristidis committed Oct 25, 2016
1 parent c0aa1df commit 021b3c4
Showing 1 changed file with 3 additions and 3 deletions.
6 changes: 3 additions & 3 deletions orderer/kafka/producer_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type mockProducerImpl struct {
producer *mocks.SyncProducer

checker mocks.ValueChecker
disk chan []byte // This is the "disk"/log that the producer writes to
disk chan []byte // This simulates the broker's "disk" where the producer's messages eventually end up
producedOffset int64
t *testing.T
}
Expand All @@ -54,13 +54,13 @@ func mockNewProducer(t *testing.T, conf *config.TopLevel, seek int64, disk chan
func (mp *mockProducerImpl) Send(payload []byte) error {
mp.producer.ExpectSendMessageWithCheckerFunctionAndSucceed(mp.checker)
mp.producedOffset++
mp.disk <- payload
prt, ofs, err := mp.producer.SendMessage(newMsg(payload, mp.config.Kafka.Topic))
if err != nil ||
prt != mp.config.Kafka.PartitionID ||
ofs != mp.producedOffset {
mp.t.Fatal("Producer not functioning as expected")
}
mp.disk <- payload // Reaches the broker's disk
return err
}

Expand All @@ -72,7 +72,7 @@ func (mp *mockProducerImpl) testFillWithBlocks(seek int64) {
dyingChan := make(chan struct{})
deadChan := make(chan struct{})

go func() { // This goroutine is meant to read only the "fill-in" blocks.
go func() { // This goroutine is meant to read only the "fill-in" blocks
for {
select {
case <-mp.disk:
Expand Down

0 comments on commit 021b3c4

Please sign in to comment.