Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc bottleneck: block files mutex #440

Merged
merged 1 commit into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 75 additions & 66 deletions turbo/snapshotsync/freezeblocks/block_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sort"
"time"

borsnaptype "github.com/ledgerwatch/erigon/polygon/bor/snaptype"
"github.com/ledgerwatch/log/v3"

"github.com/ledgerwatch/erigon-lib/common/hexutility"
Expand Down Expand Up @@ -395,12 +396,11 @@ func (r *BlockReader) HeaderByNumber(ctx context.Context, tx kv.Getter, blockHei
}
}

view := r.sn.View()
defer view.Close()
seg, ok := view.HeadersSegment(blockHeight)
seg, ok, release := r.sn.ViewSingleFile(coresnaptype.Headers, blockHeight)
if !ok {
return
}
defer release()

h, _, err = r.headerFromSnapshot(blockHeight, seg, nil)
if err != nil {
Expand All @@ -419,9 +419,8 @@ func (r *BlockReader) HeaderByHash(ctx context.Context, tx kv.Getter, hash commo
return h, nil
}

view := r.sn.View()
defer view.Close()
segments := view.Headers()
segments, release := r.sn.ViewType(coresnaptype.Headers)
defer release()

buf := make([]byte, 128)
for i := len(segments) - 1; i >= 0; i-- {
Expand Down Expand Up @@ -451,12 +450,11 @@ func (r *BlockReader) CanonicalHash(ctx context.Context, tx kv.Getter, blockHeig
return h, nil
}

view := r.sn.View()
defer view.Close()
seg, ok := view.HeadersSegment(blockHeight)
seg, ok, release := r.sn.ViewSingleFile(coresnaptype.Headers, blockHeight)
if !ok {
return
}
defer release()

header, _, err := r.headerFromSnapshot(blockHeight, seg, nil)
if err != nil {
Expand Down Expand Up @@ -486,12 +484,12 @@ func (r *BlockReader) Header(ctx context.Context, tx kv.Getter, hash common.Hash
}
}

view := r.sn.View()
defer view.Close()
seg, ok := view.HeadersSegment(blockHeight)
seg, ok, release := r.sn.ViewSingleFile(coresnaptype.Headers, blockHeight)
if !ok {
return
}
defer release()

h, _, err = r.headerFromSnapshot(blockHeight, seg, nil)
if err != nil {
return h, err
Expand Down Expand Up @@ -526,40 +524,46 @@ func (r *BlockReader) BodyWithTransactions(ctx context.Context, tx kv.Getter, ha
}
}

view := r.sn.View()
defer view.Close()

var baseTxnID uint64
var txsAmount uint32
var buf []byte
seg, ok := view.BodiesSegment(blockHeight)
seg, ok, release := r.sn.ViewSingleFile(coresnaptype.Bodies, blockHeight)
if !ok {
if dbgLogs {
log.Info(dbgPrefix + "no bodies file for this block num")
}
return nil, nil
}
body, baseTxnID, txsAmount, buf, err = r.bodyFromSnapshot(blockHeight, seg, buf)
defer release()

var baseTxnID uint64
var txCount uint32
var buf []byte
body, baseTxnID, txCount, buf, err = r.bodyFromSnapshot(blockHeight, seg, buf)
if err != nil {
return nil, err
}
release()

if body == nil {
if dbgLogs {
log.Info(dbgPrefix + "got nil body from file")
}
return nil, nil
}
txnSeg, ok := view.TxsSegment(blockHeight)

txnSeg, ok, release := r.sn.ViewSingleFile(coresnaptype.Transactions, blockHeight)
if !ok {
if dbgLogs {
log.Info(dbgPrefix+"no transactions file for this block num", "r.sn.BlocksAvailable()", r.sn.BlocksAvailable(), "r.sn.idxMax", r.sn.idxMax.Load(), "r.sn.segmetntsMax", r.sn.segmentsMax.Load())
}
return nil, nil
}
txs, senders, err := r.txsFromSnapshot(baseTxnID, txsAmount, txnSeg, buf)
defer release()

txs, senders, err := r.txsFromSnapshot(baseTxnID, txCount, txnSeg, buf)
if err != nil {
return nil, err
}
release()

if txs == nil {
if dbgLogs {
log.Info(dbgPrefix + "got nil txs from file")
Expand Down Expand Up @@ -606,13 +610,13 @@ func (r *BlockReader) Body(ctx context.Context, tx kv.Getter, hash common.Hash,
body, _, txAmount = rawdb.ReadBody(tx, hash, blockHeight)
return body, txAmount, nil
}
view := r.sn.View()
defer view.Close()

seg, ok := view.BodiesSegment(blockHeight)
seg, ok, release := r.sn.ViewSingleFile(coresnaptype.Bodies, blockHeight)
if !ok {
return
}
defer release()

body, _, txAmount, _, err = r.bodyFromSnapshot(blockHeight, seg, nil)
if err != nil {
return nil, 0, err
Expand Down Expand Up @@ -676,15 +680,14 @@ func (r *BlockReader) blockWithSenders(ctx context.Context, tx kv.Getter, hash c
return
}

view := r.sn.View()
defer view.Close()
seg, ok := view.HeadersSegment(blockHeight)
seg, ok, release := r.sn.ViewSingleFile(coresnaptype.Headers, blockHeight)
if !ok {
if dbgLogs {
log.Info(dbgPrefix + "no header files for this block num")
}
return
}
defer release()

var buf []byte
h, buf, err := r.headerFromSnapshot(blockHeight, seg, buf)
Expand All @@ -697,21 +700,26 @@ func (r *BlockReader) blockWithSenders(ctx context.Context, tx kv.Getter, hash c
}
return
}
release()

var b *types.Body
var baseTxnId uint64
var txsAmount uint32
bodySeg, ok := view.BodiesSegment(blockHeight)
bodySeg, ok, release := r.sn.ViewSingleFile(coresnaptype.Bodies, blockHeight)
if !ok {
if dbgLogs {
log.Info(dbgPrefix + "no bodies file for this block num")
}
return
}
defer release()

b, baseTxnId, txsAmount, buf, err = r.bodyFromSnapshot(blockHeight, bodySeg, buf)
if err != nil {
return nil, nil, err
}
release()

if b == nil {
if dbgLogs {
log.Info(dbgPrefix + "got nil body from file")
Expand All @@ -730,18 +738,21 @@ func (r *BlockReader) blockWithSenders(ctx context.Context, tx kv.Getter, hash c
return block, senders, nil
}

txnSeg, ok := view.TxsSegment(blockHeight)
txnSeg, ok, release := r.sn.ViewSingleFile(coresnaptype.Transactions, blockHeight)
if !ok {
if dbgLogs {
log.Info(dbgPrefix+"no transactions file for this block num", "r.sn.BlocksAvailable()", r.sn.BlocksAvailable(), "r.sn.indicesReady", r.sn.indicesReady.Load())
}
return
}
defer release()
var txs []types.Transaction
txs, senders, err = r.txsFromSnapshot(baseTxnId, txsAmount, txnSeg, buf)
if err != nil {
return nil, nil, err
}
release()

block = types.NewBlockFromStorage(hash, h, txs, b.Uncles, b.Withdrawals)
if len(senders) != block.Transactions().Len() {
if dbgLogs {
Expand Down Expand Up @@ -997,18 +1008,18 @@ func (r *BlockReader) TxnByIdxInBlock(ctx context.Context, tx kv.Getter, blockNu
return rawdb.TxnByIdxInBlock(tx, canonicalHash, blockNum, txIdxInBlock)
}

view := r.sn.View()
defer view.Close()
seg, ok := view.BodiesSegment(blockNum)
seg, ok, release := r.sn.ViewSingleFile(coresnaptype.Bodies, blockNum)
if !ok {
return
}
defer release()

var b *types.BodyForStorage
b, _, err = r.bodyForStorageFromSnapshot(blockNum, seg, nil)
if err != nil {
return nil, err
}
release()
if b == nil {
return
}
Expand All @@ -1018,10 +1029,12 @@ func (r *BlockReader) TxnByIdxInBlock(ctx context.Context, tx kv.Getter, blockNu
return nil, nil
}

txnSeg, ok := view.TxsSegment(blockNum)
txnSeg, ok, release := r.sn.ViewSingleFile(coresnaptype.Transactions, blockNum)
if !ok {
return
}
defer release()

// +1 because block has system-txn in the beginning of block
return r.txnByID(b.BaseTxId+1+uint64(txIdxInBlock), txnSeg, nil)
}
Expand All @@ -1036,10 +1049,9 @@ func (r *BlockReader) TxnLookup(_ context.Context, tx kv.Getter, txnHash common.
return *n, true, nil
}

view := r.sn.View()
defer view.Close()

_, blockNum, ok, err := r.txnByHash(txnHash, view.Txs(), nil)
txns, release := r.sn.ViewType(coresnaptype.Transactions)
defer release()
_, blockNum, ok, err := r.txnByHash(txnHash, txns, nil)
if err != nil {
return 0, false, err
}
Expand All @@ -1048,13 +1060,11 @@ func (r *BlockReader) TxnLookup(_ context.Context, tx kv.Getter, txnHash common.
}

func (r *BlockReader) FirstTxnNumNotInSnapshots() uint64 {
view := r.sn.View()
defer view.Close()

sn, ok := view.TxsSegment(r.sn.BlocksAvailable())
sn, ok, release := r.sn.ViewSingleFile(coresnaptype.Transactions, r.sn.BlocksAvailable())
if !ok {
return 0
}
defer release()

lastTxnID := sn.Index(coresnaptype.Indexes.TxnHash).BaseDataID() + uint64(sn.Count())
return lastTxnID
Expand Down Expand Up @@ -1268,10 +1278,10 @@ func (r *BlockReader) BorStartEventID(ctx context.Context, tx kv.Tx, hash common
}

borTxHash := bortypes.ComputeBorTxHash(blockHeight, hash)
view := r.borSn.View()
defer view.Close()

segments := view.Events()
segments, release := r.borSn.ViewType(borsnaptype.BorEvents)
defer release()

for i := len(segments) - 1; i >= 0; i-- {
sn := segments[i]
if sn.from > blockHeight {
Expand Down Expand Up @@ -1349,9 +1359,10 @@ func (r *BlockReader) EventsByBlock(ctx context.Context, tx kv.Tx, hash common.H
return result, nil
}
borTxHash := bortypes.ComputeBorTxHash(blockHeight, hash)
view := r.borSn.View()
defer view.Close()
segments := view.Events()

segments, release := r.borSn.ViewType(borsnaptype.BorEvents)
defer release()

var buf []byte
result := []rlp.RawValue{}
for i := len(segments) - 1; i >= 0; i-- {
Expand Down Expand Up @@ -1389,10 +1400,9 @@ func (r *BlockReader) EventsByBlock(ctx context.Context, tx kv.Tx, hash common.H

// EventsByIdFromSnapshot returns the list of records limited by time, or the number of records along with a bool value to signify if the records were limited by time
func (r *BlockReader) EventsByIdFromSnapshot(from uint64, to time.Time, limit int) ([]*heimdall.EventRecordWithTime, bool, error) {
view := r.borSn.View()
defer view.Close()
segments, release := r.borSn.ViewType(borsnaptype.BorEvents)
defer release()

segments := view.Events()
var buf []byte
var result []*heimdall.EventRecordWithTime
stateContract := bor.GenesisContractStateReceiverABI()
Expand Down Expand Up @@ -1469,9 +1479,9 @@ func (r *BlockReader) LastFrozenEventId() uint64 {
return 0
}

view := r.borSn.View()
defer view.Close()
segments := view.Events()
segments, release := r.borSn.ViewType(borsnaptype.BorEvents)
defer release()

if len(segments) == 0 {
return 0
}
Expand Down Expand Up @@ -1526,9 +1536,9 @@ func (r *BlockReader) LastFrozenSpanId() uint64 {
return 0
}

view := r.borSn.View()
defer view.Close()
segments := view.Spans()
segments, release := r.borSn.ViewType(borsnaptype.BorSpans)
defer release()

if len(segments) == 0 {
return 0
}
Expand Down Expand Up @@ -1570,9 +1580,9 @@ func (r *BlockReader) Span(ctx context.Context, tx kv.Getter, spanId uint64) ([]
}
return common.Copy(v), nil
}
view := r.borSn.View()
defer view.Close()
segments := view.Spans()
segments, release := r.borSn.ViewType(borsnaptype.BorSpans)
defer release()

for i := len(segments) - 1; i >= 0; i-- {
sn := segments[i]
idx := sn.Index()
Expand Down Expand Up @@ -1674,9 +1684,9 @@ func (r *BlockReader) Checkpoint(ctx context.Context, tx kv.Getter, checkpointId
return common.Copy(v), nil
}

view := r.borSn.View()
defer view.Close()
segments := view.Checkpoints()
segments, release := r.borSn.ViewType(borsnaptype.BorCheckpoints)
defer release()

for i := len(segments) - 1; i >= 0; i-- {
sn := segments[i]
index := sn.Index()
Expand All @@ -1700,9 +1710,8 @@ func (r *BlockReader) LastFrozenCheckpointId() uint64 {
return 0
}

view := r.borSn.View()
defer view.Close()
segments := view.Checkpoints()
segments, release := r.borSn.ViewType(borsnaptype.BorCheckpoints)
defer release()
if len(segments) == 0 {
return 0
}
Expand Down
Loading
Loading