Skip to content

Commit

Permalink
Merge pull request #4465 from harmony-one/feature/fastsync
Browse files Browse the repository at this point in the history
Initial Version of Fast Sync
  • Loading branch information
adsorptionenthalpy authored Dec 12, 2023
2 parents 02e2fee + 419aad1 commit c82599b
Show file tree
Hide file tree
Showing 33 changed files with 5,267 additions and 129 deletions.
5 changes: 5 additions & 0 deletions api/service/stagedstreamsync/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/p2p/stream/common/streammanager"
syncproto "github.com/harmony-one/harmony/p2p/stream/protocols/sync"
"github.com/harmony-one/harmony/p2p/stream/protocols/sync/message"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
)

Expand All @@ -20,6 +21,10 @@ type syncProtocol interface {
GetBlocksByHashes(ctx context.Context, hs []common.Hash, opts ...syncproto.Option) ([]*types.Block, sttypes.StreamID, error)
GetReceipts(ctx context.Context, hs []common.Hash, opts ...syncproto.Option) (receipts []types.Receipts, stid sttypes.StreamID, err error)
GetNodeData(ctx context.Context, hs []common.Hash, opts ...syncproto.Option) (data [][]byte, stid sttypes.StreamID, err error)
GetAccountRange(ctx context.Context, root common.Hash, origin common.Hash, limit common.Hash, bytes uint64, opts ...syncproto.Option) (accounts []*message.AccountData, proof [][]byte, stid sttypes.StreamID, err error)
GetStorageRanges(ctx context.Context, root common.Hash, accounts []common.Hash, origin common.Hash, limit common.Hash, bytes uint64, opts ...syncproto.Option) (slots [][]*message.StorageData, proof [][]byte, stid sttypes.StreamID, err error)
GetByteCodes(ctx context.Context, hs []common.Hash, bytes uint64, opts ...syncproto.Option) (codes [][]byte, stid sttypes.StreamID, err error)
GetTrieNodes(ctx context.Context, root common.Hash, paths []*message.TrieNodePathSet, bytes uint64, opts ...syncproto.Option) (nodes [][]byte, stid sttypes.StreamID, err error)

RemoveStream(stID sttypes.StreamID) // If a stream delivers invalid data, remove the stream
StreamFailed(stID sttypes.StreamID, reason string)
Expand Down
56 changes: 38 additions & 18 deletions api/service/stagedstreamsync/block_manager.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package stagedstreamsync

import (
"fmt"
"sync"

"github.com/ethereum/go-ethereum/common"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/rs/zerolog"
Expand All @@ -11,6 +13,7 @@ import (
type BlockDownloadDetails struct {
loopID int
streamID sttypes.StreamID
rootHash common.Hash
}

// blockDownloadManager is the helper structure for get blocks request management
Expand All @@ -19,11 +22,11 @@ type blockDownloadManager struct {
tx kv.RwTx

targetBN uint64
requesting map[uint64]struct{} // block numbers that have been assigned to workers but not received
processing map[uint64]struct{} // block numbers received requests but not inserted
retries *prioritizedNumbers // requests where error happens
rq *resultQueue // result queue wait to be inserted into blockchain
bdd map[uint64]BlockDownloadDetails // details about how this block was downloaded
requesting map[uint64]struct{} // block numbers that have been assigned to workers but not received
processing map[uint64]struct{} // block numbers received requests but not inserted
retries *prioritizedNumbers // requests where error happens
rq *resultQueue // result queue wait to be inserted into blockchain
bdd map[uint64]*BlockDownloadDetails // details about how this block was downloaded

logger zerolog.Logger
lock sync.Mutex
Expand All @@ -38,26 +41,26 @@ func newBlockDownloadManager(tx kv.RwTx, chain blockChain, targetBN uint64, logg
processing: make(map[uint64]struct{}),
retries: newPrioritizedNumbers(),
rq: newResultQueue(),
bdd: make(map[uint64]BlockDownloadDetails),
bdd: make(map[uint64]*BlockDownloadDetails),
logger: logger,
}
}

// GetNextBatch get the next block numbers batch
func (gbm *blockDownloadManager) GetNextBatch() []uint64 {
func (gbm *blockDownloadManager) GetNextBatch(curHeight uint64) []uint64 {
gbm.lock.Lock()
defer gbm.lock.Unlock()

cap := BlocksPerRequest

bns := gbm.getBatchFromRetries(cap)
bns := gbm.getBatchFromRetries(cap, curHeight)
if len(bns) > 0 {
cap -= len(bns)
gbm.addBatchToRequesting(bns)
}

if gbm.availableForMoreTasks() {
addBNs := gbm.getBatchFromUnprocessed(cap)
addBNs := gbm.getBatchFromUnprocessed(cap, curHeight)
gbm.addBatchToRequesting(addBNs)
bns = append(bns, addBNs...)
}
Expand Down Expand Up @@ -88,7 +91,7 @@ func (gbm *blockDownloadManager) HandleRequestResult(bns []uint64, blockBytes []
gbm.retries.push(bn)
} else {
gbm.processing[bn] = struct{}{}
gbm.bdd[bn] = BlockDownloadDetails{
gbm.bdd[bn] = &BlockDownloadDetails{
loopID: loopID,
streamID: streamID,
}
Expand All @@ -107,7 +110,7 @@ func (gbm *blockDownloadManager) SetDownloadDetails(bns []uint64, loopID int, st
defer gbm.lock.Unlock()

for _, bn := range bns {
gbm.bdd[bn] = BlockDownloadDetails{
gbm.bdd[bn] = &BlockDownloadDetails{
loopID: loopID,
streamID: streamID,
}
Expand All @@ -116,25 +119,43 @@ func (gbm *blockDownloadManager) SetDownloadDetails(bns []uint64, loopID int, st
}

// GetDownloadDetails returns the download details for a block
func (gbm *blockDownloadManager) GetDownloadDetails(blockNumber uint64) (loopID int, streamID sttypes.StreamID) {
func (gbm *blockDownloadManager) GetDownloadDetails(blockNumber uint64) (loopID int, streamID sttypes.StreamID, err error) {
gbm.lock.Lock()
defer gbm.lock.Unlock()

return gbm.bdd[blockNumber].loopID, gbm.bdd[blockNumber].streamID
if dm, exist := gbm.bdd[blockNumber]; exist {
return dm.loopID, dm.streamID, nil
}
return 0, sttypes.StreamID(fmt.Sprint(0)), fmt.Errorf("there is no download details for the block number: %d", blockNumber)
}

// SetRootHash sets the root hash for a specific block
func (gbm *blockDownloadManager) SetRootHash(blockNumber uint64, root common.Hash) {
gbm.lock.Lock()
defer gbm.lock.Unlock()

gbm.bdd[blockNumber].rootHash = root
}

// GetRootHash returns the root hash for a specific block
func (gbm *blockDownloadManager) GetRootHash(blockNumber uint64) common.Hash {
gbm.lock.Lock()
defer gbm.lock.Unlock()

return gbm.bdd[blockNumber].rootHash
}

// getBatchFromRetries get the block number batch to be requested from retries.
func (gbm *blockDownloadManager) getBatchFromRetries(cap int) []uint64 {
func (gbm *blockDownloadManager) getBatchFromRetries(cap int, fromBlockNumber uint64) []uint64 {
var (
requestBNs []uint64
curHeight = gbm.chain.CurrentBlock().NumberU64()
)
for cnt := 0; cnt < cap; cnt++ {
bn := gbm.retries.pop()
if bn == 0 {
break // no more retries
}
if bn <= curHeight {
if bn <= fromBlockNumber {
continue
}
requestBNs = append(requestBNs, bn)
Expand All @@ -143,10 +164,9 @@ func (gbm *blockDownloadManager) getBatchFromRetries(cap int) []uint64 {
}

// getBatchFromUnprocessed returns a batch of block numbers to be requested from unprocessed.
func (gbm *blockDownloadManager) getBatchFromUnprocessed(cap int) []uint64 {
func (gbm *blockDownloadManager) getBatchFromUnprocessed(cap int, curHeight uint64) []uint64 {
var (
requestBNs []uint64
curHeight = gbm.chain.CurrentBlock().NumberU64()
)
bn := curHeight + 1
// TODO: this algorithm can be potentially optimized.
Expand Down
29 changes: 29 additions & 0 deletions api/service/stagedstreamsync/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,35 @@ const (
// no more request will be assigned to workers to wait for InsertChain to finish.
SoftQueueCap int = 100

// number of get nodes by hashes for each request
StatesPerRequest int = 100

// maximum number of blocks for get receipts request
ReceiptsPerRequest int = 10

// DefaultConcurrency is the default settings for concurrency
DefaultConcurrency int = 4

// MaxTriesToFetchNodeData is the maximum number of tries to fetch node data
MaxTriesToFetchNodeData int = 5

// ShortRangeTimeout is the timeout for each short range sync, which allow short range sync
// to restart automatically when stuck in `getBlockHashes`
ShortRangeTimeout time.Duration = 1 * time.Minute

// pivot block distance ranges
MinPivotDistanceToHead uint64 = 1024
MaxPivotDistanceToHead uint64 = 2048
)

// SyncMode represents the synchronization mode of the downloader.
// It is a uint32 as it is used with atomic operations.
type SyncMode uint32

const (
FullSync SyncMode = iota // Synchronize the entire blockchain history from full blocks
FastSync // Download all blocks and states
SnapSync // Download the chain and the state via compact snapshots
)

type (
Expand All @@ -35,6 +61,9 @@ type (
// TODO: remove this when stream sync is fully up.
ServerOnly bool

// Synchronization mode of the downloader
SyncMode SyncMode

// parameters
Network nodeconfig.NetworkType
Concurrency int // Number of concurrent sync requests
Expand Down
120 changes: 95 additions & 25 deletions api/service/stagedstreamsync/default_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,43 +8,101 @@ type ForwardOrder []SyncStageID
type RevertOrder []SyncStageID
type CleanUpOrder []SyncStageID

var DefaultForwardOrder = ForwardOrder{
Heads,
SyncEpoch,
ShortRange,
BlockBodies,
// Stages below don't use Internet
States,
LastMile,
Finish,
var (
StagesForwardOrder ForwardOrder
StagesRevertOrder RevertOrder
StagesCleanUpOrder CleanUpOrder
)

func initStagesOrder(syncMode SyncMode) {
switch syncMode {
case FullSync:
initFullSyncStagesOrder()
case FastSync:
initFastSyncStagesOrder()
default:
panic("not supported sync mode")
}
}

var DefaultRevertOrder = RevertOrder{
Finish,
LastMile,
States,
BlockBodies,
ShortRange,
SyncEpoch,
Heads,
func initFullSyncStagesOrder() {
StagesForwardOrder = ForwardOrder{
Heads,
SyncEpoch,
ShortRange,
BlockBodies,
States,
LastMile,
Finish,
}

StagesRevertOrder = RevertOrder{
Finish,
LastMile,
States,
BlockBodies,
ShortRange,
SyncEpoch,
Heads,
}

StagesCleanUpOrder = CleanUpOrder{
Finish,
LastMile,
States,
BlockBodies,
ShortRange,
SyncEpoch,
Heads,
}
}

var DefaultCleanUpOrder = CleanUpOrder{
Finish,
LastMile,
States,
BlockBodies,
ShortRange,
SyncEpoch,
Heads,
func initFastSyncStagesOrder() {
StagesForwardOrder = ForwardOrder{
Heads,
SyncEpoch,
ShortRange,
BlockBodies,
Receipts,
StateSync,
States,
LastMile,
Finish,
}

StagesRevertOrder = RevertOrder{
Finish,
LastMile,
States,
StateSync,
Receipts,
BlockBodies,
ShortRange,
SyncEpoch,
Heads,
}

StagesCleanUpOrder = CleanUpOrder{
Finish,
LastMile,
States,
StateSync,
Receipts,
BlockBodies,
ShortRange,
SyncEpoch,
Heads,
}
}

func DefaultStages(ctx context.Context,
headsCfg StageHeadsCfg,
seCfg StageEpochCfg,
srCfg StageShortRangeCfg,
bodiesCfg StageBodiesCfg,
stateSyncCfg StageStateSyncCfg,
statesCfg StageStatesCfg,
receiptsCfg StageReceiptsCfg,
lastMileCfg StageLastMileCfg,
finishCfg StageFinishCfg,
) []*Stage {
Expand All @@ -54,6 +112,8 @@ func DefaultStages(ctx context.Context,
handlerStageEpochSync := NewStageEpoch(seCfg)
handlerStageBodies := NewStageBodies(bodiesCfg)
handlerStageStates := NewStageStates(statesCfg)
handlerStageStateSync := NewStageStateSync(stateSyncCfg)
handlerStageReceipts := NewStageReceipts(receiptsCfg)
handlerStageLastMile := NewStageLastMile(lastMileCfg)
handlerStageFinish := NewStageFinish(finishCfg)

Expand Down Expand Up @@ -83,6 +143,16 @@ func DefaultStages(ctx context.Context,
Description: "Update Blockchain State",
Handler: handlerStageStates,
},
{
ID: StateSync,
Description: "Retrieve States",
Handler: handlerStageStateSync,
},
{
ID: Receipts,
Description: "Retrieve Receipts",
Handler: handlerStageReceipts,
},
{
ID: LastMile,
Description: "update status for blocks after sync and update last mile blocks as well",
Expand Down
21 changes: 21 additions & 0 deletions api/service/stagedstreamsync/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,27 @@ func checkGetBlockByHashesResult(blocks []*types.Block, hashes []common.Hash) er
return nil
}

func getBlockByMaxVote(blocks []*types.Block) (*types.Block, error) {
hashesVote := make(map[common.Hash]int)
maxVote := int(-1)
maxVotedBlockIndex := int(0)

for i, block := range blocks {
if block == nil {
continue
}
hashesVote[block.Header().Hash()]++
if hashesVote[block.Header().Hash()] > maxVote {
maxVote = hashesVote[block.Header().Hash()]
maxVotedBlockIndex = i
}
}
if maxVote < 0 {
return nil, ErrInvalidBlockBytes
}
return blocks[maxVotedBlockIndex], nil
}

func countHashMaxVote(m map[sttypes.StreamID]common.Hash, whitelist map[sttypes.StreamID]struct{}) (common.Hash, map[sttypes.StreamID]struct{}) {
var (
voteM = make(map[common.Hash]int)
Expand Down
Loading

0 comments on commit c82599b

Please sign in to comment.