Skip to content

Commit

Permalink
FAB-18244 single node catches up with snapshot (#1964)
Browse files Browse the repository at this point in the history
Support.WriteBlock commits block to ledger asynchronously and can have
up to one block in-flight. And there's possibility a node crashes before
such block is persisted successfully. Normally when node restarts, Raft
loads entries from WAL and attempts to re-apply them. However, when a
snapshot is taken at this block, only entries after (if any) the
snapshot are loaded, and we end up hanging here forever waiting for
missing blocks to be pulled from nowhere in single node situation.

A straightforward solution would be to peek into ledger tip first, and
decide whether to load some "old" entries from WAL, instead of blindly
load data after latest snapshot. Although it's trickier than it sounds:

- today, we don't strictly respect the contract between Raft and state
machine, where applied data should not be lossy and it's safe to prune
data in WAL after snapshots. For example, in extreme case, if we lose
the entire ledger, we should not expect it to be recoverable from WAL

- etcd/raft persistence library does not provide friendly interfaces
to control what data to load in fine-grained manner. For example,
snap.Load() simply looks for latest snapshot available, and loads
entries after that. If we'd like to, for example, load older data prior
to that snapshot, we'll need to come up with our own utilities

This commit aims to provide a quick fix for bug described in FAB-18244,
leveraging the fact that we can have only one async block in-flight, and
leave the "correct" solution to future work.

Signed-off-by: Jay Guo <guojiannan1101@gmail.com>
  • Loading branch information
guoger authored Oct 5, 2020
1 parent f4a612c commit e264d1b
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 21 deletions.
51 changes: 30 additions & 21 deletions orderer/consensus/etcdraft/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,11 @@ func (c *Chain) catchUp(snap *raftpb.Snapshot) error {
if c.lastBlock.Header.Number >= b.Header.Number {
c.logger.Warnf("Snapshot is at block [%d], local block number is %d, no sync needed", b.Header.Number, c.lastBlock.Header.Number)
return nil
} else if b.Header.Number == c.lastBlock.Header.Number+1 {
c.logger.Infof("The only missing block [%d] is encapsulated in snapshot, committing it to shortcut catchup process", b.Header.Number)
c.commitBlock(b)
c.lastBlock = b
return nil
}

puller, err := c.createPuller()
Expand All @@ -917,27 +922,7 @@ func (c *Chain) catchUp(snap *raftpb.Snapshot) error {
if block == nil {
return errors.Errorf("failed to fetch block [%d] from cluster", next)
}
if protoutil.IsConfigBlock(block) {
c.support.WriteConfigBlock(block, nil)

configMembership := c.detectConfChange(block)

if configMembership != nil && configMembership.Changed() {
c.logger.Infof("Config block [%d] changes consenter set, communication should be reconfigured", block.Header.Number)

c.raftMetadataLock.Lock()
c.opts.BlockMetadata = configMembership.NewBlockMetadata
c.opts.Consenters = configMembership.NewConsenters
c.raftMetadataLock.Unlock()

if err := c.configureComm(); err != nil {
c.logger.Panicf("Failed to configure communication: %s", err)
}
}
} else {
c.support.WriteBlock(block, nil)
}

c.commitBlock(block)
c.lastBlock = block
next++
}
Expand All @@ -946,6 +931,30 @@ func (c *Chain) catchUp(snap *raftpb.Snapshot) error {
return nil
}

func (c *Chain) commitBlock(block *common.Block) {
if !protoutil.IsConfigBlock(block) {
c.support.WriteBlock(block, nil)
return
}

c.support.WriteConfigBlock(block, nil)

configMembership := c.detectConfChange(block)

if configMembership != nil && configMembership.Changed() {
c.logger.Infof("Config block [%d] changes consenter set, communication should be reconfigured", block.Header.Number)

c.raftMetadataLock.Lock()
c.opts.BlockMetadata = configMembership.NewBlockMetadata
c.opts.Consenters = configMembership.NewConsenters
c.raftMetadataLock.Unlock()

if err := c.configureComm(); err != nil {
c.logger.Panicf("Failed to configure communication: %s", err)
}
}
}

func (c *Chain) detectConfChange(block *common.Block) *MembershipChanges {
// If config is targeting THIS channel, inspect consenter set and
// propose raft ConfChange if it adds/removes node.
Expand Down
24 changes: 24 additions & 0 deletions orderer/consensus/etcdraft/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,9 @@ var _ = Describe("Chain", func() {
return nil
}

// This is a false assumption - single node shouldn't be able to pull block from anywhere.
// However, this test is mainly to assert that chain should attempt catchup upon start,
// so we could live with it.
return ledger[i]
}

Expand All @@ -965,6 +968,27 @@ var _ = Describe("Chain", func() {
Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
})

It("commits block from snapshot if it's missing from ledger", func() {
// Scenario:
// Single node exists right after a snapshot is taken, while the block
// in it hasn't been successfully persisted into ledger (there can be one
// async block write in-flight). Then the node is restarted, and catches
// up using the block in snapshot.

Expect(chain.Order(env, uint64(0))).To(Succeed())
Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
Eventually(countFiles, LongEventualTimeout).Should(Equal(1))

chain.Halt()

c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters, cryptoProvider, nil, nil)
c.init()
c.Start()
defer c.Halt()

Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
})

It("restores snapshot w/o extra entries", func() {
// Scenario:
// after a snapshot is taken, no more entries are appended.
Expand Down

0 comments on commit e264d1b

Please sign in to comment.