diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index d1cc0973711..71664eddff5 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -40,6 +40,7 @@ import ( "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/rpc" "github.com/ledgerwatch/erigon/turbo/snapshotsync" + "github.com/ledgerwatch/erigon/turbo/snapshotsync/snap" "github.com/ledgerwatch/log/v3" "github.com/spf13/cobra" "google.golang.org/grpc" @@ -106,8 +107,8 @@ func RootCommand() (*cobra.Command, *httpcfg.HttpCfg) { if err := utils.SetupCobra(cmd); err != nil { return err } - cfg.SingleNodeMode = cfg.DataDir != "" || cfg.Chaindata != "" - if cfg.SingleNodeMode { + cfg.WithDatadir = cfg.DataDir != "" || cfg.Chaindata != "" + if cfg.WithDatadir { if cfg.DataDir == "" { cfg.DataDir = paths.DefaultDataDir() } @@ -234,20 +235,20 @@ func EmbeddedServices(ctx context.Context, erigonDB kv.RoDB, stateCacheCfg kvcac } // RemoteServices - use when RPCDaemon run as independent process. Still it can use --datadir flag to enable -// `cfg.SingleNodeMode` (mode when it on 1 machine with Erigon) +// `cfg.WithDatadir` (mode when it on 1 machine with Erigon) func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger, rootCancel context.CancelFunc) ( db kv.RoDB, borDb kv.RoDB, eth services.ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient, starknet *services.StarknetService, stateCache kvcache.Cache, blockReader interfaces.BlockAndTxnReader, ff *filters.Filters, err error) { - if !cfg.SingleNodeMode && cfg.PrivateApiAddr == "" { + if !cfg.WithDatadir && cfg.PrivateApiAddr == "" { return nil, nil, nil, nil, nil, nil, nil, nil, ff, fmt.Errorf("either remote db or local db must be specified") } // Do not change the order of these checks. Chaindata needs to be checked first, because PrivateApiAddr has default value which is not "" // If PrivateApiAddr is checked first, the Chaindata option will never work - if cfg.SingleNodeMode { + if cfg.WithDatadir { var rwKv kv.RwDB log.Trace("Creating chain db", "path", cfg.Chaindata) limiter := make(chan struct{}, cfg.DBReadConcurrency) @@ -300,6 +301,10 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger, if err != nil { return err } + cfg.Snapshot.Enabled, err = snap.Enabled(tx) + if err != nil { + return err + } return nil }); err != nil { return nil, nil, nil, nil, nil, nil, nil, nil, ff, err @@ -308,10 +313,10 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger, return nil, nil, nil, nil, nil, nil, nil, nil, ff, fmt.Errorf("chain config not found in db. Need start erigon at least once on this db") } - cfg.SyncMode = ethconfig.SyncModeByChainName(cc.ChainName, cfg.SyncModeCli) - cfg.Snapshot.Enabled = cfg.SyncMode == ethconfig.SnapSync - if cfg.SingleNodeMode { - cfg.Snapshot = ethconfig.NewSnapshotCfg(cfg.Snapshot.Enabled, cfg.Snapshot.KeepBlocks) + if cfg.Snapshot.Enabled { + cfg.SyncMode = ethconfig.SnapSync + } else { + cfg.SyncMode = ethconfig.FullSync } // if chain config has terminal total difficulty then rpc must have eth and engine APIs enableds @@ -338,18 +343,22 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger, } } - var onNewSnapshot func() - if cfg.SingleNodeMode { + onNewSnapshot := func() {} + if cfg.WithDatadir { if cfg.Snapshot.Enabled { allSnapshots := snapshotsync.NewRoSnapshots(cfg.Snapshot, filepath.Join(cfg.DataDir, "snapshots")) + if err := allSnapshots.Reopen(); err != nil { + return nil, nil, nil, nil, nil, nil, nil, nil, ff, err + } + log.Info("[Snapshots] see new", "blocks", allSnapshots.BlocksAvailable()) // don't reopen it right here, because snapshots may be not ready yet onNewSnapshot = func() { allSnapshots.Reopen() + log.Info("[Snapshots] see new", "blocks", allSnapshots.BlocksAvailable()) } blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots) } else { - blockReader = snapshotsync.NewBlockReader() - onNewSnapshot = func() {} + log.Info("Use --syncmode=full") } } @@ -370,7 +379,7 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger, subscribeToStateChangesLoop(ctx, kvClient, stateCache) - if !cfg.SingleNodeMode { + if !cfg.WithDatadir { blockReader = snapshotsync.NewRemoteBlockReader(remote.NewETHBACKENDClient(conn)) } remoteEth := services.NewRemoteBackend(remote.NewETHBACKENDClient(conn), db, blockReader) diff --git a/cmd/rpcdaemon/cli/httpcfg/http_cfg.go b/cmd/rpcdaemon/cli/httpcfg/http_cfg.go index e23b3581c99..91bc9abc0ad 100644 --- a/cmd/rpcdaemon/cli/httpcfg/http_cfg.go +++ b/cmd/rpcdaemon/cli/httpcfg/http_cfg.go @@ -8,7 +8,7 @@ import ( type HttpCfg struct { Enabled bool PrivateApiAddr string - SingleNodeMode bool // Erigon's database can be read by separated processes on same machine - in read-only mode - with full support of transactions. It will share same "OS PageCache" with Erigon process. + WithDatadir bool // Erigon's database can be read by separated processes on same machine - in read-only mode - with full support of transactions. It will share same "OS PageCache" with Erigon process. DataDir string Chaindata string HttpListenAddress string diff --git a/cmd/rpcdaemon/commands/daemon.go b/cmd/rpcdaemon/commands/daemon.go index a84b6094a8e..d32a9ab009d 100644 --- a/cmd/rpcdaemon/commands/daemon.go +++ b/cmd/rpcdaemon/commands/daemon.go @@ -17,7 +17,7 @@ func APIList(db kv.RoDB, borDb kv.RoDB, eth services.ApiBackend, txPool txpool.T starknet starknet.CAIROVMClient, filters *filters.Filters, stateCache kvcache.Cache, blockReader interfaces.BlockAndTxnReader, cfg httpcfg.HttpCfg) (list []rpc.API) { - base := NewBaseApi(filters, stateCache, blockReader, cfg.SingleNodeMode) + base := NewBaseApi(filters, stateCache, blockReader, cfg.WithDatadir) if cfg.TevmEnabled { base.EnableTevmExperiment() } diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index b42ecc848ac..b20be37f895 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -1077,7 +1077,7 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R } expect := snapshothashes.KnownConfig(cfg.chainConfig.ChainName).ExpectBlocks - if cfg.snapshots.SegmentsAvailable() < expect { + if cfg.snapshots.SegmentsMax() < expect { c, err := tx.Cursor(kv.Headers) if err != nil { return err @@ -1089,10 +1089,10 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R } c.Close() hasInDB := binary.BigEndian.Uint64(firstK) - if cfg.snapshots.SegmentsAvailable() < hasInDB { - return fmt.Errorf("not enough snapshots available: snapshots=%d, blockInDB=%d, expect=%d", cfg.snapshots.SegmentsAvailable(), hasInDB, expect) + if cfg.snapshots.SegmentsMax() < hasInDB { + return fmt.Errorf("not enough snapshots available: snapshots=%d, blockInDB=%d, expect=%d", cfg.snapshots.SegmentsMax(), hasInDB, expect) } else { - log.Warn(fmt.Sprintf("not enough snapshots available: %d < %d, but we can re-generate them because DB has historical blocks up to: %d", cfg.snapshots.SegmentsAvailable(), expect, hasInDB)) + log.Warn(fmt.Sprintf("not enough snapshots available: %d < %d, but we can re-generate them because DB has historical blocks up to: %d", cfg.snapshots.SegmentsMax(), expect, hasInDB)) } } if err := cfg.snapshots.Reopen(); err != nil { @@ -1100,16 +1100,16 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R } // Create .idx files - if cfg.snapshots.IndicesAvailable() < cfg.snapshots.SegmentsAvailable() { + if cfg.snapshots.IndicesMax() < cfg.snapshots.SegmentsMax() { if !cfg.snapshots.SegmentsReady() { return fmt.Errorf("not all snapshot segments are available") } // wait for Downloader service to download all expected snapshots - if cfg.snapshots.IndicesAvailable() < cfg.snapshots.SegmentsAvailable() { + if cfg.snapshots.IndicesMax() < cfg.snapshots.SegmentsMax() { chainID, _ := uint256.FromBig(cfg.chainConfig.ChainID) workers := cmp.InRange(1, 2, runtime.GOMAXPROCS(-1)-1) - if err := snapshotsync.BuildIndices(ctx, cfg.snapshots, *chainID, cfg.tmpdir, cfg.snapshots.IndicesAvailable(), workers, log.LvlInfo); err != nil { + if err := snapshotsync.BuildIndices(ctx, cfg.snapshots, *chainID, cfg.tmpdir, cfg.snapshots.IndicesMax(), workers, log.LvlInfo); err != nil { return err } } diff --git a/go.mod b/go.mod index 4ac62518872..afa899be414 100644 --- a/go.mod +++ b/go.mod @@ -35,7 +35,7 @@ require ( github.com/json-iterator/go v1.1.12 github.com/julienschmidt/httprouter v1.3.0 github.com/kevinburke/go-bindata v3.21.0+incompatible - github.com/ledgerwatch/erigon-lib v0.0.0-20220510025207-2689ed9cf66d + github.com/ledgerwatch/erigon-lib v0.0.0-20220510072701-f4d7b2bdcce9 github.com/ledgerwatch/log/v3 v3.4.1 github.com/ledgerwatch/secp256k1 v1.0.0 github.com/pelletier/go-toml v1.9.5 diff --git a/go.sum b/go.sum index f0d5e933401..4510dac423e 100644 --- a/go.sum +++ b/go.sum @@ -448,8 +448,8 @@ github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3P github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k= github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c= github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= -github.com/ledgerwatch/erigon-lib v0.0.0-20220510025207-2689ed9cf66d h1:cFFaMnIheahNQhbMAK/FtIDBZtkaEepuS38Rursznpc= -github.com/ledgerwatch/erigon-lib v0.0.0-20220510025207-2689ed9cf66d/go.mod h1:oHHsHZR+xF3LSHzjBlJYW21ZnBXkwIZrVebCp6v9Iv0= +github.com/ledgerwatch/erigon-lib v0.0.0-20220510072701-f4d7b2bdcce9 h1:pM9ej/5TEvTUzP0FoZbT3vi0PgAM0D4YVQ+2HTWz0H8= +github.com/ledgerwatch/erigon-lib v0.0.0-20220510072701-f4d7b2bdcce9/go.mod h1:oHHsHZR+xF3LSHzjBlJYW21ZnBXkwIZrVebCp6v9Iv0= github.com/ledgerwatch/log/v3 v3.4.1 h1:/xGwlVulXnsO9Uq+tzaExc8OWmXXHU0dnLalpbnY5Bc= github.com/ledgerwatch/log/v3 v3.4.1/go.mod h1:VXcz6Ssn6XEeU92dCMc39/g1F0OYAjw1Mt+dGP5DjXY= github.com/ledgerwatch/secp256k1 v1.0.0 h1:Usvz87YoTG0uePIV8woOof5cQnLXGYa162rFf3YnwaQ= diff --git a/turbo/snapshotsync/block_snapshots.go b/turbo/snapshotsync/block_snapshots.go index 9005d649d0f..a4ca495b492 100644 --- a/turbo/snapshotsync/block_snapshots.go +++ b/turbo/snapshotsync/block_snapshots.go @@ -38,27 +38,25 @@ import ( "go.uber.org/atomic" ) -type BlocksSnapshot struct { - Bodies *compress.Decompressor // value: rlp(types.BodyForStorage) - Headers *compress.Decompressor // value: first_byte_of_header_hash + header_rlp - Transactions *compress.Decompressor // value: first_byte_of_transaction_hash + transaction_rlp - BodyNumberIdx *recsplit.Index // block_num_u64 -> bodies_segment_offset - HeaderHashIdx *recsplit.Index // header_hash -> headers_segment_offset - TxnHashIdx *recsplit.Index // transaction_hash -> transactions_segment_offset - TxnIdsIdx *recsplit.Index // transaction_id -> transactions_segment_offset - TxnHash2BlockNumIdx *recsplit.Index // transaction_hash -> block_number - - From, To uint64 // [from,to) -} - -func (s BlocksSnapshot) Has(block uint64) bool { return block >= s.From && block < s.To } - type HeaderSegment struct { seg *compress.Decompressor // value: first_byte_of_header_hash + header_rlp idxHeaderHash *recsplit.Index // header_hash -> headers_segment_offset From, To uint64 } +type BodySegment struct { + seg *compress.Decompressor // value: rlp(types.BodyForStorage) + idxBodyNumber *recsplit.Index // block_num_u64 -> bodies_segment_offset + From, To uint64 +} + +type TxnSegment struct { + Seg *compress.Decompressor // value: first_byte_of_transaction_hash + sender_address + transaction_rlp + IdxTxnHash *recsplit.Index // transaction_hash -> transactions_segment_offset + IdxTxnHash2BlockNum *recsplit.Index // transaction_hash -> block_number + From, To uint64 +} + func (sn *HeaderSegment) close() { if sn.seg != nil { sn.seg.Close() @@ -69,6 +67,7 @@ func (sn *HeaderSegment) close() { sn.idxHeaderHash = nil } } + func (sn *HeaderSegment) reopen(dir string) (err error) { sn.close() fileName := snap.SegmentFileName(sn.From, sn.To, snap.Headers) @@ -83,12 +82,6 @@ func (sn *HeaderSegment) reopen(dir string) (err error) { return nil } -type BodySegment struct { - seg *compress.Decompressor // value: rlp(types.BodyForStorage) - idxBodyNumber *recsplit.Index // block_num_u64 -> bodies_segment_offset - From, To uint64 -} - func (sn *BodySegment) close() { if sn.seg != nil { sn.seg.Close() @@ -99,6 +92,7 @@ func (sn *BodySegment) close() { sn.idxBodyNumber = nil } } + func (sn *BodySegment) reopen(dir string) (err error) { sn.close() fileName := snap.SegmentFileName(sn.From, sn.To, snap.Bodies) @@ -113,13 +107,6 @@ func (sn *BodySegment) reopen(dir string) (err error) { return nil } -type TxnSegment struct { - Seg *compress.Decompressor // value: first_byte_of_transaction_hash + transaction_rlp - IdxTxnHash *recsplit.Index // transaction_hash -> transactions_segment_offset - IdxTxnHash2BlockNum *recsplit.Index // transaction_hash -> block_number - From, To uint64 -} - func (sn *TxnSegment) close() { if sn.Seg != nil { sn.Seg.Close() @@ -274,10 +261,10 @@ type RoSnapshots struct { Bodies *bodySegments Txs *txnSegments - dir string - segmentsAvailable atomic.Uint64 // all types of .seg files are available - up to this number - idxAvailable atomic.Uint64 // all types of .idx files are available - up to this number - cfg ethconfig.Snapshot + dir string + segmentsMax atomic.Uint64 // all types of .seg files are available - up to this number + idxMax atomic.Uint64 // all types of .idx files are available - up to this number + cfg ethconfig.Snapshot } // NewRoSnapshots - opens all snapshots. But to simplify everything: @@ -289,15 +276,14 @@ func NewRoSnapshots(cfg ethconfig.Snapshot, snapshotDir string) *RoSnapshots { return &RoSnapshots{dir: snapshotDir, cfg: cfg, Headers: &headerSegments{}, Bodies: &bodySegments{}, Txs: &txnSegments{}} } -func (s *RoSnapshots) Cfg() ethconfig.Snapshot { return s.cfg } -func (s *RoSnapshots) Dir() string { return s.dir } -func (s *RoSnapshots) SegmentsReady() bool { return s.segmentsReady.Load() } -func (s *RoSnapshots) IndicesReady() bool { return s.indicesReady.Load() } -func (s *RoSnapshots) IndicesAvailable() uint64 { return s.idxAvailable.Load() } -func (s *RoSnapshots) SegmentsAvailable() uint64 { return s.segmentsAvailable.Load() } -func (s *RoSnapshots) BlocksAvailable() uint64 { - return cmp.Min(s.segmentsAvailable.Load(), s.idxAvailable.Load()) -} +func (s *RoSnapshots) Cfg() ethconfig.Snapshot { return s.cfg } +func (s *RoSnapshots) Dir() string { return s.dir } +func (s *RoSnapshots) SegmentsReady() bool { return s.segmentsReady.Load() } +func (s *RoSnapshots) IndicesReady() bool { return s.indicesReady.Load() } +func (s *RoSnapshots) IndicesMax() uint64 { return s.idxMax.Load() } +func (s *RoSnapshots) SegmentsMax() uint64 { return s.segmentsMax.Load() } +func (s *RoSnapshots) BlocksAvailable() uint64 { return cmp.Min(s.segmentsMax.Load(), s.idxMax.Load()) } + func (s *RoSnapshots) EnsureExpectedBlocksAreAvailable(cfg *snapshothashes.Config) error { if s.BlocksAvailable() < cfg.ExpectBlocks { return fmt.Errorf("app must wait until all expected snapshots are available. Expected: %d, Available: %d", cfg.ExpectBlocks, s.BlocksAvailable()) @@ -366,7 +352,7 @@ func (s *RoSnapshots) ReopenSomeIndices(types ...snap.Type) (err error) { } } - s.idxAvailable.Store(s.idxAvailability()) + s.idxMax.Store(s.idxAvailability()) s.indicesReady.Store(true) return nil } @@ -438,9 +424,9 @@ func (s *RoSnapshots) Reopen() error { } if f.To > 0 { - s.segmentsAvailable.Store(f.To - 1) + s.segmentsMax.Store(f.To - 1) } else { - s.segmentsAvailable.Store(0) + s.segmentsMax.Store(0) } } s.segmentsReady.Store(true) @@ -468,7 +454,7 @@ func (s *RoSnapshots) Reopen() error { } } - s.idxAvailable.Store(s.idxAvailability()) + s.idxMax.Store(s.idxAvailability()) s.indicesReady.Store(true) return nil @@ -525,9 +511,9 @@ func (s *RoSnapshots) ReopenSegments() error { } if f.To > 0 { - s.segmentsAvailable.Store(f.To - 1) + s.segmentsMax.Store(f.To - 1) } else { - s.segmentsAvailable.Store(0) + s.segmentsMax.Store(0) } } s.segmentsReady.Store(true) @@ -564,7 +550,7 @@ func (s *RoSnapshots) PrintDebug() { defer s.Bodies.lock.RUnlock() s.Txs.lock.RLock() defer s.Txs.lock.RUnlock() - fmt.Printf("sn: %d, %d\n", s.segmentsAvailable.Load(), s.idxAvailable.Load()) + fmt.Printf("sn: %d, %d\n", s.segmentsMax.Load(), s.idxMax.Load()) fmt.Println(" == Snapshots, Header") for _, sn := range s.Headers.segments { fmt.Printf("%d, %t\n", sn.From, sn.idxHeaderHash == nil) @@ -937,7 +923,7 @@ func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint25 if idxWorkers > 4 { idxWorkers = 4 } - if err := BuildIndices(ctx, snapshots, chainID, tmpDir, snapshots.IndicesAvailable(), idxWorkers, log.LvlInfo); err != nil { + if err := BuildIndices(ctx, snapshots, chainID, tmpDir, snapshots.IndicesMax(), idxWorkers, log.LvlInfo); err != nil { return err } merger := NewMerger(tmpDir, workers, lvl, chainID, notifier) @@ -1824,26 +1810,6 @@ func (m *Merger) removeOldFiles(toDel []string, snapshotsDir string) error { return nil } -//nolint -func assertAllSegments(blocks []*BlocksSnapshot, root string) { - wg := sync.WaitGroup{} - for _, sn := range blocks { - wg.Add(1) - go func(sn *BlocksSnapshot) { - defer wg.Done() - f := filepath.Join(root, snap.SegmentFileName(sn.From, sn.To, snap.Headers)) - assertSegment(f) - f = filepath.Join(root, snap.SegmentFileName(sn.From, sn.To, snap.Bodies)) - assertSegment(f) - f = filepath.Join(root, snap.SegmentFileName(sn.From, sn.To, snap.Transactions)) - assertSegment(f) - fmt.Printf("done:%s\n", f) - }(sn) - } - wg.Wait() - panic("success") -} - //nolint func assertSegment(segmentFile string) { d, err := compress.NewDecompressor(segmentFile) diff --git a/turbo/snapshotsync/snap/flags.go b/turbo/snapshotsync/snap/flags.go index e0f23e9078c..b7cd90a5f9a 100644 --- a/turbo/snapshotsync/snap/flags.go +++ b/turbo/snapshotsync/snap/flags.go @@ -11,6 +11,10 @@ var ( blockSnapshotEnabledKey = []byte("blocksSnapshotEnabled") ) +func Enabled(tx kv.Getter) (bool, error) { + return kv.GetBool(tx, kv.DatabaseInfo, blockSnapshotEnabledKey) +} + func EnsureNotChanged(tx kv.GetPut, cfg ethconfig.Snapshot) error { ok, v, err := kv.EnsureNotChangedBool(tx, kv.DatabaseInfo, blockSnapshotEnabledKey, cfg.Enabled) if err != nil {