diff --git a/orderer/kafka/orderer_test.go b/orderer/kafka/orderer_test.go index b0e0245a0f1..0624091955e 100644 --- a/orderer/kafka/orderer_test.go +++ b/orderer/kafka/orderer_test.go @@ -79,24 +79,7 @@ func mockNewConsenter(t *testing.T, kafkaVersion sarama.KafkaVersion, retryOptio } } -func TestKafkaConsenterEmptyBatch(t *testing.T) { - var wg sync.WaitGroup - defer wg.Wait() - cs := &mockmultichain.ConsenterSupport{ - Batches: make(chan []*cb.Envelope), - BlockCutterVal: mockblockcutter.NewReceiver(), - ChainIDVal: provisional.TestChainID, - SharedConfigVal: &mocksharedconfig.Manager{BatchTimeoutVal: testTimePadding}, - } - defer close(cs.BlockCutterVal.Block) - - co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry) - ch := newChain(co, cs) - ch.lastProcessed = testOldestOffset - 1 - - go ch.Start() - defer ch.Halt() - +func prepareMockObjectDisks(t *testing.T, co *mockConsenterImpl, ch *chainImpl) { // Wait until the mock producer is done before messing around with its disk select { case <-ch.producer.(*mockProducerImpl).isSetup: @@ -111,21 +94,50 @@ func TestKafkaConsenterEmptyBatch(t *testing.T) { case <-time.After(testTimePadding): t.Fatal("Mock consumer not setup in time") } +} +func waitableSyncQueueMessage(env *cb.Envelope, messagesToPickUp int, wg *sync.WaitGroup, + co *mockConsenterImpl, cs *mockmultichain.ConsenterSupport, ch *chainImpl) { wg.Add(1) go func() { defer wg.Done() - // Pick up the message that will be posted via the syncQueueMessage/Enqueue call below - msg := <-co.prodDisk - // Place it to the right location so that the mockConsumer can read it - co.consDisk <- msg + for i := 0; i < messagesToPickUp; i++ { + // On the first iteration of this loop, the message that will be picked up + // is the one posted via the syncQueueMessage/Enqueue call below + msg := <-co.prodDisk + // Place it to the right location so that the mockConsumer can read it + co.consDisk <- msg + } }() - syncQueueMessage(newTestEnvelope("one"), ch, cs.BlockCutterVal) + syncQueueMessage(env, ch, cs.BlockCutterVal) // The message has already been moved to the consumer's disk, // otherwise syncQueueMessage wouldn't return, so the Wait() // here is unnecessary but let's be paranoid. wg.Wait() +} + +func TestKafkaConsenterEmptyBatch(t *testing.T) { + var wg sync.WaitGroup + defer wg.Wait() + cs := &mockmultichain.ConsenterSupport{ + Batches: make(chan []*cb.Envelope), + BlockCutterVal: mockblockcutter.NewReceiver(), + ChainIDVal: provisional.TestChainID, + SharedConfigVal: &mocksharedconfig.Manager{BatchTimeoutVal: testTimePadding}, + } + defer close(cs.BlockCutterVal.Block) + + co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry) + ch := newChain(co, cs) + ch.lastProcessed = testOldestOffset - 1 + + go ch.Start() + defer ch.Halt() + + prepareMockObjectDisks(t, co, ch) + + waitableSyncQueueMessage(newTestEnvelope("one"), 1, &wg, co, cs, ch) // Stop the loop ch.Halt() @@ -157,38 +169,11 @@ func TestKafkaConsenterBatchTimer(t *testing.T) { go ch.Start() defer ch.Halt() - // Wait until the mock producer is done before messing around with its disk - select { - case <-ch.producer.(*mockProducerImpl).isSetup: - // Dispense with the CONNECT message that is posted with Start() - <-co.prodDisk - case <-time.After(testTimePadding): - t.Fatal("Mock producer not setup in time") - } - // Same for the mock consumer - select { - case <-ch.setupChan: - case <-time.After(testTimePadding): - t.Fatal("Mock consumer not setup in time") - } + prepareMockObjectDisks(t, co, ch) - wg.Add(1) - go func() { - defer wg.Done() - for i := 0; i < 2; i++ { - // First pass: Pick up the message that will be posted via the syncQueueMessage/Enqueue call below - // Second pass: Pick up the time-to-cut message that will be posted when the short timer expires - msg := <-co.prodDisk - // Place it to the right location so that the mockConsumer can read it - co.consDisk <- msg - } - }() - - syncQueueMessage(newTestEnvelope("one"), ch, cs.BlockCutterVal) - // The message has already been moved to the consumer's disk, - // otherwise syncQueueMessage wouldn't return, so the Wait() - // here is unnecessary but let's be paranoid. - wg.Wait() + // The second message that will be picked up is the time-to-cut message + // that will be posted when the short timer expires + waitableSyncQueueMessage(newTestEnvelope("one"), 2, &wg, co, cs, ch) select { case <-cs.Batches: // This is the success path @@ -197,17 +182,7 @@ func TestKafkaConsenterBatchTimer(t *testing.T) { } // As above - wg.Add(1) - go func() { - defer wg.Done() - for i := 0; i < 2; i++ { - msg := <-co.prodDisk - co.consDisk <- msg - } - }() - - syncQueueMessage(newTestEnvelope("two"), ch, cs.BlockCutterVal) - wg.Wait() + waitableSyncQueueMessage(newTestEnvelope("two"), 2, &wg, co, cs, ch) select { case <-cs.Batches: // This is the success path @@ -245,48 +220,13 @@ func TestKafkaConsenterTimerHaltOnFilledBatch(t *testing.T) { go ch.Start() defer ch.Halt() - // Wait until the mock producer is done before messing around with its disk - select { - case <-ch.producer.(*mockProducerImpl).isSetup: - // Dispense with the CONNECT message that is posted with Start() - <-co.prodDisk - case <-time.After(testTimePadding): - t.Fatal("Mock producer not setup in time") - } - // Same for the mock consumer - select { - case <-ch.setupChan: - case <-time.After(testTimePadding): - t.Fatal("Mock consumer not setup in time") - } - - wg.Add(1) - go func() { - defer wg.Done() - // Pick up the message that will be posted via the syncQueueMessage/Enqueue call below - msg := <-co.prodDisk - // Place it to the right location so that the mockConsumer can read it - co.consDisk <- msg - }() + prepareMockObjectDisks(t, co, ch) - syncQueueMessage(newTestEnvelope("one"), ch, cs.BlockCutterVal) - // The message has already been moved to the consumer's disk, - // otherwise syncQueueMessage wouldn't return, so the Wait() - // here is unnecessary but let's be paranoid. - wg.Wait() + waitableSyncQueueMessage(newTestEnvelope("one"), 1, &wg, co, cs, ch) cs.BlockCutterVal.CutNext = true - // As above - wg.Add(1) - go func() { - defer wg.Done() - msg := <-co.prodDisk - co.consDisk <- msg - }() - - syncQueueMessage(newTestEnvelope("two"), ch, cs.BlockCutterVal) - wg.Wait() + waitableSyncQueueMessage(newTestEnvelope("two"), 1, &wg, co, cs, ch) select { case <-cs.Batches: @@ -300,21 +240,9 @@ func TestKafkaConsenterTimerHaltOnFilledBatch(t *testing.T) { cs.BlockCutterVal.CutNext = false - // As above, but with a change because of the expect time-to-cut message, see below: - wg.Add(1) - go func() { - defer wg.Done() - for i := 0; i < 2; i++ { - // First pass: Pick up the message that will be posted via the syncQueueMessage/Enqueue call below - // Second pass: Pick up the time-to-cut message that will be posted when the short timer expires - msg := <-co.prodDisk - // Place it to the right location so that the mockConsumer can read it - co.consDisk <- msg - } - }() - - syncQueueMessage(newTestEnvelope("three"), ch, cs.BlockCutterVal) - wg.Wait() + // The second message that will be picked up is the time-to-cut message + // that will be posted when the short timer expires + waitableSyncQueueMessage(newTestEnvelope("three"), 2, &wg, co, cs, ch) select { case <-cs.Batches: @@ -351,44 +279,13 @@ func TestKafkaConsenterConfigStyleMultiBatch(t *testing.T) { go ch.Start() defer ch.Halt() - // Wait until the mock producer is done before messing around with its disk - select { - case <-ch.producer.(*mockProducerImpl).isSetup: - // Dispense with the CONNECT message that is posted with Start() - <-co.prodDisk - case <-time.After(testTimePadding): - t.Fatal("Mock producer not setup in time") - } - // Same for the mock consumer - select { - case <-ch.setupChan: - case <-time.After(testTimePadding): - t.Fatal("Mock consumer not setup in time") - } + prepareMockObjectDisks(t, co, ch) - wg.Add(1) - go func() { - defer wg.Done() - // Pick up the message that will be posted via the syncQueueMessage/Enqueue call below - msg := <-co.prodDisk - // Place it to the right location so that the mockConsumer can read it - co.consDisk <- msg - }() - syncQueueMessage(newTestEnvelope("one"), ch, cs.BlockCutterVal) - wg.Wait() + waitableSyncQueueMessage(newTestEnvelope("one"), 1, &wg, co, cs, ch) cs.BlockCutterVal.IsolatedTx = true - wg.Add(1) - go func() { - defer wg.Done() - // Pick up the message that will be posted via the syncQueueMessage/Enqueue call below - msg := <-co.prodDisk - // Place it to the right location so that the mockConsumer can read it - co.consDisk <- msg - }() - syncQueueMessage(newTestEnvelope("two"), ch, cs.BlockCutterVal) - wg.Wait() + waitableSyncQueueMessage(newTestEnvelope("two"), 1, &wg, co, cs, ch) ch.Halt() @@ -431,39 +328,14 @@ func TestKafkaConsenterTimeToCutForced(t *testing.T) { go ch.Start() defer ch.Halt() - // Wait until the mock producer is done before messing around with its disk - select { - case <-ch.producer.(*mockProducerImpl).isSetup: - // Dispense with the CONNECT message that is posted with Start() - <-co.prodDisk - case <-time.After(testTimePadding): - t.Fatal("Mock producer not setup in time") - } - // Same for the mock consumer - select { - case <-ch.setupChan: - case <-time.After(testTimePadding): - t.Fatal("Mock consumer not setup in time") - } + prepareMockObjectDisks(t, co, ch) - wg.Add(1) - go func() { - defer wg.Done() - // Pick up the message that will be posted via the syncQueueMessage/Enqueue call below - msg := <-co.prodDisk - // Place it to the right location so that the mockConsumer can read it - co.consDisk <- msg - }() - - syncQueueMessage(newTestEnvelope("one"), ch, cs.BlockCutterVal) - // The message has already been moved to the consumer's disk, - // otherwise syncQueueMessage wouldn't return, so the Wait() - // here is unnecessary but let's be paranoid. - wg.Wait() + waitableSyncQueueMessage(newTestEnvelope("one"), 1, &wg, co, cs, ch) cs.BlockCutterVal.CutNext = true - // As above + // This is like the waitableSyncQueueMessage routine with the difference + // that we post a time-to-cut message instead of a test envelope. wg.Add(1) go func() { defer wg.Done() @@ -512,39 +384,14 @@ func TestKafkaConsenterTimeToCutDuplicate(t *testing.T) { go ch.Start() defer ch.Halt() - // Wait until the mock producer is done before messing around with its disk - select { - case <-ch.producer.(*mockProducerImpl).isSetup: - // Dispense with the CONNECT message that is posted with Start() - <-co.prodDisk - case <-time.After(testTimePadding): - t.Fatal("Mock producer not setup in time") - } - // Same for the mock consumer - select { - case <-ch.setupChan: - case <-time.After(testTimePadding): - t.Fatal("Mock consumer not setup in time") - } + prepareMockObjectDisks(t, co, ch) - wg.Add(1) - go func() { - defer wg.Done() - // Pick up the message that will be posted via the syncQueueMessage/Enqueue call below - msg := <-co.prodDisk - // Place it to the right location so that the mockConsumer can read it - co.consDisk <- msg - }() - - syncQueueMessage(newTestEnvelope("one"), ch, cs.BlockCutterVal) - // The message has already been moved to the consumer's disk, - // otherwise syncQueueMessage wouldn't return, so the Wait() - // here is unnecessary but let's be paranoid. - wg.Wait() + waitableSyncQueueMessage(newTestEnvelope("one"), 1, &wg, co, cs, ch) cs.BlockCutterVal.CutNext = true - // As above + // This is like the waitableSyncQueueMessage routine with the difference + // that we post a time-to-cut message instead of a test envelope. wg.Add(1) go func() { defer wg.Done() @@ -566,15 +413,7 @@ func TestKafkaConsenterTimeToCutDuplicate(t *testing.T) { cs.BlockCutterVal.CutNext = false - wg.Add(1) - go func() { - defer wg.Done() - msg := <-co.prodDisk - co.consDisk <- msg - }() - - syncQueueMessage(newTestEnvelope("two"), ch, cs.BlockCutterVal) - wg.Wait() + waitableSyncQueueMessage(newTestEnvelope("two"), 1, &wg, co, cs, ch) cs.BlockCutterVal.CutNext = true // ATTN: We set `cs.BlockCutterVal.CutNext` to true on purpose @@ -633,39 +472,14 @@ func TestKafkaConsenterTimeToCutStale(t *testing.T) { go ch.Start() defer ch.Halt() - // Wait until the mock producer is done before messing around with its disk - select { - case <-ch.producer.(*mockProducerImpl).isSetup: - // Dispense with the CONNECT message that is posted with Start() - <-co.prodDisk - case <-time.After(testTimePadding): - t.Fatal("Mock producer not setup in time") - } - // Same for the mock consumer - select { - case <-ch.setupChan: - case <-time.After(testTimePadding): - t.Fatal("Mock consumer not setup in time") - } - - wg.Add(1) - go func() { - defer wg.Done() - // Pick up the message that will be posted via the syncQueueMessage/Enqueue call below - msg := <-co.prodDisk - // Place it to the right location so that the mockConsumer can read it - co.consDisk <- msg - }() + prepareMockObjectDisks(t, co, ch) - syncQueueMessage(newTestEnvelope("one"), ch, cs.BlockCutterVal) - // The message has already been moved to the consumer's disk, - // otherwise syncQueueMessage wouldn't return, so the Wait() - // here is unnecessary but let's be paranoid. - wg.Wait() + waitableSyncQueueMessage(newTestEnvelope("one"), 1, &wg, co, cs, ch) cs.BlockCutterVal.CutNext = true - // As above + // This is like the waitableSyncQueueMessage routine with the difference + // that we post a time-to-cut message instead of a test envelope. wg.Add(1) go func() { defer wg.Done() @@ -716,39 +530,14 @@ func TestKafkaConsenterTimeToCutLarger(t *testing.T) { go ch.Start() defer ch.Halt() - // Wait until the mock producer is done before messing around with its disk - select { - case <-ch.producer.(*mockProducerImpl).isSetup: - // Dispense with the CONNECT message that is posted with Start() - <-co.prodDisk - case <-time.After(testTimePadding): - t.Fatal("Mock producer not setup in time") - } - // Same for the mock consumer - select { - case <-ch.setupChan: - case <-time.After(testTimePadding): - t.Fatal("Mock consumer not setup in time") - } - - wg.Add(1) - go func() { - defer wg.Done() - // Pick up the message that will be posted via the syncQueueMessage/Enqueue call below - msg := <-co.prodDisk - // Place it to the right location so that the mockConsumer can read it - co.consDisk <- msg - }() + prepareMockObjectDisks(t, co, ch) - syncQueueMessage(newTestEnvelope("one"), ch, cs.BlockCutterVal) - // The message has already been moved to the consumer's disk, - // otherwise syncQueueMessage wouldn't return, so the Wait() - // here is unnecessary but let's be paranoid. - wg.Wait() + waitableSyncQueueMessage(newTestEnvelope("one"), 1, &wg, co, cs, ch) cs.BlockCutterVal.CutNext = true - // As above + // This is like the waitableSyncQueueMessage routine with the difference + // that we post a time-to-cut message instead of a test envelope. wg.Add(1) go func() { defer wg.Done()