Skip to content

Commit

Permalink
blockchain: Fix inconsistent state bugs
Browse files Browse the repository at this point in the history
There were a few bugs alleviated in this commit:

1: On block disconnects, spend journals were being removed first before
   the cache was flushed.  This could leave the cache at an
   unrecoverable state as it no longer has the spend journal to roll
   back a given block.
2: On block disconnects, stateLock unlock was happening after the err
   return which would leave the lock locked.
3: On utxo cache inconsistent state checks, last flushed hash wasn't
   being set correctly.
4: On utxo cache inconsistent state checks, cache flush wasn't being
   called, which leaves the cache full and leaves the node at a stuck
   state.
5: On utxo cache inconsistent state checks, consistent state would be
   updated during block connects without a cache flush, which would
   leave the utxo set in an inconsistent state if a sudden crash or user
   interrupt were to happen during the utxo set rebuilding.
  • Loading branch information
kcalvinalvin committed Apr 7, 2023
1 parent 47cb61a commit 0112042
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 62 deletions.
42 changes: 27 additions & 15 deletions blockchain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,13 +757,6 @@ func (b *BlockChain) disconnectBlock(node *blockNode, block *btcutil.Block, view
return err
}

// Update the transaction spend journal by removing the record
// that contains all txos spent by the block.
err = dbRemoveSpendJournalEntry(dbTx, block.Hash())
if err != nil {
return err
}

// Allow the index manager to call each of the currently active
// optional indexes with the block being disconnected so they
// can update themselves accordingly.
Expand All @@ -783,14 +776,37 @@ func (b *BlockChain) disconnectBlock(node *blockNode, block *btcutil.Block, view
// Commit all modifications made to the view into the utxo state. This also
// prunes these changes from the view.
b.stateLock.Lock()
if err := b.utxoCache.Commit(view, nil); err != nil {
err = b.utxoCache.Commit(view, nil)
b.stateLock.Unlock()
if err != nil {
return err
}
b.stateLock.Unlock()

// This node's parent is now the end of the best chain.
b.bestChain.SetTip(node.parent)

// When we're disconnecting blocks, force a flush. This is so that we won't
// be stuck in a situation where we don't have the spend journals during
// InitConsistentState().
b.stateLock.Lock()
err = b.utxoCache.Flush(FlushRequired, state)
b.stateLock.Unlock()
if err != nil {
return err
}

// Update the transaction spend journal by removing the record that contains
// all txos spent by the block. This is intentionally done AFTER the utxo
// cache has been force flushed since the spend journal information will no
// longer be available for the cache to use for recovery purposes after
// being removed.
err = b.db.Update(func(dbTx database.Tx) error {
return dbRemoveSpendJournalEntry(dbTx, &node.hash)
})
if err != nil {
return err
}

// Update the state for the best block. Notice how this replaces the
// entire struct instead of updating the existing one. This effectively
// allows the old version to act as a snapshot which callers can use
Expand All @@ -807,11 +823,7 @@ func (b *BlockChain) disconnectBlock(node *blockNode, block *btcutil.Block, view
b.sendNotification(NTBlockDisconnected, block)
b.chainLock.Lock()

// Since we just changed the UTXO cache, we make sure it didn't exceed its
// maximum size.
b.stateLock.Lock()
defer b.stateLock.Unlock()
return b.utxoCache.Flush(FlushIfNeeded, state)
return err
}

// countSpentOutputs returns the number of utxos the passed block spends.
Expand Down Expand Up @@ -1850,7 +1862,7 @@ func New(config *Config) (*BlockChain, error) {
// Make sure the utxo state is catched up if it was left in an inconsistent
// state.
bestNode := b.bestChain.Tip()
if err := b.utxoCache.InitConsistentState(bestNode, config.Interrupt); err != nil {
if err := b.InitConsistentState(bestNode, config.Interrupt); err != nil {
return nil, err
}

Expand Down
103 changes: 56 additions & 47 deletions blockchain/utxocache.go
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,8 @@ func (s *utxoCache) rollForwardBlock(block *btcutil.Block) error {
//
// It needs to be ensured that the chainView passed to this method does not
// get changed during the execution of this method.
func (s *utxoCache) InitConsistentState(tip *blockNode, interrupt <-chan struct{}) error {
func (b *BlockChain) InitConsistentState(tip *blockNode, interrupt <-chan struct{}) error {
s := b.utxoCache
// Load the consistency status from the database.
var statusCode byte
var statusHash *chainhash.Hash
Expand All @@ -902,14 +903,9 @@ func (s *utxoCache) InitConsistentState(tip *blockNode, interrupt <-chan struct{
return err
}

log.Tracef("UTXO cache consistency status from disk: [%d] hash %v",
log.Debugf("UTXO cache consistency status from disk: [%d] hash %v",
statusCode, statusHash)

// We can set this variable now already because it will always be valid
// unless an error is returned, in which case the state is entirely invalid.
// Doing it here prevents forgetting it later.
s.lastFlushHash = tip.hash

// If no status was found, the database is old and didn't have a cached utxo
// state yet. In that case, we set the status to the best state and write
// this to the database.
Expand All @@ -920,48 +916,45 @@ func (s *utxoCache) InitConsistentState(tip *blockNode, interrupt <-chan struct{
return dbPutUtxoStateConsistency(dbTx, ucsConsistent, &tip.hash)
})

// Set the last flush hash as it's the default value of 0s.
s.lastFlushHash = tip.hash

return err
}

// If state is consistent, we are done.
if statusCode == ucsConsistent && *statusHash == tip.hash {
log.Debugf("UTXO state consistent (%d:%v)", tip.height, tip.hash)

// The last flush hash is set to the default value of all 0s. Set
// it to the tip since we checked it's consistent.
s.lastFlushHash = tip.hash

return nil
}

log.Info("Reconstructing UTXO state after unclean shutdown. This may take " +
"a long time...")

lastFlushNode := b.index.LookupNode(statusHash)
fork := b.bestChain.FindFork(lastFlushNode)

// Even though this should always be true, make sure the fetched hash is in
// the best chain.
var statusNode *blockNode
var statusNodeNext *blockNode // the first one higher than the statusNode
attachNodes := list.New()
for node := tip; node.height >= 0; node = node.parent {
if node.hash == *statusHash {
statusNode = node
break
}
attachNodes.PushFront(node)
statusNodeNext = node
}

if statusNode == nil {
if fork == nil {
return AssertError(fmt.Sprintf("last utxo consistency status contains "+
"hash that is not in best chain: %v", statusHash))
}

// If data was in the middle of a flush, we have to roll back all
// blocks from the last best block all the way back to the last
// consistent block.
log.Debugf("Rolling back %d blocks to rebuild the UTXO state...",
tip.height-statusNode.height)
// We only roll back blocks if the node was disconnecting blocks when it suddenly
// shut down.
log.Infof("Rolling back %d blocks to rebuild the UTXO state...",
lastFlushNode.height-fork.height)

// Roll back blocks in batches.
rollbackBatch := func(dbTx database.Tx, node *blockNode) (*blockNode, error) {
nbBatchBlocks := 0
for ; node.height > statusNode.height; node = node.parent {
for node != nil && node != fork {
block, err := dbFetchBlockByNode(dbTx, node)
if err != nil {
return nil, err
Expand All @@ -981,14 +974,16 @@ func (s *utxoCache) InitConsistentState(tip *blockNode, interrupt <-chan struct{
if nbBatchBlocks >= utxoBatchSizeBlocks {
break
}
node = node.parent
}

return node, nil
}

for node := tip; node.height > statusNode.height; {
node := lastFlushNode
for node != nil && node != fork {
log.Tracef("Rolling back %d more blocks...",
node.height-statusNode.height)
node.height-fork.height)
err := s.db.Update(func(dbTx database.Tx) error {
var err error
node, err = rollbackBatch(dbTx, node)
Expand All @@ -999,29 +994,36 @@ func (s *utxoCache) InitConsistentState(tip *blockNode, interrupt <-chan struct{
return err
}

// Flush the utxo cache if needed.
threshold := (utxoFlushPeriodicThreshold * s.maxTotalMemoryUsage) / 100
if s.totalMemoryUsage() >= threshold {
err = s.flush(&BestState{Hash: node.hash})
if err != nil {
return err
}
}

if interruptRequested(interrupt) {
log.Warn("UTXO state reconstruction interrupted")

return errInterruptRequested
}
}

// Now we can update the status already to avoid redoing this work when
// interrupted.
err = s.db.Update(func(dbTx database.Tx) error {
return dbPutUtxoStateConsistency(dbTx, ucsConsistent, statusHash)
})
if err != nil {
return err
}

log.Debugf("Replaying %d blocks to rebuild UTXO state...",
tip.height-statusNodeNext.height+1)
log.Infof("Replaying %d blocks to rebuild UTXO state...",
tip.height-node.height+1)

// Then we replay the blocks from the last consistent state up to the best
// state. Iterate forward from the consistent node to the tip of the best
// chain. After every batch, we can also update the consistency state to
// avoid redoing the work when interrupted.
attachNodes := list.New()
for n := tip; n.height >= 0; n = n.parent {
if n == fork {
break
}
attachNodes.PushFront(n)
}
rollforwardBatch := func(dbTx database.Tx, node *blockNode) (*blockNode, error) {
nbBatchBlocks := 0
for e := attachNodes.Front(); e != nil; e = e.Next() {
Expand All @@ -1043,13 +1045,11 @@ func (s *utxoCache) InitConsistentState(tip *blockNode, interrupt <-chan struct{
}
}

// We can update this after each batch to avoid having to redo the work
// when interrupted.
return node, dbPutUtxoStateConsistency(dbTx, ucsConsistent, &node.hash)
return node, nil
}

for node := statusNodeNext; node.height <= tip.height; {
log.Tracef("Replaying %d more blocks...", tip.height-node.height+1)
for node != nil && node != tip {
log.Debugf("Replaying %d more blocks...", tip.height-node.height+1)
err := s.db.Update(func(dbTx database.Tx) error {
var err error
node, err = rollforwardBatch(dbTx, node)
Expand All @@ -1060,17 +1060,26 @@ func (s *utxoCache) InitConsistentState(tip *blockNode, interrupt <-chan struct{
return err
}

// Flush the utxo cache if needed.
threshold := (utxoFlushPeriodicThreshold * s.maxTotalMemoryUsage) / 100
if s.totalMemoryUsage() >= threshold {
err = s.flush(&BestState{Hash: node.hash})
if err != nil {
return err
}
}

if interruptRequested(interrupt) {
log.Warn("UTXO state reconstruction interrupted")

return errInterruptRequested
}
if node.height == tip.height {
break
}
}

log.Debug("UTXO state reconstruction done")

// Set the last flush hash as it's the default value of 0s.
s.lastFlushHash = tip.hash

return nil
}

0 comments on commit 0112042

Please sign in to comment.