Skip to content

Commit

Permalink
[FAB-5527] Failures in orderer/consensus/kafka
Browse files Browse the repository at this point in the history
- cancel retry on Halt()
- Halt() now waits for Start() to complete
- cleanup channel resources after EnqueueError test

Change-Id: Icec73a6eec9cc2fd1e9feb0aad49e82ce23ec589
Signed-off-by: Luis Sanchez <sanchezl@us.ibm.com>
  • Loading branch information
Luis Sanchez committed Jul 31, 2017
1 parent d788450 commit 61f9368
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 11 deletions.
36 changes: 25 additions & 11 deletions orderer/consensus/kafka/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,24 @@ func (chain *chainImpl) Start() {
// consensus.Chain interface.
func (chain *chainImpl) Halt() {
select {
case <-chain.haltChan:
// This construct is useful because it allows Halt() to be called
// multiple times (by a single thread) w/o panicking. Recal that a
// receive from a closed channel returns (the zero value) immediately.
logger.Warningf("[channel: %s] Halting of chain requested again", chain.support.ChainID())
case <-chain.startChan:
// chain finished starting, so we can halt it
select {
case <-chain.haltChan:
// This construct is useful because it allows Halt() to be called
// multiple times (by a single thread) w/o panicking. Recal that a
// receive from a closed channel returns (the zero value) immediately.
logger.Warningf("[channel: %s] Halting of chain requested again", chain.support.ChainID())
default:
logger.Criticalf("[channel: %s] Halting of chain requested", chain.support.ChainID())
close(chain.haltChan)
chain.closeKafkaObjects() // Also close the producer and the consumer
logger.Debugf("[channel: %s] Closed the haltChan", chain.support.ChainID())
}
default:
logger.Criticalf("[channel: %s] Halting of chain requested", chain.support.ChainID())
close(chain.haltChan)
chain.closeKafkaObjects() // Also close the producer and the consumer
logger.Debugf("[channel: %s] Closed the haltChan", chain.support.ChainID())
logger.Warningf("[channel: %s] Waiting for chain to finish starting before halting", chain.support.ChainID())
<-chain.startChan
chain.Halt()
}
}

Expand Down Expand Up @@ -476,8 +484,14 @@ func sendConnectMessage(retryOptions localconfig.Retry, exitChan chan struct{},

retryMsg := "Attempting to post the CONNECT message..."
postConnect := newRetryProcess(retryOptions, exitChan, channel, retryMsg, func() error {
_, _, err := producer.SendMessage(message)
return err
select {
case <-exitChan:
logger.Debugf("[channel: %s] Consenter for channel exiting, aborting retry", channel)
return nil
default:
_, _, err := producer.SendMessage(message)
return err
}
})

return postConnect.retry()
Expand Down
1 change: 1 addition & 0 deletions orderer/consensus/kafka/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ func TestChain(t *testing.T) {
case <-time.After(shortTimeout):
t.Fatal("startChan should have been closed by now")
}
defer chain.Halt()

// Now make it so that the next ProduceRequest is met with an error
mockBroker.SetHandlerByMap(map[string]sarama.MockResponse{
Expand Down

0 comments on commit 61f9368

Please sign in to comment.