From f98f7c4e1ce5aec63684e836abd15213e6f250d1 Mon Sep 17 00:00:00 2001 From: Artem Barger Date: Sun, 4 Nov 2018 16:11:22 +0200 Subject: [PATCH] [FAB-12576] failover while handling tx type B Raft cluster reconfigiration consists of two parts, first leader has to consent on configuration block, next leader has to extract new cluster configuration and propose raft configuration changes. However leader might fail between first and second parts, therefore newly selected leader should be able to detect there is unfinished reconfiguration and to finish reconfiguration. This commit adds logic to manage leadership failover, where new leader checks whenever last committed block is configuration block, whenever there are pending configuration changes and complete reconfiguration by proposing raft clust configuration changes. Change-Id: I05dc1f60c9ab692521887b50f726d96ea47878dc Signed-off-by: Artem Barger --- orderer/consensus/etcdraft/chain.go | 166 ++++++++++++++--------- orderer/consensus/etcdraft/chain_test.go | 94 +++++++++++++ 2 files changed, 196 insertions(+), 64 deletions(-) diff --git a/orderer/consensus/etcdraft/chain.go b/orderer/consensus/etcdraft/chain.go index ff638d549d0..6ddeda89ccd 100644 --- a/orderer/consensus/etcdraft/chain.go +++ b/orderer/consensus/etcdraft/chain.go @@ -111,8 +111,8 @@ type Chain struct { startC chan struct{} // Closes when the node is started snapC chan *raftpb.Snapshot // Signal to catch up with snapshot - configChangeApplyC chan struct{} // Notifies that a Raft configuration change has been applied - configChangeInProgress uint32 // Flag to indicate node waiting for Raft config change to be applied + configChangeAppliedC chan struct{} // Notifies that a Raft configuration change has been applied + configChangeInProgress bool // Flag to indicate node waiting for Raft config change to be applied raftMetadataLock sync.RWMutex clock clock.Clock // Tests can inject a fake clock @@ -171,29 +171,29 @@ func NewChain( } return &Chain{ - configurator: conf, - rpc: rpc, - channelID: support.ChainID(), - raftID: opts.RaftID, - submitC: make(chan *orderer.SubmitRequest), - commitC: make(chan block), - haltC: make(chan struct{}), - doneC: make(chan struct{}), - resignC: make(chan struct{}), - startC: make(chan struct{}), - syncC: make(chan struct{}), - snapC: make(chan *raftpb.Snapshot), - configChangeApplyC: make(chan struct{}), - observeC: observeC, - support: support, - fresh: fresh, - appliedIndex: appliedi, - lastSnapBlockNum: snapBlkNum, - puller: puller, - clock: opts.Clock, - logger: lg, - storage: storage, - opts: opts, + configurator: conf, + rpc: rpc, + channelID: support.ChainID(), + raftID: opts.RaftID, + submitC: make(chan *orderer.SubmitRequest), + commitC: make(chan block), + haltC: make(chan struct{}), + doneC: make(chan struct{}), + resignC: make(chan struct{}), + startC: make(chan struct{}), + syncC: make(chan struct{}), + snapC: make(chan *raftpb.Snapshot), + configChangeAppliedC: make(chan struct{}), + observeC: observeC, + support: support, + fresh: fresh, + appliedIndex: appliedi, + lastSnapBlockNum: snapBlkNum, + puller: puller, + clock: opts.Clock, + logger: lg, + storage: storage, + opts: opts, }, nil } @@ -632,6 +632,13 @@ func (c *Chain) serveRaft() { c.resignC <- struct{}{} } + // becoming a leader and configuration change is in progress + if newLead == c.raftID && c.configChangeInProgress { + // need to read recent config updates of replica set + // and finish reconfiguration + c.handleReconfigurationFailover() + } + // notify external observer select { case c.observeC <- newLead: @@ -688,7 +695,7 @@ func (c *Chain) apply(ents []raftpb.Entry) { if isConfigMembershipUpdate { // set flag config change is progress only if config block // and has updates for raft replica set - atomic.StoreUint32(&c.configChangeInProgress, uint32(1)) + c.configChangeInProgress = true } c.commitC <- block{b, ents[i].Index} @@ -705,12 +712,11 @@ func (c *Chain) apply(ents []raftpb.Entry) { c.confState = *c.node.ApplyConfChange(cc) - // assert that configuration changes result of the processing - // of configuration block of type B - isConfChangeInProgress := atomic.LoadUint32(&c.configChangeInProgress) - if isConfChangeInProgress == 1 { + if c.configChangeInProgress { // signal that config changes has been applied - c.configChangeApplyC <- struct{}{} + c.configChangeAppliedC <- struct{}{} + // set flag back + c.configChangeInProgress = false } } @@ -836,52 +842,39 @@ func (c *Chain) updateMembership(metadata *etcdraft.RaftMetadata, change *raftpb lead := atomic.LoadUint64(&c.leader) // leader to propose configuration change if lead == c.raftID { + // ProposeConfChange returns error only if node being stopped. if err := c.node.ProposeConfChange(context.TODO(), *change); err != nil { - return errors.Errorf("failed to propose configuration update to Raft node: %s", err) + c.logger.Warnf("Failed to propose configuration update to Raft node: %s", err) + return nil } } var err error - select { - case <-c.configChangeApplyC: - // update metadata once we have block committed - c.raftMetadataLock.Lock() - c.opts.RaftMetadata = metadata - c.raftMetadataLock.Unlock() - - // new we need to reconfigure the communication layer with new updates - err = c.configureComm() - case <-c.resignC: - // leadership has changed, new leader will have to take care - // of reading last config block re-propose config update - c.logger.Debug("Raft cluster leader has changed, new leader should re-propose config change based on last config block") - case <-c.doneC: - c.logger.Debug("shutting down node, aborting config change update") + for { + select { + case <-c.configChangeAppliedC: // Raft configuration changes of the raft cluster has been applied + // update metadata once we have block committed + c.raftMetadataLock.Lock() + c.opts.RaftMetadata = metadata + c.raftMetadataLock.Unlock() + + // now we need to reconfigure the communication layer with new updates + return c.configureComm() + case <-c.resignC: + c.logger.Debug("Raft cluster leader has changed, new leader should re-propose Raft config change based on last config block") + case <-c.doneC: + c.logger.Debug("Shutting down node, aborting config change update") + return err + } } - - // set flag back - atomic.StoreUint32(&c.configChangeInProgress, uint32(0)) - return err } // writeConfigBlock writes configuration blocks into the ledger in // addition extracts updates about raft replica set and if there // are changes updates cluster membership as well func (c *Chain) writeConfigBlock(b block) error { - metadata, err := ConsensusMetadataFromConfigBlock(b.b) - if err != nil { - c.logger.Panicf("error reading consensus metadata, because of %s", err) - } - - c.raftMetadataLock.RLock() - raftMetadata := proto.Clone(c.opts.RaftMetadata).(*etcdraft.RaftMetadata) - // proto.Clone doesn't copy an empty map, hence need to initialize it after - // cloning - if raftMetadata.Consenters == nil { - raftMetadata.Consenters = map[uint64]*etcdraft.Consenter{} - } - c.raftMetadataLock.RUnlock() + metadata, raftMetadata := c.newRaftMetadata(b.b) var changes *MembershipChanges if metadata != nil { @@ -901,3 +894,48 @@ func (c *Chain) writeConfigBlock(b block) error { } return nil } + +// handleReconfigurationFailover read last configuration block and proposes +// new raft configuration +func (c *Chain) handleReconfigurationFailover() { + b := c.support.Block(c.support.Height() - 1) + if b == nil { + c.logger.Panic("nil block, failed to read last written block") + } + if !utils.IsConfigBlock(b) { + // a node (leader or follower) leaving updateMembership in context of serverReq go routine, + // *iff* configuration entry has appeared and successfully applied. + // while it's blocked in updateMembership, it cannot commit any other block, + // therefore we guarantee the last block is config block + c.logger.Panic("while handling reconfiguration failover last expected block should be configuration") + } + + metadata, raftMetadata := c.newRaftMetadata(b) + + var changes *MembershipChanges + if metadata != nil { + changes = ComputeMembershipChanges(raftMetadata.Consenters, metadata.Consenters) + } + + confChange := changes.UpdateRaftMetadataAndConfChange(raftMetadata) + if err := c.node.ProposeConfChange(context.TODO(), *confChange); err != nil { + c.logger.Warnf("failed to propose configuration update to Raft node: %s", err) + } +} + +// newRaftMetadata extract raft metadata from the configuration block +func (c *Chain) newRaftMetadata(block *common.Block) (*etcdraft.Metadata, *etcdraft.RaftMetadata) { + metadata, err := ConsensusMetadataFromConfigBlock(block) + if err != nil { + c.logger.Panicf("error reading consensus metadata: %s", err) + } + c.raftMetadataLock.RLock() + raftMetadata := proto.Clone(c.opts.RaftMetadata).(*etcdraft.RaftMetadata) + // proto.Clone doesn't copy an empty map, hence need to initialize it after + // cloning + if raftMetadata.Consenters == nil { + raftMetadata.Consenters = map[uint64]*etcdraft.Consenter{} + } + c.raftMetadataLock.RUnlock() + return metadata, raftMetadata +} diff --git a/orderer/consensus/etcdraft/chain_test.go b/orderer/consensus/etcdraft/chain_test.go index bc2bba39170..47ec74c12ce 100644 --- a/orderer/consensus/etcdraft/chain_test.go +++ b/orderer/consensus/etcdraft/chain_test.go @@ -19,6 +19,7 @@ import ( "code.cloudfoundry.org/clock/fakeclock" "github.com/coreos/etcd/raft" "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric/bccsp/factory" "github.com/hyperledger/fabric/common/crypto/tlsgen" "github.com/hyperledger/fabric/common/flogging" mockconfig "github.com/hyperledger/fabric/common/mocks/config" @@ -44,6 +45,10 @@ const ( HEARTBEAT_TICK = 1 ) +func init() { + factory.InitFactories(nil) +} + // for some test cases we chmod file/dir to test failures caused by exotic permissions. // however this does not work if tests are running as root, i.e. in a container. func skipIfRoot() { @@ -1537,6 +1542,93 @@ var _ = Describe("Chain", func() { Eventually(c.support.WriteBlockCallCount, defaultTimeout).Should(Equal(3)) }) }) + + It("stop leader and continue reconfiguration failing over to new leader", func() { + // Scenario: Starting replica set of 3 Raft nodes, electing node c1 to be a leader + // configure chain support mock to disconnect c1 right after it writes configuration block + // into the ledger, this to simulate failover. + // Next boostraping a new node c4 to join a cluster and creating config transaction, submitting + // it to the leader. Once leader writes configuration block it fails and leadership transferred to + // c2. + // Test asserts that new node c4, will join the cluster and c2 will handle failover of + // re-configuration. Later we connecting c1 back and making sure it capable of catching up with + // new configuration and successfully rejoins replica set. + + c4 := newChain(timeout, channelID, dataDir, 4, &raftprotos.RaftMetadata{ + Consenters: map[uint64]*raftprotos.Consenter{}, + }) + c4.init() + + By("adding new node to the network") + Expect(c4.support.WriteBlockCallCount()).Should(Equal(0)) + Expect(c4.support.WriteConfigBlockCallCount()).Should(Equal(0)) + + configEnv := newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, addConsenterConfigValue())) + c1.cutter.CutNext = true + configBlock := &common.Block{ + Header: &common.BlockHeader{}, + Data: &common.BlockData{Data: [][]byte{marshalOrPanic(configEnv)}}} + + c1.support.CreateNextBlockReturns(configBlock) + + c1.support.WriteConfigBlockStub = func(_ *common.Block, _ []byte) { + // disconnect leader after block being committed + network.disconnect(1) + // electing new leader + network.elect(2) + } + + // mock Block method to return recent configuration block + c2.support.BlockReturns(configBlock) + + By("sending config transaction") + err := c1.Configure(configEnv, 0) + Expect(err).ToNot(HaveOccurred()) + + // every node has written config block to the OSN ledger + network.exec( + func(c *chain) { + Eventually(c.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1)) + }) + + network.addChain(c4) + c4.Start() + // ConfChange is applied to etcd/raft asynchronously, meaning node 4 is not added + // to leader's node list right away. An immediate tick does not trigger a heartbeat + // being sent to node 4. Therefore, we repeatedly tick the leader until node 4 joins + // the cluster successfully. + Eventually(func() <-chan uint64 { + c2.clock.Increment(interval) + return c4.observe + }, LongEventualTimeout).Should(Receive(Equal(uint64(2)))) + + Eventually(c4.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1)) + Eventually(c4.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1)) + + By("submitting new transaction to follower") + c2.cutter.CutNext = true + c2.support.CreateNextBlockReturns(normalBlock) + err = c4.Order(env, 0) + Expect(err).ToNot(HaveOccurred()) + + c2.clock.Increment(interval) + + // rest nodes are alive include a newly added, hence should write 2 blocks + Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) + Eventually(c3.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) + Eventually(c4.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) + + // node 1 has been stopped should not write any block + Consistently(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1)) + + network.connect(1) + + c2.clock.Increment(interval) + // check that former leader didn't get stuck and actually got resign signal, + // and once connected capable of communicating with rest of the replicas set + Eventually(c1.observe, LongEventualTimeout).Should(Receive(Equal(uint64(2)))) + Eventually(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2)) + }) }) }) @@ -2226,7 +2318,9 @@ func (n *network) elect(id uint64) (tick int) { // tick so it could take effect. t := 1000 * time.Millisecond + n.connLock.RLock() c := n.chains[id] + n.connLock.RUnlock() var elected bool for !elected {