Skip to content

Commit

Permalink
Work with the new dcrd reorg ntfn sequence
Browse files Browse the repository at this point in the history
This modifies reorg handling to work with dcrd RPC v5.0.0.


- Modify collectionQueue to verify that new blocks are in sequence
- Bump required dcrd JSON RPC version to 5.0.0
- Create rpcutils.CommonAncestor function to provide full reorg details 
  to each reorg handler and avoid redundant RPCs.
- Rewrite each ChainMonitor's reorg handling. Each of dcrsqlite, dcrpg, 
  and stakedb use a switchToSideChain function. With blockdata, 
  ReorgHandler simply stores the new chain tip's block data in the 
  reorgDataSavers, which is presently just the explorer UI update.
  There is no longer a need for BlockConnectedHandler in dcrpg and 
  dcrsqlite, so it is removed. The goroutines are no longer started of 
  course, and the synchronous collectionQueue now includes only 
  sdbChainMonitor.BlockConnectedSync and
  wsChainMonitor.BlockConnectedSync (the synchronous collection and
  storage functions for the stakedb and blockdata packages).

In short, the roles of each data saver's ReorgHandler are:

blockdata: Collect and store in the special reorgDataSavers the data for 
just the new chain tip block. This is presently used to update the 
explorer UI.

stakedb: Disconnect the old chain blocks from the StakeDatabase, then 
connect the new chain blocks.

dcrsqlite: Collect and store data for the new chain blocks. The data for 
the old chain is not presently removed as it is not critical.

dcrpg: This is the most complex one. All data related to old chain 
blocks are updated to reflect their new side chain status. The blocks in 
the new chain (from a side chain) are stored in the DB, while updating 
existing records, which may be from the disconnected blocks.
The entire process is much simpler overall. One source of complexity is 
the new reorg queue in the notification package, and the new and 
expanded txhelpers.ReorgData type that adds the entire lists of blocks 
in the old and new chains as well as the common ancestor. The reorg 
queue is processed by notification.ReorgSignaler, which uses the new 
rpcutils.CommonAncestor to get the common ancestor and the full list of 
block in the old and new chains.

rpcutils.CommonAncestor finds the common ancestor and full list of block 
in the old and new chain by iteratively interrogating dcrd via RPC for 
blocks' previous block hash until the same hash is encountered for both 
chains at the same height. This approach is used as opposed to two other 
ways:

Figuring out the common ancestor in each package's ReorgHandler using 
whatever data resources they have. This is redundant and more error 
prone.
Going back from the old chain tip until dcrd indicates the chain has 
gone from side (-1 confirmations) to main (non-negative confirmations). 
This is not robust as there may be data races if subsequent 
reorganization takes place during the handling of an earlier reorg.
  • Loading branch information
chappjc authored Nov 21, 2018
1 parent a2eb475 commit 6c9148f
Show file tree
Hide file tree
Showing 11 changed files with 800 additions and 662 deletions.
219 changes: 101 additions & 118 deletions blockdata/chainmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,16 @@ package blockdata

import (
"context"
"fmt"
"reflect"
"sync"

"github.com/decred/dcrd/chaincfg/chainhash"
"github.com/decred/dcrd/dcrutil"
"github.com/decred/dcrd/wire"
"github.com/decred/dcrdata/v3/txhelpers"
)

// ReorgData contains the information from a reoranization notification
type ReorgData struct {
OldChainHead chainhash.Hash
OldChainHeight int32
NewChainHead chainhash.Hash
NewChainHeight int32
WG *sync.WaitGroup
}

// for getblock, ticketfeeinfo, estimatestakediff, etc.
type chainMonitor struct {
ctx context.Context
Expand All @@ -33,23 +26,18 @@ type chainMonitor struct {
watchaddrs map[string]txhelpers.TxAction
blockChan chan *chainhash.Hash
recvTxBlockChan chan *txhelpers.BlockWatchedTx
reorgChan chan *ReorgData
reorgChan chan *txhelpers.ReorgData
ConnectingLock chan struct{}
DoneConnecting chan struct{}
syncConnect sync.Mutex

// reorg handling
reorgLock sync.Mutex
reorgData *ReorgData
sideChain []chainhash.Hash
reorganizing bool
reorgLock sync.Mutex
}

// NewChainMonitor creates a new chainMonitor.
func NewChainMonitor(ctx context.Context, collector *Collector, savers []BlockDataSaver,
reorgSavers []BlockDataSaver, wg *sync.WaitGroup, addrs map[string]txhelpers.TxAction,
blockChan chan *chainhash.Hash, recvTxBlockChan chan *txhelpers.BlockWatchedTx,
reorgChan chan *ReorgData) *chainMonitor {
reorgChan chan *txhelpers.ReorgData) *chainMonitor {
return &chainMonitor{
ctx: ctx,
collector: collector,
Expand Down Expand Up @@ -78,6 +66,61 @@ func (p *chainMonitor) BlockConnectedSync(hash *chainhash.Hash) {
<-p.DoneConnecting
}

func (p *chainMonitor) collect(hash *chainhash.Hash) (*wire.MsgBlock, *BlockData, error) {
// getblock RPC
msgBlock, err := p.collector.dcrdChainSvr.GetBlock(hash)
if err != nil {
return nil, nil, fmt.Errorf("failed to get block %v: %v", hash, err)
}
block := dcrutil.NewBlock(msgBlock)
height := block.Height()
log.Infof("Block height %v connected. Collecting data...", height)

if len(p.watchaddrs) > 0 {
// txsForOutpoints := blockConsumesOutpointWithAddresses(block, p.watchaddrs,
// p.collector.dcrdChainSvr)
// if len(txsForOutpoints) > 0 {
// p.spendTxBlockChan <- &BlockWatchedTx{height, txsForOutpoints}
// }

txsForAddrs := txhelpers.BlockReceivesToAddresses(block,
p.watchaddrs, p.collector.netParams)
if len(txsForAddrs) > 0 {
p.recvTxBlockChan <- &txhelpers.BlockWatchedTx{
BlockHeight: height,
TxsForAddress: txsForAddrs}
}
}

// Get node's best block height to see if the block for which we are
// collecting data is the best block.
chainHeight, err := p.collector.dcrdChainSvr.GetBlockCount()
if err != nil {
return nil, nil, fmt.Errorf("failed to get chain height: %v", err)
}

// If new block height not equal to chain height, then we are behind
// on data collection, so specify the hash of the notified, skipping
// stake diff estimates and other stuff for web ui that is only
// relevant for the best block.
var blockData *BlockData
if chainHeight != height {
log.Debugf("Collecting data for block %v (%d), behind tip %d.",
hash, height, chainHeight)
blockData, _, err = p.collector.CollectHash(hash)
if err != nil {
return nil, nil, fmt.Errorf("blockdata.CollectHash(hash) failed: %v", err.Error())
}
} else {
blockData, _, err = p.collector.Collect()
if err != nil {
return nil, nil, fmt.Errorf("blockdata.Collect() failed: %v", err.Error())
}
}

return msgBlock, blockData, nil
}

// blockConnectedHandler handles block connected notifications, which trigger
// data collection and storage.
func (p *chainMonitor) BlockConnectedHandler() {
Expand All @@ -87,11 +130,14 @@ out:
keepon:
select {
case hash, ok := <-p.blockChan:
release := func() {}
// Do not handle reorg and block connects simultaneously.
p.reorgLock.Lock()
release := func() { p.reorgLock.Unlock() }

select {
case <-p.ConnectingLock:
// send on unbuffered channel
release = func() { p.DoneConnecting <- struct{}{} }
release = func() { p.reorgLock.Unlock(); p.DoneConnecting <- struct{}{} }
default:
}

Expand All @@ -101,95 +147,18 @@ out:
break out
}

// If reorganizing, the block will first go to a side chain
p.reorgLock.Lock()
reorg, reorgData := p.reorganizing, p.reorgData
p.reorgLock.Unlock()

if reorg {
p.sideChain = append(p.sideChain, *hash)
log.Infof("Adding block %v to blockdata sidechain", *hash)

// Just append to side chain until the new main chain tip block is reached
if !reorgData.NewChainHead.IsEqual(hash) {
release()
break keepon
}

// Reorg is complete, do nothing lol
p.sideChain = nil
p.reorgLock.Lock()
p.reorganizing = false
p.reorgLock.Unlock()
log.Infof("Reorganization to block %v (height %d) complete in blockdata",
p.reorgData.NewChainHead, p.reorgData.NewChainHeight)
// dcrsqlite's chainmonitor handles the reorg, but we keep going
// to update the web UI with the new best block.
}

msgBlock, _ := p.collector.dcrdChainSvr.GetBlock(hash)
block := dcrutil.NewBlock(msgBlock)
height := block.Height()
log.Infof("Block height %v connected. Collecting data...", height)

if len(p.watchaddrs) > 0 {
// txsForOutpoints := blockConsumesOutpointWithAddresses(block, p.watchaddrs,
// p.collector.dcrdChainSvr)
// if len(txsForOutpoints) > 0 {
// p.spendTxBlockChan <- &BlockWatchedTx{height, txsForOutpoints}
// }

txsForAddrs := txhelpers.BlockReceivesToAddresses(block,
p.watchaddrs, p.collector.netParams)
if len(txsForAddrs) > 0 {
p.recvTxBlockChan <- &txhelpers.BlockWatchedTx{
BlockHeight: height,
TxsForAddress: txsForAddrs}
}
}

var blockData *BlockData
chainHeight, err := p.collector.dcrdChainSvr.GetBlockCount()
// Collect block data.
msgBlock, blockData, err := p.collect(hash)
if err != nil {
log.Errorf("Unable to get chain height: %v", err)
log.Errorf("Failed to collect data for block %v: %v", hash, err)
release()
break keepon
}

// If new block height not equal to chain height, then we are behind
// on data collection, so specify the hash of the notified, skipping
// stake diff estimates and other stuff for web ui that is only
// relevant for the best block.
if chainHeight != height {
log.Infof("Behind on our collection...")
blockData, _, err = p.collector.CollectHash(hash)
if err != nil {
log.Errorf("blockdata.CollectHash(hash) failed: %v", err.Error())
release()
break keepon
}
} else {
blockData, _, err = p.collector.Collect()
if err != nil {
log.Errorf("blockdata.Collect() failed: %v", err.Error())
release()
break keepon
}
}

// Store block data with each saver
savers := p.dataSavers
if reorg {
// This check should be redundant with check above.
if reorgData.NewChainHead.IsEqual(hash) {
savers = p.reorgDataSavers
} else {
savers = nil
}
}
for _, s := range savers {
// Store block data with each saver.
for _, s := range p.dataSavers {
if s != nil {
// save data to wherever the saver wants to put it
// Save data to wherever the saver wants to put it.
if err = s.Store(blockData, msgBlock); err != nil {
log.Errorf("(%v).Store failed: %v", reflect.TypeOf(s), err)
}
Expand All @@ -206,9 +175,9 @@ out:

}

// ReorgHandler receives notification of a chain reorganization. A
// reorganization is handled in blockdata by setting the reorganizing flag so
// that block data is not collected as the new chain is connected.
// ReorgHandler receives notification of a chain reorganization. A reorg is
// handled in blockdata by simply collecting data for the new best block, and
// storing it in the *reorgDataSavers*.
func (p *chainMonitor) ReorgHandler() {
defer p.wg.Done()
out:
Expand All @@ -220,27 +189,41 @@ out:
log.Warnf("Reorg channel closed.")
break out
}
if reorgData == nil {
log.Warnf("nil reorg data received!")
break keepon
}

newHeight, oldHeight := reorgData.NewChainHeight, reorgData.OldChainHeight
newHash, oldHash := reorgData.NewChainHead, reorgData.OldChainHead
newHeight := reorgData.NewChainHeight
newHash := reorgData.NewChainHead

// Do not handle reorg and block connects simultaneously.
p.reorgLock.Lock()
if p.reorganizing {

log.Infof("Reorganize signaled to blockdata. "+
"Collecting data for NEW head block %v at height %d.",
newHash, newHeight)

// Collect data for the new best block.
msgBlock, blockData, err := p.collect(&newHash)
if err != nil {
log.Errorf("ReorgHandler: Failed to collect data for block %v: %v", newHash, err)
p.reorgLock.Unlock()
log.Errorf("Reorg notified for chain tip %v (height %v), but already "+
"processing a reorg to block %v", newHash, newHeight,
p.reorgData.NewChainHead)
reorgData.WG.Done()
break keepon
}

p.reorganizing = true
p.reorgData = reorgData
p.reorgLock.Unlock()
// Store block data with each REORG saver.
for _, s := range p.reorgDataSavers {
if s != nil {
// Save data to wherever the saver wants to put it.
if err := s.Store(blockData, msgBlock); err != nil {
log.Errorf("(%v).Store failed: %v", reflect.TypeOf(s), err)
}
}
}

log.Infof("Reorganize started in blockdata. NEW head block %v at height %d.",
newHash, newHeight)
log.Infof("Reorganize started in blockdata. OLD head block %v at height %d.",
oldHash, oldHeight)
p.reorgLock.Unlock()

reorgData.WG.Done()

Expand Down
Loading

0 comments on commit 6c9148f

Please sign in to comment.