Skip to content

Commit

Permalink
[FAB-12576] failover while handling tx type B
Browse files Browse the repository at this point in the history
Raft cluster reconfigiration consists of two parts, first leader has to
consent on configuration block, next leader has to extract new cluster
configuration and propose raft configuration changes. However leader
might fail between first and second parts, therefore newly selected
leader should be able to detect there is unfinished reconfiguration and
to finish reconfiguration.

This commit adds logic to manage leadership failover, where new leader
checks whenever last committed block is configuration block, whenever
there are pending configuration changes and complete reconfiguration by
proposing raft clust configuration changes.

Change-Id: I05dc1f60c9ab692521887b50f726d96ea47878dc
Signed-off-by: Artem Barger <bartem@il.ibm.com>
  • Loading branch information
C0rWin authored and yacovm committed Nov 30, 2018
1 parent 9039499 commit f98f7c4
Show file tree
Hide file tree
Showing 2 changed files with 196 additions and 64 deletions.
166 changes: 102 additions & 64 deletions orderer/consensus/etcdraft/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ type Chain struct {
startC chan struct{} // Closes when the node is started
snapC chan *raftpb.Snapshot // Signal to catch up with snapshot

configChangeApplyC chan struct{} // Notifies that a Raft configuration change has been applied
configChangeInProgress uint32 // Flag to indicate node waiting for Raft config change to be applied
configChangeAppliedC chan struct{} // Notifies that a Raft configuration change has been applied
configChangeInProgress bool // Flag to indicate node waiting for Raft config change to be applied
raftMetadataLock sync.RWMutex

clock clock.Clock // Tests can inject a fake clock
Expand Down Expand Up @@ -171,29 +171,29 @@ func NewChain(
}

return &Chain{
configurator: conf,
rpc: rpc,
channelID: support.ChainID(),
raftID: opts.RaftID,
submitC: make(chan *orderer.SubmitRequest),
commitC: make(chan block),
haltC: make(chan struct{}),
doneC: make(chan struct{}),
resignC: make(chan struct{}),
startC: make(chan struct{}),
syncC: make(chan struct{}),
snapC: make(chan *raftpb.Snapshot),
configChangeApplyC: make(chan struct{}),
observeC: observeC,
support: support,
fresh: fresh,
appliedIndex: appliedi,
lastSnapBlockNum: snapBlkNum,
puller: puller,
clock: opts.Clock,
logger: lg,
storage: storage,
opts: opts,
configurator: conf,
rpc: rpc,
channelID: support.ChainID(),
raftID: opts.RaftID,
submitC: make(chan *orderer.SubmitRequest),
commitC: make(chan block),
haltC: make(chan struct{}),
doneC: make(chan struct{}),
resignC: make(chan struct{}),
startC: make(chan struct{}),
syncC: make(chan struct{}),
snapC: make(chan *raftpb.Snapshot),
configChangeAppliedC: make(chan struct{}),
observeC: observeC,
support: support,
fresh: fresh,
appliedIndex: appliedi,
lastSnapBlockNum: snapBlkNum,
puller: puller,
clock: opts.Clock,
logger: lg,
storage: storage,
opts: opts,
}, nil
}

Expand Down Expand Up @@ -632,6 +632,13 @@ func (c *Chain) serveRaft() {
c.resignC <- struct{}{}
}

// becoming a leader and configuration change is in progress
if newLead == c.raftID && c.configChangeInProgress {
// need to read recent config updates of replica set
// and finish reconfiguration
c.handleReconfigurationFailover()
}

// notify external observer
select {
case c.observeC <- newLead:
Expand Down Expand Up @@ -688,7 +695,7 @@ func (c *Chain) apply(ents []raftpb.Entry) {
if isConfigMembershipUpdate {
// set flag config change is progress only if config block
// and has updates for raft replica set
atomic.StoreUint32(&c.configChangeInProgress, uint32(1))
c.configChangeInProgress = true
}

c.commitC <- block{b, ents[i].Index}
Expand All @@ -705,12 +712,11 @@ func (c *Chain) apply(ents []raftpb.Entry) {

c.confState = *c.node.ApplyConfChange(cc)

// assert that configuration changes result of the processing
// of configuration block of type B
isConfChangeInProgress := atomic.LoadUint32(&c.configChangeInProgress)
if isConfChangeInProgress == 1 {
if c.configChangeInProgress {
// signal that config changes has been applied
c.configChangeApplyC <- struct{}{}
c.configChangeAppliedC <- struct{}{}
// set flag back
c.configChangeInProgress = false
}
}

Expand Down Expand Up @@ -836,52 +842,39 @@ func (c *Chain) updateMembership(metadata *etcdraft.RaftMetadata, change *raftpb
lead := atomic.LoadUint64(&c.leader)
// leader to propose configuration change
if lead == c.raftID {
// ProposeConfChange returns error only if node being stopped.
if err := c.node.ProposeConfChange(context.TODO(), *change); err != nil {
return errors.Errorf("failed to propose configuration update to Raft node: %s", err)
c.logger.Warnf("Failed to propose configuration update to Raft node: %s", err)
return nil
}
}

var err error

select {
case <-c.configChangeApplyC:
// update metadata once we have block committed
c.raftMetadataLock.Lock()
c.opts.RaftMetadata = metadata
c.raftMetadataLock.Unlock()

// new we need to reconfigure the communication layer with new updates
err = c.configureComm()
case <-c.resignC:
// leadership has changed, new leader will have to take care
// of reading last config block re-propose config update
c.logger.Debug("Raft cluster leader has changed, new leader should re-propose config change based on last config block")
case <-c.doneC:
c.logger.Debug("shutting down node, aborting config change update")
for {
select {
case <-c.configChangeAppliedC: // Raft configuration changes of the raft cluster has been applied
// update metadata once we have block committed
c.raftMetadataLock.Lock()
c.opts.RaftMetadata = metadata
c.raftMetadataLock.Unlock()

// now we need to reconfigure the communication layer with new updates
return c.configureComm()
case <-c.resignC:
c.logger.Debug("Raft cluster leader has changed, new leader should re-propose Raft config change based on last config block")
case <-c.doneC:
c.logger.Debug("Shutting down node, aborting config change update")
return err
}
}

// set flag back
atomic.StoreUint32(&c.configChangeInProgress, uint32(0))
return err
}

// writeConfigBlock writes configuration blocks into the ledger in
// addition extracts updates about raft replica set and if there
// are changes updates cluster membership as well
func (c *Chain) writeConfigBlock(b block) error {
metadata, err := ConsensusMetadataFromConfigBlock(b.b)
if err != nil {
c.logger.Panicf("error reading consensus metadata, because of %s", err)
}

c.raftMetadataLock.RLock()
raftMetadata := proto.Clone(c.opts.RaftMetadata).(*etcdraft.RaftMetadata)
// proto.Clone doesn't copy an empty map, hence need to initialize it after
// cloning
if raftMetadata.Consenters == nil {
raftMetadata.Consenters = map[uint64]*etcdraft.Consenter{}
}
c.raftMetadataLock.RUnlock()
metadata, raftMetadata := c.newRaftMetadata(b.b)

var changes *MembershipChanges
if metadata != nil {
Expand All @@ -901,3 +894,48 @@ func (c *Chain) writeConfigBlock(b block) error {
}
return nil
}

// handleReconfigurationFailover read last configuration block and proposes
// new raft configuration
func (c *Chain) handleReconfigurationFailover() {
b := c.support.Block(c.support.Height() - 1)
if b == nil {
c.logger.Panic("nil block, failed to read last written block")
}
if !utils.IsConfigBlock(b) {
// a node (leader or follower) leaving updateMembership in context of serverReq go routine,
// *iff* configuration entry has appeared and successfully applied.
// while it's blocked in updateMembership, it cannot commit any other block,
// therefore we guarantee the last block is config block
c.logger.Panic("while handling reconfiguration failover last expected block should be configuration")
}

metadata, raftMetadata := c.newRaftMetadata(b)

var changes *MembershipChanges
if metadata != nil {
changes = ComputeMembershipChanges(raftMetadata.Consenters, metadata.Consenters)
}

confChange := changes.UpdateRaftMetadataAndConfChange(raftMetadata)
if err := c.node.ProposeConfChange(context.TODO(), *confChange); err != nil {
c.logger.Warnf("failed to propose configuration update to Raft node: %s", err)
}
}

// newRaftMetadata extract raft metadata from the configuration block
func (c *Chain) newRaftMetadata(block *common.Block) (*etcdraft.Metadata, *etcdraft.RaftMetadata) {
metadata, err := ConsensusMetadataFromConfigBlock(block)
if err != nil {
c.logger.Panicf("error reading consensus metadata: %s", err)
}
c.raftMetadataLock.RLock()
raftMetadata := proto.Clone(c.opts.RaftMetadata).(*etcdraft.RaftMetadata)
// proto.Clone doesn't copy an empty map, hence need to initialize it after
// cloning
if raftMetadata.Consenters == nil {
raftMetadata.Consenters = map[uint64]*etcdraft.Consenter{}
}
c.raftMetadataLock.RUnlock()
return metadata, raftMetadata
}
94 changes: 94 additions & 0 deletions orderer/consensus/etcdraft/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"code.cloudfoundry.org/clock/fakeclock"
"github.com/coreos/etcd/raft"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/bccsp/factory"
"github.com/hyperledger/fabric/common/crypto/tlsgen"
"github.com/hyperledger/fabric/common/flogging"
mockconfig "github.com/hyperledger/fabric/common/mocks/config"
Expand All @@ -44,6 +45,10 @@ const (
HEARTBEAT_TICK = 1
)

func init() {
factory.InitFactories(nil)
}

// for some test cases we chmod file/dir to test failures caused by exotic permissions.
// however this does not work if tests are running as root, i.e. in a container.
func skipIfRoot() {
Expand Down Expand Up @@ -1537,6 +1542,93 @@ var _ = Describe("Chain", func() {
Eventually(c.support.WriteBlockCallCount, defaultTimeout).Should(Equal(3))
})
})

It("stop leader and continue reconfiguration failing over to new leader", func() {
// Scenario: Starting replica set of 3 Raft nodes, electing node c1 to be a leader
// configure chain support mock to disconnect c1 right after it writes configuration block
// into the ledger, this to simulate failover.
// Next boostraping a new node c4 to join a cluster and creating config transaction, submitting
// it to the leader. Once leader writes configuration block it fails and leadership transferred to
// c2.
// Test asserts that new node c4, will join the cluster and c2 will handle failover of
// re-configuration. Later we connecting c1 back and making sure it capable of catching up with
// new configuration and successfully rejoins replica set.

c4 := newChain(timeout, channelID, dataDir, 4, &raftprotos.RaftMetadata{
Consenters: map[uint64]*raftprotos.Consenter{},
})
c4.init()

By("adding new node to the network")
Expect(c4.support.WriteBlockCallCount()).Should(Equal(0))
Expect(c4.support.WriteConfigBlockCallCount()).Should(Equal(0))

configEnv := newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, addConsenterConfigValue()))
c1.cutter.CutNext = true
configBlock := &common.Block{
Header: &common.BlockHeader{},
Data: &common.BlockData{Data: [][]byte{marshalOrPanic(configEnv)}}}

c1.support.CreateNextBlockReturns(configBlock)

c1.support.WriteConfigBlockStub = func(_ *common.Block, _ []byte) {
// disconnect leader after block being committed
network.disconnect(1)
// electing new leader
network.elect(2)
}

// mock Block method to return recent configuration block
c2.support.BlockReturns(configBlock)

By("sending config transaction")
err := c1.Configure(configEnv, 0)
Expect(err).ToNot(HaveOccurred())

// every node has written config block to the OSN ledger
network.exec(
func(c *chain) {
Eventually(c.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
})

network.addChain(c4)
c4.Start()
// ConfChange is applied to etcd/raft asynchronously, meaning node 4 is not added
// to leader's node list right away. An immediate tick does not trigger a heartbeat
// being sent to node 4. Therefore, we repeatedly tick the leader until node 4 joins
// the cluster successfully.
Eventually(func() <-chan uint64 {
c2.clock.Increment(interval)
return c4.observe
}, LongEventualTimeout).Should(Receive(Equal(uint64(2))))

Eventually(c4.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
Eventually(c4.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))

By("submitting new transaction to follower")
c2.cutter.CutNext = true
c2.support.CreateNextBlockReturns(normalBlock)
err = c4.Order(env, 0)
Expect(err).ToNot(HaveOccurred())

c2.clock.Increment(interval)

// rest nodes are alive include a newly added, hence should write 2 blocks
Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
Eventually(c3.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
Eventually(c4.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))

// node 1 has been stopped should not write any block
Consistently(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))

network.connect(1)

c2.clock.Increment(interval)
// check that former leader didn't get stuck and actually got resign signal,
// and once connected capable of communicating with rest of the replicas set
Eventually(c1.observe, LongEventualTimeout).Should(Receive(Equal(uint64(2))))
Eventually(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
})
})
})

Expand Down Expand Up @@ -2226,7 +2318,9 @@ func (n *network) elect(id uint64) (tick int) {
// tick so it could take effect.
t := 1000 * time.Millisecond

n.connLock.RLock()
c := n.chains[id]
n.connLock.RUnlock()

var elected bool
for !elected {
Expand Down

0 comments on commit f98f7c4

Please sign in to comment.