Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP]: improve snapshot generation #22465

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
4a7becb
eth/protocols: persist received state segments
rjl493456442 Mar 5, 2021
60f57c9
core: initial implementation
rjl493456442 Mar 8, 2021
bf6c7da
core/state/snapshot: add tests
rjl493456442 Mar 9, 2021
fd7cf4b
core, eth: updates
rjl493456442 Mar 9, 2021
8bf2032
eth/protocols/snapshot: count flat state size
rjl493456442 Mar 9, 2021
353804d
core/state: add metrics
rjl493456442 Mar 9, 2021
3a3d341
core/state/snapshot: skip unnecessary deletion
rjl493456442 Mar 9, 2021
f0e822e
core/state/snapshot: rename
rjl493456442 Mar 9, 2021
0b8d5a6
core/state/snapshot: use the global batch
rjl493456442 Mar 9, 2021
a9ea5be
core/state/snapshot: add logs and fix wiping
rjl493456442 Mar 10, 2021
0a2a1a8
core/state/snapshot: fix
rjl493456442 Mar 10, 2021
b6b2aaf
core/state/snapshot: save generation progress even if the batch is empty
rjl493456442 Mar 10, 2021
f64ede8
core/state/snapshot: fixes
rjl493456442 Mar 10, 2021
e848ac9
core/state/snapshot: fix initial account range length
rjl493456442 Mar 10, 2021
f5b9a8f
core/state/snapshot: fix initial account range
rjl493456442 Mar 11, 2021
cc8bffc
eth/protocols/snap: store flat states during the healing
rjl493456442 Mar 11, 2021
475ad07
eth/protocols/snap: print logs
rjl493456442 Mar 12, 2021
955e99d
core/state/snapshot: refactor (#4)
holiman Mar 12, 2021
27c8239
core, eth: fixes
rjl493456442 Mar 12, 2021
a8c419a
core, eth: fix healing writer
rjl493456442 Mar 12, 2021
523e3e4
core, trie, eth: fix paths
rjl493456442 Mar 12, 2021
3f992cf
eth/protocols/snap: fix encoding
rjl493456442 Mar 12, 2021
9f27aae
eth, core: add debug log
rjl493456442 Mar 15, 2021
8344c7c
core/state/generate: release iterator asap (#5)
holiman Mar 15, 2021
e69cde1
core/state/snapshot: optimize stats counter
rjl493456442 Mar 15, 2021
f52d630
core, eth: add metric
rjl493456442 Mar 15, 2021
2dd3d99
core/state/snapshot: update comments
rjl493456442 Mar 15, 2021
b77a7ff
core/state/snapshot: improve tests
rjl493456442 Mar 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
459 changes: 317 additions & 142 deletions core/state/snapshot/generate.go

Large diffs are not rendered by default.

116 changes: 113 additions & 3 deletions core/state/snapshot/generate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,121 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/ethdb/memorydb"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
"golang.org/x/crypto/sha3"
)

// Tests that snapshot generation from an empty database.
func TestGeneration(t *testing.T) {
// We can't use statedb to make a test trie (circular dependency), so make
// a fake one manually. We're going with a small account trie of 3 accounts,
// two of which also has the same 3-slot storage trie attached.
var (
diskdb = memorydb.New()
triedb = trie.NewDatabase(diskdb)
)
stTrie, _ := trie.NewSecure(common.Hash{}, triedb)
stTrie.Update([]byte("key-1"), []byte("val-1")) // 0x1314700b81afc49f94db3623ef1df38f3ed18b73a1b7ea2f6c095118cf6118a0
stTrie.Update([]byte("key-2"), []byte("val-2")) // 0x18a0f4d79cff4459642dd7604f303886ad9d77c30cf3d7d7cedb3a693ab6d371
stTrie.Update([]byte("key-3"), []byte("val-3")) // 0x51c71a47af0695957647fb68766d0becee77e953df17c29b3c2f25436f055c78
stTrie.Commit(nil) // Root: 0xddefcd9376dd029653ef384bd2f0a126bb755fe84fdcc9e7cf421ba454f2bc67

accTrie, _ := trie.NewSecure(common.Hash{}, triedb)
acc := &Account{Balance: big.NewInt(1), Root: stTrie.Hash().Bytes(), CodeHash: emptyCode.Bytes()}
val, _ := rlp.EncodeToBytes(acc)
accTrie.Update([]byte("acc-1"), val) // 0x9250573b9c18c664139f3b6a7a8081b7d8f8916a8fcc5d94feec6c29f5fd4e9e

acc = &Account{Balance: big.NewInt(2), Root: emptyRoot.Bytes(), CodeHash: emptyCode.Bytes()}
val, _ = rlp.EncodeToBytes(acc)
accTrie.Update([]byte("acc-2"), val) // 0x65145f923027566669a1ae5ccac66f945b55ff6eaeb17d2ea8e048b7d381f2d7

acc = &Account{Balance: big.NewInt(3), Root: stTrie.Hash().Bytes(), CodeHash: emptyCode.Bytes()}
val, _ = rlp.EncodeToBytes(acc)
accTrie.Update([]byte("acc-3"), val) // 0x50815097425d000edfc8b3a4a13e175fc2bdcfee8bdfbf2d1ff61041d3c235b2
accTrie.Commit(nil) // Root: 0xe3712f1a226f3782caca78ca770ccc19ee000552813a9f59d479f8611db9b1fd
triedb.Commit(common.HexToHash("0xe3712f1a226f3782caca78ca770ccc19ee000552813a9f59d479f8611db9b1fd"), false, nil)

snap := generateSnapshot(diskdb, triedb, 16, common.HexToHash("0xe3712f1a226f3782caca78ca770ccc19ee000552813a9f59d479f8611db9b1fd"))
select {
case <-snap.genPending:
// Snapshot generation succeeded

case <-time.After(250 * time.Millisecond):
t.Errorf("Snapshot generation failed")
}
// Signal abortion to the generator and wait for it to tear down
stop := make(chan *generatorStats)
snap.genAbort <- stop
<-stop
}

func hashData(input []byte) common.Hash {
var hasher = sha3.NewLegacyKeccak256()
var hash common.Hash
hasher.Reset()
hasher.Write(input)
hasher.Sum(hash[:0])
return hash
}

// Tests that snapshot generation with existent flat state.
func TestGenerateExistentState(t *testing.T) {
// We can't use statedb to make a test trie (circular dependency), so make
// a fake one manually. We're going with a small account trie of 3 accounts,
// two of which also has the same 3-slot storage trie attached.
var (
diskdb = memorydb.New()
triedb = trie.NewDatabase(diskdb)
)
stTrie, _ := trie.NewSecure(common.Hash{}, triedb)
stTrie.Update([]byte("key-1"), []byte("val-1")) // 0x1314700b81afc49f94db3623ef1df38f3ed18b73a1b7ea2f6c095118cf6118a0
stTrie.Update([]byte("key-2"), []byte("val-2")) // 0x18a0f4d79cff4459642dd7604f303886ad9d77c30cf3d7d7cedb3a693ab6d371
stTrie.Update([]byte("key-3"), []byte("val-3")) // 0x51c71a47af0695957647fb68766d0becee77e953df17c29b3c2f25436f055c78
stTrie.Commit(nil) // Root: 0xddefcd9376dd029653ef384bd2f0a126bb755fe84fdcc9e7cf421ba454f2bc67

accTrie, _ := trie.NewSecure(common.Hash{}, triedb)
acc := &Account{Balance: big.NewInt(1), Root: stTrie.Hash().Bytes(), CodeHash: emptyCode.Bytes()}
val, _ := rlp.EncodeToBytes(acc)
accTrie.Update([]byte("acc-1"), val) // 0x9250573b9c18c664139f3b6a7a8081b7d8f8916a8fcc5d94feec6c29f5fd4e9e
rawdb.WriteAccountSnapshot(diskdb, hashData([]byte("acc-1")), val)
rawdb.WriteStorageSnapshot(diskdb, hashData([]byte("acc-1")), hashData([]byte("key-1")), []byte("val-1"))
rawdb.WriteStorageSnapshot(diskdb, hashData([]byte("acc-1")), hashData([]byte("key-2")), []byte("val-2"))
rawdb.WriteStorageSnapshot(diskdb, hashData([]byte("acc-1")), hashData([]byte("key-3")), []byte("val-3"))

acc = &Account{Balance: big.NewInt(2), Root: emptyRoot.Bytes(), CodeHash: emptyCode.Bytes()}
val, _ = rlp.EncodeToBytes(acc)
accTrie.Update([]byte("acc-2"), val) // 0x65145f923027566669a1ae5ccac66f945b55ff6eaeb17d2ea8e048b7d381f2d7
diskdb.Put(hashData([]byte("acc-2")).Bytes(), val)
rawdb.WriteAccountSnapshot(diskdb, hashData([]byte("acc-2")), val)

acc = &Account{Balance: big.NewInt(3), Root: stTrie.Hash().Bytes(), CodeHash: emptyCode.Bytes()}
val, _ = rlp.EncodeToBytes(acc)
accTrie.Update([]byte("acc-3"), val) // 0x50815097425d000edfc8b3a4a13e175fc2bdcfee8bdfbf2d1ff61041d3c235b2
rawdb.WriteAccountSnapshot(diskdb, hashData([]byte("acc-3")), val)
rawdb.WriteStorageSnapshot(diskdb, hashData([]byte("acc-3")), hashData([]byte("key-1")), []byte("val-1"))
rawdb.WriteStorageSnapshot(diskdb, hashData([]byte("acc-3")), hashData([]byte("key-2")), []byte("val-2"))
rawdb.WriteStorageSnapshot(diskdb, hashData([]byte("acc-3")), hashData([]byte("key-3")), []byte("val-3"))

accTrie.Commit(nil) // Root: 0xe3712f1a226f3782caca78ca770ccc19ee000552813a9f59d479f8611db9b1fd
triedb.Commit(common.HexToHash("0xe3712f1a226f3782caca78ca770ccc19ee000552813a9f59d479f8611db9b1fd"), false, nil)

snap := generateSnapshot(diskdb, triedb, 16, common.HexToHash("0xe3712f1a226f3782caca78ca770ccc19ee000552813a9f59d479f8611db9b1fd"))
select {
case <-snap.genPending:
// Snapshot generation succeeded

case <-time.After(250 * time.Millisecond):
t.Errorf("Snapshot generation failed")
}
// Signal abortion to the generator and wait for it to tear down
stop := make(chan *generatorStats)
snap.genAbort <- stop
<-stop
}

// Tests that snapshot generation errors out correctly in case of a missing trie
// node in the account trie.
func TestGenerateCorruptAccountTrie(t *testing.T) {
Expand Down Expand Up @@ -55,7 +165,7 @@ func TestGenerateCorruptAccountTrie(t *testing.T) {
triedb.Commit(common.HexToHash("0xa04693ea110a31037fb5ee814308a6f1d76bdab0b11676bdf4541d2de55ba978"), false, nil)
diskdb.Delete(common.HexToHash("0x65145f923027566669a1ae5ccac66f945b55ff6eaeb17d2ea8e048b7d381f2d7").Bytes())

snap := generateSnapshot(diskdb, triedb, 16, common.HexToHash("0xa04693ea110a31037fb5ee814308a6f1d76bdab0b11676bdf4541d2de55ba978"), nil)
snap := generateSnapshot(diskdb, triedb, 16, common.HexToHash("0xa04693ea110a31037fb5ee814308a6f1d76bdab0b11676bdf4541d2de55ba978"))
select {
case <-snap.genPending:
// Snapshot generation succeeded
Expand Down Expand Up @@ -115,7 +225,7 @@ func TestGenerateMissingStorageTrie(t *testing.T) {
// Delete a storage trie root and ensure the generator chokes
diskdb.Delete(common.HexToHash("0xddefcd9376dd029653ef384bd2f0a126bb755fe84fdcc9e7cf421ba454f2bc67").Bytes())

snap := generateSnapshot(diskdb, triedb, 16, common.HexToHash("0xe3712f1a226f3782caca78ca770ccc19ee000552813a9f59d479f8611db9b1fd"), nil)
snap := generateSnapshot(diskdb, triedb, 16, common.HexToHash("0xe3712f1a226f3782caca78ca770ccc19ee000552813a9f59d479f8611db9b1fd"))
select {
case <-snap.genPending:
// Snapshot generation succeeded
Expand Down Expand Up @@ -174,7 +284,7 @@ func TestGenerateCorruptStorageTrie(t *testing.T) {
// Delete a storage trie leaf and ensure the generator chokes
diskdb.Delete(common.HexToHash("0x18a0f4d79cff4459642dd7604f303886ad9d77c30cf3d7d7cedb3a693ab6d371").Bytes())

snap := generateSnapshot(diskdb, triedb, 16, common.HexToHash("0xe3712f1a226f3782caca78ca770ccc19ee000552813a9f59d479f8611db9b1fd"), nil)
snap := generateSnapshot(diskdb, triedb, 16, common.HexToHash("0xe3712f1a226f3782caca78ca770ccc19ee000552813a9f59d479f8611db9b1fd"))
select {
case <-snap.genPending:
// Snapshot generation succeeded
Expand Down
15 changes: 4 additions & 11 deletions core/state/snapshot/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ const journalVersion uint64 = 0

// journalGenerator is a disk layer entry containing the generator progress marker.
type journalGenerator struct {
Wiping bool // Whether the database was in progress of being wiped
// Indicator that whether the database was in progress of being wiped.
// It's deprecated but keep it here for background compatibility.
Wiping bool

Done bool // Whether the generator finished creating the snapshot
Marker []byte
Accounts uint64
Expand Down Expand Up @@ -193,14 +196,6 @@ func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int,
}
// Everything loaded correctly, resume any suspended operations
if !generator.Done {
// If the generator was still wiping, restart one from scratch (fine for
// now as it's rare and the wiper deletes the stuff it touches anyway, so
// restarting won't incur a lot of extra database hops.
var wiper chan struct{}
if generator.Wiping {
log.Info("Resuming previous snapshot wipe")
wiper = wipeSnapshot(diskdb, false)
}
// Whether or not wiping was in progress, load any generator progress too
base.genMarker = generator.Marker
if base.genMarker == nil {
Expand All @@ -214,7 +209,6 @@ func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int,
origin = binary.BigEndian.Uint64(generator.Marker)
}
go base.generate(&generatorStats{
wiping: wiper,
origin: origin,
start: time.Now(),
accounts: generator.Accounts,
Expand Down Expand Up @@ -381,7 +375,6 @@ func (dl *diskLayer) LegacyJournal(buffer *bytes.Buffer) (common.Hash, error) {
Marker: dl.genMarker,
}
if stats != nil {
entry.Wiping = (stats.wiping != nil)
entry.Accounts = stats.accounts
entry.Slots = stats.slots
entry.Storage = uint64(stats.storage)
Expand Down
9 changes: 1 addition & 8 deletions core/state/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,9 +637,6 @@ func (t *Tree) Rebuild(root common.Hash) {
// building a brand new snapshot.
rawdb.DeleteSnapshotRecoveryNumber(t.diskdb)

// Track whether there's a wipe currently running and keep it alive if so
var wiper chan struct{}

// Iterate over and mark all layers stale
for _, layer := range t.layers {
switch layer := layer.(type) {
Expand All @@ -648,10 +645,6 @@ func (t *Tree) Rebuild(root common.Hash) {
if layer.genAbort != nil {
abort := make(chan *generatorStats)
layer.genAbort <- abort

if stats := <-abort; stats != nil {
wiper = stats.wiping
}
}
// Layer should be inactive now, mark it as stale
layer.lock.Lock()
Expand All @@ -672,7 +665,7 @@ func (t *Tree) Rebuild(root common.Hash) {
// generator will run a wiper first if there's not one running right now.
log.Info("Rebuilding state snapshot")
t.layers = map[common.Hash]snapshot{
root: generateSnapshot(t.diskdb, t.triedb, t.cache, root, wiper),
root: generateSnapshot(t.diskdb, t.triedb, t.cache, root),
}
}

Expand Down
30 changes: 22 additions & 8 deletions core/state/snapshot/wipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ import (
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
)

// wipeSnapshot starts a goroutine to iterate over the entire key-value database
// and delete all the data associated with the snapshot (accounts, storage,
// and delete all the data associated with the snapshot (accounts, storage,
// metadata). After all is done, the snapshot range of the database is compacted
// to free up unused data blocks.
func wipeSnapshot(db ethdb.KeyValueStore, full bool) chan struct{} {
Expand All @@ -53,10 +54,10 @@ func wipeSnapshot(db ethdb.KeyValueStore, full bool) chan struct{} {
// removed in sync to avoid data races. After all is done, the snapshot range of
// the database is compacted to free up unused data blocks.
func wipeContent(db ethdb.KeyValueStore) error {
if err := wipeKeyRange(db, "accounts", rawdb.SnapshotAccountPrefix, len(rawdb.SnapshotAccountPrefix)+common.HashLength); err != nil {
if err := wipeKeyRange(db, "accounts", rawdb.SnapshotAccountPrefix, nil, nil, len(rawdb.SnapshotAccountPrefix)+common.HashLength, snapWipedAccountMeter, true); err != nil {
return err
}
if err := wipeKeyRange(db, "storage", rawdb.SnapshotStoragePrefix, len(rawdb.SnapshotStoragePrefix)+2*common.HashLength); err != nil {
if err := wipeKeyRange(db, "storage", rawdb.SnapshotStoragePrefix, nil, nil, len(rawdb.SnapshotStoragePrefix)+2*common.HashLength, snapWipedStorageMeter, true); err != nil {
return err
}
// Compact the snapshot section of the database to get rid of unused space
Expand All @@ -82,8 +83,9 @@ func wipeContent(db ethdb.KeyValueStore) error {
}

// wipeKeyRange deletes a range of keys from the database starting with prefix
// and having a specific total key length.
func wipeKeyRange(db ethdb.KeyValueStore, kind string, prefix []byte, keylen int) error {
// and having a specific total key length. The start and limit is optional for
rjl493456442 marked this conversation as resolved.
Show resolved Hide resolved
// specifying a particular key range for deletion.
func wipeKeyRange(db ethdb.KeyValueStore, kind string, prefix []byte, origin []byte, limit []byte, keylen int, meter metrics.Meter, report bool) error {
// Batch deletions together to avoid holding an iterator for too long
var (
batch = db.NewBatch()
Expand All @@ -92,7 +94,11 @@ func wipeKeyRange(db ethdb.KeyValueStore, kind string, prefix []byte, keylen int
// Iterate over the key-range and delete all of them
start, logged := time.Now(), time.Now()

it := db.NewIterator(prefix, nil)
it := db.NewIterator(prefix, origin)
var stop []byte
if limit != nil {
stop = append(prefix, limit...)
}
for it.Next() {
// Skip any keys with the correct prefix but wrong length (trie nodes)
key := it.Key()
Expand All @@ -102,6 +108,9 @@ func wipeKeyRange(db ethdb.KeyValueStore, kind string, prefix []byte, keylen int
if len(key) != keylen {
continue
}
if stop != nil && bytes.Compare(key, stop) >= 0 {
break
}
// Delete the key and periodically recreate the batch and iterator
batch.Delete(key)
items++
Expand All @@ -116,7 +125,7 @@ func wipeKeyRange(db ethdb.KeyValueStore, kind string, prefix []byte, keylen int
seekPos := key[len(prefix):]
it = db.NewIterator(prefix, seekPos)

if time.Since(logged) > 8*time.Second {
if time.Since(logged) > 8*time.Second && report {
log.Info("Deleting state snapshot leftovers", "kind", kind, "wiped", items, "elapsed", common.PrettyDuration(time.Since(start)))
logged = time.Now()
}
Expand All @@ -126,6 +135,11 @@ func wipeKeyRange(db ethdb.KeyValueStore, kind string, prefix []byte, keylen int
if err := batch.Write(); err != nil {
return err
}
log.Info("Deleted state snapshot leftovers", "kind", kind, "wiped", items, "elapsed", common.PrettyDuration(time.Since(start)))
if meter != nil {
meter.Mark(int64(items))
}
if report {
log.Info("Deleted state snapshot leftovers", "kind", kind, "wiped", items, "elapsed", common.PrettyDuration(time.Since(start)))
}
return nil
}
2 changes: 1 addition & 1 deletion eth/protocols/eth/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func TestEth66Messages(t *testing.T) {
// init the receipts
{
receipts = []*types.Receipt{
&types.Receipt{
{
Status: types.ReceiptStatusFailed,
CumulativeGasUsed: 1,
Logs: []*types.Log{
Expand Down
Loading