Skip to content

Commit

Permalink
E2 snapshot uploading (#9056)
Browse files Browse the repository at this point in the history
This change introduces additional processes to manage snapshot uploading
for E2 snapshots:

## erigon snapshots upload

The `snapshots uploader` command starts a version of erigon customized
for uploading snapshot files to
a remote location.  

It breaks the stage execution process after the senders stage and then
uses the snapshot stage to send
uploaded headers, bodies and (in the case of polygon) bor spans and
events to snapshot files. Because
this process avoids execution in run signifigantly faster than a
standard erigon configuration.

The uploader uses rclone to send seedable (100K or 500K blocks) to a
remote storage location specified
in the rclone config file.

The **uploader** is configured to minimize disk usage by doing the
following:

* It removes snapshots once they are loaded
* It aggressively prunes the database once entities are transferred to
snapshots

in addition to this it has the following performance related features:

* maximizes the workers allocated to snapshot processing to improve
throughput
* Can be started from scratch by downloading the latest snapshots from
the remote location to seed processing

## snapshots command

Is a stand alone command for managing remote snapshots it has the
following sub commands

* **cmp** - compare snapshots
* **copy** - copy snapshots
* **verify** - verify snapshots
* **manifest** - manage the manifest file in the root of remote snapshot
locations
* **torrent** - manage snapshot torrent files
  • Loading branch information
mh0lt authored Dec 27, 2023
1 parent df0699a commit 79ed8ca
Show file tree
Hide file tree
Showing 86 changed files with 6,593 additions and 673 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,5 @@ node_modules
/config.toml
/config.yaml
/config.yml

vendor
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ COMMANDS += sentinel
COMMANDS += caplin
COMMANDS += caplin-regression
COMMANDS += tooling
COMMANDS += snapshots




Expand Down
6 changes: 3 additions & 3 deletions cl/antiquary/antiquary.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (a *Antiquary) Loop() error {
if to-from < snaptype.Erigon2MergeLimit {
continue
}
if err := a.antiquate(from, to); err != nil {
if err := a.antiquate(a.sn.Version(), from, to); err != nil {
return err
}
case <-a.ctx.Done():
Expand All @@ -218,12 +218,12 @@ func (a *Antiquary) Loop() error {
}

// Antiquate will antiquate a specific block range (aka. retire snapshots), this should be ran in the background.
func (a *Antiquary) antiquate(from, to uint64) error {
func (a *Antiquary) antiquate(version uint8, from, to uint64) error {
if a.downloader == nil {
return nil // Just skip if we don't have a downloader
}
log.Info("[Antiquary]: Antiquating", "from", from, "to", to)
if err := freezeblocks.DumpBeaconBlocks(a.ctx, a.mainDB, a.beaconDB, from, to, snaptype.Erigon2MergeLimit, a.dirs.Tmp, a.dirs.Snap, 1, log.LvlDebug, a.logger); err != nil {
if err := freezeblocks.DumpBeaconBlocks(a.ctx, a.mainDB, a.beaconDB, version, from, to, snaptype.Erigon2MergeLimit, a.dirs.Tmp, a.dirs.Snap, 1, log.LvlDebug, a.logger); err != nil {
return err
}

Expand Down
30 changes: 23 additions & 7 deletions cmd/capcli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,9 @@ func (c *Chain) Run(ctx *Context) error {
log.Info("Started chain download", "chain", c.Chain)

dirs := datadir.New(c.Datadir)
csn := freezeblocks.NewCaplinSnapshots(ethconfig.BlocksFreezing{}, beaconConfig, dirs.Snap, log.Root())
snapshotVersion := snapcfg.KnownCfg(c.Chain, 0).Version

csn := freezeblocks.NewCaplinSnapshots(ethconfig.BlocksFreezing{}, beaconConfig, dirs.Snap, snapshotVersion, log.Root())

rawDB, _ := persistence.AferoRawBeaconBlockChainFromOsPath(beaconConfig, dirs.CaplinHistory)
beaconDB, db, err := caplin1.OpenCaplinDatabase(ctx, db_config.DatabaseConfiguration{PruneDepth: math.MaxUint64}, beaconConfig, rawDB, dirs.CaplinIndexing, nil, false)
Expand Down Expand Up @@ -592,7 +594,9 @@ func (c *DumpSnapshots) Run(ctx *Context) error {
return
})

return freezeblocks.DumpBeaconBlocks(ctx, db, beaconDB, 0, to, snaptype.Erigon2MergeLimit, dirs.Tmp, dirs.Snap, estimate.CompressSnapshot.Workers(), log.LvlInfo, log.Root())
snapshotVersion := snapcfg.KnownCfg(c.Chain, 0).Version

return freezeblocks.DumpBeaconBlocks(ctx, db, beaconDB, snapshotVersion, 0, to, snaptype.Erigon2MergeLimit, dirs.Tmp, dirs.Snap, estimate.CompressSnapshot.Workers(), log.LvlInfo, log.Root())
}

type CheckSnapshots struct {
Expand Down Expand Up @@ -630,8 +634,9 @@ func (c *CheckSnapshots) Run(ctx *Context) error {
}

to = (to / snaptype.Erigon2MergeLimit) * snaptype.Erigon2MergeLimit
snapshotVersion := snapcfg.KnownCfg(c.Chain, 0).Version

csn := freezeblocks.NewCaplinSnapshots(ethconfig.BlocksFreezing{}, beaconConfig, dirs.Snap, log.Root())
csn := freezeblocks.NewCaplinSnapshots(ethconfig.BlocksFreezing{}, beaconConfig, dirs.Snap, snapshotVersion, log.Root())
if err := csn.ReopenFolder(); err != nil {
return err
}
Expand Down Expand Up @@ -712,7 +717,9 @@ func (c *LoopSnapshots) Run(ctx *Context) error {

to = (to / snaptype.Erigon2MergeLimit) * snaptype.Erigon2MergeLimit

csn := freezeblocks.NewCaplinSnapshots(ethconfig.BlocksFreezing{}, beaconConfig, dirs.Snap, log.Root())
snapshotVersion := snapcfg.KnownCfg(c.Chain, 0).Version

csn := freezeblocks.NewCaplinSnapshots(ethconfig.BlocksFreezing{}, beaconConfig, dirs.Snap, snapshotVersion, log.Root())
if err := csn.ReopenFolder(); err != nil {
return err
}
Expand Down Expand Up @@ -782,7 +789,14 @@ func (d *DownloadSnapshots) Run(ctx *Context) error {
if err != nil {
return fmt.Errorf("new server: %w", err)
}
return snapshotsync.WaitForDownloader("CapCliDownloader", ctx, false, snapshotsync.OnlyCaplin, s, tx, freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.NewSnapCfg(false, false, false), dirs.Snap, log.Root()), freezeblocks.NewBorRoSnapshots(ethconfig.NewSnapCfg(false, false, false), dirs.Snap, log.Root())), params.ChainConfigByChainName(d.Chain), direct.NewDownloaderClient(bittorrentServer))

snapshotVersion := snapcfg.KnownCfg(d.Chain, 0).Version

return snapshotsync.WaitForDownloader(ctx, "CapCliDownloader", false, snapshotsync.OnlyCaplin, s, tx,
freezeblocks.NewBlockReader(
freezeblocks.NewRoSnapshots(ethconfig.NewSnapCfg(false, false, false), dirs.Snap, snapshotVersion, log.Root()),
freezeblocks.NewBorRoSnapshots(ethconfig.NewSnapCfg(false, false, false), dirs.Snap, snapshotVersion, log.Root())),
params.ChainConfigByChainName(d.Chain), direct.NewDownloaderClient(bittorrentServer))
}

type RetrieveHistoricalState struct {
Expand Down Expand Up @@ -811,7 +825,9 @@ func (r *RetrieveHistoricalState) Run(ctx *Context) error {
return err
}
defer tx.Rollback()
allSnapshots := freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{}, dirs.Snap, log.Root())
snapshotVersion := snapcfg.KnownCfg(r.Chain, 0).Version

allSnapshots := freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{}, dirs.Snap, snapshotVersion, log.Root())
if err := allSnapshots.ReopenFolder(); err != nil {
return err
}
Expand All @@ -822,7 +838,7 @@ func (r *RetrieveHistoricalState) Run(ctx *Context) error {
var bor *freezeblocks.BorRoSnapshots
blockReader := freezeblocks.NewBlockReader(allSnapshots, bor)
eth1Getter := getters.NewExecutionSnapshotReader(ctx, beaconConfig, blockReader, db)
csn := freezeblocks.NewCaplinSnapshots(ethconfig.BlocksFreezing{}, beaconConfig, dirs.Snap, log.Root())
csn := freezeblocks.NewCaplinSnapshots(ethconfig.BlocksFreezing{}, beaconConfig, dirs.Snap, snapshotVersion, log.Root())
if err := csn.ReopenFolder(); err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/caplin/caplin1/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func OpenCaplinDatabase(ctx context.Context,

func RunCaplinPhase1(ctx context.Context, sentinel sentinel.SentinelClient, engine execution_client.ExecutionEngine,
beaconConfig *clparams.BeaconChainConfig, genesisConfig *clparams.GenesisConfig, state *state.CachingBeaconState,
caplinFreezer freezer.Freezer, dirs datadir.Dirs, cfg beacon_router_configuration.RouterConfiguration, eth1Getter snapshot_format.ExecutionBlockReaderByNumber,
caplinFreezer freezer.Freezer, dirs datadir.Dirs, snapshotVersion uint8, cfg beacon_router_configuration.RouterConfiguration, eth1Getter snapshot_format.ExecutionBlockReaderByNumber,
snDownloader proto_downloader.DownloaderClient, backfilling bool, states bool, historyDB persistence.BeaconChainDatabase, indexDB kv.RwDB) error {
rawDB, af := persistence.AferoRawBeaconBlockChainFromOsPath(beaconConfig, dirs.CaplinHistory)

Expand All @@ -98,7 +98,7 @@ func RunCaplinPhase1(ctx context.Context, sentinel sentinel.SentinelClient, engi

logger := log.New("app", "caplin")

csn := freezeblocks.NewCaplinSnapshots(ethconfig.BlocksFreezing{}, beaconConfig, dirs.Snap, logger)
csn := freezeblocks.NewCaplinSnapshots(ethconfig.BlocksFreezing{}, beaconConfig, dirs.Snap, snapshotVersion, logger)
rcsn := freezeblocks.NewBeaconSnapshotReader(csn, eth1Getter, historyDB, beaconConfig)

if caplinFreezer != nil {
Expand Down
5 changes: 4 additions & 1 deletion cmd/caplin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"fmt"
"os"

"github.com/ledgerwatch/erigon-lib/chain/snapcfg"
"github.com/ledgerwatch/erigon/cl/beacon/beacon_router_configuration"
"github.com/ledgerwatch/erigon/cl/cltypes"
"github.com/ledgerwatch/erigon/cl/fork"
Expand Down Expand Up @@ -128,7 +129,9 @@ func runCaplinNode(cliCtx *cli.Context) error {
return err
}

return caplin1.RunCaplinPhase1(ctx, sentinel, executionEngine, cfg.BeaconCfg, cfg.GenesisCfg, state, caplinFreezer, cfg.Dirs, beacon_router_configuration.RouterConfiguration{
snapshotVersion := snapcfg.KnownCfg(cliCtx.String(utils.ChainFlag.Name), 0).Version

return caplin1.RunCaplinPhase1(ctx, sentinel, executionEngine, cfg.BeaconCfg, cfg.GenesisCfg, state, caplinFreezer, cfg.Dirs, snapshotVersion, beacon_router_configuration.RouterConfiguration{
Protocol: cfg.BeaconProtocol,
Address: cfg.BeaconAddr,
ReadTimeTimeout: cfg.BeaconApiReadTimeout,
Expand Down
2 changes: 1 addition & 1 deletion cmd/devnet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func setupLogger(ctx *cli.Context) (log.Logger, error) {
return nil, err
}

logger := logging.SetupLoggerCtx("devnet", ctx, false /* rootLogger */)
logger := logging.SetupLoggerCtx("devnet", ctx, log.LvlInfo, log.LvlInfo, false /* rootLogger */)

// Make root logger fail
log.Root().SetHandler(PanicHandler{})
Expand Down
11 changes: 7 additions & 4 deletions cmd/downloader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/ledgerwatch/erigon-lib/common/datadir"
"github.com/ledgerwatch/erigon-lib/common/dir"
"github.com/ledgerwatch/erigon-lib/downloader"
downloadercfg2 "github.com/ledgerwatch/erigon-lib/downloader/downloadercfg"
"github.com/ledgerwatch/erigon-lib/downloader/downloadercfg"
"github.com/ledgerwatch/erigon-lib/downloader/snaptype"
proto_downloader "github.com/ledgerwatch/erigon-lib/gointerfaces/downloader"
"github.com/ledgerwatch/erigon-lib/kv"
Expand Down Expand Up @@ -164,7 +164,7 @@ func Downloader(ctx context.Context, logger log.Logger) error {
if err := checkChainName(ctx, dirs, chain); err != nil {
return err
}
torrentLogLevel, _, err := downloadercfg2.Int2LogLevel(torrentVerbosity)
torrentLogLevel, _, err := downloadercfg.Int2LogLevel(torrentVerbosity)
if err != nil {
return err
}
Expand All @@ -186,7 +186,7 @@ func Downloader(ctx context.Context, logger log.Logger) error {
if known, ok := snapcfg.KnownWebseeds[chain]; ok {
webseedsList = append(webseedsList, known...)
}
cfg, err := downloadercfg2.New(dirs, version, torrentLogLevel, downloadRate, uploadRate, torrentPort, torrentConnsPerFile, torrentDownloadSlots, staticPeers, webseedsList, chain)
cfg, err := downloadercfg.New(dirs, version, torrentLogLevel, downloadRate, uploadRate, torrentPort, torrentConnsPerFile, torrentDownloadSlots, staticPeers, webseedsList, chain)
if err != nil {
return err
}
Expand All @@ -201,6 +201,8 @@ func Downloader(ctx context.Context, logger log.Logger) error {
}
downloadernat.DoNat(natif, cfg.ClientConfig, logger)

cfg.AddTorrentsFromDisk = true // always true unless using uploader - which wants control of torrent files

d, err := downloader.New(ctx, cfg, dirs, logger, log.LvlInfo, seedbox)
if err != nil {
return err
Expand Down Expand Up @@ -402,6 +404,7 @@ func doPrintTorrentHashes(ctx context.Context, logger log.Logger) error {
if err != nil {
return err
}

for _, t := range torrents {
// we don't release commitment history in this time. let's skip it here.
if strings.HasPrefix(t.DisplayName, "history/v1-commitment") || strings.HasPrefix(t.DisplayName, "idx/v1-commitment") {
Expand Down Expand Up @@ -494,7 +497,7 @@ func StartGrpc(snServer *downloader.GrpcServer, addr string, creds *credentials.

// Add pre-configured
func addPreConfiguredHashes(ctx context.Context, d *downloader.Downloader) error {
for _, it := range snapcfg.KnownCfg(chain).Preverified {
for _, it := range snapcfg.KnownCfg(chain, 0).Preverified {
if err := d.AddMagnetLink(ctx, snaptype.Hex2InfoHash(it.Hash), it.Name); err != nil {
return err
}
Expand Down
Loading

0 comments on commit 79ed8ca

Please sign in to comment.