Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bor waypoint storage #9793

Merged
merged 36 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
86b8736
added refactor to localize bor snapshot types
mh0lt Mar 18, 2024
8b0fcb8
refactored bor snapshots with additional types
mh0lt Mar 20, 2024
d965303
mistone and checkpoint processing initial testing
mh0lt Mar 22, 2024
5cf8950
Merge branch 'devel' into bor_waypoint_storage
mh0lt Mar 22, 2024
c7a4981
Merge branch 'devel' into bor_waypoint_storage
mh0lt Mar 22, 2024
3b81c53
fix erigon-lib dependencies
mh0lt Mar 25, 2024
7361dea
missing param
mh0lt Mar 25, 2024
d347575
Handle windows.ERROR_NO_MORE_FILES when listing directories (go 1.21 …
mh0lt Mar 27, 2024
25987b9
bor_hiemdall tested post download
mh0lt Mar 27, 2024
8c13ede
snapshot catch up processing
mh0lt Mar 27, 2024
7b191e3
performance and sync fixes
mh0lt Mar 28, 2024
8c42e00
drop windows dependency
mh0lt Mar 28, 2024
8b7fd68
sonnar fixes
mh0lt Mar 28, 2024
2f0e97c
Merge branch 'devel' into bor_waypoint_storage
mh0lt Mar 29, 2024
1a8fe42
added salt from freeze blocks
mh0lt Mar 29, 2024
f05f62b
post merge fixes - moved salt to snaptype
mh0lt Mar 29, 2024
081dc46
return ErrEventRecordNotFound when no record available
mh0lt Mar 29, 2024
d0de01c
add ErrNoChainHead exclusion for status reporting
mh0lt Mar 29, 2024
be4bd17
add unwind support for waypoints
mh0lt Mar 31, 2024
e7ed6c7
Merge branch 'devel' into bor_waypoint_storage
mh0lt Mar 31, 2024
1b1980c
fixed post merge (updated types)
mh0lt Mar 31, 2024
b7ea917
process fixes + waypoint pruning
mh0lt Apr 1, 2024
b8dd95d
fix mumbai endpoint for sync events
mh0lt Apr 1, 2024
c1dcb0e
added last checkpoint processing
mh0lt Apr 1, 2024
f50f043
fixed checkpoint prune + unwind
mh0lt Apr 9, 2024
a7e4123
Merge branch 'devel' into bor_waypoint_storage
mh0lt Apr 9, 2024
1c47a3a
post merge fixes
mh0lt Apr 9, 2024
20f3ae1
Merge branch 'devel' into bor_waypoint_storage
mh0lt Apr 22, 2024
3f9e211
post merge fixes
mh0lt Apr 22, 2024
b45a7e5
bor_types -> bortypes
mh0lt Apr 22, 2024
c3089d3
bor_snaptype, core_snaptype -> borsnaptype, coresnaptype
mh0lt Apr 22, 2024
c426c06
Merge branch 'devel' into bor_waypoint_storage
mh0lt Apr 24, 2024
97b404f
Merge branch 'devel' into bor_waypoint_storage
mh0lt Apr 27, 2024
c5afe5b
fix import conflict
mh0lt Apr 27, 2024
ca86cb8
fix lint
mh0lt Apr 27, 2024
b1a7bd1
fix test
mh0lt Apr 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions cmd/capcli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,12 @@ func (c *DumpSnapshots) Run(ctx *Context) error {
return
})

salt := freezeblocks.GetIndicesSalt(dirs.Snap)
salt, err := snaptype.GetIndexSalt(dirs.Snap)

if err != nil {
return err
}

return freezeblocks.DumpBeaconBlocks(ctx, db, 0, to, salt, dirs, estimate.CompressSnapshot.Workers(), log.LvlInfo, log.Root())
}

Expand Down Expand Up @@ -977,7 +982,12 @@ func (c *DumpBlobsSnapshots) Run(ctx *Context) error {
})
from := ((beaconConfig.DenebForkEpoch * beaconConfig.SlotsPerEpoch) / snaptype.Erigon2MergeLimit) * snaptype.Erigon2MergeLimit

salt := freezeblocks.GetIndicesSalt(dirs.Snap)
salt, err := snaptype.GetIndexSalt(dirs.Snap)

if err != nil {
return err
}

return freezeblocks.DumpBlobsSidecar(ctx, blobStorage, db, from, to, salt, dirs, estimate.CompressSnapshot.Workers(), log.LvlInfo, log.Root())
}

Expand Down
3 changes: 2 additions & 1 deletion cmd/devnet/devnetutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"strings"

libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/dir"

"github.com/ledgerwatch/erigon/crypto"
"github.com/ledgerwatch/log/v3"
Expand All @@ -23,7 +24,7 @@ var ErrInvalidEnodeString = errors.New("invalid enode string")
func ClearDevDB(dataDir string, logger log.Logger) error {
logger.Info("Deleting nodes' data folders")

files, err := os.ReadDir(dataDir)
files, err := dir.ReadDir(dataDir)

if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion cmd/devnet/services/polygon/proofgenerator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync"

"github.com/ledgerwatch/erigon/cl/merkle_tree"
bortypes "github.com/ledgerwatch/erigon/polygon/bor/types"
"golang.org/x/sync/errgroup"

"github.com/ledgerwatch/erigon-lib/chain/networkname"
Expand Down Expand Up @@ -264,7 +265,7 @@ type receiptProof struct {
}

func getReceiptProof(ctx context.Context, node requests.RequestGenerator, receipt *types.Receipt, block *requests.Block, receipts []*types.Receipt) (*receiptProof, error) {
stateSyncTxHash := types.ComputeBorTxHash(block.Number.Uint64(), block.Hash)
stateSyncTxHash := bortypes.ComputeBorTxHash(block.Number.Uint64(), block.Hash)
receiptsTrie := trie.New(trie.EmptyRoot)

if len(receipts) == 0 {
Expand Down
1 change: 1 addition & 0 deletions cmd/integration/commands/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var (
pruneHBefore, pruneRBefore uint64
pruneTBefore, pruneCBefore uint64
experiments []string
unwindTypes []string
chain string // Which chain to use (mainnet, goerli, sepolia, etc.)
outputCsvFile string

Expand Down
11 changes: 6 additions & 5 deletions cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ var cmdStageBorHeimdall = &cobra.Command{
}
defer db.Close()

if err := stageBorHeimdall(db, cmd.Context(), logger); err != nil {
if err := stageBorHeimdall(db, cmd.Context(), unwindTypes, logger); err != nil {
if !errors.Is(err, context.Canceled) {
logger.Error(err.Error())
}
Expand Down Expand Up @@ -711,6 +711,7 @@ func init() {
cmdSetPrune.Flags().Uint64Var(&pruneTBefore, "prune.t.before", 0, "")
cmdSetPrune.Flags().Uint64Var(&pruneCBefore, "prune.c.before", 0, "")
cmdSetPrune.Flags().StringSliceVar(&experiments, "experiments", nil, "Storage mode to override database")
cmdSetPrune.Flags().StringSliceVar(&unwindTypes, "unwind.types", nil, "Types to unwind for bor heimdall")
rootCmd.AddCommand(cmdSetPrune)
}

Expand Down Expand Up @@ -821,7 +822,7 @@ func stageHeaders(db kv.RwDB, ctx context.Context, logger log.Logger) error {
})
}

func stageBorHeimdall(db kv.RwDB, ctx context.Context, logger log.Logger) error {
func stageBorHeimdall(db kv.RwDB, ctx context.Context, unwindTypes []string, logger log.Logger) error {
engine, _, sync, _, miningState := newSync(ctx, db, nil /* miningConfig */, logger)
chainConfig := fromdb.ChainConfig(db)

Expand Down Expand Up @@ -852,7 +853,7 @@ func stageBorHeimdall(db kv.RwDB, ctx context.Context, logger log.Logger) error
}

unwindState := sync.NewUnwindState(stages.BorHeimdall, stageState.BlockNumber-unwind, stageState.BlockNumber)
cfg := stagedsync.StageBorHeimdallCfg(db, nil, miningState, *chainConfig, nil, nil, nil, nil, nil, nil, nil)
cfg := stagedsync.StageBorHeimdallCfg(db, nil, miningState, *chainConfig, nil, nil, nil, nil, nil, nil, nil, false, unwindTypes)
if err := stagedsync.BorHeimdallUnwind(unwindState, ctx, stageState, tx, cfg); err != nil {
return err
}
Expand Down Expand Up @@ -881,7 +882,7 @@ func stageBorHeimdall(db kv.RwDB, ctx context.Context, logger log.Logger) error
recents = bor.Recents
signatures = bor.Signatures
}
cfg := stagedsync.StageBorHeimdallCfg(db, snapDb, miningState, *chainConfig, heimdallClient, blockReader, nil, nil, nil, recents, signatures)
cfg := stagedsync.StageBorHeimdallCfg(db, snapDb, miningState, *chainConfig, heimdallClient, blockReader, nil, nil, nil, recents, signatures, false, unwindTypes)

stageState := stage(sync, tx, nil, stages.BorHeimdall)
if err := stagedsync.BorHeimdallForward(stageState, sync, ctx, tx, cfg, logger); err != nil {
Expand Down Expand Up @@ -1670,7 +1671,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig,
cfg.Sync,
stagedsync.MiningStages(ctx,
stagedsync.StageMiningCreateBlockCfg(db, miner, *chainConfig, engine, nil, nil, dirs.Tmp, blockReader),
stagedsync.StageBorHeimdallCfg(db, snapDb, miner, *chainConfig, heimdallClient, blockReader, nil, nil, nil, recents, signatures),
stagedsync.StageBorHeimdallCfg(db, snapDb, miner, *chainConfig, heimdallClient, blockReader, nil, nil, nil, recents, signatures, false, unwindTypes),
stagedsync.StageMiningExecCfg(db, miner, events, *chainConfig, engine, &vm.Config{}, dirs.Tmp, nil, 0, nil, nil, blockReader),
stagedsync.StageHashStateCfg(db, dirs, historyV3),
stagedsync.StageTrieCfg(db, false, true, false, dirs.Tmp, blockReader, nil, historyV3, agg),
Expand Down
4 changes: 4 additions & 0 deletions cmd/rpcdaemon/rpcservices/eth_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"google.golang.org/protobuf/types/known/emptypb"

"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/downloader/snaptype"
"github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon-lib/kv"
Expand All @@ -27,6 +28,8 @@ import (
"github.com/ledgerwatch/erigon/turbo/services"
)

var _ services.FullBlockReader = &RemoteBackend{}

type RemoteBackend struct {
remoteEthBackend remote.ETHBACKENDClient
log log.Logger
Expand Down Expand Up @@ -92,6 +95,7 @@ func (back *RemoteBackend) BlockByHash(ctx context.Context, db kv.Tx, hash commo
func (back *RemoteBackend) TxsV3Enabled() bool { panic("not implemented") }
func (back *RemoteBackend) Snapshots() services.BlockSnapshots { panic("not implemented") }
func (back *RemoteBackend) BorSnapshots() services.BlockSnapshots { panic("not implemented") }
func (back *RemoteBackend) AllTypes() []snaptype.Type { panic("not implemented") }
func (back *RemoteBackend) FrozenBlocks() uint64 { return back.blockReader.FrozenBlocks() }
func (back *RemoteBackend) FrozenBorBlocks() uint64 { return back.blockReader.FrozenBorBlocks() }
func (back *RemoteBackend) FrozenFiles() (list []string) { return back.blockReader.FrozenFiles() }
Expand Down
19 changes: 2 additions & 17 deletions cmd/silkworm_api/snapshot_idx.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ func buildIndex(cliCtx *cli.Context, dataDir string, snapshotPaths []string, min
g.SetLimit(workers)

dirs := datadir.New(dataDir)
salt := freezeblocks.GetIndicesSalt(dirs.Snap)

chainDB := mdbx.NewMDBX(logger).Path(dirs.Chaindata).MustOpen()
defer chainDB.Close()
Expand All @@ -92,26 +91,12 @@ func buildIndex(cliCtx *cli.Context, dataDir string, snapshotPaths []string, min
}

switch segment.Type.Enum() {
case snaptype.Enums.Headers:
case snaptype.Enums.Headers, snaptype.Enums.Bodies, snaptype.Enums.Transactions:
g.Go(func() error {
jobProgress := &background.Progress{}
ps.Add(jobProgress)
defer ps.Delete(jobProgress)
return freezeblocks.HeadersIdx(ctx, segment, salt, dirs.Tmp, jobProgress, logLevel, logger)
})
case snaptype.Enums.Bodies:
g.Go(func() error {
jobProgress := &background.Progress{}
ps.Add(jobProgress)
defer ps.Delete(jobProgress)
return freezeblocks.BodiesIdx(ctx, segment, salt, dirs.Tmp, jobProgress, logLevel, logger)
})
case snaptype.Enums.Transactions:
g.Go(func() error {
jobProgress := &background.Progress{}
ps.Add(jobProgress)
defer ps.Delete(jobProgress)
return freezeblocks.TransactionsIdx(ctx, chainConfig, segment, salt, dirs.Tmp, jobProgress, logLevel, logger)
return segment.Type.BuildIndexes(ctx, segment, chainConfig, dirs.Tmp, jobProgress, logLevel, logger)
})
}
}
Expand Down
14 changes: 6 additions & 8 deletions cmd/snapshots/cmp/cmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/ledgerwatch/erigon/cmd/snapshots/flags"
"github.com/ledgerwatch/erigon/cmd/snapshots/sync"
"github.com/ledgerwatch/erigon/cmd/utils"
coresnaptype "github.com/ledgerwatch/erigon/core/snaptype"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/params"
Expand Down Expand Up @@ -614,8 +615,8 @@ func (c comparitor) compareBodies(ctx context.Context, f1ents []*BodyEntry, f2en
}()

logger.Info(fmt.Sprintf("Indexing %s", ent1.Body.Name()))
salt := freezeblocks.GetIndicesSalt(info.Dir())
return freezeblocks.BodiesIdx(ctx, info, salt, c.session1.LocalFsRoot(), nil, log.LvlDebug, logger)

return coresnaptype.Bodies.BuildIndexes(ctx, info, c.chainConfig(), c.session1.LocalFsRoot(), nil, log.LvlDebug, logger)
})

g.Go(func() error {
Expand Down Expand Up @@ -653,8 +654,7 @@ func (c comparitor) compareBodies(ctx context.Context, f1ents []*BodyEntry, f2en
}()

logger.Info(fmt.Sprintf("Indexing %s", ent1.Transactions.Name()))
salt := freezeblocks.GetIndicesSalt(info.Dir())
return freezeblocks.TransactionsIdx(ctx, c.chainConfig(), info, salt, c.session1.LocalFsRoot(), nil, log.LvlDebug, logger)
return coresnaptype.Transactions.BuildIndexes(ctx, info, c.chainConfig(), c.session1.LocalFsRoot(), nil, log.LvlDebug, logger)
})

b2err := make(chan error, 1)
Expand Down Expand Up @@ -690,8 +690,7 @@ func (c comparitor) compareBodies(ctx context.Context, f1ents []*BodyEntry, f2en
}()

logger.Info(fmt.Sprintf("Indexing %s", ent2.Body.Name()))
salt := freezeblocks.GetIndicesSalt(info.Dir())
return freezeblocks.BodiesIdx(ctx, info, salt, c.session1.LocalFsRoot(), nil, log.LvlDebug, logger)
return coresnaptype.Bodies.BuildIndexes(ctx, info, c.chainConfig(), c.session1.LocalFsRoot(), nil, log.LvlDebug, logger)
})

g.Go(func() error {
Expand Down Expand Up @@ -732,8 +731,7 @@ func (c comparitor) compareBodies(ctx context.Context, f1ents []*BodyEntry, f2en
}()

logger.Info(fmt.Sprintf("Indexing %s", ent2.Transactions.Name()))
salt := freezeblocks.GetIndicesSalt(info.Dir())
return freezeblocks.TransactionsIdx(ctx, c.chainConfig(), info, salt, c.session2.LocalFsRoot(), nil, log.LvlDebug, logger)
return coresnaptype.Transactions.BuildIndexes(ctx, info, c.chainConfig(), c.session2.LocalFsRoot(), nil, log.LvlDebug, logger)
})

if err := g.Wait(); err != nil {
Expand Down
7 changes: 7 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,12 @@ var (
Value: true,
}

WithHeimdallWaypoints = cli.BoolFlag{
Name: "bor.waypoints",
Usage: "Enabling bor waypont recording",
Value: false,
}

PolygonSyncFlag = cli.BoolFlag{
Name: "polygon.sync",
Usage: "Enabling syncing using the new polygon sync component",
Expand Down Expand Up @@ -1565,6 +1571,7 @@ func setBorConfig(ctx *cli.Context, cfg *ethconfig.Config) {
cfg.HeimdallURL = ctx.String(HeimdallURLFlag.Name)
cfg.WithoutHeimdall = ctx.Bool(WithoutHeimdallFlag.Name)
cfg.WithHeimdallMilestones = ctx.Bool(WithHeimdallMilestones.Name)
cfg.WithHeimdallWaypointRecording = ctx.Bool(WithHeimdallWaypoints.Name)
cfg.PolygonSync = ctx.Bool(PolygonSyncFlag.Name)
}

Expand Down
3 changes: 2 additions & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/ledgerwatch/erigon/core/vm"
"github.com/ledgerwatch/erigon/core/vm/evmtypes"
"github.com/ledgerwatch/erigon/eth/ethutils"
bortypes "github.com/ledgerwatch/erigon/polygon/bor/types"
"github.com/ledgerwatch/erigon/rlp"
)

Expand Down Expand Up @@ -192,7 +193,7 @@ func ExecuteBlockEphemerally(
stateSyncReceipt.Logs = blockLogs[len(logs):] // get state-sync logs from `state.Logs()`

// fill the state sync with the correct information
types.DeriveFieldsForBorReceipt(stateSyncReceipt, block.Hash(), block.NumberU64(), receipts)
bortypes.DeriveFieldsForBorReceipt(stateSyncReceipt, block.Hash(), block.NumberU64(), receipts)
stateSyncReceipt.Status = types.ReceiptStatusSuccessful
}
}
Expand Down
53 changes: 53 additions & 0 deletions core/rawdb/accessors_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"math"
"math/big"
Expand All @@ -41,6 +42,7 @@ import (

"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/ethdb/cbor"
"github.com/ledgerwatch/erigon/polygon/heimdall"
"github.com/ledgerwatch/erigon/rlp"
)

Expand Down Expand Up @@ -1150,6 +1152,57 @@ func PruneBorBlocks(tx kv.RwTx, blockTo uint64, blocksDeleteLimit int, SpanIdAt
}
counter--
}

checkpointCursor, err := tx.RwCursor(kv.BorCheckpoints)
if err != nil {
return err
}

defer checkpointCursor.Close()
lastCheckpointToRemove, err := heimdall.CheckpointIdAt(tx, blockTo)

if err != nil {
return err
}

var checkpointIdBytes [8]byte
binary.BigEndian.PutUint64(checkpointIdBytes[:], uint64(lastCheckpointToRemove))
for k, _, err := checkpointCursor.Seek(checkpointIdBytes[:]); err == nil && k != nil; k, _, err = checkpointCursor.Prev() {
if err = checkpointCursor.DeleteCurrent(); err != nil {
return err
}
}

milestoneCursor, err := tx.RwCursor(kv.BorMilestones)

if err != nil {
return err
}

defer milestoneCursor.Close()

var lastMilestoneToRemove heimdall.MilestoneId

for blockCount := 1; err != nil && blockCount < blocksDeleteLimit; blockCount++ {
lastMilestoneToRemove, err = heimdall.MilestoneIdAt(tx, blockTo-uint64(blockCount))

if !errors.Is(err, heimdall.ErrMilestoneNotFound) {
return err
} else {
if blockCount == blocksDeleteLimit-1 {
return nil
}
}
}

var milestoneIdBytes [8]byte
binary.BigEndian.PutUint64(milestoneIdBytes[:], uint64(lastMilestoneToRemove))
for k, _, err := milestoneCursor.Seek(milestoneIdBytes[:]); err == nil && k != nil; k, _, err = milestoneCursor.Prev() {
if err = milestoneCursor.DeleteCurrent(); err != nil {
return err
}
}

return nil
}

Expand Down
7 changes: 4 additions & 3 deletions core/rawdb/bor_receipts.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ import (
"github.com/ledgerwatch/erigon-lib/kv/dbutils"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/ethdb/cbor"
bortypes "github.com/ledgerwatch/erigon/polygon/bor/types"
"github.com/ledgerwatch/erigon/rlp"
)

var (
// bor receipt key
borReceiptKey = types.BorReceiptKey
borReceiptKey = bortypes.BorReceiptKey
)

// HasBorReceipts verifies the existence of all block receipt belonging to a block.
Expand Down Expand Up @@ -78,7 +79,7 @@ func ReadBorReceipt(db kv.Tx, blockHash libcommon.Hash, blockNumber uint64, rece
}
}

types.DeriveFieldsForBorReceipt(borReceipt, blockHash, blockNumber, receipts)
bortypes.DeriveFieldsForBorReceipt(borReceipt, blockHash, blockNumber, receipts)

return borReceipt, nil
}
Expand Down Expand Up @@ -126,7 +127,7 @@ func ReadBorTransactionForBlock(db kv.Tx, blockNum uint64) types.Transaction {
if !HasBorReceipts(db, blockNum) {
return nil
}
return types.NewBorTransaction()
return bortypes.NewBorTransaction()
}

// TruncateBorReceipts removes all bor receipt for given block number or newer
Expand Down
Loading
Loading