From 070723ab036d7965b6a16d9a251eb739f7ecc210 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Fri, 3 May 2024 15:32:43 +0700 Subject: [PATCH 1/2] save --- eth/ethconfig/config.go | 2 - eth/stagedsync/exec3.go | 10 +-- .../snapshotsync/freezeblocks/block_reader.go | 83 +++++++++++++++++-- .../freezeblocks/block_snapshots.go | 62 ++------------ .../freezeblocks/bor_snapshots.go | 2 +- 5 files changed, 90 insertions(+), 69 deletions(-) diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index ae52182f576..7492038d4fd 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -45,8 +45,6 @@ import ( "github.com/ledgerwatch/erigon/rpc" ) -//const HistoryV3AggregationStep = 3_125_000 / 100 // use this to reduce step size for dev/debug - // BorDefaultMinerGasPrice defines the minimum gas price for bor validators to mine a transaction. var BorDefaultMinerGasPrice = big.NewInt(30 * params.GWei) diff --git a/eth/stagedsync/exec3.go b/eth/stagedsync/exec3.go index b90124c1ceb..eee5c9581ae 100644 --- a/eth/stagedsync/exec3.go +++ b/eth/stagedsync/exec3.go @@ -512,7 +512,7 @@ func ExecV3(ctx context.Context, Loop: for blockNum = block; blockNum <= maxBlockNum; blockNum++ { inputBlockNum.Store(blockNum) - b, err = blockWithSenders(chainDb, applyTx, blockReader, blockNum) + b, err = blockWithSenders(ctx, chainDb, applyTx, blockReader, blockNum) if err != nil { return err } @@ -750,15 +750,15 @@ Loop: } return nil } -func blockWithSenders(db kv.RoDB, tx kv.Tx, blockReader services.BlockReader, blockNum uint64) (b *types.Block, err error) { +func blockWithSenders(ctx context.Context, db kv.RoDB, tx kv.Tx, blockReader services.BlockReader, blockNum uint64) (b *types.Block, err error) { if tx == nil { - tx, err = db.BeginRo(context.Background()) + tx, err = db.BeginRo(ctx) if err != nil { return nil, err } defer tx.Rollback() } - return blockReader.BlockByNumber(context.Background(), tx, blockNum) + return blockReader.BlockByNumber(ctx, tx, blockNum) } func processResultQueue(in *exec22.QueueWithRetry, rws *exec22.ResultsQueue, outputTxNumIn uint64, rs *state.StateV3, agg *libstate.Aggregator, applyTx kv.Tx, backPressure chan struct{}, applyWorker *exec3.Worker, canRetry, forceStopAtBlockEnd bool) (outputTxNum uint64, conflicts, triggers int, processedBlockNum uint64, stopedAtBlockEnd bool, err error) { @@ -1040,7 +1040,7 @@ func reconstituteStep(last bool, for bn := startBlockNum; bn <= endBlockNum; bn++ { t = time.Now() - b, err = blockWithSenders(chainDb, nil, blockReader, bn) + b, err = blockWithSenders(ctx, chainDb, nil, blockReader, bn) if err != nil { return err } diff --git a/turbo/snapshotsync/freezeblocks/block_reader.go b/turbo/snapshotsync/freezeblocks/block_reader.go index 6d292763874..8f2e0666acc 100644 --- a/turbo/snapshotsync/freezeblocks/block_reader.go +++ b/turbo/snapshotsync/freezeblocks/block_reader.go @@ -356,6 +356,22 @@ func (r *BlockReader) HeadersRange(ctx context.Context, walker func(header *type } func (r *BlockReader) HeaderByNumber(ctx context.Context, tx kv.Getter, blockHeight uint64) (h *types.Header, err error) { + //TODO: investigate why code blolow causing getting error `Could not set forkchoice app=caplin stage=ForkChoice err="execution Client RPC failed to retrieve ForkChoiceUpdate response, err: unknown ancestor"` + //maxBlockNumInFiles := r.sn.BlocksAvailable() + //if maxBlockNumInFiles == 0 || blockHeight > maxBlockNumInFiles { + // if tx == nil { + // return nil, nil + // } + // blockHash, err := rawdb.ReadCanonicalHash(tx, blockHeight) + // if err != nil { + // return nil, err + // } + // if blockHash == (common.Hash{}) { + // return nil, nil + // } + // h = rawdb.ReadHeader(tx, blockHash, blockHeight) + // return h, nil + //} if tx != nil { blockHash, err := rawdb.ReadCanonicalHash(tx, blockHeight) if err != nil { @@ -445,6 +461,15 @@ func (r *BlockReader) CanonicalHash(ctx context.Context, tx kv.Getter, blockHeig } func (r *BlockReader) Header(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (h *types.Header, err error) { + //TODO: investigate why code blolow causing getting error `Could not set forkchoice app=caplin stage=ForkChoice err="execution Client RPC failed to retrieve ForkChoiceUpdate response, err: unknown ancestor"` + //maxBlockNumInFiles := r.sn.BlocksAvailable() + //if maxBlockNumInFiles == 0 || blockHeight > maxBlockNumInFiles { + // if tx == nil { + // return nil, nil + // } + // h = rawdb.ReadHeader(tx, hash, blockHeight) + // return h, nil + //} if tx != nil { h = rawdb.ReadHeader(tx, hash, blockHeight) if h != nil { @@ -466,7 +491,20 @@ func (r *BlockReader) Header(ctx context.Context, tx kv.Getter, hash common.Hash } func (r *BlockReader) BodyWithTransactions(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (body *types.Body, err error) { - if tx != nil { + var dbgPrefix string + dbgLogs := dbg.Enabled(ctx) + if dbgLogs { + dbgPrefix = fmt.Sprintf("[dbg] BlockReader(idxMax=%d,segMax=%d).BodyWithTransactions(hash=%x,blk=%d) -> ", r.sn.idxMax.Load(), r.sn.segmentsMax.Load(), hash, blockHeight) + } + + maxBlockNumInFiles := r.sn.BlocksAvailable() + if maxBlockNumInFiles == 0 || blockHeight > maxBlockNumInFiles { + if tx == nil { + if dbgLogs { + log.Info(dbgPrefix + "RoTx is nil") + } + return nil, nil + } body, err = rawdb.ReadBodyWithTransactions(tx, hash, blockHeight) if err != nil { return nil, err @@ -474,6 +512,9 @@ func (r *BlockReader) BodyWithTransactions(ctx context.Context, tx kv.Getter, ha if body != nil { return body, nil } + if dbgLogs { + log.Info(dbgPrefix + "found in db=false") + } } view := r.sn.View() @@ -484,6 +525,9 @@ func (r *BlockReader) BodyWithTransactions(ctx context.Context, tx kv.Getter, ha var buf []byte seg, ok := view.BodiesSegment(blockHeight) if !ok { + if dbgLogs { + log.Info(dbgPrefix + "no bodies file for this block num") + } return nil, nil } body, baseTxnID, txsAmount, buf, err = r.bodyFromSnapshot(blockHeight, seg, buf) @@ -491,10 +535,16 @@ func (r *BlockReader) BodyWithTransactions(ctx context.Context, tx kv.Getter, ha return nil, err } if body == nil { + if dbgLogs { + log.Info(dbgPrefix + "got nil body from file") + } return nil, nil } txnSeg, ok := view.TxsSegment(blockHeight) if !ok { + if dbgLogs { + log.Info(dbgPrefix+"no transactions file for this block num", "r.sn.BlocksAvailable()", r.sn.BlocksAvailable(), "r.sn.idxMax", r.sn.idxMax.Load(), "r.sn.segmetntsMax", r.sn.segmentsMax.Load()) + } return nil, nil } txs, senders, err := r.txsFromSnapshot(baseTxnID, txsAmount, txnSeg, buf) @@ -502,8 +552,14 @@ func (r *BlockReader) BodyWithTransactions(ctx context.Context, tx kv.Getter, ha return nil, err } if txs == nil { + if dbgLogs { + log.Info(dbgPrefix + "got nil txs from file") + } return nil, nil } + if dbgLogs { + log.Info(dbgPrefix+"got non-nil txs from file", "len(txs)", len(txs)) + } body.Transactions = txs body.SendersToTxs(senders) return body, nil @@ -524,6 +580,9 @@ func (r *BlockReader) BodyRlp(ctx context.Context, tx kv.Getter, hash common.Has func (r *BlockReader) Body(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (body *types.Body, txAmount uint32, err error) { maxBlockNumInFiles := r.sn.BlocksAvailable() if maxBlockNumInFiles == 0 || blockHeight > maxBlockNumInFiles { + if tx == nil { + return nil, 0, nil + } body, _, txAmount = rawdb.ReadBody(tx, hash, blockHeight) return body, txAmount, nil } @@ -556,11 +615,17 @@ func (r *BlockReader) blockWithSenders(ctx context.Context, tx kv.Getter, hash c var dbgPrefix string dbgLogs := dbg.Enabled(ctx) if dbgLogs { - dbgPrefix = fmt.Sprintf("[dbg] BlockReader.blockWithSenders(hash=%x,blk=%d,forceCanonical=%t) -> ", hash, blockHeight, forceCanonical) + dbgPrefix = fmt.Sprintf("[dbg] BlockReader(idxMax=%d,segMax=%d).blockWithSenders(hash=%x,blk=%d) -> ", r.sn.idxMax.Load(), r.sn.segmentsMax.Load(), hash, blockHeight) } maxBlockNumInFiles := r.sn.BlocksAvailable() - if tx != nil && (maxBlockNumInFiles == 0 || blockHeight > maxBlockNumInFiles) { + if maxBlockNumInFiles == 0 || blockHeight > maxBlockNumInFiles { + if tx == nil { + if dbgLogs { + log.Info(dbgPrefix + "RoTx is nil") + } + return nil, nil, nil + } if forceCanonical { canonicalHash, err := rawdb.ReadCanonicalHash(tx, blockHeight) if err != nil { @@ -648,7 +713,7 @@ func (r *BlockReader) blockWithSenders(ctx context.Context, tx kv.Getter, hash c txnSeg, ok := view.TxsSegment(blockHeight) if !ok { if dbgLogs { - log.Info(dbgPrefix + "no transactions file for this block num") + log.Info(dbgPrefix+"no transactions file for this block num", "r.sn.BlocksAvailable()", r.sn.BlocksAvailable(), "r.sn.indicesReady", r.sn.indicesReady.Load()) } return } @@ -1215,7 +1280,10 @@ func (r *BlockReader) BorStartEventID(ctx context.Context, tx kv.Tx, hash common func (r *BlockReader) EventsByBlock(ctx context.Context, tx kv.Tx, hash common.Hash, blockHeight uint64) ([]rlp.RawValue, error) { maxBlockNumInFiles := r.FrozenBorBlocks() - if tx != nil && (maxBlockNumInFiles == 0 || blockHeight > maxBlockNumInFiles) { + if maxBlockNumInFiles == 0 || blockHeight > maxBlockNumInFiles { + if tx == nil { + return nil, nil + } c, err := tx.Cursor(kv.BorEventNums) if err != nil { return nil, err @@ -1470,7 +1538,10 @@ func (r *BlockReader) Span(ctx context.Context, tx kv.Getter, spanId uint64) ([] var buf [8]byte binary.BigEndian.PutUint64(buf[:], spanId) maxBlockNumInFiles := r.FrozenBorBlocks() - if tx != nil && (maxBlockNumInFiles == 0 || endBlock > maxBlockNumInFiles) { + if maxBlockNumInFiles == 0 || endBlock > maxBlockNumInFiles { + if tx == nil { + return nil, nil + } v, err := tx.GetOne(kv.BorSpans, buf[:]) if err != nil { return nil, err diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots.go b/turbo/snapshotsync/freezeblocks/block_snapshots.go index a0e97cc5d8e..880c0b9ef96 100644 --- a/turbo/snapshotsync/freezeblocks/block_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/block_snapshots.go @@ -325,33 +325,6 @@ func (s *RoSnapshots) EnsureExpectedBlocksAreAvailable(cfg *snapcfg.Cfg) error { return nil } -func (s *RoSnapshots) blocksAvailable(in snaptype.Type, indexed bool) uint64 { - if s == nil { - return 0 - } - - var max uint64 - - if segtype, ok := s.segments.Get(in.Enum()); ok { - if indexed { - for _, seg := range segtype.segments { - if !seg.IsIndexed() { - break - } - - max = seg.to - 1 - } - } else { - if seglen := len(segtype.segments); seglen > 0 { - max = segtype.segments[seglen-1].to - } - } - - } - - return max -} - func (s *RoSnapshots) Types() []snaptype.Type { return s.types } func (s *RoSnapshots) HasType(in snaptype.Type) bool { for _, t := range s.types { @@ -414,47 +387,28 @@ func (s *RoSnapshots) EnableMadvNormal() *RoSnapshots { } func (s *RoSnapshots) idxAvailability() uint64 { - max := make([]uint64, 0, len(s.Types())) + _max := make([]uint64, len(s.Types())) i := 0 - seglen := 0 - - s.segments.Scan(func(segtype snaptype.Enum, value *segments) bool { - if l := len(value.segments); l > seglen { - seglen = l - } - return true - }) - s.segments.Scan(func(segtype snaptype.Enum, value *segments) bool { - if !s.HasType(segtype.Type()) { - return true - } - - if len(value.segments) < seglen { - return true - } - - max = append(max, 0) - for _, seg := range value.segments { if !seg.IsIndexed() { break } - max[i] = seg.to - 1 + _max[i] = seg.to - 1 } i++ return true }) - var min uint64 = math.MaxUint64 - for _, maxEl := range max { - min = cmp.Min(min, maxEl) + var _min uint64 = math.MaxUint64 + for _, maxEl := range _max { + _min = cmp.Min(_min, maxEl) } - return min + return _min } // OptimisticReopenWithDB - optimistically open snapshots (ignoring error), useful at App startup because: @@ -610,9 +564,7 @@ func (s *RoSnapshots) rebuildSegments(fileNames []string, open bool, optimistic } if f.To > 0 { - if f.To > segmentsMax { - segmentsMax = f.To - 1 - } + segmentsMax = f.To - 1 } else { segmentsMax = 0 } diff --git a/turbo/snapshotsync/freezeblocks/bor_snapshots.go b/turbo/snapshotsync/freezeblocks/bor_snapshots.go index 4c6fa8a8221..c0cf858d15b 100644 --- a/turbo/snapshotsync/freezeblocks/bor_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/bor_snapshots.go @@ -36,7 +36,7 @@ func (br *BlockRetire) retireBorBlocks(ctx context.Context, minBlockNum uint64, for _, snaptype := range blockReader.BorSnapshots().Types() { minSnapNum := minBlockNum - if available := blockReader.BorSnapshots().(*BorRoSnapshots).blocksAvailable(snaptype, false); available < minBlockNum { + if available := blockReader.BorSnapshots().SegmentsMax(); available < minBlockNum { minSnapNum = available } From e41de5b5e3841980f6b6e9b43e2f665b3c702333 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Fri, 3 May 2024 17:02:14 +0700 Subject: [PATCH 2/2] save --- turbo/snapshotsync/freezeblocks/block_reader.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/turbo/snapshotsync/freezeblocks/block_reader.go b/turbo/snapshotsync/freezeblocks/block_reader.go index 8f2e0666acc..bc1b33033c6 100644 --- a/turbo/snapshotsync/freezeblocks/block_reader.go +++ b/turbo/snapshotsync/freezeblocks/block_reader.go @@ -1538,10 +1538,7 @@ func (r *BlockReader) Span(ctx context.Context, tx kv.Getter, spanId uint64) ([] var buf [8]byte binary.BigEndian.PutUint64(buf[:], spanId) maxBlockNumInFiles := r.FrozenBorBlocks() - if maxBlockNumInFiles == 0 || endBlock > maxBlockNumInFiles { - if tx == nil { - return nil, nil - } + if tx != nil && (maxBlockNumInFiles == 0 || endBlock > maxBlockNumInFiles) { v, err := tx.GetOne(kv.BorSpans, buf[:]) if err != nil { return nil, err