Skip to content

Commit

Permalink
e3: reconDB - to use 64kb pageSize (#6282)
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov authored Dec 12, 2022
1 parent 1c3c486 commit 9d20045
Showing 1 changed file with 45 additions and 43 deletions.
88 changes: 45 additions & 43 deletions eth/stagedsync/exec3.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,43 @@ func reconstituteStep(last bool,
chainConfig *params.ChainConfig, logger log.Logger, genesis *core.Genesis, engine consensus.Engine,
batchSize datasize.ByteSize, s *StageState, blockNum uint64, total uint64,
) error {
var err error
var startOk, endOk bool
startTxNum, endTxNum := as.TxNumRange()
var startBlockNum, endBlockNum uint64 // First block which is not covered by the history snapshot files
if err := chainDb.View(ctx, func(tx kv.Tx) error {
startOk, startBlockNum, err = rawdb.TxNums.FindBlockNum(tx, startTxNum)
if err != nil {
return err
}
if startBlockNum > 0 {
startBlockNum--
startTxNum, err = rawdb.TxNums.Min(tx, startBlockNum)
if err != nil {
return err
}
}
endOk, endBlockNum, err = rawdb.TxNums.FindBlockNum(tx, endTxNum)
if err != nil {
return err
}
return nil
}); err != nil {
return err
}
if !startOk {
return fmt.Errorf("step startTxNum not found in snapshot blocks: %d", startTxNum)
}
if !endOk {
return fmt.Errorf("step endTxNum not found in snapshot blocks: %d", endTxNum)
}
if last {
endBlockNum = blockNum
}

fmt.Printf("startTxNum = %d, endTxNum = %d, startBlockNum = %d, endBlockNum = %d\n", startTxNum, endTxNum, startBlockNum, endBlockNum)
var maxTxNum uint64 = startTxNum

workCh := make(chan *exec22.TxTask, workerCount*4)
rs := state.NewReconState(workCh)
scanWorker := exec3.NewScanWorker(txNum, as)
Expand Down Expand Up @@ -770,7 +807,6 @@ func reconstituteStep(last bool,
}
}
}()
var err error
for i := 0; i < workerCount; i++ {
if roTxs[i], err = db.BeginRo(ctx); err != nil {
return err
Expand All @@ -791,40 +827,7 @@ func reconstituteStep(last bool,
reconWorkers[i].SetChainTx(chainTxs[i])
}
wg.Add(workerCount)
var startOk, endOk bool
startTxNum, endTxNum := as.TxNumRange()
var startBlockNum, endBlockNum uint64 // First block which is not covered by the history snapshot files
if err := chainDb.View(ctx, func(tx kv.Tx) error {
startOk, startBlockNum, err = rawdb.TxNums.FindBlockNum(tx, startTxNum)
if err != nil {
return err
}
if startBlockNum > 0 {
startBlockNum--
startTxNum, err = rawdb.TxNums.Min(tx, startBlockNum)
if err != nil {
return err
}
}
endOk, endBlockNum, err = rawdb.TxNums.FindBlockNum(tx, endTxNum)
if err != nil {
return err
}
return nil
}); err != nil {
return err
}
if !startOk {
return fmt.Errorf("step startTxNum not found in snapshot blocks: %d", startTxNum)
}
if !endOk {
return fmt.Errorf("step endTxNum not found in snapshot blocks: %d", endTxNum)
}
if last {
endBlockNum = blockNum
}
fmt.Printf("startTxNum = %d, endTxNum = %d, startBlockNum = %d, endBlockNum = %d\n", startTxNum, endTxNum, startBlockNum, endBlockNum)
var maxTxNum uint64 = startTxNum

rollbackCount := uint64(0)
prevCount := rs.DoneCount()
for i := 0; i < workerCount; i++ {
Expand Down Expand Up @@ -973,7 +976,6 @@ func reconstituteStep(last bool,
}
inputTxNum++
}
b, txs = nil, nil //nolint

core.BlockExecutionTimer.UpdateDuration(t)
}
Expand All @@ -991,11 +993,11 @@ func reconstituteStep(last bool,
}); err != nil {
return err
}
plainStateCollector := etl.NewCollector("recon plainState", dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize/2))
plainStateCollector := etl.NewCollector("recon plainState", dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize))
defer plainStateCollector.Close()
codeCollector := etl.NewCollector("recon code", dirs.Tmp, etl.NewOldestEntryBuffer(etl.BufferOptimalSize/2))
codeCollector := etl.NewCollector("recon code", dirs.Tmp, etl.NewOldestEntryBuffer(etl.BufferOptimalSize))
defer codeCollector.Close()
plainContractCollector := etl.NewCollector("recon plainContract", dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize/2))
plainContractCollector := etl.NewCollector("recon plainContract", dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize))
defer plainContractCollector.Close()
var transposedKey []byte
if err = db.View(ctx, func(roTx kv.Tx) error {
Expand Down Expand Up @@ -1213,7 +1215,7 @@ func ReconstituteState(ctx context.Context, s *StageState, dirs datadir.Dirs, wo
return mdbx.UtterlyNoSync | mdbx.NoMetaSync | mdbx.NoMemInit | mdbx.LifoReclaim | mdbx.WriteMap
}).
WriteMergeThreshold(2 * 8192).
PageSize(uint64(4 * datasize.KB)).
PageSize(uint64(64 * datasize.KB)).
WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.ReconTablesCfg }).
Open()
if err != nil {
Expand All @@ -1235,11 +1237,11 @@ func ReconstituteState(ctx context.Context, s *StageState, dirs datadir.Dirs, wo
}
}
db.Close()
plainStateCollector := etl.NewCollector("recon plainState", dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize/2))
plainStateCollector := etl.NewCollector("recon plainState", dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize))
defer plainStateCollector.Close()
codeCollector := etl.NewCollector("recon code", dirs.Tmp, etl.NewOldestEntryBuffer(etl.BufferOptimalSize/2))
codeCollector := etl.NewCollector("recon code", dirs.Tmp, etl.NewOldestEntryBuffer(etl.BufferOptimalSize))
defer codeCollector.Close()
plainContractCollector := etl.NewCollector("recon plainContract", dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize/2))
plainContractCollector := etl.NewCollector("recon plainContract", dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize))
defer plainContractCollector.Close()
fillWorker := exec3.NewFillWorker(txNum, aggSteps[len(aggSteps)-1])
t := time.Now()
Expand Down

0 comments on commit 9d20045

Please sign in to comment.