Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial Version of Fast Sync #4465

Merged
merged 56 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
ca05f3f
add statesync as a new state to staged sync
GheisMohammadi Jun 25, 2023
9ec0272
add initial state download manager to stream sync
GheisMohammadi Jun 26, 2023
2064cfd
add protocol to stage statesync
GheisMohammadi Jun 26, 2023
702eb5e
add task management logic to state download manager in stream sync
GheisMohammadi Jun 26, 2023
4629fda
fix statesync config
GheisMohammadi Jun 26, 2023
9e1249a
refactor state download manager
GheisMohammadi Jun 28, 2023
841073d
refactor stage state sync
GheisMohammadi Jun 28, 2023
975857f
fix state download manager tasks issue
GheisMohammadi Jun 29, 2023
0da96b9
add receipt download manager
GheisMohammadi Jun 29, 2023
12d930f
fix receipt download manager result queue
GheisMohammadi Jun 29, 2023
6f3aa67
refactor stage receipts and change the stages sorting
GheisMohammadi Jun 29, 2023
e11b6ef
goimports staged stream sync
GheisMohammadi Jun 29, 2023
9103468
add block insertion without execution to blockchain implementation
GheisMohammadi Jul 3, 2023
cfc94bb
fix tests for new block insertion
GheisMohammadi Jul 3, 2023
9954a90
refactor staged stream sync to process fast sync and new block insertion
GheisMohammadi Jul 3, 2023
3522127
refactor stage receipts
GheisMohammadi Jul 3, 2023
591f223
fix block insertion in main.go
GheisMohammadi Jul 3, 2023
7006e15
goimports staged sync files
GheisMohammadi Jul 3, 2023
30de7c2
refactor stages list initialization based on the sync mode
GheisMohammadi Jul 3, 2023
f10dd1e
add SyncMode to configs
GheisMohammadi Jul 3, 2023
498bcc0
fix state download manager failure message
GheisMohammadi Jul 4, 2023
1f26944
split verifyAndInsertBlock function to be able to reuse verification …
GheisMohammadi Jul 5, 2023
7c3807a
refactor stage bodies to extract receip hashes in this stage rather t…
GheisMohammadi Jul 5, 2023
d4c8577
goimports
GheisMohammadi Jul 5, 2023
8f81810
add InsertReceiptChain to blockchain interface
GheisMohammadi Jul 5, 2023
57a77ab
refactor get receipts stage to use insertReceiptsChain
GheisMohammadi Jul 6, 2023
bcf1b77
remove using currentCycle, cleanup stage receipts
GheisMohammadi Jul 6, 2023
6f9a1ec
goimports
GheisMohammadi Jul 6, 2023
12235f5
fix stages forward order for staged stream sync
GheisMohammadi Jul 10, 2023
f6b8951
add SyncMode to flags
GheisMohammadi Jul 13, 2023
cd7ccbe
fix stages and replace with forward stages
GheisMohammadi Aug 16, 2023
772d865
fix block validation in stage bodies
GheisMohammadi Sep 6, 2023
c1d352b
add pivot to chain accessor, add CurrentFastBlock to blockchain_impl,…
GheisMohammadi Sep 26, 2023
8d66bdf
add getBlockByMaxVote to sync helper
GheisMohammadi Sep 27, 2023
917a301
add tests for node data request
GheisMohammadi Oct 3, 2023
7c21eef
fix stream tests
GheisMohammadi Oct 3, 2023
d534fea
add Validator method to blockchain to fix the interface
GheisMohammadi Oct 3, 2023
e96855b
fix shard chain test
GheisMohammadi Oct 3, 2023
ebd689f
remove blockExecution option from insertChain
GheisMohammadi Oct 3, 2023
36d2abd
remove extra blockExecutions
GheisMohammadi Oct 4, 2023
9629d9c
remove blockExecution option from staged stream sync
GheisMohammadi Oct 4, 2023
e4dcda6
refactor staged stream sync, fix the state sync functions
GheisMohammadi Oct 4, 2023
6348128
improve stage handling for create new instance of staged stream sync
GheisMohammadi Oct 4, 2023
c808f2b
fix pivot block issue for write on chain
GheisMohammadi Oct 5, 2023
bdd7f14
improve stream sync current cycle and pivot checks, fix edge case iss…
GheisMohammadi Oct 6, 2023
135c7da
fix WriteHeadBlock, fix GetDownloadDetails index, improve fetching cu…
GheisMohammadi Oct 24, 2023
3fcfad4
fix rebase conflicts
GheisMohammadi Oct 26, 2023
9992825
add state sync
GheisMohammadi Nov 14, 2023
c340c70
fix GetNextBatch to complete sync after there is no more pending stat…
GheisMohammadi Nov 16, 2023
3374100
fix state sync file name spell error
GheisMohammadi Nov 16, 2023
e141f79
add ProofSet and ProofList to staged stream sync
GheisMohammadi Dec 7, 2023
390bdb6
add client new functions to stream sync adapter, update GetAccountRan…
GheisMohammadi Dec 7, 2023
0901e92
add state sync full, complete full state sync stage
GheisMohammadi Dec 7, 2023
f3ce9f3
return back deleted codes, fix rebase issues, goimports
GheisMohammadi Dec 11, 2023
191c55b
fix full state sync requests cap, add error handling to stage state s…
GheisMohammadi Dec 12, 2023
419aad1
remove state debug logs
GheisMohammadi Dec 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/service/stagedstreamsync/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/p2p/stream/common/streammanager"
syncproto "github.com/harmony-one/harmony/p2p/stream/protocols/sync"
"github.com/harmony-one/harmony/p2p/stream/protocols/sync/message"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
)

Expand All @@ -20,6 +21,10 @@ type syncProtocol interface {
GetBlocksByHashes(ctx context.Context, hs []common.Hash, opts ...syncproto.Option) ([]*types.Block, sttypes.StreamID, error)
GetReceipts(ctx context.Context, hs []common.Hash, opts ...syncproto.Option) (receipts []types.Receipts, stid sttypes.StreamID, err error)
GetNodeData(ctx context.Context, hs []common.Hash, opts ...syncproto.Option) (data [][]byte, stid sttypes.StreamID, err error)
GetAccountRange(ctx context.Context, root common.Hash, origin common.Hash, limit common.Hash, bytes uint64, opts ...syncproto.Option) (accounts []*message.AccountData, proof [][]byte, stid sttypes.StreamID, err error)
GetStorageRanges(ctx context.Context, root common.Hash, accounts []common.Hash, origin common.Hash, limit common.Hash, bytes uint64, opts ...syncproto.Option) (slots [][]*message.StorageData, proof [][]byte, stid sttypes.StreamID, err error)
GetByteCodes(ctx context.Context, hs []common.Hash, bytes uint64, opts ...syncproto.Option) (codes [][]byte, stid sttypes.StreamID, err error)
GetTrieNodes(ctx context.Context, root common.Hash, paths []*message.TrieNodePathSet, bytes uint64, opts ...syncproto.Option) (nodes [][]byte, stid sttypes.StreamID, err error)

RemoveStream(stID sttypes.StreamID) // If a stream delivers invalid data, remove the stream
StreamFailed(stID sttypes.StreamID, reason string)
Expand Down
56 changes: 38 additions & 18 deletions api/service/stagedstreamsync/block_manager.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package stagedstreamsync

import (
"fmt"
"sync"

"github.com/ethereum/go-ethereum/common"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/rs/zerolog"
Expand All @@ -11,6 +13,7 @@ import (
type BlockDownloadDetails struct {
loopID int
streamID sttypes.StreamID
rootHash common.Hash
}

// blockDownloadManager is the helper structure for get blocks request management
Expand All @@ -19,11 +22,11 @@ type blockDownloadManager struct {
tx kv.RwTx

targetBN uint64
requesting map[uint64]struct{} // block numbers that have been assigned to workers but not received
processing map[uint64]struct{} // block numbers received requests but not inserted
retries *prioritizedNumbers // requests where error happens
rq *resultQueue // result queue wait to be inserted into blockchain
bdd map[uint64]BlockDownloadDetails // details about how this block was downloaded
requesting map[uint64]struct{} // block numbers that have been assigned to workers but not received
processing map[uint64]struct{} // block numbers received requests but not inserted
retries *prioritizedNumbers // requests where error happens
rq *resultQueue // result queue wait to be inserted into blockchain
bdd map[uint64]*BlockDownloadDetails // details about how this block was downloaded

logger zerolog.Logger
lock sync.Mutex
Expand All @@ -38,26 +41,26 @@ func newBlockDownloadManager(tx kv.RwTx, chain blockChain, targetBN uint64, logg
processing: make(map[uint64]struct{}),
retries: newPrioritizedNumbers(),
rq: newResultQueue(),
bdd: make(map[uint64]BlockDownloadDetails),
bdd: make(map[uint64]*BlockDownloadDetails),
logger: logger,
}
}

// GetNextBatch get the next block numbers batch
func (gbm *blockDownloadManager) GetNextBatch() []uint64 {
func (gbm *blockDownloadManager) GetNextBatch(curHeight uint64) []uint64 {
gbm.lock.Lock()
defer gbm.lock.Unlock()

cap := BlocksPerRequest

bns := gbm.getBatchFromRetries(cap)
bns := gbm.getBatchFromRetries(cap, curHeight)
if len(bns) > 0 {
cap -= len(bns)
gbm.addBatchToRequesting(bns)
}

if gbm.availableForMoreTasks() {
addBNs := gbm.getBatchFromUnprocessed(cap)
addBNs := gbm.getBatchFromUnprocessed(cap, curHeight)
gbm.addBatchToRequesting(addBNs)
bns = append(bns, addBNs...)
}
Expand Down Expand Up @@ -88,7 +91,7 @@ func (gbm *blockDownloadManager) HandleRequestResult(bns []uint64, blockBytes []
gbm.retries.push(bn)
} else {
gbm.processing[bn] = struct{}{}
gbm.bdd[bn] = BlockDownloadDetails{
gbm.bdd[bn] = &BlockDownloadDetails{
loopID: loopID,
streamID: streamID,
}
Expand All @@ -107,7 +110,7 @@ func (gbm *blockDownloadManager) SetDownloadDetails(bns []uint64, loopID int, st
defer gbm.lock.Unlock()

for _, bn := range bns {
gbm.bdd[bn] = BlockDownloadDetails{
gbm.bdd[bn] = &BlockDownloadDetails{
loopID: loopID,
streamID: streamID,
}
Expand All @@ -116,25 +119,43 @@ func (gbm *blockDownloadManager) SetDownloadDetails(bns []uint64, loopID int, st
}

// GetDownloadDetails returns the download details for a block
func (gbm *blockDownloadManager) GetDownloadDetails(blockNumber uint64) (loopID int, streamID sttypes.StreamID) {
func (gbm *blockDownloadManager) GetDownloadDetails(blockNumber uint64) (loopID int, streamID sttypes.StreamID, err error) {
gbm.lock.Lock()
defer gbm.lock.Unlock()

return gbm.bdd[blockNumber].loopID, gbm.bdd[blockNumber].streamID
if dm, exist := gbm.bdd[blockNumber]; exist {
return dm.loopID, dm.streamID, nil
}
return 0, sttypes.StreamID(fmt.Sprint(0)), fmt.Errorf("there is no download details for the block number: %d", blockNumber)
}

// SetRootHash sets the root hash for a specific block
func (gbm *blockDownloadManager) SetRootHash(blockNumber uint64, root common.Hash) {
gbm.lock.Lock()
defer gbm.lock.Unlock()

gbm.bdd[blockNumber].rootHash = root
}

// GetRootHash returns the root hash for a specific block
func (gbm *blockDownloadManager) GetRootHash(blockNumber uint64) common.Hash {
gbm.lock.Lock()
defer gbm.lock.Unlock()

return gbm.bdd[blockNumber].rootHash
}

// getBatchFromRetries get the block number batch to be requested from retries.
func (gbm *blockDownloadManager) getBatchFromRetries(cap int) []uint64 {
func (gbm *blockDownloadManager) getBatchFromRetries(cap int, fromBlockNumber uint64) []uint64 {
var (
requestBNs []uint64
curHeight = gbm.chain.CurrentBlock().NumberU64()
)
for cnt := 0; cnt < cap; cnt++ {
bn := gbm.retries.pop()
if bn == 0 {
break // no more retries
}
if bn <= curHeight {
if bn <= fromBlockNumber {
continue
}
requestBNs = append(requestBNs, bn)
Expand All @@ -143,10 +164,9 @@ func (gbm *blockDownloadManager) getBatchFromRetries(cap int) []uint64 {
}

// getBatchFromUnprocessed returns a batch of block numbers to be requested from unprocessed.
func (gbm *blockDownloadManager) getBatchFromUnprocessed(cap int) []uint64 {
func (gbm *blockDownloadManager) getBatchFromUnprocessed(cap int, curHeight uint64) []uint64 {
var (
requestBNs []uint64
curHeight = gbm.chain.CurrentBlock().NumberU64()
)
bn := curHeight + 1
// TODO: this algorithm can be potentially optimized.
Expand Down
29 changes: 29 additions & 0 deletions api/service/stagedstreamsync/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,35 @@ const (
// no more request will be assigned to workers to wait for InsertChain to finish.
SoftQueueCap int = 100

// number of get nodes by hashes for each request
StatesPerRequest int = 100

// maximum number of blocks for get receipts request
ReceiptsPerRequest int = 10

// DefaultConcurrency is the default settings for concurrency
DefaultConcurrency int = 4

// MaxTriesToFetchNodeData is the maximum number of tries to fetch node data
MaxTriesToFetchNodeData int = 5

// ShortRangeTimeout is the timeout for each short range sync, which allow short range sync
// to restart automatically when stuck in `getBlockHashes`
ShortRangeTimeout time.Duration = 1 * time.Minute

// pivot block distance ranges
MinPivotDistanceToHead uint64 = 1024
MaxPivotDistanceToHead uint64 = 2048
)

// SyncMode represents the synchronization mode of the downloader.
// It is a uint32 as it is used with atomic operations.
type SyncMode uint32

const (
FullSync SyncMode = iota // Synchronize the entire blockchain history from full blocks
FastSync // Download all blocks and states
SnapSync // Download the chain and the state via compact snapshots
)

type (
Expand All @@ -35,6 +61,9 @@ type (
// TODO: remove this when stream sync is fully up.
ServerOnly bool

// Synchronization mode of the downloader
SyncMode SyncMode

// parameters
Network nodeconfig.NetworkType
Concurrency int // Number of concurrent sync requests
Expand Down
120 changes: 95 additions & 25 deletions api/service/stagedstreamsync/default_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,43 +8,101 @@ type ForwardOrder []SyncStageID
type RevertOrder []SyncStageID
type CleanUpOrder []SyncStageID

var DefaultForwardOrder = ForwardOrder{
Heads,
SyncEpoch,
ShortRange,
BlockBodies,
// Stages below don't use Internet
States,
LastMile,
Finish,
var (
StagesForwardOrder ForwardOrder
StagesRevertOrder RevertOrder
StagesCleanUpOrder CleanUpOrder
)

func initStagesOrder(syncMode SyncMode) {
switch syncMode {
case FullSync:
initFullSyncStagesOrder()
case FastSync:
initFastSyncStagesOrder()
default:
panic("not supported sync mode")
}
}

var DefaultRevertOrder = RevertOrder{
Finish,
LastMile,
States,
BlockBodies,
ShortRange,
SyncEpoch,
Heads,
func initFullSyncStagesOrder() {
StagesForwardOrder = ForwardOrder{
Heads,
SyncEpoch,
ShortRange,
BlockBodies,
States,
LastMile,
Finish,
}

StagesRevertOrder = RevertOrder{
Finish,
LastMile,
States,
BlockBodies,
ShortRange,
SyncEpoch,
Heads,
}

StagesCleanUpOrder = CleanUpOrder{
Finish,
LastMile,
States,
BlockBodies,
ShortRange,
SyncEpoch,
Heads,
}
}

var DefaultCleanUpOrder = CleanUpOrder{
Finish,
LastMile,
States,
BlockBodies,
ShortRange,
SyncEpoch,
Heads,
func initFastSyncStagesOrder() {
StagesForwardOrder = ForwardOrder{
Heads,
SyncEpoch,
ShortRange,
BlockBodies,
Receipts,
StateSync,
States,
LastMile,
Finish,
}

StagesRevertOrder = RevertOrder{
Finish,
LastMile,
States,
StateSync,
Receipts,
BlockBodies,
ShortRange,
SyncEpoch,
Heads,
}

StagesCleanUpOrder = CleanUpOrder{
Finish,
LastMile,
States,
StateSync,
Receipts,
BlockBodies,
ShortRange,
SyncEpoch,
Heads,
}
}

func DefaultStages(ctx context.Context,
headsCfg StageHeadsCfg,
seCfg StageEpochCfg,
srCfg StageShortRangeCfg,
bodiesCfg StageBodiesCfg,
stateSyncCfg StageStateSyncCfg,
statesCfg StageStatesCfg,
receiptsCfg StageReceiptsCfg,
lastMileCfg StageLastMileCfg,
finishCfg StageFinishCfg,
) []*Stage {
Expand All @@ -54,6 +112,8 @@ func DefaultStages(ctx context.Context,
handlerStageEpochSync := NewStageEpoch(seCfg)
handlerStageBodies := NewStageBodies(bodiesCfg)
handlerStageStates := NewStageStates(statesCfg)
handlerStageStateSync := NewStageStateSync(stateSyncCfg)
handlerStageReceipts := NewStageReceipts(receiptsCfg)
handlerStageLastMile := NewStageLastMile(lastMileCfg)
handlerStageFinish := NewStageFinish(finishCfg)

Expand Down Expand Up @@ -83,6 +143,16 @@ func DefaultStages(ctx context.Context,
Description: "Update Blockchain State",
Handler: handlerStageStates,
},
{
ID: StateSync,
Description: "Retrieve States",
Handler: handlerStageStateSync,
},
{
ID: Receipts,
Description: "Retrieve Receipts",
Handler: handlerStageReceipts,
},
{
ID: LastMile,
Description: "update status for blocks after sync and update last mile blocks as well",
Expand Down
21 changes: 21 additions & 0 deletions api/service/stagedstreamsync/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,27 @@ func checkGetBlockByHashesResult(blocks []*types.Block, hashes []common.Hash) er
return nil
}

func getBlockByMaxVote(blocks []*types.Block) (*types.Block, error) {
hashesVote := make(map[common.Hash]int)
maxVote := int(-1)
maxVotedBlockIndex := int(0)

for i, block := range blocks {
if block == nil {
continue
}
hashesVote[block.Header().Hash()]++
if hashesVote[block.Header().Hash()] > maxVote {
maxVote = hashesVote[block.Header().Hash()]
maxVotedBlockIndex = i
}
}
if maxVote < 0 {
return nil, ErrInvalidBlockBytes
}
return blocks[maxVotedBlockIndex], nil
}

func countHashMaxVote(m map[sttypes.StreamID]common.Hash, whitelist map[sttypes.StreamID]struct{}) (common.Hash, map[sttypes.StreamID]struct{}) {
var (
voteM = make(map[common.Hash]int)
Expand Down
Loading