diff --git a/orderer/consensus/etcdraft/chain.go b/orderer/consensus/etcdraft/chain.go index ff638d549d0..6ddeda89ccd 100644 --- a/orderer/consensus/etcdraft/chain.go +++ b/orderer/consensus/etcdraft/chain.go @@ -111,8 +111,8 @@ type Chain struct { startC chan struct{} // Closes when the node is started snapC chan *raftpb.Snapshot // Signal to catch up with snapshot - configChangeApplyC chan struct{} // Notifies that a Raft configuration change has been applied - configChangeInProgress uint32 // Flag to indicate node waiting for Raft config change to be applied + configChangeAppliedC chan struct{} // Notifies that a Raft configuration change has been applied + configChangeInProgress bool // Flag to indicate node waiting for Raft config change to be applied raftMetadataLock sync.RWMutex clock clock.Clock // Tests can inject a fake clock @@ -171,29 +171,29 @@ func NewChain( } return &Chain{ - configurator: conf, - rpc: rpc, - channelID: support.ChainID(), - raftID: opts.RaftID, - submitC: make(chan *orderer.SubmitRequest), - commitC: make(chan block), - haltC: make(chan struct{}), - doneC: make(chan struct{}), - resignC: make(chan struct{}), - startC: make(chan struct{}), - syncC: make(chan struct{}), - snapC: make(chan *raftpb.Snapshot), - configChangeApplyC: make(chan struct{}), - observeC: observeC, - support: support, - fresh: fresh, - appliedIndex: appliedi, - lastSnapBlockNum: snapBlkNum, - puller: puller, - clock: opts.Clock, - logger: lg, - storage: storage, - opts: opts, + configurator: conf, + rpc: rpc, + channelID: support.ChainID(), + raftID: opts.RaftID, + submitC: make(chan *orderer.SubmitRequest), + commitC: make(chan block), + haltC: make(chan struct{}), + doneC: make(chan struct{}), + resignC: make(chan struct{}), + startC: make(chan struct{}), + syncC: make(chan struct{}), + snapC: make(chan *raftpb.Snapshot), + configChangeAppliedC: make(chan struct{}), + observeC: observeC, + support: support, + fresh: fresh, + appliedIndex: appliedi, + lastSnapBlockNum: snapBlkNum, + puller: puller, + clock: opts.Clock, + logger: lg, + storage: storage, + opts: opts, }, nil } @@ -632,6 +632,13 @@ func (c *Chain) serveRaft() { c.resignC <- struct{}{} } + // becoming a leader and configuration change is in progress + if newLead == c.raftID && c.configChangeInProgress { + // need to read recent config updates of replica set + // and finish reconfiguration + c.handleReconfigurationFailover() + } + // notify external observer select { case c.observeC <- newLead: @@ -688,7 +695,7 @@ func (c *Chain) apply(ents []raftpb.Entry) { if isConfigMembershipUpdate { // set flag config change is progress only if config block // and has updates for raft replica set - atomic.StoreUint32(&c.configChangeInProgress, uint32(1)) + c.configChangeInProgress = true } c.commitC <- block{b, ents[i].Index} @@ -705,12 +712,11 @@ func (c *Chain) apply(ents []raftpb.Entry) { c.confState = *c.node.ApplyConfChange(cc) - // assert that configuration changes result of the processing - // of configuration block of type B - isConfChangeInProgress := atomic.LoadUint32(&c.configChangeInProgress) - if isConfChangeInProgress == 1 { + if c.configChangeInProgress { // signal that config changes has been applied - c.configChangeApplyC <- struct{}{} + c.configChangeAppliedC <- struct{}{} + // set flag back + c.configChangeInProgress = false } } @@ -836,52 +842,39 @@ func (c *Chain) updateMembership(metadata *etcdraft.RaftMetadata, change *raftpb lead := atomic.LoadUint64(&c.leader) // leader to propose configuration change if lead == c.raftID { + // ProposeConfChange returns error only if node being stopped. if err := c.node.ProposeConfChange(context.TODO(), *change); err != nil { - return errors.Errorf("failed to propose configuration update to Raft node: %s", err) + c.logger.Warnf("Failed to propose configuration update to Raft node: %s", err) + return nil } } var err error - select { - case <-c.configChangeApplyC: - // update metadata once we have block committed - c.raftMetadataLock.Lock() - c.opts.RaftMetadata = metadata - c.raftMetadataLock.Unlock() - - // new we need to reconfigure the communication layer with new updates - err = c.configureComm() - case <-c.resignC: - // leadership has changed, new leader will have to take care - // of reading last config block re-propose config update - c.logger.Debug("Raft cluster leader has changed, new leader should re-propose config change based on last config block") - case <-c.doneC: - c.logger.Debug("shutting down node, aborting config change update") + for { + select { + case <-c.configChangeAppliedC: // Raft configuration changes of the raft cluster has been applied + // update metadata once we have block committed + c.raftMetadataLock.Lock() + c.opts.RaftMetadata = metadata + c.raftMetadataLock.Unlock() + + // now we need to reconfigure the communication layer with new updates + return c.configureComm() + case <-c.resignC: + c.logger.Debug("Raft cluster leader has changed, new leader should re-propose Raft config change based on last config block") + case <-c.doneC: + c.logger.Debug("Shutting down node, aborting config change update") + return err + } } - - // set flag back - atomic.StoreUint32(&c.configChangeInProgress, uint32(0)) - return err } // writeConfigBlock writes configuration blocks into the ledger in // addition extracts updates about raft replica set and if there // are changes updates cluster membership as well func (c *Chain) writeConfigBlock(b block) error { - metadata, err := ConsensusMetadataFromConfigBlock(b.b) - if err != nil { - c.logger.Panicf("error reading consensus metadata, because of %s", err) - } - - c.raftMetadataLock.RLock() - raftMetadata := proto.Clone(c.opts.RaftMetadata).(*etcdraft.RaftMetadata) - // proto.Clone doesn't copy an empty map, hence need to initialize it after - // cloning - if raftMetadata.Consenters == nil { - raftMetadata.Consenters = map[uint64]*etcdraft.Consenter{} - } - c.raftMetadataLock.RUnlock() + metadata, raftMetadata := c.newRaftMetadata(b.b) var changes *MembershipChanges if metadata != nil { @@ -901,3 +894,48 @@ func (c *Chain) writeConfigBlock(b block) error { } return nil } + +// handleReconfigurationFailover read last configuration block and proposes +// new raft configuration +func (c *Chain) handleReconfigurationFailover() { + b := c.support.Block(c.support.Height() - 1) + if b == nil { + c.logger.Panic("nil block, failed to read last written block") + } + if !utils.IsConfigBlock(b) { + // a node (leader or follower) leaving updateMembership in context of serverReq go routine, + // *iff* configuration entry has appeared and successfully applied. + // while it's blocked in updateMembership, it cannot commit any other block, + // therefore we guarantee the last block is config block + c.logger.Panic("while handling reconfiguration failover last expected block should be configuration") + } + + metadata, raftMetadata := c.newRaftMetadata(b) + + var changes *MembershipChanges + if metadata != nil { + changes = ComputeMembershipChanges(raftMetadata.Consenters, metadata.Consenters) + } + + confChange := changes.UpdateRaftMetadataAndConfChange(raftMetadata) + if err := c.node.ProposeConfChange(context.TODO(), *confChange); err != nil { + c.logger.Warnf("failed to propose configuration update to Raft node: %s", err) + } +} + +// newRaftMetadata extract raft metadata from the configuration block +func (c *Chain) newRaftMetadata(block *common.Block) (*etcdraft.Metadata, *etcdraft.RaftMetadata) { + metadata, err := ConsensusMetadataFromConfigBlock(block) + if err != nil { + c.logger.Panicf("error reading consensus metadata: %s", err) + } + c.raftMetadataLock.RLock() + raftMetadata := proto.Clone(c.opts.RaftMetadata).(*etcdraft.RaftMetadata) + // proto.Clone doesn't copy an empty map, hence need to initialize it after + // cloning + if raftMetadata.Consenters == nil { + raftMetadata.Consenters = map[uint64]*etcdraft.Consenter{} + } + c.raftMetadataLock.RUnlock() + return metadata, raftMetadata +} diff --git a/orderer/consensus/etcdraft/chain_test.go b/orderer/consensus/etcdraft/chain_test.go index bc2bba39170..47ec74c12ce 100644 --- a/orderer/consensus/etcdraft/chain_test.go +++ b/orderer/consensus/etcdraft/chain_test.go @@ -19,6 +19,7 @@ import ( "code.cloudfoundry.org/clock/fakeclock" "github.com/coreos/etcd/raft" "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric/bccsp/factory" "github.com/hyperledger/fabric/common/crypto/tlsgen" "github.com/hyperledger/fabric/common/flogging" mockconfig "github.com/hyperledger/fabric/common/mocks/config" @@ -44,6 +45,10 @@ const ( HEARTBEAT_TICK = 1 ) +func init() { + factory.InitFactories(nil) +} + // for some test cases we chmod file/dir to test failures caused by exotic permissions. // however this does not work if tests are running as root, i.e. in a container. func skipIfRoot() { @@ -1537,6 +1542,93 @@ var _ = Describe("Chain", func() { Eventually(c.support.WriteBlockCallCount, defaultTimeout).Should(Equal(3)) }) }) + + It("stop leader and continue reconfiguration failing over to new leader", func() { + // Scenario: Starting replica set of 3 Raft nodes, electing node c1 to be a leader + // configure chain support mock to disconnect c1 right after it writes configuration block + // into the ledger, this to simulate failover. + // Next boostraping a new node c4 to join a cluster and creating config transaction, submitting + // it to the leader. Once leader writes configuration block it fails and leadership transferred to + // c2. + // Test asserts that new node c4, will join the cluster and c2 will handle failover of + // re-configuration. Later we connecting c1 back and making sure it capable of catching up with + // new configuration and successfully rejoins replica set. + + c4 := newChain(timeout, channelID, dataDir, 4, &raftprotos.RaftMetadata{ + Consenters: map[uint64]*raftprotos.Consenter{}, + }) + c4.init() + + By("adding new node to the network") + Expect(c4.support.WriteBlockCallCount()).Should(Equal(0)) + Expect(c4.support.WriteConfigBlockCallCount()).Should(Equal(0)) + + configEnv := newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, addConsenterConfigValue())) + c1.cutter.CutNext = true + configBlock := &common.Block{ + Header: &common.BlockHeader{}, + Data: &common.BlockData{Data: [][]byte{marshalOrPanic(configEnv)}}} + + c1.support.CreateNextBlockReturns(configBlock) + + c1.support.WriteConfigBlockStub = func(_ *common.Block, _ []byte) { + // disconnect leader after block being committed + network.disconnect(1) + // electing new leader + network.elect(2) + } + + // mock Block method to return recent configuration block + c2.support.BlockReturns(configBlock) + + By("sending config transaction") + err := c1.Configure(configEnv, 0) + Expect(err).ToNot(HaveOccurred()) + + // every node has written config block to the OSN ledger + network.exec( + func(c *chain) { + Eventually(c.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1)) + }) + + network.addChain(c4) + c4.Start() + // ConfChange is applied to etcd/raft asynchronously, meaning node 4 is not added + // to leader's node list right away. An immediate tick does not trigger a heartbeat + // being sent to node 4. Therefore, we repeatedly tick the leader until node 4 joins + // the cluster successfully. + Eventually(func() <-chan uint64 { + c2.clock.Increment(interval) + return c4.observe + }, LongEventualTimeout).Should(Receive(Equal(uint64(2)))) + + Eventually(c4.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1)) + Eventually(c4.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1)) + + By("submitting new transaction to follower") + c2.cutter.CutNext = true + c2.support.CreateNextBlockReturns(normalBlock) + err = c4.Order(env, 0) + Expect(err).ToNot(HaveOccurred()) + + c2.clock.Increment(interval) + + // rest nodes are alive include a newly added, hence should write 2 blocks + Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) + Eventually(c3.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) + Eventually(c4.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) + + // node 1 has been stopped should not write any block + Consistently(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1)) + + network.connect(1) + + c2.clock.Increment(interval) + // check that former leader didn't get stuck and actually got resign signal, + // and once connected capable of communicating with rest of the replicas set + Eventually(c1.observe, LongEventualTimeout).Should(Receive(Equal(uint64(2)))) + Eventually(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) + }) }) }) @@ -2226,7 +2318,9 @@ func (n *network) elect(id uint64) (tick int) { // tick so it could take effect. t := 1000 * time.Millisecond + n.connLock.RLock() c := n.chains[id] + n.connLock.RUnlock() var elected bool for !elected {