Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nil block during execution #10193

Merged
merged 2 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ import (
"github.com/ledgerwatch/erigon/rpc"
)

//const HistoryV3AggregationStep = 3_125_000 / 100 // use this to reduce step size for dev/debug

// BorDefaultMinerGasPrice defines the minimum gas price for bor validators to mine a transaction.
var BorDefaultMinerGasPrice = big.NewInt(30 * params.GWei)

Expand Down
10 changes: 5 additions & 5 deletions eth/stagedsync/exec3.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ func ExecV3(ctx context.Context,
Loop:
for blockNum = block; blockNum <= maxBlockNum; blockNum++ {
inputBlockNum.Store(blockNum)
b, err = blockWithSenders(chainDb, applyTx, blockReader, blockNum)
b, err = blockWithSenders(ctx, chainDb, applyTx, blockReader, blockNum)
if err != nil {
return err
}
Expand Down Expand Up @@ -750,15 +750,15 @@ Loop:
}
return nil
}
func blockWithSenders(db kv.RoDB, tx kv.Tx, blockReader services.BlockReader, blockNum uint64) (b *types.Block, err error) {
func blockWithSenders(ctx context.Context, db kv.RoDB, tx kv.Tx, blockReader services.BlockReader, blockNum uint64) (b *types.Block, err error) {
if tx == nil {
tx, err = db.BeginRo(context.Background())
tx, err = db.BeginRo(ctx)
if err != nil {
return nil, err
}
defer tx.Rollback()
}
return blockReader.BlockByNumber(context.Background(), tx, blockNum)
return blockReader.BlockByNumber(ctx, tx, blockNum)
}

func processResultQueue(in *exec22.QueueWithRetry, rws *exec22.ResultsQueue, outputTxNumIn uint64, rs *state.StateV3, agg *libstate.Aggregator, applyTx kv.Tx, backPressure chan struct{}, applyWorker *exec3.Worker, canRetry, forceStopAtBlockEnd bool) (outputTxNum uint64, conflicts, triggers int, processedBlockNum uint64, stopedAtBlockEnd bool, err error) {
Expand Down Expand Up @@ -1040,7 +1040,7 @@ func reconstituteStep(last bool,

for bn := startBlockNum; bn <= endBlockNum; bn++ {
t = time.Now()
b, err = blockWithSenders(chainDb, nil, blockReader, bn)
b, err = blockWithSenders(ctx, chainDb, nil, blockReader, bn)
if err != nil {
return err
}
Expand Down
78 changes: 73 additions & 5 deletions turbo/snapshotsync/freezeblocks/block_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,22 @@ func (r *BlockReader) HeadersRange(ctx context.Context, walker func(header *type
}

func (r *BlockReader) HeaderByNumber(ctx context.Context, tx kv.Getter, blockHeight uint64) (h *types.Header, err error) {
//TODO: investigate why code blolow causing getting error `Could not set forkchoice app=caplin stage=ForkChoice err="execution Client RPC failed to retrieve ForkChoiceUpdate response, err: unknown ancestor"`
//maxBlockNumInFiles := r.sn.BlocksAvailable()
//if maxBlockNumInFiles == 0 || blockHeight > maxBlockNumInFiles {
// if tx == nil {
// return nil, nil
// }
// blockHash, err := rawdb.ReadCanonicalHash(tx, blockHeight)
// if err != nil {
// return nil, err
// }
// if blockHash == (common.Hash{}) {
// return nil, nil
// }
// h = rawdb.ReadHeader(tx, blockHash, blockHeight)
// return h, nil
//}
if tx != nil {
blockHash, err := rawdb.ReadCanonicalHash(tx, blockHeight)
if err != nil {
Expand Down Expand Up @@ -445,6 +461,15 @@ func (r *BlockReader) CanonicalHash(ctx context.Context, tx kv.Getter, blockHeig
}

func (r *BlockReader) Header(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (h *types.Header, err error) {
//TODO: investigate why code blolow causing getting error `Could not set forkchoice app=caplin stage=ForkChoice err="execution Client RPC failed to retrieve ForkChoiceUpdate response, err: unknown ancestor"`
//maxBlockNumInFiles := r.sn.BlocksAvailable()
//if maxBlockNumInFiles == 0 || blockHeight > maxBlockNumInFiles {
// if tx == nil {
// return nil, nil
// }
// h = rawdb.ReadHeader(tx, hash, blockHeight)
// return h, nil
//}
if tx != nil {
h = rawdb.ReadHeader(tx, hash, blockHeight)
if h != nil {
Expand All @@ -466,14 +491,30 @@ func (r *BlockReader) Header(ctx context.Context, tx kv.Getter, hash common.Hash
}

func (r *BlockReader) BodyWithTransactions(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (body *types.Body, err error) {
if tx != nil {
var dbgPrefix string
dbgLogs := dbg.Enabled(ctx)
if dbgLogs {
dbgPrefix = fmt.Sprintf("[dbg] BlockReader(idxMax=%d,segMax=%d).BodyWithTransactions(hash=%x,blk=%d) -> ", r.sn.idxMax.Load(), r.sn.segmentsMax.Load(), hash, blockHeight)
}

maxBlockNumInFiles := r.sn.BlocksAvailable()
if maxBlockNumInFiles == 0 || blockHeight > maxBlockNumInFiles {
if tx == nil {
if dbgLogs {
log.Info(dbgPrefix + "RoTx is nil")
}
return nil, nil
}
body, err = rawdb.ReadBodyWithTransactions(tx, hash, blockHeight)
if err != nil {
return nil, err
}
if body != nil {
return body, nil
}
if dbgLogs {
log.Info(dbgPrefix + "found in db=false")
}
}

view := r.sn.View()
Expand All @@ -484,26 +525,41 @@ func (r *BlockReader) BodyWithTransactions(ctx context.Context, tx kv.Getter, ha
var buf []byte
seg, ok := view.BodiesSegment(blockHeight)
if !ok {
if dbgLogs {
log.Info(dbgPrefix + "no bodies file for this block num")
}
return nil, nil
}
body, baseTxnID, txsAmount, buf, err = r.bodyFromSnapshot(blockHeight, seg, buf)
if err != nil {
return nil, err
}
if body == nil {
if dbgLogs {
log.Info(dbgPrefix + "got nil body from file")
}
return nil, nil
}
txnSeg, ok := view.TxsSegment(blockHeight)
if !ok {
if dbgLogs {
log.Info(dbgPrefix+"no transactions file for this block num", "r.sn.BlocksAvailable()", r.sn.BlocksAvailable(), "r.sn.idxMax", r.sn.idxMax.Load(), "r.sn.segmetntsMax", r.sn.segmentsMax.Load())
}
return nil, nil
}
txs, senders, err := r.txsFromSnapshot(baseTxnID, txsAmount, txnSeg, buf)
if err != nil {
return nil, err
}
if txs == nil {
if dbgLogs {
log.Info(dbgPrefix + "got nil txs from file")
}
return nil, nil
}
if dbgLogs {
log.Info(dbgPrefix+"got non-nil txs from file", "len(txs)", len(txs))
}
body.Transactions = txs
body.SendersToTxs(senders)
return body, nil
Expand All @@ -524,6 +580,9 @@ func (r *BlockReader) BodyRlp(ctx context.Context, tx kv.Getter, hash common.Has
func (r *BlockReader) Body(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (body *types.Body, txAmount uint32, err error) {
maxBlockNumInFiles := r.sn.BlocksAvailable()
if maxBlockNumInFiles == 0 || blockHeight > maxBlockNumInFiles {
if tx == nil {
return nil, 0, nil
}
body, _, txAmount = rawdb.ReadBody(tx, hash, blockHeight)
return body, txAmount, nil
}
Expand Down Expand Up @@ -556,11 +615,17 @@ func (r *BlockReader) blockWithSenders(ctx context.Context, tx kv.Getter, hash c
var dbgPrefix string
dbgLogs := dbg.Enabled(ctx)
if dbgLogs {
dbgPrefix = fmt.Sprintf("[dbg] BlockReader.blockWithSenders(hash=%x,blk=%d,forceCanonical=%t) -> ", hash, blockHeight, forceCanonical)
dbgPrefix = fmt.Sprintf("[dbg] BlockReader(idxMax=%d,segMax=%d).blockWithSenders(hash=%x,blk=%d) -> ", r.sn.idxMax.Load(), r.sn.segmentsMax.Load(), hash, blockHeight)
}

maxBlockNumInFiles := r.sn.BlocksAvailable()
if tx != nil && (maxBlockNumInFiles == 0 || blockHeight > maxBlockNumInFiles) {
if maxBlockNumInFiles == 0 || blockHeight > maxBlockNumInFiles {
if tx == nil {
if dbgLogs {
log.Info(dbgPrefix + "RoTx is nil")
}
return nil, nil, nil
}
if forceCanonical {
canonicalHash, err := rawdb.ReadCanonicalHash(tx, blockHeight)
if err != nil {
Expand Down Expand Up @@ -648,7 +713,7 @@ func (r *BlockReader) blockWithSenders(ctx context.Context, tx kv.Getter, hash c
txnSeg, ok := view.TxsSegment(blockHeight)
if !ok {
if dbgLogs {
log.Info(dbgPrefix + "no transactions file for this block num")
log.Info(dbgPrefix+"no transactions file for this block num", "r.sn.BlocksAvailable()", r.sn.BlocksAvailable(), "r.sn.indicesReady", r.sn.indicesReady.Load())
}
return
}
Expand Down Expand Up @@ -1215,7 +1280,10 @@ func (r *BlockReader) BorStartEventID(ctx context.Context, tx kv.Tx, hash common

func (r *BlockReader) EventsByBlock(ctx context.Context, tx kv.Tx, hash common.Hash, blockHeight uint64) ([]rlp.RawValue, error) {
maxBlockNumInFiles := r.FrozenBorBlocks()
if tx != nil && (maxBlockNumInFiles == 0 || blockHeight > maxBlockNumInFiles) {
if maxBlockNumInFiles == 0 || blockHeight > maxBlockNumInFiles {
if tx == nil {
return nil, nil
}
c, err := tx.Cursor(kv.BorEventNums)
if err != nil {
return nil, err
Expand Down
62 changes: 7 additions & 55 deletions turbo/snapshotsync/freezeblocks/block_snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,33 +325,6 @@ func (s *RoSnapshots) EnsureExpectedBlocksAreAvailable(cfg *snapcfg.Cfg) error {
return nil
}

func (s *RoSnapshots) blocksAvailable(in snaptype.Type, indexed bool) uint64 {
if s == nil {
return 0
}

var max uint64

if segtype, ok := s.segments.Get(in.Enum()); ok {
if indexed {
for _, seg := range segtype.segments {
if !seg.IsIndexed() {
break
}

max = seg.to - 1
}
} else {
if seglen := len(segtype.segments); seglen > 0 {
max = segtype.segments[seglen-1].to
}
}

}

return max
}

func (s *RoSnapshots) Types() []snaptype.Type { return s.types }
func (s *RoSnapshots) HasType(in snaptype.Type) bool {
for _, t := range s.types {
Expand Down Expand Up @@ -414,47 +387,28 @@ func (s *RoSnapshots) EnableMadvNormal() *RoSnapshots {
}

func (s *RoSnapshots) idxAvailability() uint64 {
max := make([]uint64, 0, len(s.Types()))
_max := make([]uint64, len(s.Types()))
i := 0

seglen := 0

s.segments.Scan(func(segtype snaptype.Enum, value *segments) bool {
if l := len(value.segments); l > seglen {
seglen = l
}
return true
})

s.segments.Scan(func(segtype snaptype.Enum, value *segments) bool {
if !s.HasType(segtype.Type()) {
return true
}

if len(value.segments) < seglen {
return true
}

max = append(max, 0)

for _, seg := range value.segments {
if !seg.IsIndexed() {
break
}

max[i] = seg.to - 1
_max[i] = seg.to - 1
}

i++
return true
})

var min uint64 = math.MaxUint64
for _, maxEl := range max {
min = cmp.Min(min, maxEl)
var _min uint64 = math.MaxUint64
for _, maxEl := range _max {
_min = cmp.Min(_min, maxEl)
}

return min
return _min
}

// OptimisticReopenWithDB - optimistically open snapshots (ignoring error), useful at App startup because:
Expand Down Expand Up @@ -610,9 +564,7 @@ func (s *RoSnapshots) rebuildSegments(fileNames []string, open bool, optimistic
}

if f.To > 0 {
if f.To > segmentsMax {
segmentsMax = f.To - 1
}
segmentsMax = f.To - 1
} else {
segmentsMax = 0
}
Expand Down
2 changes: 1 addition & 1 deletion turbo/snapshotsync/freezeblocks/bor_snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (br *BlockRetire) retireBorBlocks(ctx context.Context, minBlockNum uint64,
for _, snaptype := range blockReader.BorSnapshots().Types() {
minSnapNum := minBlockNum

if available := blockReader.BorSnapshots().(*BorRoSnapshots).blocksAvailable(snaptype, false); available < minBlockNum {
if available := blockReader.BorSnapshots().SegmentsMax(); available < minBlockNum {
minSnapNum = available
}

Expand Down
Loading