Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RPCDaemon: open snapshots on startup (because now snapshots dir is atomic), even if no Erigon available #4110

Merged
merged 6 commits into from
May 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 23 additions & 14 deletions cmd/rpcdaemon/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/rpc"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
"github.com/ledgerwatch/erigon/turbo/snapshotsync/snap"
"github.com/ledgerwatch/log/v3"
"github.com/spf13/cobra"
"google.golang.org/grpc"
Expand Down Expand Up @@ -106,8 +107,8 @@ func RootCommand() (*cobra.Command, *httpcfg.HttpCfg) {
if err := utils.SetupCobra(cmd); err != nil {
return err
}
cfg.SingleNodeMode = cfg.DataDir != "" || cfg.Chaindata != ""
if cfg.SingleNodeMode {
cfg.WithDatadir = cfg.DataDir != "" || cfg.Chaindata != ""
if cfg.WithDatadir {
if cfg.DataDir == "" {
cfg.DataDir = paths.DefaultDataDir()
}
Expand Down Expand Up @@ -234,20 +235,20 @@ func EmbeddedServices(ctx context.Context, erigonDB kv.RoDB, stateCacheCfg kvcac
}

// RemoteServices - use when RPCDaemon run as independent process. Still it can use --datadir flag to enable
// `cfg.SingleNodeMode` (mode when it on 1 machine with Erigon)
// `cfg.WithDatadir` (mode when it on 1 machine with Erigon)
func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger, rootCancel context.CancelFunc) (
db kv.RoDB, borDb kv.RoDB,
eth services.ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient,
starknet *services.StarknetService,
stateCache kvcache.Cache, blockReader interfaces.BlockAndTxnReader,
ff *filters.Filters, err error) {
if !cfg.SingleNodeMode && cfg.PrivateApiAddr == "" {
if !cfg.WithDatadir && cfg.PrivateApiAddr == "" {
return nil, nil, nil, nil, nil, nil, nil, nil, ff, fmt.Errorf("either remote db or local db must be specified")
}

// Do not change the order of these checks. Chaindata needs to be checked first, because PrivateApiAddr has default value which is not ""
// If PrivateApiAddr is checked first, the Chaindata option will never work
if cfg.SingleNodeMode {
if cfg.WithDatadir {
var rwKv kv.RwDB
log.Trace("Creating chain db", "path", cfg.Chaindata)
limiter := make(chan struct{}, cfg.DBReadConcurrency)
Expand Down Expand Up @@ -300,6 +301,10 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger,
if err != nil {
return err
}
cfg.Snapshot.Enabled, err = snap.Enabled(tx)
if err != nil {
return err
}
return nil
}); err != nil {
return nil, nil, nil, nil, nil, nil, nil, nil, ff, err
Expand All @@ -308,10 +313,10 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger,
return nil, nil, nil, nil, nil, nil, nil, nil, ff, fmt.Errorf("chain config not found in db. Need start erigon at least once on this db")
}

cfg.SyncMode = ethconfig.SyncModeByChainName(cc.ChainName, cfg.SyncModeCli)
cfg.Snapshot.Enabled = cfg.SyncMode == ethconfig.SnapSync
if cfg.SingleNodeMode {
cfg.Snapshot = ethconfig.NewSnapshotCfg(cfg.Snapshot.Enabled, cfg.Snapshot.KeepBlocks)
if cfg.Snapshot.Enabled {
cfg.SyncMode = ethconfig.SnapSync
} else {
cfg.SyncMode = ethconfig.FullSync
}

// if chain config has terminal total difficulty then rpc must have eth and engine APIs enableds
Expand All @@ -338,18 +343,22 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger,
}
}

var onNewSnapshot func()
if cfg.SingleNodeMode {
onNewSnapshot := func() {}
if cfg.WithDatadir {
if cfg.Snapshot.Enabled {
allSnapshots := snapshotsync.NewRoSnapshots(cfg.Snapshot, filepath.Join(cfg.DataDir, "snapshots"))
if err := allSnapshots.Reopen(); err != nil {
return nil, nil, nil, nil, nil, nil, nil, nil, ff, err
}
log.Info("[Snapshots] see new", "blocks", allSnapshots.BlocksAvailable())
// don't reopen it right here, because snapshots may be not ready yet
onNewSnapshot = func() {
allSnapshots.Reopen()
log.Info("[Snapshots] see new", "blocks", allSnapshots.BlocksAvailable())
}
blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots)
} else {
blockReader = snapshotsync.NewBlockReader()
onNewSnapshot = func() {}
log.Info("Use --syncmode=full")
}
}

Expand All @@ -370,7 +379,7 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger,

subscribeToStateChangesLoop(ctx, kvClient, stateCache)

if !cfg.SingleNodeMode {
if !cfg.WithDatadir {
blockReader = snapshotsync.NewRemoteBlockReader(remote.NewETHBACKENDClient(conn))
}
remoteEth := services.NewRemoteBackend(remote.NewETHBACKENDClient(conn), db, blockReader)
Expand Down
2 changes: 1 addition & 1 deletion cmd/rpcdaemon/cli/httpcfg/http_cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
type HttpCfg struct {
Enabled bool
PrivateApiAddr string
SingleNodeMode bool // Erigon's database can be read by separated processes on same machine - in read-only mode - with full support of transactions. It will share same "OS PageCache" with Erigon process.
WithDatadir bool // Erigon's database can be read by separated processes on same machine - in read-only mode - with full support of transactions. It will share same "OS PageCache" with Erigon process.
DataDir string
Chaindata string
HttpListenAddress string
Expand Down
2 changes: 1 addition & 1 deletion cmd/rpcdaemon/commands/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func APIList(db kv.RoDB, borDb kv.RoDB, eth services.ApiBackend, txPool txpool.T
starknet starknet.CAIROVMClient, filters *filters.Filters, stateCache kvcache.Cache,
blockReader interfaces.BlockAndTxnReader, cfg httpcfg.HttpCfg) (list []rpc.API) {

base := NewBaseApi(filters, stateCache, blockReader, cfg.SingleNodeMode)
base := NewBaseApi(filters, stateCache, blockReader, cfg.WithDatadir)
if cfg.TevmEnabled {
base.EnableTevmExperiment()
}
Expand Down
14 changes: 7 additions & 7 deletions eth/stagedsync/stage_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1077,7 +1077,7 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R
}

expect := snapshothashes.KnownConfig(cfg.chainConfig.ChainName).ExpectBlocks
if cfg.snapshots.SegmentsAvailable() < expect {
if cfg.snapshots.SegmentsMax() < expect {
c, err := tx.Cursor(kv.Headers)
if err != nil {
return err
Expand All @@ -1089,27 +1089,27 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R
}
c.Close()
hasInDB := binary.BigEndian.Uint64(firstK)
if cfg.snapshots.SegmentsAvailable() < hasInDB {
return fmt.Errorf("not enough snapshots available: snapshots=%d, blockInDB=%d, expect=%d", cfg.snapshots.SegmentsAvailable(), hasInDB, expect)
if cfg.snapshots.SegmentsMax() < hasInDB {
return fmt.Errorf("not enough snapshots available: snapshots=%d, blockInDB=%d, expect=%d", cfg.snapshots.SegmentsMax(), hasInDB, expect)
} else {
log.Warn(fmt.Sprintf("not enough snapshots available: %d < %d, but we can re-generate them because DB has historical blocks up to: %d", cfg.snapshots.SegmentsAvailable(), expect, hasInDB))
log.Warn(fmt.Sprintf("not enough snapshots available: %d < %d, but we can re-generate them because DB has historical blocks up to: %d", cfg.snapshots.SegmentsMax(), expect, hasInDB))
}
}
if err := cfg.snapshots.Reopen(); err != nil {
return fmt.Errorf("ReopenIndices: %w", err)
}

// Create .idx files
if cfg.snapshots.IndicesAvailable() < cfg.snapshots.SegmentsAvailable() {
if cfg.snapshots.IndicesMax() < cfg.snapshots.SegmentsMax() {
if !cfg.snapshots.SegmentsReady() {
return fmt.Errorf("not all snapshot segments are available")
}

// wait for Downloader service to download all expected snapshots
if cfg.snapshots.IndicesAvailable() < cfg.snapshots.SegmentsAvailable() {
if cfg.snapshots.IndicesMax() < cfg.snapshots.SegmentsMax() {
chainID, _ := uint256.FromBig(cfg.chainConfig.ChainID)
workers := cmp.InRange(1, 2, runtime.GOMAXPROCS(-1)-1)
if err := snapshotsync.BuildIndices(ctx, cfg.snapshots, *chainID, cfg.tmpdir, cfg.snapshots.IndicesAvailable(), workers, log.LvlInfo); err != nil {
if err := snapshotsync.BuildIndices(ctx, cfg.snapshots, *chainID, cfg.tmpdir, cfg.snapshots.IndicesMax(), workers, log.LvlInfo); err != nil {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ require (
github.com/json-iterator/go v1.1.12
github.com/julienschmidt/httprouter v1.3.0
github.com/kevinburke/go-bindata v3.21.0+incompatible
github.com/ledgerwatch/erigon-lib v0.0.0-20220510025207-2689ed9cf66d
github.com/ledgerwatch/erigon-lib v0.0.0-20220510072701-f4d7b2bdcce9
github.com/ledgerwatch/log/v3 v3.4.1
github.com/ledgerwatch/secp256k1 v1.0.0
github.com/pelletier/go-toml v1.9.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -448,8 +448,8 @@ github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3P
github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k=
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
github.com/ledgerwatch/erigon-lib v0.0.0-20220510025207-2689ed9cf66d h1:cFFaMnIheahNQhbMAK/FtIDBZtkaEepuS38Rursznpc=
github.com/ledgerwatch/erigon-lib v0.0.0-20220510025207-2689ed9cf66d/go.mod h1:oHHsHZR+xF3LSHzjBlJYW21ZnBXkwIZrVebCp6v9Iv0=
github.com/ledgerwatch/erigon-lib v0.0.0-20220510072701-f4d7b2bdcce9 h1:pM9ej/5TEvTUzP0FoZbT3vi0PgAM0D4YVQ+2HTWz0H8=
github.com/ledgerwatch/erigon-lib v0.0.0-20220510072701-f4d7b2bdcce9/go.mod h1:oHHsHZR+xF3LSHzjBlJYW21ZnBXkwIZrVebCp6v9Iv0=
github.com/ledgerwatch/log/v3 v3.4.1 h1:/xGwlVulXnsO9Uq+tzaExc8OWmXXHU0dnLalpbnY5Bc=
github.com/ledgerwatch/log/v3 v3.4.1/go.mod h1:VXcz6Ssn6XEeU92dCMc39/g1F0OYAjw1Mt+dGP5DjXY=
github.com/ledgerwatch/secp256k1 v1.0.0 h1:Usvz87YoTG0uePIV8woOof5cQnLXGYa162rFf3YnwaQ=
Expand Down
104 changes: 35 additions & 69 deletions turbo/snapshotsync/block_snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,27 +38,25 @@ import (
"go.uber.org/atomic"
)

type BlocksSnapshot struct {
Bodies *compress.Decompressor // value: rlp(types.BodyForStorage)
Headers *compress.Decompressor // value: first_byte_of_header_hash + header_rlp
Transactions *compress.Decompressor // value: first_byte_of_transaction_hash + transaction_rlp
BodyNumberIdx *recsplit.Index // block_num_u64 -> bodies_segment_offset
HeaderHashIdx *recsplit.Index // header_hash -> headers_segment_offset
TxnHashIdx *recsplit.Index // transaction_hash -> transactions_segment_offset
TxnIdsIdx *recsplit.Index // transaction_id -> transactions_segment_offset
TxnHash2BlockNumIdx *recsplit.Index // transaction_hash -> block_number

From, To uint64 // [from,to)
}

func (s BlocksSnapshot) Has(block uint64) bool { return block >= s.From && block < s.To }

type HeaderSegment struct {
seg *compress.Decompressor // value: first_byte_of_header_hash + header_rlp
idxHeaderHash *recsplit.Index // header_hash -> headers_segment_offset
From, To uint64
}

type BodySegment struct {
seg *compress.Decompressor // value: rlp(types.BodyForStorage)
idxBodyNumber *recsplit.Index // block_num_u64 -> bodies_segment_offset
From, To uint64
}

type TxnSegment struct {
Seg *compress.Decompressor // value: first_byte_of_transaction_hash + sender_address + transaction_rlp
IdxTxnHash *recsplit.Index // transaction_hash -> transactions_segment_offset
IdxTxnHash2BlockNum *recsplit.Index // transaction_hash -> block_number
From, To uint64
}

func (sn *HeaderSegment) close() {
if sn.seg != nil {
sn.seg.Close()
Expand All @@ -69,6 +67,7 @@ func (sn *HeaderSegment) close() {
sn.idxHeaderHash = nil
}
}

func (sn *HeaderSegment) reopen(dir string) (err error) {
sn.close()
fileName := snap.SegmentFileName(sn.From, sn.To, snap.Headers)
Expand All @@ -83,12 +82,6 @@ func (sn *HeaderSegment) reopen(dir string) (err error) {
return nil
}

type BodySegment struct {
seg *compress.Decompressor // value: rlp(types.BodyForStorage)
idxBodyNumber *recsplit.Index // block_num_u64 -> bodies_segment_offset
From, To uint64
}

func (sn *BodySegment) close() {
if sn.seg != nil {
sn.seg.Close()
Expand All @@ -99,6 +92,7 @@ func (sn *BodySegment) close() {
sn.idxBodyNumber = nil
}
}

func (sn *BodySegment) reopen(dir string) (err error) {
sn.close()
fileName := snap.SegmentFileName(sn.From, sn.To, snap.Bodies)
Expand All @@ -113,13 +107,6 @@ func (sn *BodySegment) reopen(dir string) (err error) {
return nil
}

type TxnSegment struct {
Seg *compress.Decompressor // value: first_byte_of_transaction_hash + transaction_rlp
IdxTxnHash *recsplit.Index // transaction_hash -> transactions_segment_offset
IdxTxnHash2BlockNum *recsplit.Index // transaction_hash -> block_number
From, To uint64
}

func (sn *TxnSegment) close() {
if sn.Seg != nil {
sn.Seg.Close()
Expand Down Expand Up @@ -274,10 +261,10 @@ type RoSnapshots struct {
Bodies *bodySegments
Txs *txnSegments

dir string
segmentsAvailable atomic.Uint64 // all types of .seg files are available - up to this number
idxAvailable atomic.Uint64 // all types of .idx files are available - up to this number
cfg ethconfig.Snapshot
dir string
segmentsMax atomic.Uint64 // all types of .seg files are available - up to this number
idxMax atomic.Uint64 // all types of .idx files are available - up to this number
cfg ethconfig.Snapshot
}

// NewRoSnapshots - opens all snapshots. But to simplify everything:
Expand All @@ -289,15 +276,14 @@ func NewRoSnapshots(cfg ethconfig.Snapshot, snapshotDir string) *RoSnapshots {
return &RoSnapshots{dir: snapshotDir, cfg: cfg, Headers: &headerSegments{}, Bodies: &bodySegments{}, Txs: &txnSegments{}}
}

func (s *RoSnapshots) Cfg() ethconfig.Snapshot { return s.cfg }
func (s *RoSnapshots) Dir() string { return s.dir }
func (s *RoSnapshots) SegmentsReady() bool { return s.segmentsReady.Load() }
func (s *RoSnapshots) IndicesReady() bool { return s.indicesReady.Load() }
func (s *RoSnapshots) IndicesAvailable() uint64 { return s.idxAvailable.Load() }
func (s *RoSnapshots) SegmentsAvailable() uint64 { return s.segmentsAvailable.Load() }
func (s *RoSnapshots) BlocksAvailable() uint64 {
return cmp.Min(s.segmentsAvailable.Load(), s.idxAvailable.Load())
}
func (s *RoSnapshots) Cfg() ethconfig.Snapshot { return s.cfg }
func (s *RoSnapshots) Dir() string { return s.dir }
func (s *RoSnapshots) SegmentsReady() bool { return s.segmentsReady.Load() }
func (s *RoSnapshots) IndicesReady() bool { return s.indicesReady.Load() }
func (s *RoSnapshots) IndicesMax() uint64 { return s.idxMax.Load() }
func (s *RoSnapshots) SegmentsMax() uint64 { return s.segmentsMax.Load() }
func (s *RoSnapshots) BlocksAvailable() uint64 { return cmp.Min(s.segmentsMax.Load(), s.idxMax.Load()) }

func (s *RoSnapshots) EnsureExpectedBlocksAreAvailable(cfg *snapshothashes.Config) error {
if s.BlocksAvailable() < cfg.ExpectBlocks {
return fmt.Errorf("app must wait until all expected snapshots are available. Expected: %d, Available: %d", cfg.ExpectBlocks, s.BlocksAvailable())
Expand Down Expand Up @@ -366,7 +352,7 @@ func (s *RoSnapshots) ReopenSomeIndices(types ...snap.Type) (err error) {
}
}

s.idxAvailable.Store(s.idxAvailability())
s.idxMax.Store(s.idxAvailability())
s.indicesReady.Store(true)
return nil
}
Expand Down Expand Up @@ -438,9 +424,9 @@ func (s *RoSnapshots) Reopen() error {
}

if f.To > 0 {
s.segmentsAvailable.Store(f.To - 1)
s.segmentsMax.Store(f.To - 1)
} else {
s.segmentsAvailable.Store(0)
s.segmentsMax.Store(0)
}
}
s.segmentsReady.Store(true)
Expand Down Expand Up @@ -468,7 +454,7 @@ func (s *RoSnapshots) Reopen() error {
}
}

s.idxAvailable.Store(s.idxAvailability())
s.idxMax.Store(s.idxAvailability())
s.indicesReady.Store(true)

return nil
Expand Down Expand Up @@ -525,9 +511,9 @@ func (s *RoSnapshots) ReopenSegments() error {
}

if f.To > 0 {
s.segmentsAvailable.Store(f.To - 1)
s.segmentsMax.Store(f.To - 1)
} else {
s.segmentsAvailable.Store(0)
s.segmentsMax.Store(0)
}
}
s.segmentsReady.Store(true)
Expand Down Expand Up @@ -564,7 +550,7 @@ func (s *RoSnapshots) PrintDebug() {
defer s.Bodies.lock.RUnlock()
s.Txs.lock.RLock()
defer s.Txs.lock.RUnlock()
fmt.Printf("sn: %d, %d\n", s.segmentsAvailable.Load(), s.idxAvailable.Load())
fmt.Printf("sn: %d, %d\n", s.segmentsMax.Load(), s.idxMax.Load())
fmt.Println(" == Snapshots, Header")
for _, sn := range s.Headers.segments {
fmt.Printf("%d, %t\n", sn.From, sn.idxHeaderHash == nil)
Expand Down Expand Up @@ -937,7 +923,7 @@ func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint25
if idxWorkers > 4 {
idxWorkers = 4
}
if err := BuildIndices(ctx, snapshots, chainID, tmpDir, snapshots.IndicesAvailable(), idxWorkers, log.LvlInfo); err != nil {
if err := BuildIndices(ctx, snapshots, chainID, tmpDir, snapshots.IndicesMax(), idxWorkers, log.LvlInfo); err != nil {
return err
}
merger := NewMerger(tmpDir, workers, lvl, chainID, notifier)
Expand Down Expand Up @@ -1824,26 +1810,6 @@ func (m *Merger) removeOldFiles(toDel []string, snapshotsDir string) error {
return nil
}

//nolint
func assertAllSegments(blocks []*BlocksSnapshot, root string) {
wg := sync.WaitGroup{}
for _, sn := range blocks {
wg.Add(1)
go func(sn *BlocksSnapshot) {
defer wg.Done()
f := filepath.Join(root, snap.SegmentFileName(sn.From, sn.To, snap.Headers))
assertSegment(f)
f = filepath.Join(root, snap.SegmentFileName(sn.From, sn.To, snap.Bodies))
assertSegment(f)
f = filepath.Join(root, snap.SegmentFileName(sn.From, sn.To, snap.Transactions))
assertSegment(f)
fmt.Printf("done:%s\n", f)
}(sn)
}
wg.Wait()
panic("success")
}

//nolint
func assertSegment(segmentFile string) {
d, err := compress.NewDecompressor(segmentFile)
Expand Down
Loading