From 48942d7357f6ed2e271d7e6364abe5a1dcb10e58 Mon Sep 17 00:00:00 2001 From: Kostas Christidis Date: Sun, 18 Dec 2016 02:38:47 -0500 Subject: [PATCH] [FAB-1352] Add time-based block cutting to Kafka https://jira.hyperledger.org/browse/FAB-1352 In the version that was rebased on top of the common components, this option was kept out in order to minimize the complexity of the changeset. This changeset introduces it so now the Kafka-based consenter uses the BatchTimeout setting in shared config, and posts time-to-cut (TTC-X) messages according to the design document posted here: https://docs.google.com/document/d/1vNMaM7XhOlu9tB_10dKnlrhy5d7b1u8lSY8a-kVjCO4/edit The respective unit tests from the solo package have been ported, as well as additional tests specific to the time-to-cut logic. Do note that this path shall be revisited with integration tests. Change-Id: I743d4412cf8a3536fcb854433dfcbb3baa221d95 Signed-off-by: Kostas Christidis --- orderer/kafka/main.go | 59 +++- orderer/kafka/main_test.go | 626 +++++++++++++++++++++++++++++++++++-- 2 files changed, 643 insertions(+), 42 deletions(-) diff --git a/orderer/kafka/main.go b/orderer/kafka/main.go index 9bf849a2aa0..18d44d61ccd 100644 --- a/orderer/kafka/main.go +++ b/orderer/kafka/main.go @@ -17,6 +17,8 @@ limitations under the License. package kafka import ( + "time" + "github.com/Shopify/sarama" "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric/orderer/localconfig" @@ -93,6 +95,7 @@ func newChain(consenter testableConsenter, support multichain.ConsenterSupport) consenter: consenter, support: support, partition: newChainPartition(support.ChainID(), rawPartition), + batchTimeout: support.SharedConfig().BatchTimeout(), lastProcessed: sarama.OffsetOldest - 1, // TODO This should be retrieved by ConsenterSupport; also see note in loop() below producer: consenter.prodFunc()(support.SharedConfig().KafkaBrokers(), consenter.kafkaVersion(), consenter.retryOptions()), halted: false, // Redundant as the default value for booleans is false but added for readability @@ -123,7 +126,9 @@ type chainImpl struct { support multichain.ConsenterSupport partition ChainPartition + batchTimeout time.Duration lastProcessed int64 + lastCutBlock uint64 producer Producer consumer Consumer @@ -199,46 +204,82 @@ func (ch *chainImpl) Enqueue(env *cb.Envelope) bool { func (ch *chainImpl) loop() { msg := new(ab.KafkaMessage) + var timer <-chan time.Time + var ttcNumber uint64 defer close(ch.haltedChan) defer ch.producer.Close() defer func() { ch.halted = true }() defer ch.consumer.Close() - // TODO Add support for time-based block cutting - for { select { case in := <-ch.consumer.Recv(): - logger.Debug("Received:", in) if err := proto.Unmarshal(in.Value, msg); err != nil { // This shouldn't happen, it should be filtered at ingress logger.Critical("Unable to unmarshal consumed message:", err) } - logger.Debug("Unmarshaled to:", msg) + logger.Debug("Received:", msg) switch msg.Type.(type) { - case *ab.KafkaMessage_Connect, *ab.KafkaMessage_TimeToCut: - logger.Debugf("Ignoring message") + case *ab.KafkaMessage_Connect: + logger.Debug("It's a connect message - ignoring") continue + case *ab.KafkaMessage_TimeToCut: + ttcNumber = msg.GetTimeToCut().BlockNumber + logger.Debug("It's a time-to-cut message for block", ttcNumber) + if ttcNumber == ch.lastCutBlock+1 { + timer = nil + logger.Debug("Nil'd the timer") + batch, committers := ch.support.BlockCutter().Cut() + if len(batch) == 0 { + logger.Warningf("Got right time-to-cut message (%d) but no pending requests - this might indicate a bug", ch.lastCutBlock) + logger.Infof("Consenter for chain %s exiting", ch.partition.Topic()) + return + } + block := ch.support.CreateNextBlock(batch) + ch.support.WriteBlock(block, committers) + ch.lastCutBlock++ + logger.Debug("Proper time-to-cut received, just cut block", ch.lastCutBlock) + continue + } else if ttcNumber > ch.lastCutBlock+1 { + logger.Warningf("Got larger time-to-cut message (%d) than allowed (%d) - this might indicate a bug", ttcNumber, ch.lastCutBlock+1) + logger.Infof("Consenter for chain %s exiting", ch.partition.Topic()) + return + } + logger.Debug("Ignoring stale time-to-cut-message for", ch.lastCutBlock) case *ab.KafkaMessage_Regular: env := new(cb.Envelope) if err := proto.Unmarshal(msg.GetRegular().Payload, env); err != nil { // This shouldn't happen, it should be filtered at ingress - logger.Critical("Unable to unmarshal consumed message:", err) + logger.Critical("Unable to unmarshal consumed regular message:", err) continue } batches, committers, ok := ch.support.BlockCutter().Ordered(env) logger.Debugf("Ordering results: batches: %v, ok: %v", batches, ok) - if ok && len(batches) == 0 { + if ok && len(batches) == 0 && timer == nil { + timer = time.After(ch.batchTimeout) + logger.Debugf("Just began %s batch timer", ch.batchTimeout.String()) continue } // If !ok, batches == nil, so this will be skipped for i, batch := range batches { block := ch.support.CreateNextBlock(batch) ch.support.WriteBlock(block, committers[i]) + ch.lastCutBlock++ + logger.Debug("Batch filled, just cut block", ch.lastCutBlock) + } + if len(batches) > 0 { + timer = nil } } - case <-ch.exitChan: // when Halt() is called + case <-timer: + logger.Debugf("Time-to-cut block %d timer expired", ch.lastCutBlock+1) + timer = nil + if err := ch.producer.Send(ch.partition, utils.MarshalOrPanic(newTimeToCutMessage(ch.lastCutBlock+1))); err != nil { + logger.Errorf("Couldn't post to %s: %s", ch.partition, err) + // Do not exit + } + case <-ch.exitChan: // When Halt() is called logger.Infof("Consenter for chain %s exiting", ch.partition.Topic()) return } diff --git a/orderer/kafka/main_test.go b/orderer/kafka/main_test.go index 32c27c43214..b0e0245a0f1 100644 --- a/orderer/kafka/main_test.go +++ b/orderer/kafka/main_test.go @@ -30,6 +30,7 @@ import ( "github.com/hyperledger/fabric/orderer/multichain" cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" + "github.com/hyperledger/fabric/protos/utils" ) var cp = newChainPartition(provisional.TestChainID, rawPartition) @@ -81,12 +82,71 @@ func mockNewConsenter(t *testing.T, kafkaVersion sarama.KafkaVersion, retryOptio func TestKafkaConsenterEmptyBatch(t *testing.T) { var wg sync.WaitGroup defer wg.Wait() + cs := &mockmultichain.ConsenterSupport{ + Batches: make(chan []*cb.Envelope), + BlockCutterVal: mockblockcutter.NewReceiver(), + ChainIDVal: provisional.TestChainID, + SharedConfigVal: &mocksharedconfig.Manager{BatchTimeoutVal: testTimePadding}, + } + defer close(cs.BlockCutterVal.Block) + + co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry) + ch := newChain(co, cs) + ch.lastProcessed = testOldestOffset - 1 + + go ch.Start() + defer ch.Halt() + + // Wait until the mock producer is done before messing around with its disk + select { + case <-ch.producer.(*mockProducerImpl).isSetup: + // Dispense with the CONNECT message that is posted with Start() + <-co.prodDisk + case <-time.After(testTimePadding): + t.Fatal("Mock producer not setup in time") + } + // Same for the mock consumer + select { + case <-ch.setupChan: + case <-time.After(testTimePadding): + t.Fatal("Mock consumer not setup in time") + } + + wg.Add(1) + go func() { + defer wg.Done() + // Pick up the message that will be posted via the syncQueueMessage/Enqueue call below + msg := <-co.prodDisk + // Place it to the right location so that the mockConsumer can read it + co.consDisk <- msg + }() + + syncQueueMessage(newTestEnvelope("one"), ch, cs.BlockCutterVal) + // The message has already been moved to the consumer's disk, + // otherwise syncQueueMessage wouldn't return, so the Wait() + // here is unnecessary but let's be paranoid. + wg.Wait() + + // Stop the loop + ch.Halt() + + select { + case <-cs.Batches: + t.Fatal("Expected no invocations of Append") + case <-ch.haltedChan: // If we're here, we definitely had a chance to invoke Append but didn't (which is great) + } +} +func TestKafkaConsenterBatchTimer(t *testing.T) { + var wg sync.WaitGroup + defer wg.Wait() + + batchTimeout, _ := time.ParseDuration("1ms") cs := &mockmultichain.ConsenterSupport{ Batches: make(chan []*cb.Envelope), BlockCutterVal: mockblockcutter.NewReceiver(), ChainIDVal: provisional.TestChainID, - SharedConfigVal: newMockSharedConfigManager(), + SharedConfigVal: &mocksharedconfig.Manager{BatchTimeoutVal: batchTimeout}, } defer close(cs.BlockCutterVal.Block) @@ -97,26 +157,109 @@ func TestKafkaConsenterEmptyBatch(t *testing.T) { go ch.Start() defer ch.Halt() + // Wait until the mock producer is done before messing around with its disk + select { + case <-ch.producer.(*mockProducerImpl).isSetup: + // Dispense with the CONNECT message that is posted with Start() + <-co.prodDisk + case <-time.After(testTimePadding): + t.Fatal("Mock producer not setup in time") + } + // Same for the mock consumer + select { + case <-ch.setupChan: + case <-time.After(testTimePadding): + t.Fatal("Mock consumer not setup in time") + } + wg.Add(1) go func() { defer wg.Done() - // Wait until the mock producer is done before messing around with its disk - select { - case <-ch.producer.(*mockProducerImpl).isSetup: - // Dispense the CONNECT message that is posted with Start() - <-co.prodDisk - case <-time.After(testTimePadding): - t.Fatal("Mock producer not setup in time") + for i := 0; i < 2; i++ { + // First pass: Pick up the message that will be posted via the syncQueueMessage/Enqueue call below + // Second pass: Pick up the time-to-cut message that will be posted when the short timer expires + msg := <-co.prodDisk + // Place it to the right location so that the mockConsumer can read it + co.consDisk <- msg } - // Same for the mock consumer - select { - case <-ch.setupChan: - case <-time.After(testTimePadding): - t.Fatal("Mock consumer not setup in time") + }() + + syncQueueMessage(newTestEnvelope("one"), ch, cs.BlockCutterVal) + // The message has already been moved to the consumer's disk, + // otherwise syncQueueMessage wouldn't return, so the Wait() + // here is unnecessary but let's be paranoid. + wg.Wait() + + select { + case <-cs.Batches: // This is the success path + case <-time.After(testTimePadding): + t.Fatal("Expected block to be cut because batch timer expired") + } + + // As above + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 2; i++ { + msg := <-co.prodDisk + co.consDisk <- msg } }() + + syncQueueMessage(newTestEnvelope("two"), ch, cs.BlockCutterVal) wg.Wait() + select { + case <-cs.Batches: // This is the success path + case <-time.After(testTimePadding): + t.Fatal("Expected second block to be cut, batch timer not reset") + } + + // Stop the loop + ch.Halt() + + select { + case <-cs.Batches: + t.Fatal("Expected no invocations of Append") + case <-ch.haltedChan: // If we're here, we definitely had a chance to invoke Append but didn't (which is great) + } +} + +func TestKafkaConsenterTimerHaltOnFilledBatch(t *testing.T) { + var wg sync.WaitGroup + defer wg.Wait() + + batchTimeout, _ := time.ParseDuration("1h") + cs := &mockmultichain.ConsenterSupport{ + Batches: make(chan []*cb.Envelope), + BlockCutterVal: mockblockcutter.NewReceiver(), + ChainIDVal: provisional.TestChainID, + SharedConfigVal: &mocksharedconfig.Manager{BatchTimeoutVal: batchTimeout}, + } + defer close(cs.BlockCutterVal.Block) + + co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry) + ch := newChain(co, cs) + ch.lastProcessed = testOldestOffset - 1 + + go ch.Start() + defer ch.Halt() + + // Wait until the mock producer is done before messing around with its disk + select { + case <-ch.producer.(*mockProducerImpl).isSetup: + // Dispense with the CONNECT message that is posted with Start() + <-co.prodDisk + case <-time.After(testTimePadding): + t.Fatal("Mock producer not setup in time") + } + // Same for the mock consumer + select { + case <-ch.setupChan: + case <-time.After(testTimePadding): + t.Fatal("Mock consumer not setup in time") + } + wg.Add(1) go func() { defer wg.Done() @@ -132,6 +275,53 @@ func TestKafkaConsenterEmptyBatch(t *testing.T) { // here is unnecessary but let's be paranoid. wg.Wait() + cs.BlockCutterVal.CutNext = true + + // As above + wg.Add(1) + go func() { + defer wg.Done() + msg := <-co.prodDisk + co.consDisk <- msg + }() + + syncQueueMessage(newTestEnvelope("two"), ch, cs.BlockCutterVal) + wg.Wait() + + select { + case <-cs.Batches: + case <-time.After(testTimePadding): + t.Fatal("Expected block to be cut because batch timer expired") + } + + // Change the batch timeout to be near instant. + // If the timer was not reset, it will still be waiting an hour. + ch.batchTimeout = time.Millisecond + + cs.BlockCutterVal.CutNext = false + + // As above, but with a change because of the expect time-to-cut message, see below: + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 2; i++ { + // First pass: Pick up the message that will be posted via the syncQueueMessage/Enqueue call below + // Second pass: Pick up the time-to-cut message that will be posted when the short timer expires + msg := <-co.prodDisk + // Place it to the right location so that the mockConsumer can read it + co.consDisk <- msg + } + }() + + syncQueueMessage(newTestEnvelope("three"), ch, cs.BlockCutterVal) + wg.Wait() + + select { + case <-cs.Batches: + case <-time.After(testTimePadding): + t.Fatalf("Did not cut the second block, indicating that the old timer was still running") + } + // Stop the loop ch.Halt() @@ -150,7 +340,7 @@ func TestKafkaConsenterConfigStyleMultiBatch(t *testing.T) { Batches: make(chan []*cb.Envelope), BlockCutterVal: mockblockcutter.NewReceiver(), ChainIDVal: provisional.TestChainID, - SharedConfigVal: newMockSharedConfigManager(), + SharedConfigVal: &mocksharedconfig.Manager{BatchTimeoutVal: testTimePadding}, } defer close(cs.BlockCutterVal.Block) @@ -161,25 +351,20 @@ func TestKafkaConsenterConfigStyleMultiBatch(t *testing.T) { go ch.Start() defer ch.Halt() - wg.Add(1) - go func() { - defer wg.Done() - // Wait until the mock producer is done before messing around with its disk - select { - case <-ch.producer.(*mockProducerImpl).isSetup: - // Dispense the CONNECT message that is posted with Start() - <-co.prodDisk - case <-time.After(testTimePadding): - t.Fatal("Mock producer not setup in time") - } - // Same for the mock consumer - select { - case <-ch.setupChan: - case <-time.After(testTimePadding): - t.Fatal("Mock consumer not setup in time") - } - }() - wg.Wait() + // Wait until the mock producer is done before messing around with its disk + select { + case <-ch.producer.(*mockProducerImpl).isSetup: + // Dispense with the CONNECT message that is posted with Start() + <-co.prodDisk + case <-time.After(testTimePadding): + t.Fatal("Mock producer not setup in time") + } + // Same for the mock consumer + select { + case <-ch.setupChan: + case <-time.After(testTimePadding): + t.Fatal("Mock consumer not setup in time") + } wg.Add(1) go func() { @@ -225,3 +410,378 @@ func TestKafkaConsenterConfigStyleMultiBatch(t *testing.T) { case <-ch.haltedChan: } } + +func TestKafkaConsenterTimeToCutForced(t *testing.T) { + var wg sync.WaitGroup + defer wg.Wait() + + batchTimeout, _ := time.ParseDuration("1h") + cs := &mockmultichain.ConsenterSupport{ + Batches: make(chan []*cb.Envelope), + BlockCutterVal: mockblockcutter.NewReceiver(), + ChainIDVal: provisional.TestChainID, + SharedConfigVal: &mocksharedconfig.Manager{BatchTimeoutVal: batchTimeout}, + } + defer close(cs.BlockCutterVal.Block) + + co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry) + ch := newChain(co, cs) + ch.lastProcessed = testOldestOffset - 1 + + go ch.Start() + defer ch.Halt() + + // Wait until the mock producer is done before messing around with its disk + select { + case <-ch.producer.(*mockProducerImpl).isSetup: + // Dispense with the CONNECT message that is posted with Start() + <-co.prodDisk + case <-time.After(testTimePadding): + t.Fatal("Mock producer not setup in time") + } + // Same for the mock consumer + select { + case <-ch.setupChan: + case <-time.After(testTimePadding): + t.Fatal("Mock consumer not setup in time") + } + + wg.Add(1) + go func() { + defer wg.Done() + // Pick up the message that will be posted via the syncQueueMessage/Enqueue call below + msg := <-co.prodDisk + // Place it to the right location so that the mockConsumer can read it + co.consDisk <- msg + }() + + syncQueueMessage(newTestEnvelope("one"), ch, cs.BlockCutterVal) + // The message has already been moved to the consumer's disk, + // otherwise syncQueueMessage wouldn't return, so the Wait() + // here is unnecessary but let's be paranoid. + wg.Wait() + + cs.BlockCutterVal.CutNext = true + + // As above + wg.Add(1) + go func() { + defer wg.Done() + msg := <-co.prodDisk + co.consDisk <- msg + }() + + if err := ch.producer.Send(ch.partition, utils.MarshalOrPanic(newTimeToCutMessage(ch.lastCutBlock+1))); err != nil { + t.Fatalf("Couldn't post to %s: %s", ch.partition, err) + } + wg.Wait() + + select { + case <-cs.Batches: + case <-time.After(testTimePadding): + t.Fatal("Expected block to be cut because proper time-to-cut was sent") + } + + // Stop the loop + ch.Halt() + + select { + case <-cs.Batches: + t.Fatal("Expected no invocations of Append") + case <-ch.haltedChan: // If we're here, we definitely had a chance to invoke Append but didn't (which is great) + } +} + +func TestKafkaConsenterTimeToCutDuplicate(t *testing.T) { + var wg sync.WaitGroup + defer wg.Wait() + + batchTimeout, _ := time.ParseDuration("1h") + cs := &mockmultichain.ConsenterSupport{ + Batches: make(chan []*cb.Envelope), + BlockCutterVal: mockblockcutter.NewReceiver(), + ChainIDVal: provisional.TestChainID, + SharedConfigVal: &mocksharedconfig.Manager{BatchTimeoutVal: batchTimeout}, + } + defer close(cs.BlockCutterVal.Block) + + co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry) + ch := newChain(co, cs) + ch.lastProcessed = testOldestOffset - 1 + + go ch.Start() + defer ch.Halt() + + // Wait until the mock producer is done before messing around with its disk + select { + case <-ch.producer.(*mockProducerImpl).isSetup: + // Dispense with the CONNECT message that is posted with Start() + <-co.prodDisk + case <-time.After(testTimePadding): + t.Fatal("Mock producer not setup in time") + } + // Same for the mock consumer + select { + case <-ch.setupChan: + case <-time.After(testTimePadding): + t.Fatal("Mock consumer not setup in time") + } + + wg.Add(1) + go func() { + defer wg.Done() + // Pick up the message that will be posted via the syncQueueMessage/Enqueue call below + msg := <-co.prodDisk + // Place it to the right location so that the mockConsumer can read it + co.consDisk <- msg + }() + + syncQueueMessage(newTestEnvelope("one"), ch, cs.BlockCutterVal) + // The message has already been moved to the consumer's disk, + // otherwise syncQueueMessage wouldn't return, so the Wait() + // here is unnecessary but let's be paranoid. + wg.Wait() + + cs.BlockCutterVal.CutNext = true + + // As above + wg.Add(1) + go func() { + defer wg.Done() + msg := <-co.prodDisk + co.consDisk <- msg + }() + + // Send a proper time-to-cut message + if err := ch.producer.Send(ch.partition, utils.MarshalOrPanic(newTimeToCutMessage(ch.lastCutBlock+1))); err != nil { + t.Fatalf("Couldn't post to %s: %s", ch.partition, err) + } + wg.Wait() + + select { + case <-cs.Batches: + case <-time.After(testTimePadding): + t.Fatal("Expected block to be cut because proper time-to-cut was sent") + } + + cs.BlockCutterVal.CutNext = false + + wg.Add(1) + go func() { + defer wg.Done() + msg := <-co.prodDisk + co.consDisk <- msg + }() + + syncQueueMessage(newTestEnvelope("two"), ch, cs.BlockCutterVal) + wg.Wait() + + cs.BlockCutterVal.CutNext = true + // ATTN: We set `cs.BlockCutterVal.CutNext` to true on purpose + // If the logic works right, the orderer should discard the + // duplicate TTC message below and a call to the block cutter + // will only happen when the long, hour-long timer expires + + // As above + wg.Add(1) + go func() { + defer wg.Done() + msg := <-co.prodDisk + co.consDisk <- msg + }() + + // Send a duplicate time-to-cut message + if err := ch.producer.Send(ch.partition, utils.MarshalOrPanic(newTimeToCutMessage(ch.lastCutBlock))); err != nil { + t.Fatalf("Couldn't post to %s: %s", ch.partition, err) + } + wg.Wait() + + select { + case <-cs.Batches: + t.Fatal("Should have discarded duplicate time-to-cut") + case <-time.After(testTimePadding): + // This is the success path + } + + // Stop the loop + ch.Halt() + + select { + case <-cs.Batches: + t.Fatal("Expected no invocations of Append") + case <-ch.haltedChan: // If we're here, we definitely had a chance to invoke Append but didn't (which is great) + } +} + +func TestKafkaConsenterTimeToCutStale(t *testing.T) { + var wg sync.WaitGroup + defer wg.Wait() + + batchTimeout, _ := time.ParseDuration("1h") + cs := &mockmultichain.ConsenterSupport{ + Batches: make(chan []*cb.Envelope), + BlockCutterVal: mockblockcutter.NewReceiver(), + ChainIDVal: provisional.TestChainID, + SharedConfigVal: &mocksharedconfig.Manager{BatchTimeoutVal: batchTimeout}, + } + defer close(cs.BlockCutterVal.Block) + + co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry) + ch := newChain(co, cs) + ch.lastProcessed = testOldestOffset - 1 + + go ch.Start() + defer ch.Halt() + + // Wait until the mock producer is done before messing around with its disk + select { + case <-ch.producer.(*mockProducerImpl).isSetup: + // Dispense with the CONNECT message that is posted with Start() + <-co.prodDisk + case <-time.After(testTimePadding): + t.Fatal("Mock producer not setup in time") + } + // Same for the mock consumer + select { + case <-ch.setupChan: + case <-time.After(testTimePadding): + t.Fatal("Mock consumer not setup in time") + } + + wg.Add(1) + go func() { + defer wg.Done() + // Pick up the message that will be posted via the syncQueueMessage/Enqueue call below + msg := <-co.prodDisk + // Place it to the right location so that the mockConsumer can read it + co.consDisk <- msg + }() + + syncQueueMessage(newTestEnvelope("one"), ch, cs.BlockCutterVal) + // The message has already been moved to the consumer's disk, + // otherwise syncQueueMessage wouldn't return, so the Wait() + // here is unnecessary but let's be paranoid. + wg.Wait() + + cs.BlockCutterVal.CutNext = true + + // As above + wg.Add(1) + go func() { + defer wg.Done() + msg := <-co.prodDisk + co.consDisk <- msg + }() + + // Send a stale time-to-cut message + if err := ch.producer.Send(ch.partition, utils.MarshalOrPanic(newTimeToCutMessage(ch.lastCutBlock))); err != nil { + t.Fatalf("Couldn't post to %s: %s", ch.partition, err) + } + wg.Wait() + + select { + case <-cs.Batches: + t.Fatal("Should have ignored stale time-to-cut") + case <-time.After(testTimePadding): + // This is the success path + } + + // Stop the loop + ch.Halt() + + select { + case <-cs.Batches: + t.Fatal("Expected no invocations of Append") + case <-ch.haltedChan: // If we're here, we definitely had a chance to invoke Append but didn't (which is great) + } +} + +func TestKafkaConsenterTimeToCutLarger(t *testing.T) { + var wg sync.WaitGroup + defer wg.Wait() + + batchTimeout, _ := time.ParseDuration("1h") + cs := &mockmultichain.ConsenterSupport{ + Batches: make(chan []*cb.Envelope), + BlockCutterVal: mockblockcutter.NewReceiver(), + ChainIDVal: provisional.TestChainID, + SharedConfigVal: &mocksharedconfig.Manager{BatchTimeoutVal: batchTimeout}, + } + defer close(cs.BlockCutterVal.Block) + + co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry) + ch := newChain(co, cs) + ch.lastProcessed = testOldestOffset - 1 + + go ch.Start() + defer ch.Halt() + + // Wait until the mock producer is done before messing around with its disk + select { + case <-ch.producer.(*mockProducerImpl).isSetup: + // Dispense with the CONNECT message that is posted with Start() + <-co.prodDisk + case <-time.After(testTimePadding): + t.Fatal("Mock producer not setup in time") + } + // Same for the mock consumer + select { + case <-ch.setupChan: + case <-time.After(testTimePadding): + t.Fatal("Mock consumer not setup in time") + } + + wg.Add(1) + go func() { + defer wg.Done() + // Pick up the message that will be posted via the syncQueueMessage/Enqueue call below + msg := <-co.prodDisk + // Place it to the right location so that the mockConsumer can read it + co.consDisk <- msg + }() + + syncQueueMessage(newTestEnvelope("one"), ch, cs.BlockCutterVal) + // The message has already been moved to the consumer's disk, + // otherwise syncQueueMessage wouldn't return, so the Wait() + // here is unnecessary but let's be paranoid. + wg.Wait() + + cs.BlockCutterVal.CutNext = true + + // As above + wg.Add(1) + go func() { + defer wg.Done() + msg := <-co.prodDisk + co.consDisk <- msg + }() + + // Send a stale time-to-cut message + if err := ch.producer.Send(ch.partition, utils.MarshalOrPanic(newTimeToCutMessage(ch.lastCutBlock+2))); err != nil { + t.Fatalf("Couldn't post to %s: %s", ch.partition, err) + } + wg.Wait() + + select { + case <-cs.Batches: + t.Fatal("Should have ignored larger time-to-cut than expected") + case <-time.After(testTimePadding): + // This is the success path + } + + // Loop is already stopped, but this is a good test to see + // if a second invokation of Halt() panicks. (It shouldn't.) + defer func() { + if r := recover(); r != nil { + t.Fatal("Expected duplicate call to Halt to succeed") + } + }() + + ch.Halt() + + select { + case <-cs.Batches: + t.Fatal("Expected no invocations of Append") + case <-ch.haltedChan: // If we're here, we definitely had a chance to invoke Append but didn't (which is great) + } +}