diff --git a/orderer/kafka/chain.go b/orderer/kafka/chain.go index b1d765bc93d..ca8faa66514 100644 --- a/orderer/kafka/chain.go +++ b/orderer/kafka/chain.go @@ -23,11 +23,12 @@ import ( // Used for capturing metrics -- see processMessagesToBlocks const ( indexRecvError = iota + indexUnmarshalError indexRecvPass indexProcessConnectPass indexProcessTimeToCutError indexProcessTimeToCutPass - indexPprocessRegularError + indexProcessRegularError indexProcessRegularPass indexSendTimeToCutError indexSendTimeToCutPass @@ -39,16 +40,19 @@ func newChain(consenter commonConsenter, support multichain.ConsenterSupport, la logger.Infof("[channel: %s] Starting chain with last persisted offset %d and last recorded block %d", support.ChainID(), lastOffsetPersisted, lastCutBlockNumber) + errorChan := make(chan struct{}) + close(errorChan) // We need this closed when starting up + return &chainImpl{ consenter: consenter, support: support, channel: newChannel(support.ChainID(), defaultPartition), lastOffsetPersisted: lastOffsetPersisted, lastCutBlockNumber: lastCutBlockNumber, - started: false, // Redundant as the default value for booleans is false but added for readability - startChan: make(chan struct{}), - halted: false, - exitChan: make(chan struct{}), + + errorChan: errorChan, + haltChan: make(chan struct{}), + startChan: make(chan struct{}), }, nil } @@ -64,17 +68,21 @@ type chainImpl struct { parentConsumer sarama.Consumer channelConsumer sarama.PartitionConsumer - // Set the flag to true and close the channel when the retriable steps in `Start` have completed successfully - started bool + // When the partition consumer errors, close the channel. Otherwise, make + // this an open, unbuffered channel. + errorChan chan struct{} + // When a Halt() request comes, close the channel. Unlike errorChan, this + // channel never re-opens when closed. Its closing triggers the exit of the + // processMessagesToBlock loop. + haltChan chan struct{} + // // Close when the retriable steps in Start have completed. startChan chan struct{} - - halted bool - exitChan chan struct{} } -// Errored currently only closes on halt +// Errored returns a channel which will close when a partition consumer error +// has occurred. Checked by Deliver(). func (chain *chainImpl) Errored() <-chan struct{} { - return chain.exitChan + return chain.errorChan } // Start allocates the necessary resources for staying up to date with this @@ -86,190 +94,213 @@ func (chain *chainImpl) Start() { go startThread(chain) } +// Halt frees the resources which were allocated for this Chain. Implements the +// multichain.Chain interface. +func (chain *chainImpl) Halt() { + select { + case <-chain.haltChan: + // This construct is useful because it allows Halt() to be called + // multiple times (by a single thread) w/o panicking. Recal that a + // receive from a closed channel returns (the zero value) immediately. + logger.Warningf("[channel: %s] Halting of chain requested again", chain.support.ChainID()) + default: + logger.Criticalf("[channel: %s] Halting of chain requested", chain.support.ChainID()) + close(chain.haltChan) + chain.closeKafkaObjects() // Also close the producer and the consumer + logger.Debugf("[channel: %s] Closed the haltChan", chain.support.ChainID()) + } +} + +// Enqueue accepts a message and returns true on acceptance, or false otheriwse. +// Implements the multichain.Chain interface. Called by Broadcast(). +func (chain *chainImpl) Enqueue(env *cb.Envelope) bool { + logger.Debugf("[channel: %s] Enqueueing envelope...", chain.support.ChainID()) + select { + case <-chain.startChan: // The Start phase has completed + select { + case <-chain.haltChan: // The chain has been halted, stop here + logger.Warningf("[channel: %s] Will not enqueue, consenter for this channel has been halted", chain.support.ChainID()) + return false + default: // The post path + marshaledEnv, err := utils.Marshal(env) + if err != nil { + logger.Errorf("[channel: %s] cannot enqueue, unable to marshal envelope = %s", chain.support.ChainID(), err) + return false + } + // We're good to go + payload := utils.MarshalOrPanic(newRegularMessage(marshaledEnv)) + message := newProducerMessage(chain.channel, payload) + if _, _, err := chain.producer.SendMessage(message); err != nil { + logger.Errorf("[channel: %s] cannot enqueue envelope = %s", chain.support.ChainID(), err) + return false + } + logger.Debugf("[channel: %s] Envelope enqueued successfully", chain.support.ChainID()) + return true + } + default: // Not ready yet + logger.Warningf("[channel: %s] Will not enqueue, consenter for this channel hasn't started yet", chain.support.ChainID()) + return false + } +} + // Called by Start(). func startThread(chain *chainImpl) { var err error // Set up the producer - chain.producer, err = setupProducerForChannel(chain.consenter.retryOptions(), chain.exitChan, chain.support.SharedConfig().KafkaBrokers(), chain.consenter.brokerConfig(), chain.channel) + chain.producer, err = setupProducerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.support.SharedConfig().KafkaBrokers(), chain.consenter.brokerConfig(), chain.channel) if err != nil { logger.Panicf("[channel: %s] Cannot set up producer = %s", chain.channel.topic(), err) } logger.Infof("[channel: %s] Producer set up successfully", chain.support.ChainID()) // Have the producer post the CONNECT message - if err = sendConnectMessage(chain.consenter.retryOptions(), chain.exitChan, chain.producer, chain.channel); err != nil { + if err = sendConnectMessage(chain.consenter.retryOptions(), chain.haltChan, chain.producer, chain.channel); err != nil { logger.Panicf("[channel: %s] Cannot post CONNECT message = %s", chain.channel.topic(), err) } logger.Infof("[channel: %s] CONNECT message posted successfully", chain.channel.topic()) // Set up the parent consumer - chain.parentConsumer, err = setupParentConsumerForChannel(chain.consenter.retryOptions(), chain.exitChan, chain.support.SharedConfig().KafkaBrokers(), chain.consenter.brokerConfig(), chain.channel) + chain.parentConsumer, err = setupParentConsumerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.support.SharedConfig().KafkaBrokers(), chain.consenter.brokerConfig(), chain.channel) if err != nil { logger.Panicf("[channel: %s] Cannot set up parent consumer = %s", chain.channel.topic(), err) } logger.Infof("[channel: %s] Parent consumer set up successfully", chain.channel.topic()) // Set up the channel consumer - chain.channelConsumer, err = setupChannelConsumerForChannel(chain.consenter.retryOptions(), chain.exitChan, chain.parentConsumer, chain.channel, chain.lastOffsetPersisted+1) + chain.channelConsumer, err = setupChannelConsumerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.parentConsumer, chain.channel, chain.lastOffsetPersisted+1) if err != nil { logger.Panicf("[channel: %s] Cannot set up channel consumer = %s", chain.channel.topic(), err) } logger.Infof("[channel: %s] Channel consumer set up successfully", chain.channel.topic()) - chain.started = true - close(chain.startChan) + close(chain.startChan) // Broadcast requests will now go through + chain.errorChan = make(chan struct{}) // Deliver requests will also go through - go listenForErrors(chain.channelConsumer.Errors(), chain.exitChan) + logger.Infof("[channel: %s] Start phase completed successfully", chain.channel.topic()) - // Keep up to date with the channel - processMessagesToBlock(chain.support, chain.producer, chain.parentConsumer, chain.channelConsumer, - chain.channel, &chain.lastCutBlockNumber, &chain.halted, &chain.exitChan) -} - -// Halt frees the resources which were allocated for this Chain. Implements the -// multichain.Chain interface. -func (chain *chainImpl) Halt() { - select { - case <-chain.exitChan: - // This construct is useful because it allows Halt() to be called - // multiple times w/o panicking. Recal that a receive from a closed - // channel returns (the zero value) immediately. - logger.Warningf("[channel: %s] Halting of chain requested again", chain.support.ChainID()) - default: - logger.Criticalf("[channel: %s] Halting of chain requested", chain.support.ChainID()) - chain.halted = true - close(chain.exitChan) - } -} - -// Enqueue accepts a message and returns true on acceptance, or false on -// shutdown. Implements the multichain.Chain interface. Called by Broadcast. -func (chain *chainImpl) Enqueue(env *cb.Envelope) bool { - if !chain.started { - logger.Warningf("[channel: %s] Will not enqueue because the chain hasn't completed its initialization yet", chain.support.ChainID()) - return false - } - - if chain.halted { - logger.Warningf("[channel: %s] Will not enqueue because the chain has been halted", chain.support.ChainID()) - return false - } - - logger.Debugf("[channel: %s] Enqueueing envelope...", chain.support.ChainID()) - marshaledEnv, err := utils.Marshal(env) - if err != nil { - return false - } - payload := utils.MarshalOrPanic(newRegularMessage(marshaledEnv)) - message := newProducerMessage(chain.channel, payload) - if _, _, err := chain.producer.SendMessage(message); err != nil { - logger.Errorf("[channel: %s] cannot enqueue envelope = %s", chain.support.ChainID(), err) - return false - } - logger.Debugf("[channel: %s] Envelope enqueued successfully", chain.support.ChainID()) - - return !chain.halted // If ch.halted has been set to true while sending, we should return false + chain.processMessagesToBlocks() // Keep up to date with the channel } // processMessagesToBlocks drains the Kafka consumer for the given channel, and // takes care of converting the stream of ordered messages into blocks for the -// channel's ledger. NOTE: May need to rethink the model here, and turn this -// into a method. For the time being, we optimize for testability. -func processMessagesToBlock(support multichain.ConsenterSupport, producer sarama.SyncProducer, - parentConsumer sarama.Consumer, channelConsumer sarama.PartitionConsumer, - chn channel, lastCutBlockNumber *uint64, haltedFlag *bool, exitChan *chan struct{}) ([]uint64, error) { +// channel's ledger. +func (chain *chainImpl) processMessagesToBlocks() ([]uint64, error) { + counts := make([]uint64, 11) // For metrics and tests msg := new(ab.KafkaMessage) var timer <-chan time.Time - counts := make([]uint64, 10) // For metrics and tests - - defer func() { - _ = closeLoop(chn.topic(), producer, parentConsumer, channelConsumer, haltedFlag) - logger.Infof("[channel: %s] Closed producer/consumer threads for channel and exiting loop", chn.topic()) + defer func() { // When Halt() is called + select { + case <-chain.errorChan: // If already closed, don't do anything + default: + close(chain.errorChan) + } }() for { select { - case in := <-channelConsumer.Messages(): + case <-chain.haltChan: + logger.Warningf("[channel: %s] Consenter for channel exiting", chain.support.ChainID()) + counts[indexExitChanPass]++ + return counts, nil + case kafkaErr := <-chain.channelConsumer.Errors(): + logger.Error(kafkaErr) + counts[indexRecvError]++ + select { + case <-chain.errorChan: // If already closed, don't do anything + default: + close(chain.errorChan) + } + logger.Warningf("[channel: %s] Closed the errorChan", chain.support.ChainID()) + // This covers the edge case where (1) a consumption error has + // closed the errorChan and thus rendered the chain unavailable to + // deliver clients, (2) we're already at the newest offset, and (3) + // there are no new Broadcast requests coming in. In this case, + // there is no trigger that can recreate the errorChan again and + // mark the chain as available, so we have to force that trigger via + // the emission of a CONNECT message. TODO Consider rate limiting + go sendConnectMessage(chain.consenter.retryOptions(), chain.haltChan, chain.producer, chain.channel) + case in := <-chain.channelConsumer.Messages(): + select { + case <-chain.errorChan: // If this channel was closed... + chain.errorChan = make(chan struct{}) // ...make a new one. + logger.Infof("[channel: %s] Marked consenter as available again", chain.support.ChainID()) + default: + } + if err := proto.Unmarshal(in.Value, msg); err != nil { // This shouldn't happen, it should be filtered at ingress - logger.Criticalf("[channel: %s] Unable to unmarshal consumed message = %s", chn.topic(), err) - counts[indexRecvError]++ + logger.Criticalf("[channel: %s] Unable to unmarshal consumed message = %s", chain.support.ChainID(), err) + counts[indexUnmarshalError]++ } else { - logger.Debugf("[channel: %s] Successfully unmarshalled consumed message, offset is %d. Inspecting type...", chn.topic(), in.Offset) + logger.Debugf("[channel: %s] Successfully unmarshalled consumed message, offset is %d. Inspecting type...", chain.support.ChainID(), in.Offset) counts[indexRecvPass]++ } switch msg.Type.(type) { case *ab.KafkaMessage_Connect: - _ = processConnect(chn.topic()) + _ = processConnect(chain.support.ChainID()) counts[indexProcessConnectPass]++ case *ab.KafkaMessage_TimeToCut: - if err := processTimeToCut(msg.GetTimeToCut(), support, lastCutBlockNumber, &timer, in.Offset); err != nil { - logger.Warningf("[channel: %s] %s", chn.topic(), err) - logger.Criticalf("[channel: %s] Consenter for channel exiting", chn.topic()) + if err := processTimeToCut(msg.GetTimeToCut(), chain.support, &chain.lastCutBlockNumber, &timer, in.Offset); err != nil { + logger.Warningf("[channel: %s] %s", chain.support.ChainID(), err) + logger.Criticalf("[channel: %s] Consenter for channel exiting", chain.support.ChainID()) counts[indexProcessTimeToCutError]++ return counts, err // TODO Revisit whether we should indeed stop processing the chain at this point } counts[indexProcessTimeToCutPass]++ case *ab.KafkaMessage_Regular: - if err := processRegular(msg.GetRegular(), support, &timer, in.Offset, lastCutBlockNumber); err != nil { - logger.Warningf("[channel: %s] Error when processing incoming message of type REGULAR = %s", chn.topic(), err) - counts[indexPprocessRegularError]++ + if err := processRegular(msg.GetRegular(), chain.support, &timer, in.Offset, &chain.lastCutBlockNumber); err != nil { + logger.Warningf("[channel: %s] Error when processing incoming message of type REGULAR = %s", chain.support.ChainID(), err) + counts[indexProcessRegularError]++ } else { counts[indexProcessRegularPass]++ } } case <-timer: - if err := sendTimeToCut(producer, chn, (*lastCutBlockNumber)+1, &timer); err != nil { - logger.Errorf("[channel: %s] cannot post time-to-cut message = %s", chn.topic(), err) + if err := sendTimeToCut(chain.producer, chain.channel, chain.lastCutBlockNumber+1, &timer); err != nil { + logger.Errorf("[channel: %s] cannot post time-to-cut message = %s", chain.support.ChainID(), err) // Do not return though counts[indexSendTimeToCutError]++ } else { counts[indexSendTimeToCutPass]++ } - case <-*exitChan: // When Halt() is called - logger.Warningf("[channel: %s] Consenter for channel exiting", chn.topic()) - counts[indexExitChanPass]++ - return counts, nil } } } -// Helper functions - -func closeLoop(channelName string, producer sarama.SyncProducer, parentConsumer sarama.Consumer, channelConsumer sarama.PartitionConsumer, haltedFlag *bool) []error { +func (chain *chainImpl) closeKafkaObjects() []error { var errs []error - *haltedFlag = true - - err := channelConsumer.Close() + err := chain.channelConsumer.Close() if err != nil { - logger.Errorf("[channel: %s] could not close channelConsumer cleanly = %s", channelName, err) + logger.Errorf("[channel: %s] could not close channelConsumer cleanly = %s", chain.support.ChainID(), err) errs = append(errs, err) } else { - logger.Debugf("[channel: %s] Closed the channel consumer", channelName) + logger.Debugf("[channel: %s] Closed the channel consumer", chain.support.ChainID()) } - err = parentConsumer.Close() + err = chain.parentConsumer.Close() if err != nil { - logger.Errorf("[channel: %s] could not close parentConsumer cleanly = %s", channelName, err) + logger.Errorf("[channel: %s] could not close parentConsumer cleanly = %s", chain.support.ChainID(), err) errs = append(errs, err) } else { - logger.Debugf("[channel: %s] Closed the parent consumer", channelName) + logger.Debugf("[channel: %s] Closed the parent consumer", chain.support.ChainID()) } - err = producer.Close() + err = chain.producer.Close() if err != nil { - logger.Errorf("[channel: %s] could not close producer cleanly = %s", channelName, err) + logger.Errorf("[channel: %s] could not close producer cleanly = %s", chain.support.ChainID(), err) errs = append(errs, err) } else { - logger.Debugf("[channel: %s] Closed the producer", channelName) + logger.Debugf("[channel: %s] Closed the producer", chain.support.ChainID()) } return errs } +// Helper functions + func getLastCutBlockNumber(blockchainHeight uint64) uint64 { return blockchainHeight - 1 } @@ -287,16 +318,6 @@ func getLastOffsetPersisted(metadataValue []byte, chainID string) int64 { return (sarama.OffsetOldest - 1) // default } -func listenForErrors(errChan <-chan *sarama.ConsumerError, exitChan <-chan struct{}) error { - select { - case <-exitChan: - return nil - case err := <-errChan: - logger.Error(err) - return err - } -} - func newConnectMessage() *ab.KafkaMessage { return &ab.KafkaMessage{ Type: &ab.KafkaMessage_Connect{ @@ -327,10 +348,10 @@ func newTimeToCutMessage(blockNumber uint64) *ab.KafkaMessage { } } -func newProducerMessage(chn channel, pld []byte) *sarama.ProducerMessage { +func newProducerMessage(channel channel, pld []byte) *sarama.ProducerMessage { return &sarama.ProducerMessage{ - Topic: chn.topic(), - Key: sarama.StringEncoder(strconv.Itoa(int(chn.partition()))), // TODO Consider writing an IntEncoder? + Topic: channel.topic(), + Key: sarama.StringEncoder(strconv.Itoa(int(channel.partition()))), // TODO Consider writing an IntEncoder? Value: sarama.ByteEncoder(pld), } } @@ -396,22 +417,6 @@ func processTimeToCut(ttcMessage *ab.KafkaMessageTimeToCut, support multichain.C return nil } -// Sets up the partition consumer for a channel using the given retry options. -func setupChannelConsumerForChannel(retryOptions localconfig.Retry, exitChan chan struct{}, parentConsumer sarama.Consumer, channel channel, startFrom int64) (sarama.PartitionConsumer, error) { - var err error - var channelConsumer sarama.PartitionConsumer - - logger.Infof("[channel: %s] Setting up the channel consumer for this channel...", channel.topic()) - - retryMsg := "Connecting to the Kafka cluster" - setupChannelConsumer := newRetryProcess(retryOptions, exitChan, channel, retryMsg, func() error { - channelConsumer, err = parentConsumer.ConsumePartition(channel.topic(), channel.partition(), startFrom) - return err - }) - - return channelConsumer, setupChannelConsumer.retry() -} - // Post a CONNECT message to the channel using the given retry options. This // prevents the panicking that would occur if we were to set up a consumer and // seek on a partition that hadn't been written to yet. @@ -439,15 +444,31 @@ func sendTimeToCut(producer sarama.SyncProducer, channel channel, timeToCutBlock return err } +// Sets up the partition consumer for a channel using the given retry options. +func setupChannelConsumerForChannel(retryOptions localconfig.Retry, haltChan chan struct{}, parentConsumer sarama.Consumer, channel channel, startFrom int64) (sarama.PartitionConsumer, error) { + var err error + var channelConsumer sarama.PartitionConsumer + + logger.Infof("[channel: %s] Setting up the channel consumer for this channel (start offset: %d)...", channel.topic(), startFrom) + + retryMsg := "Connecting to the Kafka cluster" + setupChannelConsumer := newRetryProcess(retryOptions, haltChan, channel, retryMsg, func() error { + channelConsumer, err = parentConsumer.ConsumePartition(channel.topic(), channel.partition(), startFrom) + return err + }) + + return channelConsumer, setupChannelConsumer.retry() +} + // Sets up the parent consumer for a channel using the given retry options. -func setupParentConsumerForChannel(retryOptions localconfig.Retry, exitChan chan struct{}, brokers []string, brokerConfig *sarama.Config, channel channel) (sarama.Consumer, error) { +func setupParentConsumerForChannel(retryOptions localconfig.Retry, haltChan chan struct{}, brokers []string, brokerConfig *sarama.Config, channel channel) (sarama.Consumer, error) { var err error var parentConsumer sarama.Consumer logger.Infof("[channel: %s] Setting up the parent consumer for this channel...", channel.topic()) retryMsg := "Connecting to the Kafka cluster" - setupParentConsumer := newRetryProcess(retryOptions, exitChan, channel, retryMsg, func() error { + setupParentConsumer := newRetryProcess(retryOptions, haltChan, channel, retryMsg, func() error { parentConsumer, err = sarama.NewConsumer(brokers, brokerConfig) return err }) @@ -456,14 +477,14 @@ func setupParentConsumerForChannel(retryOptions localconfig.Retry, exitChan chan } // Sets up the writer/producer for a channel using the given retry options. -func setupProducerForChannel(retryOptions localconfig.Retry, exitChan chan struct{}, brokers []string, brokerConfig *sarama.Config, channel channel) (sarama.SyncProducer, error) { +func setupProducerForChannel(retryOptions localconfig.Retry, haltChan chan struct{}, brokers []string, brokerConfig *sarama.Config, channel channel) (sarama.SyncProducer, error) { var err error var producer sarama.SyncProducer logger.Infof("[channel: %s] Setting up the producer for this channel...", channel.topic()) retryMsg := "Connecting to the Kafka cluster" - setupProducer := newRetryProcess(retryOptions, exitChan, channel, retryMsg, func() error { + setupProducer := newRetryProcess(retryOptions, haltChan, channel, retryMsg, func() error { producer, err = sarama.NewSyncProducer(brokers, brokerConfig) return err }) diff --git a/orderer/kafka/chain_test.go b/orderer/kafka/chain_test.go index f2d7192bab6..b9ad41bfb7d 100644 --- a/orderer/kafka/chain_test.go +++ b/orderer/kafka/chain_test.go @@ -23,9 +23,11 @@ import ( ) var ( - shortTimeout = 1 * time.Second - longTimeout = 1 * time.Hour - hitBranch = 50 * time.Millisecond + extraShortTimeout = 1 * time.Millisecond + shortTimeout = 1 * time.Second + longTimeout = 1 * time.Hour + + hitBranch = 50 * time.Millisecond ) func TestChain(t *testing.T) { @@ -59,26 +61,45 @@ func TestChain(t *testing.T) { } t.Run("New", func(t *testing.T) { - _, err := newChain(mockConsenter, mockSupport, newestOffset-1) + chain, err := newChain(mockConsenter, mockSupport, newestOffset-1) + assert.NoError(t, err, "Expected newChain to return without errors") + select { + case <-chain.errorChan: + logger.Debug("errorChan is closed as it should be") + default: + t.Fatal("errorChan should have been closed") + } + + select { + case <-chain.haltChan: + t.Fatal("haltChan should have been open") + default: + logger.Debug("haltChan is open as it should be") + } + + select { + case <-chain.startChan: + t.Fatal("startChan should have been open") + default: + logger.Debug("startChan is open as it should be") + } }) t.Run("Start", func(t *testing.T) { - chain, _ := newChain(mockConsenter, mockSupport, newestOffset-1) // -1 because we haven't set the CONNECT message yet + // Set to -1 because we haven't sent the CONNECT message yet + chain, _ := newChain(mockConsenter, mockSupport, newestOffset-1) chain.Start() select { - case <-time.After(shortTimeout): - t.Fatal("Chain should have started by now") case <-chain.startChan: - logger.Debug("startChan closed") + logger.Debug("startChan is closed as it should be") + case <-time.After(shortTimeout): + t.Fatal("startChan should have been closed by now") } - _, ok := <-chain.startChan // Redundant, but let's be paranoid - assert.False(t, ok, "Expected chain.startChan to be closed") - assert.True(t, chain.started, "Expected chain.started flag to be set to true") - - close(chain.exitChan) // Trigger the exitChan clause in the processMessagesToBlock goroutine + // Trigger the haltChan clause in the processMessagesToBlocks goroutine + close(chain.haltChan) }) t.Run("Halt", func(t *testing.T) { @@ -86,16 +107,28 @@ func TestChain(t *testing.T) { chain.Start() select { - case <-time.After(shortTimeout): - t.Fatal("Chain should have started by now") case <-chain.startChan: - logger.Debug("startChan closed") + logger.Debug("startChan is closed as it should be") + case <-time.After(shortTimeout): + t.Fatal("startChan should have been closed by now") } + + // Wait till the start phase has completed, then: chain.Halt() - _, ok := <-chain.exitChan - assert.False(t, ok, "Expected chain.exitChan to be closed") - assert.True(t, chain.halted, "Expected chain.halted flag to be set true") + select { + case <-chain.haltChan: + logger.Debug("haltChan is closed as it should be") + case <-time.After(shortTimeout): + t.Fatal("haltChan should have been closed") + } + + select { + case <-chain.errorChan: + logger.Debug("errorChan is closed as it should be") + case <-time.After(shortTimeout): + t.Fatal("errorChan should have been closed") + } }) t.Run("DoubleHalt", func(t *testing.T) { @@ -103,36 +136,37 @@ func TestChain(t *testing.T) { chain.Start() select { - case <-time.After(shortTimeout): - t.Fatal("Chain should have started by now") case <-chain.startChan: - logger.Debug("startChan closed") + logger.Debug("startChan is closed as it should be") + case <-time.After(shortTimeout): + t.Fatal("startChan should have been closed by now") } chain.Halt() assert.NotPanics(t, func() { chain.Halt() }, "Calling Halt() more than once shouldn't panic") - - _, ok := <-chain.exitChan - assert.False(t, ok, "Expected chain.exitChan to be closed") - assert.True(t, chain.halted, "Expected chain.halted flag to be set true") }) t.Run("StartWithProducerForChannelError", func(t *testing.T) { + // Point to an empty brokers list mockSupportCopy := *mockSupport mockSupportCopy.SharedConfigVal = &mockconfig.Orderer{KafkaBrokersVal: []string{}} chain, _ := newChain(mockConsenter, &mockSupportCopy, newestOffset-1) - // The production path will actually call `chain.Start`. This is + // The production path will actually call chain.Start(). This is // functionally equivalent and allows us to run assertions on it. - assert.Panics(t, func() { startThread(chain) }, "Expected the Start() call to result in panic") + assert.Panics(t, func() { startThread(chain) }, "Expected the Start() call to panic") }) t.Run("StartWithConnectMessageError", func(t *testing.T) { - // Affected by Net.ReadTimeout, Consumer.Retry.Backoff, and Metadata.Retry.Max + // Note that this test is affected by the following parameters: + // - Net.ReadTimeout + // - Consumer.Retry.Backoff + // - Metadata.Retry.Max chain, _ := newChain(mockConsenter, mockSupport, newestOffset-1) + // Have the broker return an ErrNotLeaderForPartition error mockBroker.SetHandlerByMap(map[string]sarama.MockResponse{ "MetadataRequest": sarama.NewMockMetadataResponse(t). SetBroker(mockBroker.Addr(), mockBroker.BrokerID()). @@ -146,13 +180,15 @@ func TestChain(t *testing.T) { SetMessage(mockChannel.topic(), mockChannel.partition(), newestOffset, message), }) - assert.Panics(t, func() { startThread(chain) }, "Expected the Start() call to result in panic") + assert.Panics(t, func() { startThread(chain) }, "Expected the Start() call to panic") }) t.Run("EnqueueIfNotStarted", func(t *testing.T) { chain, _ := newChain(mockConsenter, mockSupport, newestOffset-1) - // Assume a CONNECT error + // As in StartWithConnectMessageError, have the broker return an + // ErrNotLeaderForPartition error, i.e. cause an error in the + // 'post connect message' step. mockBroker.SetHandlerByMap(map[string]sarama.MockResponse{ "MetadataRequest": sarama.NewMockMetadataResponse(t). SetBroker(mockBroker.Addr(), mockBroker.BrokerID()). @@ -170,8 +206,13 @@ func TestChain(t *testing.T) { }) t.Run("StartWithConsumerForChannelError", func(t *testing.T) { - // Affected by Net.ReadTimeout, Consumer.Retry.Backoff, and Metadata.Retry.Max - chain, _ := newChain(mockConsenter, mockSupport, newestOffset) // Provide an out-of-range offset + // Note that this test is affected by the following parameters: + // - Net.ReadTimeout + // - Consumer.Retry.Backoff + // - Metadata.Retry.Max + + // Provide an out-of-range offset + chain, _ := newChain(mockConsenter, mockSupport, newestOffset) mockBroker.SetHandlerByMap(map[string]sarama.MockResponse{ "MetadataRequest": sarama.NewMockMetadataResponse(t). @@ -186,20 +227,35 @@ func TestChain(t *testing.T) { SetMessage(mockChannel.topic(), mockChannel.partition(), newestOffset, message), }) - assert.Panics(t, func() { startThread(chain) }, "Expected the Start() call to result in panic") + assert.Panics(t, func() { startThread(chain) }, "Expected the Start() call to panic") }) - t.Run("Enqueue", func(t *testing.T) { + t.Run("EnqueueProper", func(t *testing.T) { chain, _ := newChain(mockConsenter, mockSupport, newestOffset-1) + mockBroker.SetHandlerByMap(map[string]sarama.MockResponse{ + "MetadataRequest": sarama.NewMockMetadataResponse(t). + SetBroker(mockBroker.Addr(), mockBroker.BrokerID()). + SetLeader(mockChannel.topic(), mockChannel.partition(), mockBroker.BrokerID()), + "ProduceRequest": sarama.NewMockProduceResponse(t). + SetError(mockChannel.topic(), mockChannel.partition(), sarama.ErrNoError), + "OffsetRequest": sarama.NewMockOffsetResponse(t). + SetOffset(mockChannel.topic(), mockChannel.partition(), sarama.OffsetOldest, oldestOffset). + SetOffset(mockChannel.topic(), mockChannel.partition(), sarama.OffsetNewest, newestOffset), + "FetchRequest": sarama.NewMockFetchResponse(t, 1). + SetMessage(mockChannel.topic(), mockChannel.partition(), newestOffset, message), + }) + chain.Start() select { - case <-time.After(shortTimeout): - t.Fatal("Chain should have started by now") case <-chain.startChan: - logger.Debug("startChan closed") + logger.Debug("startChan is closed as it should be") + case <-time.After(shortTimeout): + t.Fatal("startChan should have been closed by now") } + // Enqueue should have access to the post path, and its ProduceRequest + // should go by without error assert.True(t, chain.Enqueue(newMockEnvelope("fooMessage")), "Expected Enqueue call to return true") chain.Halt() @@ -208,36 +264,43 @@ func TestChain(t *testing.T) { t.Run("EnqueueIfHalted", func(t *testing.T) { chain, _ := newChain(mockConsenter, mockSupport, newestOffset-1) + mockBroker.SetHandlerByMap(map[string]sarama.MockResponse{ + "MetadataRequest": sarama.NewMockMetadataResponse(t). + SetBroker(mockBroker.Addr(), mockBroker.BrokerID()). + SetLeader(mockChannel.topic(), mockChannel.partition(), mockBroker.BrokerID()), + "ProduceRequest": sarama.NewMockProduceResponse(t). + SetError(mockChannel.topic(), mockChannel.partition(), sarama.ErrNoError), + "OffsetRequest": sarama.NewMockOffsetResponse(t). + SetOffset(mockChannel.topic(), mockChannel.partition(), sarama.OffsetOldest, oldestOffset). + SetOffset(mockChannel.topic(), mockChannel.partition(), sarama.OffsetNewest, newestOffset), + "FetchRequest": sarama.NewMockFetchResponse(t, 1). + SetMessage(mockChannel.topic(), mockChannel.partition(), newestOffset, message), + }) + chain.Start() select { - case <-time.After(shortTimeout): - t.Fatal("Chain should have started by now") case <-chain.startChan: - logger.Debug("startChan closed") + logger.Debug("startChan is closed as it should be") + case <-time.After(shortTimeout): + t.Fatal("startChan should have been closed by now") } chain.Halt() - assert.True(t, chain.halted, "Expected chain.halted flag to be set true") + // haltChan should close access to the post path assert.False(t, chain.Enqueue(newMockEnvelope("fooMessage")), "Expected Enqueue call to return false") }) t.Run("EnqueueError", func(t *testing.T) { chain, _ := newChain(mockConsenter, mockSupport, newestOffset-1) - chain.Start() - select { - case <-time.After(shortTimeout): - t.Fatal("Chain should have started by now") - case <-chain.startChan: - logger.Debug("startChan closed") - } - + // Use the "good" handler map that allows the Stage to complete without + // issues mockBroker.SetHandlerByMap(map[string]sarama.MockResponse{ "MetadataRequest": sarama.NewMockMetadataResponse(t). SetBroker(mockBroker.Addr(), mockBroker.BrokerID()). SetLeader(mockChannel.topic(), mockChannel.partition(), mockBroker.BrokerID()), "ProduceRequest": sarama.NewMockProduceResponse(t). - SetError(mockChannel.topic(), mockChannel.partition(), sarama.ErrNotLeaderForPartition), + SetError(mockChannel.topic(), mockChannel.partition(), sarama.ErrNoError), "OffsetRequest": sarama.NewMockOffsetResponse(t). SetOffset(mockChannel.topic(), mockChannel.partition(), sarama.OffsetOldest, oldestOffset). SetOffset(mockChannel.topic(), mockChannel.partition(), sarama.OffsetNewest, newestOffset), @@ -245,13 +308,114 @@ func TestChain(t *testing.T) { SetMessage(mockChannel.topic(), mockChannel.partition(), newestOffset, message), }) + chain.Start() + select { + case <-chain.startChan: + logger.Debug("startChan is closed as it should be") + case <-time.After(shortTimeout): + t.Fatal("startChan should have been closed by now") + } + + // Now make it so that the next ProduceRequest is met with an error + mockBroker.SetHandlerByMap(map[string]sarama.MockResponse{ + "ProduceRequest": sarama.NewMockProduceResponse(t). + SetError(mockChannel.topic(), mockChannel.partition(), sarama.ErrNotLeaderForPartition), + }) + assert.False(t, chain.Enqueue(newMockEnvelope("fooMessage")), "Expected Enqueue call to return false") }) } -func TestCloseLoop(t *testing.T) { +func TestSetupProducerForChannel(t *testing.T) { + if testing.Short() { + t.Skip("Skipping test in short mode") + } + + mockBroker := sarama.NewMockBroker(t, 0) + defer mockBroker.Close() + + mockChannel := newChannel("channelFoo", defaultPartition) + + haltChan := make(chan struct{}) + + t.Run("Proper", func(t *testing.T) { + metadataResponse := new(sarama.MetadataResponse) + metadataResponse.AddBroker(mockBroker.Addr(), mockBroker.BrokerID()) + metadataResponse.AddTopicPartition(mockChannel.topic(), mockChannel.partition(), mockBroker.BrokerID(), nil, nil, sarama.ErrNoError) + mockBroker.Returns(metadataResponse) + + producer, err := setupProducerForChannel(mockConsenter.retryOptions(), haltChan, []string{mockBroker.Addr()}, mockBrokerConfig, mockChannel) + assert.NoError(t, err, "Expected the setupProducerForChannel call to return without errors") + assert.NoError(t, producer.Close(), "Expected to close the producer without errors") + }) + + t.Run("WithError", func(t *testing.T) { + _, err := setupProducerForChannel(mockConsenter.retryOptions(), haltChan, []string{}, mockBrokerConfig, mockChannel) + assert.Error(t, err, "Expected the setupProducerForChannel call to return an error") + }) +} + +func TestSetupConsumerForChannel(t *testing.T) { + mockBroker := sarama.NewMockBroker(t, 0) + defer func() { mockBroker.Close() }() + + mockChannel := newChannel("channelFoo", defaultPartition) + + oldestOffset := int64(0) + newestOffset := int64(5) + + startFrom := int64(3) + message := sarama.StringEncoder("messageFoo") + + mockBroker.SetHandlerByMap(map[string]sarama.MockResponse{ + "MetadataRequest": sarama.NewMockMetadataResponse(t). + SetBroker(mockBroker.Addr(), mockBroker.BrokerID()). + SetLeader(mockChannel.topic(), mockChannel.partition(), mockBroker.BrokerID()), + "OffsetRequest": sarama.NewMockOffsetResponse(t). + SetOffset(mockChannel.topic(), mockChannel.partition(), sarama.OffsetOldest, oldestOffset). + SetOffset(mockChannel.topic(), mockChannel.partition(), sarama.OffsetNewest, newestOffset), + "FetchRequest": sarama.NewMockFetchResponse(t, 1). + SetMessage(mockChannel.topic(), mockChannel.partition(), startFrom, message), + }) + + haltChan := make(chan struct{}) + + t.Run("ProperParent", func(t *testing.T) { + parentConsumer, err := setupParentConsumerForChannel(mockConsenter.retryOptions(), haltChan, []string{mockBroker.Addr()}, mockBrokerConfig, mockChannel) + assert.NoError(t, err, "Expected the setupParentConsumerForChannel call to return without errors") + assert.NoError(t, parentConsumer.Close(), "Expected to close the parentConsumer without errors") + }) + + t.Run("ProperChannel", func(t *testing.T) { + parentConsumer, _ := setupParentConsumerForChannel(mockConsenter.retryOptions(), haltChan, []string{mockBroker.Addr()}, mockBrokerConfig, mockChannel) + defer func() { parentConsumer.Close() }() + channelConsumer, err := setupChannelConsumerForChannel(mockConsenter.retryOptions(), haltChan, parentConsumer, mockChannel, newestOffset) + assert.NoError(t, err, "Expected the setupChannelConsumerForChannel call to return without errors") + assert.NoError(t, channelConsumer.Close(), "Expected to close the channelConsumer without errors") + }) + + t.Run("WithParentConsumerError", func(t *testing.T) { + // Provide an empty brokers list + _, err := setupParentConsumerForChannel(mockConsenter.retryOptions(), haltChan, []string{}, mockBrokerConfig, mockChannel) + assert.Error(t, err, "Expected the setupParentConsumerForChannel call to return an error") + }) + + t.Run("WithChannelConsumerError", func(t *testing.T) { + // Provide an out-of-range offset + parentConsumer, _ := setupParentConsumerForChannel(mockConsenter.retryOptions(), haltChan, []string{mockBroker.Addr()}, mockBrokerConfig, mockChannel) + _, err := setupChannelConsumerForChannel(mockConsenter.retryOptions(), haltChan, parentConsumer, mockChannel, newestOffset+1) + defer func() { parentConsumer.Close() }() + assert.Error(t, err, "Expected the setupChannelConsumerForChannel call to return an error") + }) +} + +func TestCloseKafkaObjects(t *testing.T) { mockChannel := newChannel("channelFoo", defaultPartition) + mockSupport := &mockmultichain.ConsenterSupport{ + ChainIDVal: mockChannel.topic(), + } + oldestOffset := int64(0) newestOffset := int64(5) @@ -272,24 +436,25 @@ func TestCloseLoop(t *testing.T) { SetMessage(mockChannel.topic(), mockChannel.partition(), startFrom, message), }) - exitChan := make(chan struct{}) + haltChan := make(chan struct{}) t.Run("Proper", func(t *testing.T) { - producer, err := setupProducerForChannel(mockConsenter.retryOptions(), exitChan, []string{mockBroker.Addr()}, mockBrokerConfig, mockChannel) - assert.NoError(t, err, "Expected no error when setting up the producer") - - parentConsumer, err := setupParentConsumerForChannel(mockConsenter.retryOptions(), exitChan, []string{mockBroker.Addr()}, mockBrokerConfig, mockChannel) - assert.NoError(t, err, "Expected no error when setting up the parent consumer") - - channelConsumer, err := setupChannelConsumerForChannel(mockConsenter.retryOptions(), exitChan, parentConsumer, mockChannel, startFrom) - assert.NoError(t, err, "Expected no error when setting up the channel consumer") - - haltedFlag := false + producer, _ := setupProducerForChannel(mockConsenter.retryOptions(), haltChan, []string{mockBroker.Addr()}, mockBrokerConfig, mockChannel) + parentConsumer, _ := setupParentConsumerForChannel(mockConsenter.retryOptions(), haltChan, []string{mockBroker.Addr()}, mockBrokerConfig, mockChannel) + channelConsumer, _ := setupChannelConsumerForChannel(mockConsenter.retryOptions(), haltChan, parentConsumer, mockChannel, startFrom) + + // Set up a chain with just the minimum necessary fields instantiated so + // as to test the function + bareMinimumChain := &chainImpl{ + support: mockSupport, + producer: producer, + parentConsumer: parentConsumer, + channelConsumer: channelConsumer, + } - errs := closeLoop(mockChannel.topic(), producer, parentConsumer, channelConsumer, &haltedFlag) + errs := bareMinimumChain.closeKafkaObjects() assert.Len(t, errs, 0, "Expected zero errors") - assert.True(t, haltedFlag, "Expected halted flag to be set to true") assert.Panics(t, func() { channelConsumer.Close() @@ -307,24 +472,28 @@ func TestCloseLoop(t *testing.T) { }) t.Run("ChannelConsumerError", func(t *testing.T) { - producer, err := sarama.NewSyncProducer([]string{mockBroker.Addr()}, mockBrokerConfig) - assert.NoError(t, err, "Expected no error when setting up the sarama SyncProducer") + producer, _ := sarama.NewSyncProducer([]string{mockBroker.Addr()}, mockBrokerConfig) // Unlike all other tests in this file, forcing an error on the // channelConsumer.Close() call is more easily achieved using the mock - // Consumer. Thus we bypass the call to `newConsumer` and do - // type-casting. + // Consumer. Thus we bypass the call to `setup*Consumer`. + + // Have the consumer receive an ErrOutOfBrokers error. mockParentConsumer := mocks.NewConsumer(t, nil) mockParentConsumer.ExpectConsumePartition(mockChannel.topic(), mockChannel.partition(), startFrom).YieldError(sarama.ErrOutOfBrokers) mockChannelConsumer, err := mockParentConsumer.ConsumePartition(mockChannel.topic(), mockChannel.partition(), startFrom) assert.NoError(t, err, "Expected no error when setting up the mock partition consumer") - haltedFlag := false + bareMinimumChain := &chainImpl{ + support: mockSupport, + producer: producer, + parentConsumer: mockParentConsumer, + channelConsumer: mockChannelConsumer, + } - errs := closeLoop(mockChannel.topic(), producer, mockParentConsumer, mockChannelConsumer, &haltedFlag) + errs := bareMinimumChain.closeKafkaObjects() assert.Len(t, errs, 1, "Expected 1 error returned") - assert.True(t, haltedFlag, "Expected halted flag to be set to true") assert.NotPanics(t, func() { mockChannelConsumer.Close() @@ -336,6 +505,8 @@ func TestCloseLoop(t *testing.T) { }) } +// Test helper functions here. + func TestGetLastCutBlockNumber(t *testing.T) { testCases := []struct { name string @@ -380,24 +551,46 @@ func TestGetLastOffsetPersisted(t *testing.T) { } } -func TestListenForErrors(t *testing.T) { +func TestSendConnectMessage(t *testing.T) { + mockBroker := sarama.NewMockBroker(t, 0) + defer func() { mockBroker.Close() }() + mockChannel := newChannel("mockChannelFoo", defaultPartition) - errChan := make(chan *sarama.ConsumerError, 1) - exitChan1 := make(chan struct{}) - close(exitChan1) - assert.Nil(t, listenForErrors(errChan, exitChan1), "Expected listenForErrors call to return nil") + metadataResponse := new(sarama.MetadataResponse) + metadataResponse.AddBroker(mockBroker.Addr(), mockBroker.BrokerID()) + metadataResponse.AddTopicPartition(mockChannel.topic(), mockChannel.partition(), mockBroker.BrokerID(), nil, nil, sarama.ErrNoError) + mockBroker.Returns(metadataResponse) + + producer, _ := sarama.NewSyncProducer([]string{mockBroker.Addr()}, mockBrokerConfig) + defer func() { producer.Close() }() + + haltChan := make(chan struct{}) - exitChan2 := make(chan struct{}) - errChan <- &sarama.ConsumerError{ - Topic: mockChannel.topic(), - Partition: mockChannel.partition(), - Err: fmt.Errorf("foo"), - } - assert.NotNil(t, listenForErrors(errChan, exitChan2), "Expected listenForErrors call to return an error") + t.Run("Proper", func(t *testing.T) { + successResponse := new(sarama.ProduceResponse) + successResponse.AddTopicPartition(mockChannel.topic(), mockChannel.partition(), sarama.ErrNoError) + mockBroker.Returns(successResponse) + + assert.NoError(t, sendConnectMessage(mockConsenter.retryOptions(), haltChan, producer, mockChannel), "Expected the sendConnectMessage call to return without errors") + }) + + t.Run("WithError", func(t *testing.T) { + // Note that this test is affected by the following parameters: + // - Net.ReadTimeout + // - Consumer.Retry.Backoff + // - Metadata.Retry.Max + + // Have the broker return an ErrNotEnoughReplicas error + failureResponse := new(sarama.ProduceResponse) + failureResponse.AddTopicPartition(mockChannel.topic(), mockChannel.partition(), sarama.ErrNotEnoughReplicas) + mockBroker.Returns(failureResponse) + + assert.Error(t, sendConnectMessage(mockConsenter.retryOptions(), haltChan, producer, mockChannel), "Expected the sendConnectMessage call to return an error") + }) } -func TestProcessLoopConnect(t *testing.T) { +func TestSendTimeToCut(t *testing.T) { mockBroker := sarama.NewMockBroker(t, 0) defer func() { mockBroker.Close() }() @@ -410,45 +603,41 @@ func TestProcessLoopConnect(t *testing.T) { producer, err := sarama.NewSyncProducer([]string{mockBroker.Addr()}, mockBrokerConfig) assert.NoError(t, err, "Expected no error when setting up the sarama SyncProducer") + defer func() { producer.Close() }() - mockBrokerConfigCopy := *mockBrokerConfig - mockBrokerConfigCopy.ChannelBufferSize = 0 - - newestOffset := int64(5) - - mockParentConsumer := mocks.NewConsumer(t, &mockBrokerConfigCopy) - mpc := mockParentConsumer.ExpectConsumePartition(mockChannel.topic(), mockChannel.partition(), newestOffset) - mockChannelConsumer, err := mockParentConsumer.ConsumePartition(mockChannel.topic(), mockChannel.partition(), newestOffset) - assert.NoError(t, err, "Expected no error when setting up the mock partition consumer") - - mockSupport := &mockmultichain.ConsenterSupport{} + timeToCutBlockNumber := uint64(3) + var timer <-chan time.Time - lastCutBlockNumber := uint64(2) - haltedFlag := false - exitChan := make(chan struct{}) + t.Run("Proper", func(t *testing.T) { + successResponse := new(sarama.ProduceResponse) + successResponse.AddTopicPartition(mockChannel.topic(), mockChannel.partition(), sarama.ErrNoError) + mockBroker.Returns(successResponse) - var counts []uint64 - done := make(chan struct{}) + timer = time.After(longTimeout) - go func() { - counts, err = processMessagesToBlock(mockSupport, producer, mockParentConsumer, mockChannelConsumer, mockChannel, &lastCutBlockNumber, &haltedFlag, &exitChan) - done <- struct{}{} - }() + assert.NoError(t, sendTimeToCut(producer, mockChannel, timeToCutBlockNumber, &timer), "Expected the sendTimeToCut call to return without errors") + assert.Nil(t, timer, "Expected the sendTimeToCut call to nil the timer") + }) - // This is the wrappedMessage that the for loop will process - mpc.YieldMessage(newMockConsumerMessage(newConnectMessage())) + t.Run("WithError", func(t *testing.T) { + // Note that this test is affected by the following parameters: + // - Net.ReadTimeout + // - Consumer.Retry.Backoff + // - Metadata.Retry.Max + failureResponse := new(sarama.ProduceResponse) + failureResponse.AddTopicPartition(mockChannel.topic(), mockChannel.partition(), sarama.ErrNotEnoughReplicas) + mockBroker.Returns(failureResponse) - logger.Debug("Closing exitChan to exit the infinite for loop") - close(exitChan) // Identical to chain.Halt() - logger.Debug("exitChan closed") - <-done + timer = time.After(longTimeout) - assert.NoError(t, err, "Expected the processMessagesToBlock call to return without errors") - assert.Equal(t, uint64(1), counts[indexRecvPass], "Expected 1 message received and unmarshaled") - assert.Equal(t, uint64(1), counts[indexProcessConnectPass], "Expected 1 CONNECT message processed") + assert.Error(t, sendTimeToCut(producer, mockChannel, timeToCutBlockNumber, &timer), "Expected the sendTimeToCut call to return an error") + assert.Nil(t, timer, "Expected the sendTimeToCut call to nil the timer") + }) } -func TestProcessLoopRegularError(t *testing.T) { +func TestProcessMessagesToBlocks(t *testing.T) { + subtestIndex := -1 // Used to calculate the right offset at each subtest + mockBroker := sarama.NewMockBroker(t, 0) defer func() { mockBroker.Close() }() @@ -459,219 +648,275 @@ func TestProcessLoopRegularError(t *testing.T) { metadataResponse.AddTopicPartition(mockChannel.topic(), mockChannel.partition(), mockBroker.BrokerID(), nil, nil, sarama.ErrNoError) mockBroker.Returns(metadataResponse) - producer, err := sarama.NewSyncProducer([]string{mockBroker.Addr()}, mockBrokerConfig) - assert.NoError(t, err, "Expected no error when setting up the sarama SyncProducer") + producer, _ := sarama.NewSyncProducer([]string{mockBroker.Addr()}, mockBrokerConfig) mockBrokerConfigCopy := *mockBrokerConfig mockBrokerConfigCopy.ChannelBufferSize = 0 - newestOffset := int64(5) + newestOffset := int64(0) mockParentConsumer := mocks.NewConsumer(t, &mockBrokerConfigCopy) mpc := mockParentConsumer.ExpectConsumePartition(mockChannel.topic(), mockChannel.partition(), newestOffset) mockChannelConsumer, err := mockParentConsumer.ConsumePartition(mockChannel.topic(), mockChannel.partition(), newestOffset) assert.NoError(t, err, "Expected no error when setting up the mock partition consumer") - lastCutBlockNumber := uint64(3) + t.Run("ReceiveConnect", func(t *testing.T) { + subtestIndex++ - mockSupport := &mockmultichain.ConsenterSupport{ - Blocks: make(chan *cb.Block), // WriteBlock will post here - BlockCutterVal: mockblockcutter.NewReceiver(), - ChainIDVal: mockChannel.topic(), - HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call - SharedConfigVal: &mockconfig.Orderer{ - KafkaBrokersVal: []string{mockBroker.Addr()}, - }, - } - defer close(mockSupport.BlockCutterVal.Block) + errorChan := make(chan struct{}) + close(errorChan) + haltChan := make(chan struct{}) - haltedFlag := false - exitChan := make(chan struct{}) + mockSupport := &mockmultichain.ConsenterSupport{ + ChainIDVal: mockChannel.topic(), + } - var counts []uint64 - done := make(chan struct{}) + bareMinimumChain := &chainImpl{ + parentConsumer: mockParentConsumer, + channelConsumer: mockChannelConsumer, - go func() { - counts, err = processMessagesToBlock(mockSupport, producer, mockParentConsumer, mockChannelConsumer, mockChannel, &lastCutBlockNumber, &haltedFlag, &exitChan) - done <- struct{}{} - }() + channel: mockChannel, + support: mockSupport, - // This is the wrappedMessage that the for loop will process - mpc.YieldMessage(newMockConsumerMessage(newRegularMessage(tamperBytes(utils.MarshalOrPanic(newMockEnvelope("fooMessage")))))) + errorChan: errorChan, + haltChan: haltChan, + } - logger.Debug("Closing exitChan to exit the infinite for loop") - close(exitChan) // Identical to chain.Halt() - logger.Debug("exitChan closed") - <-done + var counts []uint64 + done := make(chan struct{}) - assert.NoError(t, err, "Expected the processMessagesToBlock call to return without errors") - assert.Equal(t, uint64(1), counts[indexRecvPass], "Expected 1 message received and unmarshaled") - assert.Equal(t, uint64(1), counts[indexPprocessRegularError], "Expected 1 damaged REGULAR message processed") -} + go func() { + counts, err = bareMinimumChain.processMessagesToBlocks() + done <- struct{}{} + }() -func TestProcessLoopRegularQueueEnvelope(t *testing.T) { - mockBroker := sarama.NewMockBroker(t, 0) - defer func() { mockBroker.Close() }() + // This is the wrappedMessage that the for-loop will process + mpc.YieldMessage(newMockConsumerMessage(newConnectMessage())) - mockChannel := newChannel("mockChannelFoo", defaultPartition) + logger.Debug("Closing haltChan to exit the infinite for-loop") + close(haltChan) // Identical to chain.Halt() + logger.Debug("haltChan closed") + <-done - metadataResponse := new(sarama.MetadataResponse) - metadataResponse.AddBroker(mockBroker.Addr(), mockBroker.BrokerID()) - metadataResponse.AddTopicPartition(mockChannel.topic(), mockChannel.partition(), mockBroker.BrokerID(), nil, nil, sarama.ErrNoError) - mockBroker.Returns(metadataResponse) + assert.NoError(t, err, "Expected the processMessagesToBlocks call to return without errors") + assert.Equal(t, uint64(1), counts[indexRecvPass], "Expected 1 message received and unmarshaled") + assert.Equal(t, uint64(1), counts[indexProcessConnectPass], "Expected 1 CONNECT message processed") + }) - producer, err := sarama.NewSyncProducer([]string{mockBroker.Addr()}, mockBrokerConfig) - assert.NoError(t, err, "Expected no error when setting up the sarama SyncProducer") + t.Run("ReceiveRegularWithError", func(t *testing.T) { + subtestIndex++ - newestOffset := int64(5) + errorChan := make(chan struct{}) + close(errorChan) + haltChan := make(chan struct{}) - mockParentConsumer := mocks.NewConsumer(t, nil) - mockParentConsumer.ExpectConsumePartition(mockChannel.topic(), mockChannel.partition(), newestOffset). - YieldMessage(newMockConsumerMessage(newRegularMessage(utils.MarshalOrPanic(newMockEnvelope("fooMessage"))))) // This is the wrappedMessage that the for loop will process - mockChannelConsumer, err := mockParentConsumer.ConsumePartition(mockChannel.topic(), mockChannel.partition(), newestOffset) - assert.NoError(t, err, "Expected no error when setting up the mock partition consumer") + mockSupport := &mockmultichain.ConsenterSupport{ + ChainIDVal: mockChannel.topic(), + } - lastCutBlockNumber := uint64(3) + bareMinimumChain := &chainImpl{ + parentConsumer: mockParentConsumer, + channelConsumer: mockChannelConsumer, - mockSupport := &mockmultichain.ConsenterSupport{ - Blocks: make(chan *cb.Block), // WriteBlock will post here - BlockCutterVal: mockblockcutter.NewReceiver(), - ChainIDVal: mockChannel.topic(), - HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call - SharedConfigVal: &mockconfig.Orderer{ - BatchTimeoutVal: longTimeout, - KafkaBrokersVal: []string{mockBroker.Addr()}, - }, - } - defer close(mockSupport.BlockCutterVal.Block) + channel: mockChannel, + support: mockSupport, - exitChan := make(chan struct{}) + errorChan: errorChan, + haltChan: haltChan, + } - go func() { // Note: Unlike the CONNECT test case, the following does NOT introduce a race condition, so we're good - mockSupport.BlockCutterVal.Block <- struct{}{} // Let the `mockblockcutter.Ordered` call return - logger.Debugf("Mock blockcutter's Ordered call has returned") - logger.Debug("Closing exitChan to exit the infinite for loop") // We are guaranteed to hit the exitChan branch after hitting the REGULAR branch at least once - close(exitChan) // Identical to chain.Halt() - logger.Debug("exitChan closed") - }() + var counts []uint64 + done := make(chan struct{}) - haltedFlag := false + go func() { + counts, err = bareMinimumChain.processMessagesToBlocks() + done <- struct{}{} + }() - counts, err := processMessagesToBlock(mockSupport, producer, mockParentConsumer, mockChannelConsumer, mockChannel, &lastCutBlockNumber, &haltedFlag, &exitChan) - assert.NoError(t, err, "Expected the processMessagesToBlock call to return without errors") - assert.Equal(t, uint64(1), counts[indexRecvPass], "Expected 1 message received and unmarshaled") - assert.Equal(t, uint64(1), counts[indexProcessRegularPass], "Expected 1 REGULAR message processed") -} + // This is the wrappedMessage that the for-loop will process + mpc.YieldMessage(newMockConsumerMessage(newRegularMessage(tamperBytes(utils.MarshalOrPanic(newMockEnvelope("fooMessage")))))) -func TestProcessLoopRegularCutBlock(t *testing.T) { - mockBroker := sarama.NewMockBroker(t, 0) - defer func() { mockBroker.Close() }() + logger.Debug("Closing haltChan to exit the infinite for-loop") + close(haltChan) // Identical to chain.Halt() + logger.Debug("haltChan closed") + <-done - mockChannel := newChannel("mockChannelFoo", defaultPartition) + assert.NoError(t, err, "Expected the processMessagesToBlocks call to return without errors") + assert.Equal(t, uint64(1), counts[indexRecvPass], "Expected 1 message received and unmarshaled") + assert.Equal(t, uint64(1), counts[indexProcessRegularError], "Expected 1 damaged REGULAR message processed") + }) - metadataResponse := new(sarama.MetadataResponse) - metadataResponse.AddBroker(mockBroker.Addr(), mockBroker.BrokerID()) - metadataResponse.AddTopicPartition(mockChannel.topic(), mockChannel.partition(), mockBroker.BrokerID(), nil, nil, sarama.ErrNoError) - mockBroker.Returns(metadataResponse) + t.Run("ReceiveRegularAndQueue", func(t *testing.T) { + subtestIndex++ - producer, err := sarama.NewSyncProducer([]string{mockBroker.Addr()}, mockBrokerConfig) - assert.NoError(t, err, "Expected no error when setting up the sarama SyncProducer") + errorChan := make(chan struct{}) + close(errorChan) + haltChan := make(chan struct{}) - newestOffset := int64(5) + lastCutBlockNumber := uint64(3) - mockParentConsumer := mocks.NewConsumer(t, nil) - mockParentConsumer.ExpectConsumePartition(mockChannel.topic(), mockChannel.partition(), newestOffset). - YieldMessage(newMockConsumerMessage(newRegularMessage(utils.MarshalOrPanic(newMockEnvelope("fooMessage"))))) // This is the wrappedMessage that the for loop will process - mockChannelConsumer, err := mockParentConsumer.ConsumePartition(mockChannel.topic(), mockChannel.partition(), newestOffset) - assert.NoError(t, err, "Expected no error when setting up the mock partition consumer") + mockSupport := &mockmultichain.ConsenterSupport{ + Blocks: make(chan *cb.Block), // WriteBlock will post here + BlockCutterVal: mockblockcutter.NewReceiver(), + ChainIDVal: mockChannel.topic(), + HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call + SharedConfigVal: &mockconfig.Orderer{ + BatchTimeoutVal: longTimeout, + }, + } + defer close(mockSupport.BlockCutterVal.Block) - lastCutBlockNumber := uint64(3) - lastCutBlockNumberEnd := lastCutBlockNumber + 1 + bareMinimumChain := &chainImpl{ + parentConsumer: mockParentConsumer, + channelConsumer: mockChannelConsumer, - mockSupport := &mockmultichain.ConsenterSupport{ - Blocks: make(chan *cb.Block), // WriteBlock will post here - BlockCutterVal: mockblockcutter.NewReceiver(), - ChainIDVal: mockChannel.topic(), - HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call - SharedConfigVal: &mockconfig.Orderer{ - BatchTimeoutVal: shortTimeout, - KafkaBrokersVal: []string{mockBroker.Addr()}, - }, - } - defer close(mockSupport.BlockCutterVal.Block) + channel: mockChannel, + support: mockSupport, + lastCutBlockNumber: lastCutBlockNumber, + + errorChan: errorChan, + haltChan: haltChan, + } - mockSupport.BlockCutterVal.CutNext = true + var counts []uint64 + done := make(chan struct{}) - exitChan := make(chan struct{}) + go func() { + counts, err = bareMinimumChain.processMessagesToBlocks() + done <- struct{}{} + }() + + // This is the wrappedMessage that the for-loop will process + mpc.YieldMessage(newMockConsumerMessage(newRegularMessage(utils.MarshalOrPanic(newMockEnvelope("fooMessage"))))) - go func() { // Note: Unlike the CONNECT test case, the following does NOT introduce a race condition, so we're good mockSupport.BlockCutterVal.Block <- struct{}{} // Let the `mockblockcutter.Ordered` call return logger.Debugf("Mock blockcutter's Ordered call has returned") - <-mockSupport.Blocks // Let the `mockConsenterSupport.WriteBlock` proceed - logger.Debug("Closing exitChan to exit the infinite for loop") // We are guaranteed to hit the exitChan branch after hitting the REGULAR branch at least once - close(exitChan) // Identical to chain.Halt() - logger.Debug("exitChan closed") - }() - - haltedFlag := false - - counts, err := processMessagesToBlock(mockSupport, producer, mockParentConsumer, mockChannelConsumer, mockChannel, &lastCutBlockNumber, &haltedFlag, &exitChan) - assert.NoError(t, err, "Expected the processMessagesToBlock call to return without errors") - assert.Equal(t, uint64(1), counts[indexRecvPass], "Expected 1 message received and unmarshaled") - assert.Equal(t, uint64(1), counts[indexProcessRegularPass], "Expected 1 REGULAR message processed") - assert.Equal(t, lastCutBlockNumberEnd, lastCutBlockNumber, "Expected lastCutBlockNumber to be bumped up by one") -} -func TestProcessLoopRegularCutTwoBlocks(t *testing.T) { - if testing.Short() { - t.Skip("Skipping test in short mode") - } + logger.Debug("Closing haltChan to exit the infinite for-loop") + // We are guaranteed to hit the haltChan branch after hitting the REGULAR branch at least once + close(haltChan) // Identical to chain.Halt() + logger.Debug("haltChan closed") + <-done - mockBroker := sarama.NewMockBroker(t, 0) - defer func() { mockBroker.Close() }() + assert.NoError(t, err, "Expected the processMessagesToBlocks call to return without errors") + assert.Equal(t, uint64(1), counts[indexRecvPass], "Expected 1 message received and unmarshaled") + assert.Equal(t, uint64(1), counts[indexProcessRegularPass], "Expected 1 REGULAR message processed") + }) - mockChannel := newChannel("mockChannelFoo", defaultPartition) + t.Run("ReceiveRegularAndCutBlock", func(t *testing.T) { + subtestIndex++ - metadataResponse := new(sarama.MetadataResponse) - metadataResponse.AddBroker(mockBroker.Addr(), mockBroker.BrokerID()) - metadataResponse.AddTopicPartition(mockChannel.topic(), mockChannel.partition(), mockBroker.BrokerID(), nil, nil, sarama.ErrNoError) - mockBroker.Returns(metadataResponse) + errorChan := make(chan struct{}) + close(errorChan) + haltChan := make(chan struct{}) - producer, err := sarama.NewSyncProducer([]string{mockBroker.Addr()}, mockBrokerConfig) - assert.NoError(t, err, "Expected no error when setting up the sarama SyncProducer") + lastCutBlockNumber := uint64(3) - newestOffset := int64(0) + mockSupport := &mockmultichain.ConsenterSupport{ + Blocks: make(chan *cb.Block), // WriteBlock will post here + BlockCutterVal: mockblockcutter.NewReceiver(), + ChainIDVal: mockChannel.topic(), + HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call + SharedConfigVal: &mockconfig.Orderer{ + BatchTimeoutVal: longTimeout, + }, + } + defer close(mockSupport.BlockCutterVal.Block) - mockParentConsumer := mocks.NewConsumer(t, nil) - mpc := mockParentConsumer.ExpectConsumePartition(mockChannel.topic(), mockChannel.partition(), newestOffset) - mockChannelConsumer, err := mockParentConsumer.ConsumePartition(mockChannel.topic(), mockChannel.partition(), newestOffset) - assert.NoError(t, err, "Expected no error when setting up the mock partition consumer") + bareMinimumChain := &chainImpl{ + parentConsumer: mockParentConsumer, + channelConsumer: mockChannelConsumer, - lastCutBlockNumber := uint64(0) + channel: mockChannel, + support: mockSupport, + lastCutBlockNumber: lastCutBlockNumber, - mockSupport := &mockmultichain.ConsenterSupport{ - Blocks: make(chan *cb.Block), // WriteBlock will post here - BlockCutterVal: mockblockcutter.NewReceiver(), - ChainIDVal: mockChannel.topic(), - HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call - SharedConfigVal: &mockconfig.Orderer{ - BatchTimeoutVal: longTimeout, - KafkaBrokersVal: []string{mockBroker.Addr()}, - }, - } - defer close(mockSupport.BlockCutterVal.Block) + errorChan: errorChan, + haltChan: haltChan, + } + + var counts []uint64 + done := make(chan struct{}) + + go func() { + counts, err = bareMinimumChain.processMessagesToBlocks() + done <- struct{}{} + }() + + mockSupport.BlockCutterVal.CutNext = true + + // This is the wrappedMessage that the for-loop will process + mpc.YieldMessage(newMockConsumerMessage(newRegularMessage(utils.MarshalOrPanic(newMockEnvelope("fooMessage"))))) - exitChan := make(chan struct{}) - var block1, block2 *cb.Block + mockSupport.BlockCutterVal.Block <- struct{}{} // Let the `mockblockcutter.Ordered` call return + logger.Debugf("Mock blockcutter's Ordered call has returned") + <-mockSupport.Blocks // Let the `mockConsenterSupport.WriteBlock` proceed + + logger.Debug("Closing haltChan to exit the infinite for-loop") + close(haltChan) // Identical to chain.Halt() + logger.Debug("haltChan closed") + <-done + + assert.NoError(t, err, "Expected the processMessagesToBlocks call to return without errors") + assert.Equal(t, uint64(1), counts[indexRecvPass], "Expected 1 message received and unmarshaled") + assert.Equal(t, uint64(1), counts[indexProcessRegularPass], "Expected 1 REGULAR message processed") + assert.Equal(t, lastCutBlockNumber+1, bareMinimumChain.lastCutBlockNumber, "Expected lastCutBlockNumber to be bumped up by one") + }) + + t.Run("ReceiveTwoRegularAndCutTwoBlocks", func(t *testing.T) { + subtestIndex++ + + if testing.Short() { + t.Skip("Skipping test in short mode") + } + + errorChan := make(chan struct{}) + close(errorChan) + haltChan := make(chan struct{}) + + lastCutBlockNumber := uint64(3) - go func() { - // Push the wrappedMessage that the for-loop will process + mockSupport := &mockmultichain.ConsenterSupport{ + Blocks: make(chan *cb.Block), // WriteBlock will post here + BlockCutterVal: mockblockcutter.NewReceiver(), + ChainIDVal: mockChannel.topic(), + HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call + SharedConfigVal: &mockconfig.Orderer{ + BatchTimeoutVal: longTimeout, + }, + } + defer close(mockSupport.BlockCutterVal.Block) + + bareMinimumChain := &chainImpl{ + parentConsumer: mockParentConsumer, + channelConsumer: mockChannelConsumer, + + channel: mockChannel, + support: mockSupport, + lastCutBlockNumber: lastCutBlockNumber, + + errorChan: errorChan, + haltChan: haltChan, + } + + var counts []uint64 + done := make(chan struct{}) + + go func() { + counts, err = bareMinimumChain.processMessagesToBlocks() + done <- struct{}{} + }() + + var block1, block2 *cb.Block + + // This is the first wrappedMessage that the for-loop will process mpc.YieldMessage(newMockConsumerMessage(newRegularMessage(utils.MarshalOrPanic(newMockEnvelope("fooMessage"))))) mockSupport.BlockCutterVal.Block <- struct{}{} // Let the `mockblockcutter.Ordered` call return logger.Debugf("Mock blockcutter's Ordered call has returned") mockSupport.BlockCutterVal.IsolatedTx = true + // This is the first wrappedMessage that the for-loop will process mpc.YieldMessage(newMockConsumerMessage(newRegularMessage(utils.MarshalOrPanic(newMockEnvelope("fooMessage"))))) mockSupport.BlockCutterVal.Block <- struct{}{} logger.Debugf("Mock blockcutter's Ordered call has returned for the second time") @@ -688,535 +933,533 @@ func TestProcessLoopRegularCutTwoBlocks(t *testing.T) { logger.Fatalf("Did not receive a block from the blockcutter as expected") } - logger.Debug("Closing exitChan to exit the infinite for-loop") - close(exitChan) // Identical to chain.Halt() - logger.Debug("exitChan closed") - }() - - haltedFlag := false - lastCutBlockNumberEnd := lastCutBlockNumber + 2 + logger.Debug("Closing haltChan to exit the infinite for-loop") + close(haltChan) // Identical to chain.Halt() + logger.Debug("haltChan closed") + <-done - counts, err := processMessagesToBlock(mockSupport, producer, mockParentConsumer, mockChannelConsumer, mockChannel, &lastCutBlockNumber, &haltedFlag, &exitChan) + expectedOffset := newestOffset + int64(subtestIndex) // TODO Hacky, revise eventually - assert.NoError(t, err, "Expected the processMessagesToBlock call to return without errors") - assert.Equal(t, uint64(2), counts[indexRecvPass], "Expected 2 messages received and unmarshaled") - assert.Equal(t, uint64(2), counts[indexProcessRegularPass], "Expected 2 REGULAR messages processed") - assert.Equal(t, lastCutBlockNumberEnd, lastCutBlockNumber, "Expected lastCutBlockNumber to be bumped up by two") - // assert.NotEqual(t, block1, block2, "Expect these two blocks to not be equal, got\n%+v\nand\n%+v", block1, block2) - // assert.NotEqual(t, block1.GetMetadata(), block2.GetMetadata(), "Expect these two metadata values to not be equal") - assert.Equal(t, newestOffset+1, extractEncodedOffset(block1.GetMetadata().Metadata[cb.BlockMetadataIndex_ORDERER]), "Expected encoded offset in first block to be %d", newestOffset+1) - assert.Equal(t, newestOffset+2, extractEncodedOffset(block2.GetMetadata().Metadata[cb.BlockMetadataIndex_ORDERER]), "Expected encoded offset in first block to be %d", newestOffset+2) -} + assert.NoError(t, err, "Expected the processMessagesToBlocks call to return without errors") + assert.Equal(t, uint64(2), counts[indexRecvPass], "Expected 2 messages received and unmarshaled") + assert.Equal(t, uint64(2), counts[indexProcessRegularPass], "Expected 2 REGULAR messages processed") + assert.Equal(t, lastCutBlockNumber+2, bareMinimumChain.lastCutBlockNumber, "Expected lastCutBlockNumber to be bumped up by two") + assert.Equal(t, expectedOffset+1, extractEncodedOffset(block1.GetMetadata().Metadata[cb.BlockMetadataIndex_ORDERER]), "Expected encoded offset in first block to be %d", newestOffset+1) + assert.Equal(t, expectedOffset+2, extractEncodedOffset(block2.GetMetadata().Metadata[cb.BlockMetadataIndex_ORDERER]), "Expected encoded offset in first block to be %d", newestOffset+2) + }) -func TestProcessLoopRegularAndSendTimeToCutRegular(t *testing.T) { - if testing.Short() { - t.Skip("Skipping test in short mode") - } + t.Run("ReceiveRegularAndSendTimeToCut", func(t *testing.T) { + subtestIndex++ - mockBroker := sarama.NewMockBroker(t, 0) - defer func() { mockBroker.Close() }() + t.Skip("Skipping test as it introduces a race condition") - mockChannel := newChannel("mockChannelFoo", defaultPartition) + // NB We haven't set a handlermap for the mock broker so we need to set + // the ProduceResponse + successResponse := new(sarama.ProduceResponse) + successResponse.AddTopicPartition(mockChannel.topic(), mockChannel.partition(), sarama.ErrNoError) + mockBroker.Returns(successResponse) - metadataResponse := new(sarama.MetadataResponse) - metadataResponse.AddBroker(mockBroker.Addr(), mockBroker.BrokerID()) - metadataResponse.AddTopicPartition(mockChannel.topic(), mockChannel.partition(), mockBroker.BrokerID(), nil, nil, sarama.ErrNoError) - mockBroker.Returns(metadataResponse) + errorChan := make(chan struct{}) + close(errorChan) + haltChan := make(chan struct{}) - successResponse := new(sarama.ProduceResponse) - successResponse.AddTopicPartition(mockChannel.topic(), mockChannel.partition(), sarama.ErrNoError) - mockBroker.Returns(successResponse) + lastCutBlockNumber := uint64(3) - producer, err := sarama.NewSyncProducer([]string{mockBroker.Addr()}, mockBrokerConfig) - assert.NoError(t, err, "Expected no error when setting up the sarama SyncProducer") + mockSupport := &mockmultichain.ConsenterSupport{ + Blocks: make(chan *cb.Block), // WriteBlock will post here + BlockCutterVal: mockblockcutter.NewReceiver(), + ChainIDVal: mockChannel.topic(), + HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call + SharedConfigVal: &mockconfig.Orderer{ + BatchTimeoutVal: extraShortTimeout, // ATTN + }, + } + defer close(mockSupport.BlockCutterVal.Block) - newestOffset := int64(5) + bareMinimumChain := &chainImpl{ + producer: producer, + parentConsumer: mockParentConsumer, + channelConsumer: mockChannelConsumer, - mockParentConsumer := mocks.NewConsumer(t, nil) - mockParentConsumer.ExpectConsumePartition(mockChannel.topic(), mockChannel.partition(), newestOffset). - YieldMessage(newMockConsumerMessage(newRegularMessage(utils.MarshalOrPanic(newMockEnvelope("fooMessage"))))) // This is the wrappedMessage that the for loop will process - mockChannelConsumer, err := mockParentConsumer.ConsumePartition(mockChannel.topic(), mockChannel.partition(), newestOffset) - assert.NoError(t, err, "Expected no error when setting up the mock partition consumer") + channel: mockChannel, + support: mockSupport, + lastCutBlockNumber: lastCutBlockNumber, - lastCutBlockNumber := uint64(3) - lastCutBlockNumberEnd := lastCutBlockNumber + errorChan: errorChan, + haltChan: haltChan, + } - batchTimeout, _ := time.ParseDuration("1ms") + var counts []uint64 + done := make(chan struct{}) - mockSupport := &mockmultichain.ConsenterSupport{ - Blocks: make(chan *cb.Block), // WriteBlock will post here - BlockCutterVal: mockblockcutter.NewReceiver(), - ChainIDVal: mockChannel.topic(), - HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call - SharedConfigVal: &mockconfig.Orderer{ - BatchTimeoutVal: batchTimeout, - KafkaBrokersVal: []string{mockBroker.Addr()}, - }, - } - defer close(mockSupport.BlockCutterVal.Block) + go func() { + counts, err = bareMinimumChain.processMessagesToBlocks() + done <- struct{}{} + }() - exitChan := make(chan struct{}) + // This is the wrappedMessage that the for-loop will process + mpc.YieldMessage(newMockConsumerMessage(newRegularMessage(utils.MarshalOrPanic(newMockEnvelope("fooMessage"))))) - go func() { // TODO Hacky, see comments below, revise approach - mockSupport.BlockCutterVal.Block <- struct{}{} // Let the `mockblockcutter.Ordered` call return (in `processRegular`) + mockSupport.BlockCutterVal.Block <- struct{}{} // Let the `mockblockcutter.Ordered` call return logger.Debugf("Mock blockcutter's Ordered call has returned") - time.Sleep(hitBranch) // This introduces a race: we're basically sleeping so as to let select hit the TIMER branch first before the EXITCHAN one - logger.Debug("Closing exitChan to exit the infinite for loop") - close(exitChan) // Identical to chain.Halt() - logger.Debug("exitChan closed") - }() - - haltedFlag := false - - counts, err := processMessagesToBlock(mockSupport, producer, mockParentConsumer, mockChannelConsumer, mockChannel, &lastCutBlockNumber, &haltedFlag, &exitChan) - assert.NoError(t, err, "Expected the processMessagesToBlock call to return without errors") - assert.Equal(t, uint64(1), counts[indexRecvPass], "Expected 1 message received and unmarshaled") - assert.Equal(t, uint64(1), counts[indexProcessRegularPass], "Expected 1 REGULAR message processed") - assert.Equal(t, uint64(1), counts[indexSendTimeToCutPass], "Expected 1 TIMER event processed") - assert.Equal(t, lastCutBlockNumberEnd, lastCutBlockNumber, "Expected lastCutBlockNumber to stay the same") -} -func TestProcessLoopRegularAndSendTimeToCutError(t *testing.T) { - if testing.Short() { - t.Skip("Skipping test in short mode.") - } + // Sleep so that the timer branch is activated before the exitChan one. + // TODO This is a race condition, will fix in follow-up changeset + time.Sleep(hitBranch) - mockChannel := newChannel("mockChannelFoo", defaultPartition) + logger.Debug("Closing haltChan to exit the infinite for-loop") + close(haltChan) // Identical to chain.Halt() + logger.Debug("haltChan closed") + <-done - mockBroker := sarama.NewMockBroker(t, 0) - defer func() { mockBroker.Close() }() + assert.NoError(t, err, "Expected the processMessagesToBlocks call to return without errors") + assert.Equal(t, uint64(1), counts[indexRecvPass], "Expected 1 message received and unmarshaled") + assert.Equal(t, uint64(1), counts[indexProcessRegularPass], "Expected 1 REGULAR message processed") + assert.Equal(t, uint64(1), counts[indexSendTimeToCutPass], "Expected 1 TIMER event processed") + assert.Equal(t, lastCutBlockNumber, bareMinimumChain.lastCutBlockNumber, "Expected lastCutBlockNumber to stay the same") + }) - metadataResponse := new(sarama.MetadataResponse) - metadataResponse.AddBroker(mockBroker.Addr(), mockBroker.BrokerID()) - metadataResponse.AddTopicPartition(mockChannel.topic(), mockChannel.partition(), mockBroker.BrokerID(), nil, nil, sarama.ErrNoError) - mockBroker.Returns(metadataResponse) + t.Run("ReceiveRegularAndSendTimeToCutError", func(t *testing.T) { + // Note that this test is affected by the following parameters: + // - Net.ReadTimeout + // - Consumer.Retry.Backoff + // - Metadata.Retry.Max - // Affected by Net.ReadTimeout, Consumer.Retry.Backoff, and Metadata.Retry.Max + subtestIndex++ - producer, err := sarama.NewSyncProducer([]string{mockBroker.Addr()}, mockBrokerConfig) - assert.NoError(t, err, "Expected no error when setting up the sarama SyncProducer") + t.Skip("Skipping test as it introduces a race condition") - failureResponse := new(sarama.ProduceResponse) - failureResponse.AddTopicPartition(mockChannel.topic(), mockChannel.partition(), sarama.ErrNotEnoughReplicas) - mockBroker.Returns(failureResponse) + // Exact same test as ReceiveRegularAndSendTimeToCut. + // Only difference is that the producer's attempt to send a TTC will + // fail with an ErrNotEnoughReplicas error. + failureResponse := new(sarama.ProduceResponse) + failureResponse.AddTopicPartition(mockChannel.topic(), mockChannel.partition(), sarama.ErrNotEnoughReplicas) + mockBroker.Returns(failureResponse) - newestOffset := int64(5) + errorChan := make(chan struct{}) + close(errorChan) + haltChan := make(chan struct{}) - mockParentConsumer := mocks.NewConsumer(t, nil) - mockParentConsumer.ExpectConsumePartition(mockChannel.topic(), mockChannel.partition(), newestOffset). - YieldMessage(newMockConsumerMessage(newRegularMessage(utils.MarshalOrPanic(newMockEnvelope("fooMessage"))))) // This is the wrappedMessage that the for loop will process - mockChannelConsumer, err := mockParentConsumer.ConsumePartition(mockChannel.topic(), mockChannel.partition(), newestOffset) - assert.NoError(t, err, "Expected no error when setting up the mock partition consumer") + lastCutBlockNumber := uint64(3) - lastCutBlockNumber := uint64(3) - lastCutBlockNumberEnd := lastCutBlockNumber + mockSupport := &mockmultichain.ConsenterSupport{ + Blocks: make(chan *cb.Block), // WriteBlock will post here + BlockCutterVal: mockblockcutter.NewReceiver(), + ChainIDVal: mockChannel.topic(), + HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call + SharedConfigVal: &mockconfig.Orderer{ + BatchTimeoutVal: extraShortTimeout, // ATTN + }, + } + defer close(mockSupport.BlockCutterVal.Block) - batchTimeout, _ := time.ParseDuration("1ms") + bareMinimumChain := &chainImpl{ + producer: producer, + parentConsumer: mockParentConsumer, + channelConsumer: mockChannelConsumer, - mockSupport := &mockmultichain.ConsenterSupport{ - Blocks: make(chan *cb.Block), // WriteBlock will post here - BlockCutterVal: mockblockcutter.NewReceiver(), - ChainIDVal: mockChannel.topic(), - HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call - SharedConfigVal: &mockconfig.Orderer{ - BatchTimeoutVal: batchTimeout, - KafkaBrokersVal: []string{mockBroker.Addr()}, - }, - } - defer close(mockSupport.BlockCutterVal.Block) + channel: mockChannel, + support: mockSupport, + lastCutBlockNumber: lastCutBlockNumber, - exitChan := make(chan struct{}) + errorChan: errorChan, + haltChan: haltChan, + } - go func() { // TODO Hacky, see comments below, revise approach - mockSupport.BlockCutterVal.Block <- struct{}{} // Let the `mockblockcutter.Ordered` call return (in `processRegular`) + var counts []uint64 + done := make(chan struct{}) + + go func() { + counts, err = bareMinimumChain.processMessagesToBlocks() + done <- struct{}{} + }() + + // This is the wrappedMessage that the for-loop will process + mpc.YieldMessage(newMockConsumerMessage(newRegularMessage(utils.MarshalOrPanic(newMockEnvelope("fooMessage"))))) + + mockSupport.BlockCutterVal.Block <- struct{}{} // Let the `mockblockcutter.Ordered` call return logger.Debugf("Mock blockcutter's Ordered call has returned") - time.Sleep(hitBranch) // This introduces a race: we're basically sleeping so as to let select hit the TIMER branch first before the EXITCHAN one - logger.Debug("Closing exitChan to exit the infinite for loop") - close(exitChan) // Identical to chain.Halt() - logger.Debug("exitChan closed") - }() - - haltedFlag := false - - counts, err := processMessagesToBlock(mockSupport, producer, mockParentConsumer, mockChannelConsumer, mockChannel, &lastCutBlockNumber, &haltedFlag, &exitChan) - assert.NoError(t, err, "Expected the processMessagesToBlock call to return without errors") - assert.Equal(t, uint64(1), counts[indexRecvPass], "Expected 1 message received and unmarshaled") - assert.Equal(t, uint64(1), counts[indexProcessRegularPass], "Expected 1 REGULAR message processed") - assert.Equal(t, uint64(1), counts[indexSendTimeToCutError], "Expected 1 faulty TIMER event processed") - assert.Equal(t, lastCutBlockNumberEnd, lastCutBlockNumber, "Expected lastCutBlockNumber to stay the same") -} -func TestProcessLoopTimeToCutFromReceivedMessageRegular(t *testing.T) { - mockChannel := newChannel("mockChannelFoo", defaultPartition) + // Sleep so that the timer branch is activated before the exitChan one. + // TODO This is a race condition, will fix in follow-up changeset + time.Sleep(hitBranch) - mockBroker := sarama.NewMockBroker(t, 0) - defer func() { mockBroker.Close() }() + logger.Debug("Closing haltChan to exit the infinite for-loop") + close(haltChan) // Identical to chain.Halt() + logger.Debug("haltChan closed") + <-done - metadataResponse := new(sarama.MetadataResponse) - metadataResponse.AddBroker(mockBroker.Addr(), mockBroker.BrokerID()) - metadataResponse.AddTopicPartition(mockChannel.topic(), mockChannel.partition(), mockBroker.BrokerID(), nil, nil, sarama.ErrNoError) - mockBroker.Returns(metadataResponse) + assert.NoError(t, err, "Expected the processMessagesToBlocks call to return without errors") + assert.Equal(t, uint64(1), counts[indexRecvPass], "Expected 1 message received and unmarshaled") + assert.Equal(t, uint64(1), counts[indexProcessRegularPass], "Expected 1 REGULAR message processed") + assert.Equal(t, uint64(1), counts[indexSendTimeToCutError], "Expected 1 faulty TIMER event processed") + assert.Equal(t, lastCutBlockNumber, bareMinimumChain.lastCutBlockNumber, "Expected lastCutBlockNumber to stay the same") + }) - producer, err := sarama.NewSyncProducer([]string{mockBroker.Addr()}, mockBrokerConfig) - assert.NoError(t, err, "Expected no error when setting up the sarama SyncProducer") + t.Run("ReceiveTimeToCutProper", func(t *testing.T) { + subtestIndex++ - newestOffset := int64(5) + errorChan := make(chan struct{}) + close(errorChan) + haltChan := make(chan struct{}) - lastCutBlockNumber := uint64(3) - lastCutBlockNumberEnd := lastCutBlockNumber + 1 + lastCutBlockNumber := uint64(3) - mockParentConsumer := mocks.NewConsumer(t, nil) - mockParentConsumer.ExpectConsumePartition(mockChannel.topic(), mockChannel.partition(), newestOffset). - YieldMessage(newMockConsumerMessage(newTimeToCutMessage(lastCutBlockNumber + 1))) // This is the wrappedMessage that the for loop will process - mockChannelConsumer, err := mockParentConsumer.ConsumePartition(mockChannel.topic(), mockChannel.partition(), newestOffset) - assert.NoError(t, err, "Expected no error when setting up the mock partition consumer") + mockSupport := &mockmultichain.ConsenterSupport{ + Blocks: make(chan *cb.Block), // WriteBlock will post here + BlockCutterVal: mockblockcutter.NewReceiver(), + ChainIDVal: mockChannel.topic(), + HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call + } + defer close(mockSupport.BlockCutterVal.Block) - mockSupport := &mockmultichain.ConsenterSupport{ - Blocks: make(chan *cb.Block), // WriteBlock will post here - BlockCutterVal: mockblockcutter.NewReceiver(), - ChainIDVal: mockChannel.topic(), - HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call - SharedConfigVal: &mockconfig.Orderer{ - KafkaBrokersVal: []string{mockBroker.Addr()}, - }, - } - defer close(mockSupport.BlockCutterVal.Block) - - // We need the mock blockcutter to deliver a non-empty batch - go func() { - mockSupport.BlockCutterVal.Block <- struct{}{} // Let the `mockblockcutter.Ordered` call below return - }() - mockSupport.BlockCutterVal.Ordered(newMockEnvelope("fooMessage")) - - exitChan := make(chan struct{}) - - go func() { // Note: Unlike the CONNECT test case, the following does NOT introduce a race condition, so we're good - <-mockSupport.Blocks // Let the `mockConsenterSupport.WriteBlock` proceed - logger.Debug("Closing exitChan to exit the infinite for loop") // We are guaranteed to hit the exitChan branch after hitting the REGULAR branch at least once - close(exitChan) // Identical to chain.Halt() - logger.Debug("exitChan closed") - }() - - haltedFlag := false - - counts, err := processMessagesToBlock(mockSupport, producer, mockParentConsumer, mockChannelConsumer, mockChannel, &lastCutBlockNumber, &haltedFlag, &exitChan) - assert.NoError(t, err, "Expected the processMessagesToBlock call to return without errors") - assert.Equal(t, uint64(1), counts[indexRecvPass], "Expected 1 message received and unmarshaled") - assert.Equal(t, uint64(1), counts[indexProcessTimeToCutPass], "Expected 1 TIMETOCUT message processed") - assert.Equal(t, lastCutBlockNumberEnd, lastCutBlockNumber, "Expected lastCutBlockNumber to be bumped up by one") -} + bareMinimumChain := &chainImpl{ + parentConsumer: mockParentConsumer, + channelConsumer: mockChannelConsumer, -func TestProcessLoopTimeToCutFromReceivedMessageZeroBatch(t *testing.T) { - mockBroker := sarama.NewMockBroker(t, 0) - defer func() { mockBroker.Close() }() + channel: mockChannel, + support: mockSupport, + lastCutBlockNumber: lastCutBlockNumber, - mockChannel := newChannel("mockChannelFoo", defaultPartition) + errorChan: errorChan, + haltChan: haltChan, + } - metadataResponse := new(sarama.MetadataResponse) - metadataResponse.AddBroker(mockBroker.Addr(), mockBroker.BrokerID()) - metadataResponse.AddTopicPartition(mockChannel.topic(), mockChannel.partition(), mockBroker.BrokerID(), nil, nil, sarama.ErrNoError) - mockBroker.Returns(metadataResponse) + // We need the mock blockcutter to deliver a non-empty batch + go func() { + mockSupport.BlockCutterVal.Block <- struct{}{} // Let the `mockblockcutter.Ordered` call below return + logger.Debugf("Mock blockcutter's Ordered call has returned") + }() + // We are "planting" a message directly to the mock blockcutter + mockSupport.BlockCutterVal.Ordered(newMockEnvelope("fooMessage")) - producer, err := sarama.NewSyncProducer([]string{mockBroker.Addr()}, mockBrokerConfig) - assert.NoError(t, err, "Expected no error when setting up the sarama SyncProducer") + var counts []uint64 + done := make(chan struct{}) - newestOffset := int64(5) + go func() { + counts, err = bareMinimumChain.processMessagesToBlocks() + done <- struct{}{} + }() - lastCutBlockNumber := uint64(3) - lastCutBlockNumberEnd := lastCutBlockNumber + // This is the wrappedMessage that the for-loop will process + mpc.YieldMessage(newMockConsumerMessage(newTimeToCutMessage(lastCutBlockNumber + 1))) - mockParentConsumer := mocks.NewConsumer(t, nil) - mockParentConsumer.ExpectConsumePartition(mockChannel.topic(), mockChannel.partition(), newestOffset). - YieldMessage(newMockConsumerMessage(newTimeToCutMessage(lastCutBlockNumber + 1))) // This is the wrappedMessage that the for loop will process - mockChannelConsumer, err := mockParentConsumer.ConsumePartition(mockChannel.topic(), mockChannel.partition(), newestOffset) - assert.NoError(t, err, "Expected no error when setting up the mock partition consumer") + <-mockSupport.Blocks // Let the `mockConsenterSupport.WriteBlock` proceed - mockSupport := &mockmultichain.ConsenterSupport{ - Blocks: make(chan *cb.Block), // WriteBlock will post here - BlockCutterVal: mockblockcutter.NewReceiver(), - ChainIDVal: mockChannel.topic(), - HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call - SharedConfigVal: &mockconfig.Orderer{ - KafkaBrokersVal: []string{mockBroker.Addr()}, - }, - } - defer close(mockSupport.BlockCutterVal.Block) + logger.Debug("Closing haltChan to exit the infinite for-loop") + close(haltChan) // Identical to chain.Halt() + logger.Debug("haltChan closed") + <-done - haltedFlag := false - exitChan := make(chan struct{}) + assert.NoError(t, err, "Expected the processMessagesToBlocks call to return without errors") + assert.Equal(t, uint64(1), counts[indexRecvPass], "Expected 1 message received and unmarshaled") + assert.Equal(t, uint64(1), counts[indexProcessTimeToCutPass], "Expected 1 TIMETOCUT message processed") + assert.Equal(t, lastCutBlockNumber+1, bareMinimumChain.lastCutBlockNumber, "Expected lastCutBlockNumber to be bumped up by one") + }) - counts, err := processMessagesToBlock(mockSupport, producer, mockParentConsumer, mockChannelConsumer, mockChannel, &lastCutBlockNumber, &haltedFlag, &exitChan) - assert.Error(t, err, "Expected the processMessagesToBlock call to return an error") - assert.Equal(t, uint64(1), counts[indexRecvPass], "Expected 1 message received and unmarshaled") - assert.Equal(t, uint64(1), counts[indexProcessTimeToCutError], "Expected 1 faulty TIMETOCUT message processed") - assert.Equal(t, lastCutBlockNumberEnd, lastCutBlockNumber, "Expected lastCutBlockNumber to stay the same") -} + t.Run("ReceiveTimeToCutZeroBatch", func(t *testing.T) { + subtestIndex++ -func TestProcessLoopTimeToCutFromReceivedMessageLargerThanExpected(t *testing.T) { - mockBroker := sarama.NewMockBroker(t, 0) - defer func() { mockBroker.Close() }() + errorChan := make(chan struct{}) + close(errorChan) + haltChan := make(chan struct{}) - mockChannel := newChannel("mockChannelFoo", defaultPartition) + lastCutBlockNumber := uint64(3) - metadataResponse := new(sarama.MetadataResponse) - metadataResponse.AddBroker(mockBroker.Addr(), mockBroker.BrokerID()) - metadataResponse.AddTopicPartition(mockChannel.topic(), mockChannel.partition(), mockBroker.BrokerID(), nil, nil, sarama.ErrNoError) - mockBroker.Returns(metadataResponse) + mockSupport := &mockmultichain.ConsenterSupport{ + Blocks: make(chan *cb.Block), // WriteBlock will post here + BlockCutterVal: mockblockcutter.NewReceiver(), + ChainIDVal: mockChannel.topic(), + HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call + } + defer close(mockSupport.BlockCutterVal.Block) - producer, err := sarama.NewSyncProducer([]string{mockBroker.Addr()}, mockBrokerConfig) - assert.NoError(t, err, "Expected no error when setting up the sarama SyncProducer") + bareMinimumChain := &chainImpl{ + parentConsumer: mockParentConsumer, + channelConsumer: mockChannelConsumer, - newestOffset := int64(5) + channel: mockChannel, + support: mockSupport, + lastCutBlockNumber: lastCutBlockNumber, - lastCutBlockNumber := uint64(3) - lastCutBlockNumberEnd := lastCutBlockNumber + errorChan: errorChan, + haltChan: haltChan, + } - mockParentConsumer := mocks.NewConsumer(t, nil) - mockParentConsumer.ExpectConsumePartition(mockChannel.topic(), mockChannel.partition(), newestOffset). - YieldMessage(newMockConsumerMessage(newTimeToCutMessage(lastCutBlockNumber + 2))) // This is the wrappedMessage that the for loop will process - mockChannelConsumer, err := mockParentConsumer.ConsumePartition(mockChannel.topic(), mockChannel.partition(), newestOffset) - assert.NoError(t, err, "Expected no error when setting up the mock partition consumer") + var counts []uint64 + done := make(chan struct{}) - mockSupport := &mockmultichain.ConsenterSupport{ - Blocks: make(chan *cb.Block), // WriteBlock will post here - BlockCutterVal: mockblockcutter.NewReceiver(), - ChainIDVal: mockChannel.topic(), - HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call - SharedConfigVal: &mockconfig.Orderer{ - KafkaBrokersVal: []string{mockBroker.Addr()}, - }, - } - defer close(mockSupport.BlockCutterVal.Block) + go func() { + counts, err = bareMinimumChain.processMessagesToBlocks() + done <- struct{}{} + }() - haltedFlag := false - exitChan := make(chan struct{}) + // This is the wrappedMessage that the for-loop will process + mpc.YieldMessage(newMockConsumerMessage(newTimeToCutMessage(lastCutBlockNumber + 1))) - counts, err := processMessagesToBlock(mockSupport, producer, mockParentConsumer, mockChannelConsumer, mockChannel, &lastCutBlockNumber, &haltedFlag, &exitChan) - assert.Error(t, err, "Expected the processMessagesToBlock call to return an error") - assert.Equal(t, uint64(1), counts[indexRecvPass], "Expected 1 message received and unmarshaled") - assert.Equal(t, uint64(1), counts[indexProcessTimeToCutError], "Expected 1 faulty TIMETOCUT message processed") - assert.Equal(t, lastCutBlockNumberEnd, lastCutBlockNumber, "Expected lastCutBlockNumber to stay the same") -} + logger.Debug("Closing haltChan to exit the infinite for-loop") + close(haltChan) // Identical to chain.Halt() + logger.Debug("haltChan closed") + <-done -func TestProcessLoopTimeToCutFromReceivedMessageStale(t *testing.T) { - mockBroker := sarama.NewMockBroker(t, 0) - defer func() { mockBroker.Close() }() + assert.Error(t, err, "Expected the processMessagesToBlocks call to return an error") + assert.Equal(t, uint64(1), counts[indexRecvPass], "Expected 1 message received and unmarshaled") + assert.Equal(t, uint64(1), counts[indexProcessTimeToCutError], "Expected 1 faulty TIMETOCUT message processed") + assert.Equal(t, lastCutBlockNumber, bareMinimumChain.lastCutBlockNumber, "Expected lastCutBlockNumber to stay the same") + }) - mockChannel := newChannel("mockChannelFoo", defaultPartition) + t.Run("ReceiveTimeToCutLargerThanExpected", func(t *testing.T) { + subtestIndex++ - metadataResponse := new(sarama.MetadataResponse) - metadataResponse.AddBroker(mockBroker.Addr(), mockBroker.BrokerID()) - metadataResponse.AddTopicPartition(mockChannel.topic(), mockChannel.partition(), mockBroker.BrokerID(), nil, nil, sarama.ErrNoError) - mockBroker.Returns(metadataResponse) + errorChan := make(chan struct{}) + close(errorChan) + haltChan := make(chan struct{}) - producer, err := sarama.NewSyncProducer([]string{mockBroker.Addr()}, mockBrokerConfig) - assert.NoError(t, err, "Expected no error when setting up the sarama SyncProducer") + lastCutBlockNumber := uint64(3) - mockBrokerConfigCopy := *mockBrokerConfig - mockBrokerConfigCopy.ChannelBufferSize = 0 + mockSupport := &mockmultichain.ConsenterSupport{ + Blocks: make(chan *cb.Block), // WriteBlock will post here + BlockCutterVal: mockblockcutter.NewReceiver(), + ChainIDVal: mockChannel.topic(), + HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call + } + defer close(mockSupport.BlockCutterVal.Block) - newestOffset := int64(5) + bareMinimumChain := &chainImpl{ + parentConsumer: mockParentConsumer, + channelConsumer: mockChannelConsumer, - mockParentConsumer := mocks.NewConsumer(t, &mockBrokerConfigCopy) - mpc := mockParentConsumer.ExpectConsumePartition(mockChannel.topic(), mockChannel.partition(), newestOffset) - mockChannelConsumer, err := mockParentConsumer.ConsumePartition(mockChannel.topic(), mockChannel.partition(), newestOffset) - assert.NoError(t, err, "Expected no error when setting up the mock partition consumer") + channel: mockChannel, + support: mockSupport, + lastCutBlockNumber: lastCutBlockNumber, - lastCutBlockNumber := uint64(3) - lastCutBlockNumberEnd := lastCutBlockNumber + errorChan: errorChan, + haltChan: haltChan, + } - mockSupport := &mockmultichain.ConsenterSupport{ - Blocks: make(chan *cb.Block), // WriteBlock will post here - BlockCutterVal: mockblockcutter.NewReceiver(), - ChainIDVal: mockChannel.topic(), - HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call - SharedConfigVal: &mockconfig.Orderer{ - KafkaBrokersVal: []string{mockBroker.Addr()}, - }, - } - defer close(mockSupport.BlockCutterVal.Block) + var counts []uint64 + done := make(chan struct{}) - haltedFlag := false - exitChan := make(chan struct{}) + go func() { + counts, err = bareMinimumChain.processMessagesToBlocks() + done <- struct{}{} + }() - var counts []uint64 - done := make(chan struct{}) + // This is the wrappedMessage that the for-loop will process + mpc.YieldMessage(newMockConsumerMessage(newTimeToCutMessage(lastCutBlockNumber + 2))) - go func() { - counts, err = processMessagesToBlock(mockSupport, producer, mockParentConsumer, mockChannelConsumer, mockChannel, &lastCutBlockNumber, &haltedFlag, &exitChan) - done <- struct{}{} - }() + logger.Debug("Closing haltChan to exit the infinite for-loop") + close(haltChan) // Identical to chain.Halt() + logger.Debug("haltChan closed") + <-done - // This is the wrappedMessage that the for-loop will process - mpc.YieldMessage(newMockConsumerMessage(newTimeToCutMessage(lastCutBlockNumber))) + assert.Error(t, err, "Expected the processMessagesToBlocks call to return an error") + assert.Equal(t, uint64(1), counts[indexRecvPass], "Expected 1 message received and unmarshaled") + assert.Equal(t, uint64(1), counts[indexProcessTimeToCutError], "Expected 1 faulty TIMETOCUT message processed") + assert.Equal(t, lastCutBlockNumber, bareMinimumChain.lastCutBlockNumber, "Expected lastCutBlockNumber to stay the same") + }) - logger.Debug("Closing exitChan to exit the infinite for loop") - close(exitChan) // Identical to chain.Halt() - logger.Debug("exitChan closed") - <-done + t.Run("ReceiveTimeToCutStale", func(t *testing.T) { + subtestIndex++ - assert.NoError(t, err, "Expected the processMessagesToBlock call to return without errors") - assert.Equal(t, uint64(1), counts[indexRecvPass], "Expected 1 message received and unmarshaled") - assert.Equal(t, uint64(1), counts[indexProcessTimeToCutPass], "Expected 1 TIMETOCUT message processed") - assert.Equal(t, lastCutBlockNumberEnd, lastCutBlockNumber, "Expected lastCutBlockNumber to stay the same") -} + errorChan := make(chan struct{}) + close(errorChan) + haltChan := make(chan struct{}) -func TestSendConnectMessage(t *testing.T) { - mockBroker := sarama.NewMockBroker(t, 0) - defer func() { mockBroker.Close() }() + lastCutBlockNumber := uint64(3) - mockChannel := newChannel("mockChannelFoo", defaultPartition) + mockSupport := &mockmultichain.ConsenterSupport{ + Blocks: make(chan *cb.Block), // WriteBlock will post here + BlockCutterVal: mockblockcutter.NewReceiver(), + ChainIDVal: mockChannel.topic(), + HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call + } + defer close(mockSupport.BlockCutterVal.Block) - metadataResponse := new(sarama.MetadataResponse) - metadataResponse.AddBroker(mockBroker.Addr(), mockBroker.BrokerID()) - metadataResponse.AddTopicPartition(mockChannel.topic(), mockChannel.partition(), mockBroker.BrokerID(), nil, nil, sarama.ErrNoError) - mockBroker.Returns(metadataResponse) + bareMinimumChain := &chainImpl{ + parentConsumer: mockParentConsumer, + channelConsumer: mockChannelConsumer, - producer, err := sarama.NewSyncProducer([]string{mockBroker.Addr()}, mockBrokerConfig) - assert.NoError(t, err, "Expected no error when setting up the sarama SyncProducer") - defer func() { producer.Close() }() + channel: mockChannel, + support: mockSupport, + lastCutBlockNumber: lastCutBlockNumber, - exitChan := make(chan struct{}) + errorChan: errorChan, + haltChan: haltChan, + } - t.Run("Proper", func(t *testing.T) { - successResponse := new(sarama.ProduceResponse) - successResponse.AddTopicPartition(mockChannel.topic(), mockChannel.partition(), sarama.ErrNoError) - mockBroker.Returns(successResponse) + var counts []uint64 + done := make(chan struct{}) - assert.NoError(t, sendConnectMessage(mockConsenter.retryOptions(), exitChan, producer, mockChannel), "Expected the sendConnectMessage call to return without errors") - }) + go func() { + counts, err = bareMinimumChain.processMessagesToBlocks() + done <- struct{}{} + }() - t.Run("WithError", func(t *testing.T) { - // Affected by Net.ReadTimeout, Consumer.Retry.Backoff, and Metadata.Retry.Max - failureResponse := new(sarama.ProduceResponse) - failureResponse.AddTopicPartition(mockChannel.topic(), mockChannel.partition(), sarama.ErrNotEnoughReplicas) - mockBroker.Returns(failureResponse) + // This is the wrappedMessage that the for-loop will process + mpc.YieldMessage(newMockConsumerMessage(newTimeToCutMessage(lastCutBlockNumber))) - assert.Error(t, sendConnectMessage(mockConsenter.retryOptions(), exitChan, producer, mockChannel), "Expected the sendConnectMessage call to return an error") + logger.Debug("Closing haltChan to exit the infinite for-loop") + close(haltChan) // Identical to chain.Halt() + logger.Debug("haltChan closed") + <-done + + assert.NoError(t, err, "Expected the processMessagesToBlocks call to return without errors") + assert.Equal(t, uint64(1), counts[indexRecvPass], "Expected 1 message received and unmarshaled") + assert.Equal(t, uint64(1), counts[indexProcessTimeToCutPass], "Expected 1 TIMETOCUT message processed") + assert.Equal(t, lastCutBlockNumber, bareMinimumChain.lastCutBlockNumber, "Expected lastCutBlockNumber to stay the same") }) -} -func TestSendTimeToCut(t *testing.T) { - mockBroker := sarama.NewMockBroker(t, 0) - defer func() { mockBroker.Close() }() + t.Run("ReceiveKafkaErrorAndCloseErrorChan", func(t *testing.T) { + subtestIndex++ - mockChannel := newChannel("mockChannelFoo", defaultPartition) + // If we set up the mock broker so that it returns a response, if the + // test finishes before the sendConnectMessage goroutine has received + // this response, we will get a failure ("not all expectations were + // satisfied") from the mock broker. So we sabotage the producer. + failedProducer, _ := sarama.NewSyncProducer([]string{}, mockBrokerConfig) - metadataResponse := new(sarama.MetadataResponse) - metadataResponse.AddBroker(mockBroker.Addr(), mockBroker.BrokerID()) - metadataResponse.AddTopicPartition(mockChannel.topic(), mockChannel.partition(), mockBroker.BrokerID(), nil, nil, sarama.ErrNoError) - mockBroker.Returns(metadataResponse) + // We need to have the sendConnectMessage goroutine die instantaneously, + // otherwise we'll get a nil pointer dereference panic. We are + // exploiting the admittedly hacky shortcut where a retriable process + // returns immediately when given the nil time.Duration value for its + // ticker. + zeroRetryConsenter := &consenterImpl{} - producer, err := sarama.NewSyncProducer([]string{mockBroker.Addr()}, mockBrokerConfig) - assert.NoError(t, err, "Expected no error when setting up the sarama SyncProducer") - defer func() { producer.Close() }() + // Let's assume an open errorChan, i.e. a healthy link between the + // consumer and the Kafka partition corresponding to the channel + errorChan := make(chan struct{}) - timeToCutBlockNumber := uint64(3) - var timer <-chan time.Time + haltChan := make(chan struct{}) - t.Run("Proper", func(t *testing.T) { - successResponse := new(sarama.ProduceResponse) - successResponse.AddTopicPartition(mockChannel.topic(), mockChannel.partition(), sarama.ErrNoError) - mockBroker.Returns(successResponse) + mockSupport := &mockmultichain.ConsenterSupport{ + ChainIDVal: mockChannel.topic(), + } - timer = time.After(longTimeout) + bareMinimumChain := &chainImpl{ + consenter: zeroRetryConsenter, // For sendConnectMessage + producer: failedProducer, // For sendConnectMessage + parentConsumer: mockParentConsumer, + channelConsumer: mockChannelConsumer, - assert.NoError(t, sendTimeToCut(producer, mockChannel, timeToCutBlockNumber, &timer), "Expected the sendTimeToCut call to return without errors") - assert.Nil(t, timer, "Expected the sendTimeToCut call to nil the timer") - }) + channel: mockChannel, + support: mockSupport, - t.Run("WithError", func(t *testing.T) { - // Affected by Net.ReadTimeout, Consumer.Retry.Backoff, and Metadata.Retry.Max - failureResponse := new(sarama.ProduceResponse) - failureResponse.AddTopicPartition(mockChannel.topic(), mockChannel.partition(), sarama.ErrNotEnoughReplicas) - mockBroker.Returns(failureResponse) + errorChan: errorChan, + haltChan: haltChan, + } - timer = time.After(longTimeout) + var counts []uint64 + done := make(chan struct{}) - assert.Error(t, sendTimeToCut(producer, mockChannel, timeToCutBlockNumber, &timer), "Expected the sendTimeToCut call to return an error") - assert.Nil(t, timer, "Expected the sendTimeToCut call to nil the timer") + go func() { + counts, err = bareMinimumChain.processMessagesToBlocks() + done <- struct{}{} + }() + + // This is what the for-loop will process + mpc.YieldError(fmt.Errorf("fooError")) + + logger.Debug("Closing haltChan to exit the infinite for-loop") + close(haltChan) // Identical to chain.Halt() + logger.Debug("haltChan closed") + <-done + + assert.NoError(t, err, "Expected the processMessagesToBlocks call to return without errors") + assert.Equal(t, uint64(1), counts[indexRecvError], "Expected 1 Kafka error received") + + select { + case <-bareMinimumChain.errorChan: + logger.Debug("errorChan is closed as it should be") + default: + t.Fatal("errorChan should have been closed") + } }) -} -func TestSetupConsumerForChannel(t *testing.T) { - mockBroker := sarama.NewMockBroker(t, 0) - defer func() { mockBroker.Close() }() + t.Run("ReceiveKafkaErrorAndThenReceiveRegularMessage", func(t *testing.T) { + subtestIndex++ - mockChannel := newChannel("channelFoo", defaultPartition) + t.Skip("Skipping test as it introduces a race condition") - oldestOffset := int64(0) - newestOffset := int64(5) + // If we set up the mock broker so that it returns a response, if the + // test finishes before the sendConnectMessage goroutine has received + // this response, we will get a failure ("not all expectations were + // satisfied") from the mock broker. So we sabotage the producer. + failedProducer, _ := sarama.NewSyncProducer([]string{}, mockBrokerConfig) - startFrom := int64(3) - message := sarama.StringEncoder("messageFoo") + // We need to have the sendConnectMessage goroutine die instantaneously, + // otherwise we'll get a nil pointer dereference panic. We are + // exploiting the admittedly hacky shortcut where a retriable process + // returns immediately when given the nil time.Duration value for its + // ticker. + zeroRetryConsenter := &consenterImpl{} - mockBroker.SetHandlerByMap(map[string]sarama.MockResponse{ - "MetadataRequest": sarama.NewMockMetadataResponse(t). - SetBroker(mockBroker.Addr(), mockBroker.BrokerID()). - SetLeader(mockChannel.topic(), mockChannel.partition(), mockBroker.BrokerID()), - "OffsetRequest": sarama.NewMockOffsetResponse(t). - SetOffset(mockChannel.topic(), mockChannel.partition(), sarama.OffsetOldest, oldestOffset). - SetOffset(mockChannel.topic(), mockChannel.partition(), sarama.OffsetNewest, newestOffset), - "FetchRequest": sarama.NewMockFetchResponse(t, 1). - SetMessage(mockChannel.topic(), mockChannel.partition(), startFrom, message), - }) + // If the errorChan is closed already, the kafkaErr branch shouldn't + // touch it + errorChan := make(chan struct{}) + close(errorChan) - exitChan := make(chan struct{}) + haltChan := make(chan struct{}) - t.Run("ProperParent", func(t *testing.T) { - parentConsumer, err := setupParentConsumerForChannel(mockConsenter.retryOptions(), exitChan, []string{mockBroker.Addr()}, mockBrokerConfig, mockChannel) - assert.NoError(t, err, "Expected the setupParentConsumerForChannel call to return without errors") - assert.NoError(t, parentConsumer.Close(), "Expected to close the parentConsumer without errors") - }) + mockSupport := &mockmultichain.ConsenterSupport{ + ChainIDVal: mockChannel.topic(), + } - t.Run("ProperChannel", func(t *testing.T) { - parentConsumer, _ := setupParentConsumerForChannel(mockConsenter.retryOptions(), exitChan, []string{mockBroker.Addr()}, mockBrokerConfig, mockChannel) - defer func() { parentConsumer.Close() }() - channelConsumer, err := setupChannelConsumerForChannel(mockConsenter.retryOptions(), exitChan, parentConsumer, mockChannel, newestOffset) - assert.NoError(t, err, "Expected the setupChannelConsumerForChannel call to return without errors") - assert.NoError(t, channelConsumer.Close(), "Expected to close the channelConsumer without errors") - }) + bareMinimumChain := &chainImpl{ + consenter: zeroRetryConsenter, // For sendConnectMessage + producer: failedProducer, // For sendConnectMessage + parentConsumer: mockParentConsumer, + channelConsumer: mockChannelConsumer, - t.Run("WithParentConsumerError", func(t *testing.T) { - // Provide an empty brokers list - _, err := setupParentConsumerForChannel(mockConsenter.retryOptions(), exitChan, []string{}, mockBrokerConfig, mockChannel) - assert.Error(t, err, "Expected the setupParentConsumerForChannel call to return an error") - }) + channel: mockChannel, + support: mockSupport, - t.Run("WithChannelConsumerError", func(t *testing.T) { - // Provide an out-of-range offset - parentConsumer, _ := setupParentConsumerForChannel(mockConsenter.retryOptions(), exitChan, []string{mockBroker.Addr()}, mockBrokerConfig, mockChannel) - _, err := setupChannelConsumerForChannel(mockConsenter.retryOptions(), exitChan, parentConsumer, mockChannel, newestOffset+1) - defer func() { parentConsumer.Close() }() - assert.Error(t, err, "Expected the setupChannelConsumerForChannel call to return an error") - }) -} + errorChan: errorChan, + haltChan: haltChan, + } -func TestSetupProducerForChannel(t *testing.T) { - if testing.Short() { - t.Skip("Skipping test in short mode") - } + var counts []uint64 + done := make(chan struct{}) - mockBroker := sarama.NewMockBroker(t, 0) - defer mockBroker.Close() + go func() { + counts, err = bareMinimumChain.processMessagesToBlocks() + done <- struct{}{} + }() - mockChannel := newChannel("channelFoo", defaultPartition) + // This is what the for-loop will process + mpc.YieldError(fmt.Errorf("foo")) - exitChan := make(chan struct{}) + // We tested this in ReceiveKafkaErrorAndCloseErrorChan, so this check + // is redundant in that regard. We use it however to ensure the + // kafkaErrBranch has been activated before proceeding with pushing the + // regular message. + select { + case <-bareMinimumChain.errorChan: + logger.Debug("errorChan is closed as it should be") + case <-time.After(shortTimeout): + t.Fatal("errorChan should have been closed by now") + } - t.Run("Proper", func(t *testing.T) { - metadataResponse := new(sarama.MetadataResponse) - metadataResponse.AddBroker(mockBroker.Addr(), mockBroker.BrokerID()) - metadataResponse.AddTopicPartition(mockChannel.topic(), mockChannel.partition(), mockBroker.BrokerID(), nil, nil, sarama.ErrNoError) - mockBroker.Returns(metadataResponse) + // This is the wrappedMessage that the for-loop will process. We use + // a broken regular message here on purpose since this is the shortest + // path and it allows us to test what we want. + mpc.YieldMessage(newMockConsumerMessage(newRegularMessage(tamperBytes(utils.MarshalOrPanic(newMockEnvelope("fooMessage")))))) - producer, err := setupProducerForChannel(mockConsenter.retryOptions(), exitChan, []string{mockBroker.Addr()}, mockBrokerConfig, mockChannel) - assert.NoError(t, err, "Expected the setupProducerForChannel call to return without errors") - assert.NoError(t, producer.Close(), "Expected to close the producer without errors") - }) + // Sleep so that the Messages/errorChan branch is activated. + // TODO Hacky approach, will need to revise eventually + time.Sleep(hitBranch) - t.Run("WithError", func(t *testing.T) { - _, err := setupProducerForChannel(mockConsenter.retryOptions(), exitChan, []string{}, mockBrokerConfig, mockChannel) - assert.Error(t, err, "Expected the setupProducerForChannel call to return an error") + // Check that the errorChan was recreated + select { + case <-bareMinimumChain.errorChan: + t.Fatal("errorChan should have been open") + default: + logger.Debug("errorChan is open as it should be") + } + + logger.Debug("Closing haltChan to exit the infinite for-loop") + close(haltChan) // Identical to chain.Halt() + logger.Debug("haltChan closed") + <-done }) } diff --git a/orderer/kafka/retry.go b/orderer/kafka/retry.go index ca11d27b180..5bfaa5d324d 100644 --- a/orderer/kafka/retry.go +++ b/orderer/kafka/retry.go @@ -44,6 +44,14 @@ func (rp *retryProcess) retry() error { } func (rp *retryProcess) try(interval, total time.Duration) error { + // Configuration validation will not allow non-positive ticker values + // (which would result in panic). The path below is for those test cases + // when we cannot avoid the creation of a retriable process but we wish + // to terminate it right away. + if rp.shortPollingInterval == 0*time.Second { + return fmt.Errorf("illegal value") + } + var err = fmt.Errorf("process has not been executed yet") tickInterval := time.NewTicker(interval)