diff --git a/cmd/hack/db/lmdb.go b/cmd/hack/db/lmdb.go index 288bcf10208..10df7a5e92a 100644 --- a/cmd/hack/db/lmdb.go +++ b/cmd/hack/db/lmdb.go @@ -577,7 +577,7 @@ func generate6(_ kv.RwDB, tx kv.RwTx) (bool, error) { } func dropT(_ kv.RwDB, tx kv.RwTx) (bool, error) { - if err := tx.(kv.BucketMigrator).ClearBucket("t"); err != nil { + if err := tx.ClearBucket("t"); err != nil { return false, err } return true, nil @@ -607,14 +607,14 @@ func generate7(_ kv.RwDB, tx kv.RwTx) (bool, error) { } func dropT1(_ kv.RwDB, tx kv.RwTx) (bool, error) { - if err := tx.(kv.BucketMigrator).ClearBucket("t1"); err != nil { + if err := tx.ClearBucket("t1"); err != nil { return false, err } return true, nil } func dropT2(_ kv.RwDB, tx kv.RwTx) (bool, error) { - if err := tx.(kv.BucketMigrator).ClearBucket("t2"); err != nil { + if err := tx.ClearBucket("t2"); err != nil { return false, err } return true, nil @@ -624,7 +624,7 @@ func dropT2(_ kv.RwDB, tx kv.RwTx) (bool, error) { func generate8(_ kv.RwDB, tx kv.RwTx) (bool, error) { for i := 0; i < 100; i++ { k := fmt.Sprintf("table_%05d", i) - if err := tx.(kv.BucketMigrator).CreateBucket(k); err != nil { + if err := tx.CreateBucket(k); err != nil { return false, err } } @@ -656,7 +656,7 @@ func generate9(tx kv.RwTx, entries int) error { func dropAll(_ kv.RwDB, tx kv.RwTx) (bool, error) { for i := 0; i < 100; i++ { k := fmt.Sprintf("table_%05d", i) - if err := tx.(kv.BucketMigrator).DropBucket(k); err != nil { + if err := tx.DropBucket(k); err != nil { return false, err } } diff --git a/consensus/bor/bor.go b/consensus/bor/bor.go index 64b239829bc..b86a4cea8d6 100644 --- a/consensus/bor/bor.go +++ b/consensus/bor/bor.go @@ -28,6 +28,9 @@ import ( "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/consensus" + "github.com/ledgerwatch/erigon/consensus/bor/finality" + "github.com/ledgerwatch/erigon/consensus/bor/finality/flags" + "github.com/ledgerwatch/erigon/consensus/bor/finality/whitelist" "github.com/ledgerwatch/erigon/consensus/bor/heimdall" "github.com/ledgerwatch/erigon/consensus/bor/heimdall/span" "github.com/ledgerwatch/erigon/consensus/bor/statefull" @@ -39,9 +42,6 @@ import ( "github.com/ledgerwatch/erigon/core/types/accounts" "github.com/ledgerwatch/erigon/crypto" "github.com/ledgerwatch/erigon/crypto/cryptopool" - "github.com/ledgerwatch/erigon/eth/borfinality" - "github.com/ledgerwatch/erigon/eth/borfinality/flags" - "github.com/ledgerwatch/erigon/eth/borfinality/whitelist" "github.com/ledgerwatch/erigon/rlp" "github.com/ledgerwatch/erigon/rpc" "github.com/ledgerwatch/erigon/turbo/services" @@ -1302,7 +1302,7 @@ func (f FinalityAPIFunc) GetRootHash(start uint64, end uint64) (string, error) { func (c *Bor) Start(chainDB kv.RwDB) { if flags.Milestone { whitelist.RegisterService(c.DB) - borfinality.Whitelist(c.HeimdallClient, c.DB, chainDB, c.blockReader, c.logger, + finality.Whitelist(c.HeimdallClient, c.DB, chainDB, c.blockReader, c.logger, FinalityAPIFunc(func(start uint64, end uint64) (string, error) { ctx := context.Background() tx, err := chainDB.BeginRo(ctx) diff --git a/eth/borfinality/api.go b/consensus/bor/finality/api.go similarity index 93% rename from eth/borfinality/api.go rename to consensus/bor/finality/api.go index fbeabf4ccac..288080e570b 100644 --- a/eth/borfinality/api.go +++ b/consensus/bor/finality/api.go @@ -1,11 +1,11 @@ -package borfinality +package finality import ( "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon/consensus/bor/finality/whitelist" "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/types" - "github.com/ledgerwatch/erigon/eth/borfinality/whitelist" ) func GetFinalizedBlockNumber(tx kv.Tx) uint64 { @@ -29,6 +29,7 @@ func GetFinalizedBlockNumber(tx kv.Tx) uint64 { doExist, number, hash = service.GetWhitelistedCheckpoint() if doExist && number <= currentBlockNum.Number.Uint64() { + blockHeader := rawdb.ReadHeaderByNumber(tx, number) if blockHeader == nil { diff --git a/eth/borfinality/bor_checkpoint_verifier.go b/consensus/bor/finality/bor_verifier.go similarity index 94% rename from eth/borfinality/bor_checkpoint_verifier.go rename to consensus/bor/finality/bor_verifier.go index cd1380ac78c..b96ed42ba6e 100644 --- a/eth/borfinality/bor_checkpoint_verifier.go +++ b/consensus/bor/finality/bor_verifier.go @@ -1,14 +1,14 @@ // nolint -package borfinality +package finality import ( "context" "errors" "fmt" + "github.com/ledgerwatch/erigon/consensus/bor/finality/generics" + "github.com/ledgerwatch/erigon/consensus/bor/finality/whitelist" "github.com/ledgerwatch/erigon/core/rawdb" - "github.com/ledgerwatch/erigon/eth/borfinality/generics" - "github.com/ledgerwatch/erigon/eth/borfinality/whitelist" "github.com/ledgerwatch/erigon/metrics" "github.com/ledgerwatch/log/v3" ) @@ -139,6 +139,10 @@ func borVerify(ctx context.Context, config *config, start uint64, end uint64, ha log.Debug("Failed to get end block hash while whitelisting", "err", err) return hash, errEndBlock } + if block == nil { + log.Debug("Current header behind the end block", "block", end) + return hash, errEndBlock + } hash = fmt.Sprintf("%v", block.Hash()) diff --git a/eth/borfinality/flags/flags.go b/consensus/bor/finality/flags/flags.go similarity index 100% rename from eth/borfinality/flags/flags.go rename to consensus/bor/finality/flags/flags.go diff --git a/eth/borfinality/generics/generics.go b/consensus/bor/finality/generics/generics.go similarity index 100% rename from eth/borfinality/generics/generics.go rename to consensus/bor/finality/generics/generics.go diff --git a/eth/borfinality/rawdb/checkpoint.go b/consensus/bor/finality/rawdb/checkpoint.go similarity index 100% rename from eth/borfinality/rawdb/checkpoint.go rename to consensus/bor/finality/rawdb/checkpoint.go diff --git a/eth/borfinality/rawdb/milestone.go b/consensus/bor/finality/rawdb/milestone.go similarity index 98% rename from eth/borfinality/rawdb/milestone.go rename to consensus/bor/finality/rawdb/milestone.go index 8f94bb6c433..d5ac8f49621 100644 --- a/eth/borfinality/rawdb/milestone.go +++ b/consensus/bor/finality/rawdb/milestone.go @@ -7,7 +7,7 @@ import ( libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon/eth/borfinality/generics" + "github.com/ledgerwatch/erigon/consensus/bor/finality/generics" "github.com/ledgerwatch/log/v3" ) diff --git a/eth/borfinality/whitelist.go b/consensus/bor/finality/whitelist.go similarity index 97% rename from eth/borfinality/whitelist.go rename to consensus/bor/finality/whitelist.go index 995e8d81138..4ec51628b44 100644 --- a/eth/borfinality/whitelist.go +++ b/consensus/bor/finality/whitelist.go @@ -1,4 +1,4 @@ -package borfinality +package finality import ( "context" @@ -7,9 +7,9 @@ import ( "time" "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon/consensus/bor/finality/flags" + "github.com/ledgerwatch/erigon/consensus/bor/finality/whitelist" "github.com/ledgerwatch/erigon/consensus/bor/heimdall" - "github.com/ledgerwatch/erigon/eth/borfinality/flags" - "github.com/ledgerwatch/erigon/eth/borfinality/whitelist" "github.com/ledgerwatch/erigon/turbo/services" "github.com/ledgerwatch/log/v3" ) diff --git a/eth/borfinality/whitelist/checkpoint.go b/consensus/bor/finality/whitelist/checkpoint.go similarity index 95% rename from eth/borfinality/whitelist/checkpoint.go rename to consensus/bor/finality/whitelist/checkpoint.go index ad6a55ddcc5..5e23eee398c 100644 --- a/eth/borfinality/whitelist/checkpoint.go +++ b/consensus/bor/finality/whitelist/checkpoint.go @@ -2,8 +2,8 @@ package whitelist import ( "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon/consensus/bor/finality/rawdb" "github.com/ledgerwatch/erigon/core/types" - "github.com/ledgerwatch/erigon/eth/borfinality/rawdb" "github.com/ledgerwatch/erigon/metrics" ) diff --git a/eth/borfinality/whitelist/finality.go b/consensus/bor/finality/whitelist/finality.go similarity index 96% rename from eth/borfinality/whitelist/finality.go rename to consensus/bor/finality/whitelist/finality.go index 293445229b8..f1abbbf3df6 100644 --- a/eth/borfinality/whitelist/finality.go +++ b/consensus/bor/finality/whitelist/finality.go @@ -5,8 +5,8 @@ import ( "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon/consensus/bor/finality/rawdb" "github.com/ledgerwatch/erigon/core/types" - "github.com/ledgerwatch/erigon/eth/borfinality/rawdb" "github.com/ledgerwatch/log/v3" ) diff --git a/eth/borfinality/whitelist/milestone.go b/consensus/bor/finality/whitelist/milestone.go similarity index 98% rename from eth/borfinality/whitelist/milestone.go rename to consensus/bor/finality/whitelist/milestone.go index 89d840d5c3a..4d78758b2f1 100644 --- a/eth/borfinality/whitelist/milestone.go +++ b/consensus/bor/finality/whitelist/milestone.go @@ -2,9 +2,9 @@ package whitelist import ( "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon/consensus/bor/finality/flags" + "github.com/ledgerwatch/erigon/consensus/bor/finality/rawdb" "github.com/ledgerwatch/erigon/core/types" - "github.com/ledgerwatch/erigon/eth/borfinality/flags" - "github.com/ledgerwatch/erigon/eth/borfinality/rawdb" "github.com/ledgerwatch/erigon/metrics" "github.com/ledgerwatch/log/v3" ) diff --git a/eth/borfinality/whitelist/service.go b/consensus/bor/finality/whitelist/service.go similarity index 98% rename from eth/borfinality/whitelist/service.go rename to consensus/bor/finality/whitelist/service.go index 3ca07a58ee5..7bf7aa89819 100644 --- a/eth/borfinality/whitelist/service.go +++ b/consensus/bor/finality/whitelist/service.go @@ -5,8 +5,8 @@ import ( "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon/consensus/bor/finality/rawdb" "github.com/ledgerwatch/erigon/core/types" - "github.com/ledgerwatch/erigon/eth/borfinality/rawdb" ) var ( diff --git a/eth/borfinality/whitelist/service_test.go b/consensus/bor/finality/whitelist/service_test.go similarity index 99% rename from eth/borfinality/whitelist/service_test.go rename to consensus/bor/finality/whitelist/service_test.go index cbbdd7f5a37..0a45e6fe712 100644 --- a/eth/borfinality/whitelist/service_test.go +++ b/consensus/bor/finality/whitelist/service_test.go @@ -12,8 +12,8 @@ import ( libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/memdb" + "github.com/ledgerwatch/erigon/consensus/bor/finality/rawdb" "github.com/ledgerwatch/erigon/core/types" - "github.com/ledgerwatch/erigon/eth/borfinality/rawdb" "github.com/stretchr/testify/require" "pgregory.net/rapid" diff --git a/eth/borfinality/whitelist_helpers.go b/consensus/bor/finality/whitelist_helpers.go similarity index 98% rename from eth/borfinality/whitelist_helpers.go rename to consensus/bor/finality/whitelist_helpers.go index b8126bf0558..6c3b4a35a42 100644 --- a/eth/borfinality/whitelist_helpers.go +++ b/consensus/bor/finality/whitelist_helpers.go @@ -1,12 +1,12 @@ -package borfinality +package finality import ( "context" "errors" "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon/consensus/bor/finality/whitelist" "github.com/ledgerwatch/erigon/consensus/bor/heimdall" - "github.com/ledgerwatch/erigon/eth/borfinality/whitelist" "github.com/ledgerwatch/log/v3" ) diff --git a/consensus/bor/heimdall/heimall.go b/consensus/bor/heimdall/heimall.go index d45739245ef..2ef405290f2 100644 --- a/consensus/bor/heimdall/heimall.go +++ b/consensus/bor/heimdall/heimall.go @@ -4,10 +4,10 @@ import ( "context" "github.com/ledgerwatch/erigon/consensus/bor/clerk" + "github.com/ledgerwatch/erigon/consensus/bor/finality/generics" "github.com/ledgerwatch/erigon/consensus/bor/heimdall/checkpoint" "github.com/ledgerwatch/erigon/consensus/bor/heimdall/milestone" "github.com/ledgerwatch/erigon/consensus/bor/heimdall/span" - "github.com/ledgerwatch/erigon/eth/borfinality/generics" ) func MilestoneRewindPending() bool { diff --git a/core/rawdb/accessors_account.go b/core/rawdb/accessors_account.go index 254d4d9bb02..0607f04648e 100644 --- a/core/rawdb/accessors_account.go +++ b/core/rawdb/accessors_account.go @@ -23,7 +23,7 @@ import ( "github.com/ledgerwatch/erigon/core/types/accounts" ) -func ReadAccount(db kv.Tx, addr libcommon.Address, acc *accounts.Account) (bool, error) { +func ReadAccount(db kv.Getter, addr libcommon.Address, acc *accounts.Account) (bool, error) { enc, err := db.GetOne(kv.PlainState, addr[:]) if err != nil { return false, err diff --git a/core/types/block.go b/core/types/block.go index c99ec59a9fa..e0b1e2d6633 100644 --- a/core/types/block.go +++ b/core/types/block.go @@ -1510,28 +1510,16 @@ func (b *Block) SendersToTxs(senders []libcommon.Address) { // RawBody creates a RawBody based on the block. It is not very efficient, so // will probably be removed in favour of RawBlock. Also it panics func (b *Block) RawBody() *RawBody { - br := &RawBody{Transactions: make([][]byte, len(b.transactions)), Uncles: b.uncles, Withdrawals: b.withdrawals} - for i, tx := range b.transactions { - var err error - br.Transactions[i], err = rlp.EncodeToBytes(tx) - if err != nil { - panic(err) - } - } - return br + return b.Body().RawBody() } // RawBody creates a RawBody based on the body. func (b *Body) RawBody() *RawBody { - br := &RawBody{Transactions: make([][]byte, len(b.Transactions)), Uncles: b.Uncles, Withdrawals: b.Withdrawals} - for i, tx := range b.Transactions { - var err error - br.Transactions[i], err = rlp.EncodeToBytes(tx) - if err != nil { - panic(err) - } + txs, err := MarshalTransactionsBinary(b.Transactions) + if err != nil { + panic(err) } - return br + return &RawBody{Transactions: txs, Uncles: b.Uncles, Withdrawals: b.Withdrawals} } // Size returns the true RLP encoded storage size of the block, either by encoding diff --git a/core/types/block_test.go b/core/types/block_test.go index be05c716b26..cfba04da01f 100644 --- a/core/types/block_test.go +++ b/core/types/block_test.go @@ -512,3 +512,52 @@ func TestCopyTxs(t *testing.T) { copies := CopyTxs(txs) assert.Equal(t, txs, copies) } + +func TestRawBodyConverter(t *testing.T) { + blockEnc := common.FromHex("f90260f901f9a083cafc574e1f51ba9dc0568fc617a08ea2429fb384059c972f13b19fa1c8dd55a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347948888f1f195afa192cfee860698584c030f4c9db1a0ef1552a40b7165c3cd773806b9e0c165b75356e0314bf0706f279c729f51e017a05fe50b260da6308036625b850b5d6ced6d0a9f814c0688bc91ffb7b7a3a54b67a0bc37d79753ad738a6dac4921e57392f145d8887476de3f783dfa7edae9283e52bfefd8825208845506eb0780a0bd4472abb6659ebe3ee06ee4d7b72a00a9f4d001caca51342001075469aff49888a13a5a8c8f2bb1c4f861f85f800a82c35094095e7baea6a6c7c4c2dfeb977efac326af552d870a801ba09bea4c4daac7c7c52e093e6a4c35dbbcf8856f1af7b059ba20253e70848d094fa08a8fae537ce25ed8cb5af9adac3f141af69bd515bd2ba031522df09b97dd72b1c0") + var block Block + if err := rlp.DecodeBytes(blockEnc, &block); err != nil { + t.Fatal("decode error: ", err) + } + + expectedTxs := [][]byte{ + libcommon.FromHex("f85f800a82c35094095e7baea6a6c7c4c2dfeb977efac326af552d870a801ba09bea4c4daac7c7c52e093e6a4c35dbbcf8856f1af7b059ba20253e70848d094fa08a8fae537ce25ed8cb5af9adac3f141af69bd515bd2ba031522df09b97dd72b1"), + } + require.Equal(t, block.RawBody().Transactions, expectedTxs) +} + +func TestRawBodyConverterShangai(t *testing.T) { + header := Header{ + ParentHash: libcommon.HexToHash("0x8b00fcf1e541d371a3a1b79cc999a85cc3db5ee5637b5159646e1acd3613fd15"), + Coinbase: libcommon.HexToAddress("0x571846e42308df2dad8ed792f44a8bfddf0acb4d"), + Root: libcommon.HexToHash("0x351780124dae86b84998c6d4fe9a88acfb41b4856b4f2c56767b51a4e2f94dd4"), + Difficulty: libcommon.Big0, + Number: big.NewInt(20_000_000), + GasLimit: 30_000_000, + GasUsed: 3_074_345, + Time: 1666343339, + Extra: make([]byte, 0), + MixDigest: libcommon.HexToHash("0x7f04e338b206ef863a1fad30e082bbb61571c74e135df8d1677e3f8b8171a09b"), + BaseFee: big.NewInt(7_000_000_000), + } + + withdrawals := make([]*Withdrawal, 2) + withdrawals[0] = &Withdrawal{ + Index: 44555666, + Validator: 89, + Address: libcommon.HexToAddress("0x690b9a9e9aa1c9db991c7721a92d351db4fac990"), + Amount: 2, + } + withdrawals[1] = &Withdrawal{ + Index: 44555667, + Validator: 37, + Address: libcommon.HexToAddress("0x95222290dd7278aa3ddd389cc1e1d165cc4bafe5"), + Amount: 5_000_000_000, + } + + block := NewBlock(&header, nil, nil, nil, withdrawals) + + rb := block.RawBody() + + require.Equal(t, rb.Withdrawals, withdrawals) +} diff --git a/erigon-lib/kv/kv_interface.go b/erigon-lib/kv/kv_interface.go index aad6d93c7f8..adbd8b23c32 100644 --- a/erigon-lib/kv/kv_interface.go +++ b/erigon-lib/kv/kv_interface.go @@ -303,8 +303,6 @@ type StatelessReadTx interface { // Sequence changes become visible outside the current write transaction after it is committed, and discarded on abort. // Starts from 0. ReadSequence(table string) (uint64, error) - - BucketSize(table string) (uint64, error) } type StatelessWriteTx interface { @@ -340,6 +338,16 @@ type StatelessRwTx interface { StatelessWriteTx } +// PendingMutations in-memory storage of changes +// Later they can either be flushed to the database or abandon +type PendingMutations interface { + StatelessRwTx + // Flush all in-memory data into `tx` + Flush(ctx context.Context, tx RwTx) error + Close() + BatchSize() int +} + // Tx // WARNING: // - Tx is not threadsafe and may only be used in the goroutine that created it @@ -397,6 +405,7 @@ type Tx interface { // Pointer to the underlying C transaction handle (e.g. *C.MDBX_txn) CHandle() unsafe.Pointer + BucketSize(table string) (uint64, error) } // RwTx diff --git a/erigon-lib/kv/mdbx/kv_migrator_test.go b/erigon-lib/kv/mdbx/kv_migrator_test.go index 62be6bcb86b..05eef49fd91 100644 --- a/erigon-lib/kv/mdbx/kv_migrator_test.go +++ b/erigon-lib/kv/mdbx/kv_migrator_test.go @@ -37,10 +37,7 @@ func TestBucketCRUD(t *testing.T) { normalBucket := kv.ChaindataTables[15] deprecatedBucket := kv.ChaindataDeprecatedTables[0] - migrator, ok := tx.(kv.BucketMigrator) - if !ok { - return - } + migrator := tx // check thad buckets have unique DBI's uniquness := map[kv.DBI]bool{} diff --git a/ethdb/olddb/database_test.go b/erigon-lib/kv/membatch/database_test.go similarity index 84% rename from ethdb/olddb/database_test.go rename to erigon-lib/kv/membatch/database_test.go index 7767f3005af..c2b789162cf 100644 --- a/ethdb/olddb/database_test.go +++ b/erigon-lib/kv/membatch/database_test.go @@ -16,7 +16,7 @@ //go:build !js -package olddb +package membatch import ( "bytes" @@ -29,9 +29,6 @@ import ( "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/memdb" - "github.com/ledgerwatch/erigon/common" - "github.com/ledgerwatch/erigon/ethdb" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -207,42 +204,3 @@ func TestParallelPutGet(t *testing.T) { } pending.Wait() } - -var hexEntries = map[string]string{ - "6b": "89c6", - "91": "c476", - "a8": "0a514e", - "bb": "7a", - "bd": "fe76", - "c0": "12", -} - -var startKey = common.FromHex("a0") -var fixedBits = 3 - -var keysInRange = [][]byte{common.FromHex("a8"), common.FromHex("bb"), common.FromHex("bd")} - -func TestWalk(t *testing.T) { - _, tx := memdb.NewTestTx(t) - - for k, v := range hexEntries { - err := tx.Put(testBucket, common.FromHex(k), common.FromHex(v)) - if err != nil { - t.Fatalf("put failed: %v", err) - } - } - - var gotKeys [][]byte - c, err := tx.Cursor(testBucket) - if err != nil { - panic(err) - } - defer c.Close() - err = ethdb.Walk(c, startKey, fixedBits, func(key, val []byte) (bool, error) { - gotKeys = append(gotKeys, common.CopyBytes(key)) - return true, nil - }) - assert.NoError(t, err) - - assert.Equal(t, keysInRange, gotKeys) -} diff --git a/ethdb/olddb/mapmutation.go b/erigon-lib/kv/membatch/mapmutation.go similarity index 66% rename from ethdb/olddb/mapmutation.go rename to erigon-lib/kv/membatch/mapmutation.go index 385d201ddd0..0a38f39feff 100644 --- a/ethdb/olddb/mapmutation.go +++ b/erigon-lib/kv/membatch/mapmutation.go @@ -1,4 +1,4 @@ -package olddb +package membatch import ( "context" @@ -11,13 +11,11 @@ import ( "github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/log/v3" - - "github.com/ledgerwatch/erigon/ethdb" ) -type mapmutation struct { +type Mapmutation struct { puts map[string]map[string][]byte // table -> key -> value ie. blocks -> hash -> blockBod - db kv.RwTx + db kv.Tx quit <-chan struct{} clean func() mu sync.RWMutex @@ -32,10 +30,10 @@ type mapmutation struct { // Common pattern: // // batch := db.NewBatch() -// defer batch.Rollback() +// defer batch.Close() // ... some calculations on `batch` // batch.Commit() -func NewHashBatch(tx kv.RwTx, quit <-chan struct{}, tmpdir string, logger log.Logger) *mapmutation { +func NewHashBatch(tx kv.Tx, quit <-chan struct{}, tmpdir string, logger log.Logger) *Mapmutation { clean := func() {} if quit == nil { ch := make(chan struct{}) @@ -43,7 +41,7 @@ func NewHashBatch(tx kv.RwTx, quit <-chan struct{}, tmpdir string, logger log.Lo quit = ch } - return &mapmutation{ + return &Mapmutation{ db: tx, puts: make(map[string]map[string][]byte), quit: quit, @@ -53,14 +51,7 @@ func NewHashBatch(tx kv.RwTx, quit <-chan struct{}, tmpdir string, logger log.Lo } } -func (m *mapmutation) RwKV() kv.RwDB { - if casted, ok := m.db.(ethdb.HasRwKV); ok { - return casted.RwKV() - } - return nil -} - -func (m *mapmutation) getMem(table string, key []byte) ([]byte, bool) { +func (m *Mapmutation) getMem(table string, key []byte) ([]byte, bool) { m.mu.RLock() defer m.mu.RUnlock() if _, ok := m.puts[table]; !ok { @@ -73,7 +64,7 @@ func (m *mapmutation) getMem(table string, key []byte) ([]byte, bool) { return nil, false } -func (m *mapmutation) IncrementSequence(bucket string, amount uint64) (res uint64, err error) { +func (m *Mapmutation) IncrementSequence(bucket string, amount uint64) (res uint64, err error) { v, ok := m.getMem(kv.Sequence, []byte(bucket)) if !ok && m.db != nil { v, err = m.db.GetOne(kv.Sequence, []byte(bucket)) @@ -95,7 +86,7 @@ func (m *mapmutation) IncrementSequence(bucket string, amount uint64) (res uint6 return currentV, nil } -func (m *mapmutation) ReadSequence(bucket string) (res uint64, err error) { +func (m *Mapmutation) ReadSequence(bucket string) (res uint64, err error) { v, ok := m.getMem(kv.Sequence, []byte(bucket)) if !ok && m.db != nil { v, err = m.db.GetOne(kv.Sequence, []byte(bucket)) @@ -112,7 +103,7 @@ func (m *mapmutation) ReadSequence(bucket string) (res uint64, err error) { } // Can only be called from the worker thread -func (m *mapmutation) GetOne(table string, key []byte) ([]byte, error) { +func (m *Mapmutation) GetOne(table string, key []byte) ([]byte, error) { if value, ok := m.getMem(table, key); ok { return value, nil } @@ -127,21 +118,7 @@ func (m *mapmutation) GetOne(table string, key []byte) ([]byte, error) { return nil, nil } -// Can only be called from the worker thread -func (m *mapmutation) Get(table string, key []byte) ([]byte, error) { - value, err := m.GetOne(table, key) - if err != nil { - return nil, err - } - - if value == nil { - return nil, ethdb.ErrKeyNotFound - } - - return value, nil -} - -func (m *mapmutation) Last(table string) ([]byte, []byte, error) { +func (m *Mapmutation) Last(table string) ([]byte, []byte, error) { c, err := m.db.Cursor(table) if err != nil { return nil, nil, err @@ -150,7 +127,7 @@ func (m *mapmutation) Last(table string) ([]byte, []byte, error) { return c.Last() } -func (m *mapmutation) Has(table string, key []byte) (bool, error) { +func (m *Mapmutation) Has(table string, key []byte) (bool, error) { if _, ok := m.getMem(table, key); ok { return ok, nil } @@ -161,7 +138,7 @@ func (m *mapmutation) Has(table string, key []byte) (bool, error) { } // puts a table key with a value and if the table is not found then it appends a table -func (m *mapmutation) Put(table string, k, v []byte) error { +func (m *Mapmutation) Put(table string, k, v []byte) error { m.mu.Lock() defer m.mu.Unlock() if _, ok := m.puts[table]; !ok { @@ -183,40 +160,40 @@ func (m *mapmutation) Put(table string, k, v []byte) error { return nil } -func (m *mapmutation) Append(table string, key []byte, value []byte) error { +func (m *Mapmutation) Append(table string, key []byte, value []byte) error { return m.Put(table, key, value) } -func (m *mapmutation) AppendDup(table string, key []byte, value []byte) error { +func (m *Mapmutation) AppendDup(table string, key []byte, value []byte) error { return m.Put(table, key, value) } -func (m *mapmutation) BatchSize() int { +func (m *Mapmutation) BatchSize() int { m.mu.RLock() defer m.mu.RUnlock() return m.size } -func (m *mapmutation) ForEach(bucket string, fromPrefix []byte, walker func(k, v []byte) error) error { +func (m *Mapmutation) ForEach(bucket string, fromPrefix []byte, walker func(k, v []byte) error) error { m.panicOnEmptyDB() return m.db.ForEach(bucket, fromPrefix, walker) } -func (m *mapmutation) ForPrefix(bucket string, prefix []byte, walker func(k, v []byte) error) error { +func (m *Mapmutation) ForPrefix(bucket string, prefix []byte, walker func(k, v []byte) error) error { m.panicOnEmptyDB() return m.db.ForPrefix(bucket, prefix, walker) } -func (m *mapmutation) ForAmount(bucket string, prefix []byte, amount uint32, walker func(k, v []byte) error) error { +func (m *Mapmutation) ForAmount(bucket string, prefix []byte, amount uint32, walker func(k, v []byte) error) error { m.panicOnEmptyDB() return m.db.ForAmount(bucket, prefix, amount, walker) } -func (m *mapmutation) Delete(table string, k []byte) error { +func (m *Mapmutation) Delete(table string, k []byte) error { return m.Put(table, k, nil) } -func (m *mapmutation) doCommit(tx kv.RwTx) error { +func (m *Mapmutation) doCommit(tx kv.RwTx) error { logEvery := time.NewTicker(30 * time.Second) defer logEvery.Stop() count := 0 @@ -235,7 +212,7 @@ func (m *mapmutation) doCommit(tx kv.RwTx) error { tx.CollectMetrics() } } - if err := collector.Load(m.db, table, etl.IdentityLoadFunc, etl.TransformArgs{Quit: m.quit}); err != nil { + if err := collector.Load(tx, table, etl.IdentityLoadFunc, etl.TransformArgs{Quit: m.quit}); err != nil { return err } } @@ -244,13 +221,13 @@ func (m *mapmutation) doCommit(tx kv.RwTx) error { return nil } -func (m *mapmutation) Commit() error { +func (m *Mapmutation) Flush(ctx context.Context, tx kv.RwTx) error { if m.db == nil { return nil } m.mu.Lock() defer m.mu.Unlock() - if err := m.doCommit(m.db); err != nil { + if err := m.doCommit(tx); err != nil { return err } @@ -261,7 +238,7 @@ func (m *mapmutation) Commit() error { return nil } -func (m *mapmutation) Rollback() { +func (m *Mapmutation) Close() { m.mu.Lock() defer m.mu.Unlock() m.puts = map[string]map[string][]byte{} @@ -270,25 +247,11 @@ func (m *mapmutation) Rollback() { m.size = 0 m.clean() } +func (m *Mapmutation) Commit() error { panic("not db txn, use .Flush method") } +func (m *Mapmutation) Rollback() { panic("not db txn, use .Close method") } -func (m *mapmutation) Close() { - m.Rollback() -} - -func (m *mapmutation) Begin(ctx context.Context, flags ethdb.TxFlags) (ethdb.DbWithPendingMutations, error) { - panic("mutation can't start transaction, because doesn't own it") -} - -func (m *mapmutation) panicOnEmptyDB() { +func (m *Mapmutation) panicOnEmptyDB() { if m.db == nil { panic("Not implemented") } } - -func (m *mapmutation) SetRwKV(kv kv.RwDB) { - hasRwKV, ok := m.db.(ethdb.HasRwKV) - if !ok { - log.Warn("Failed to convert mapmutation type to HasRwKV interface") - } - hasRwKV.SetRwKV(kv) -} diff --git a/erigon-lib/kv/memdb/memory_mutation.go b/erigon-lib/kv/memdb/memory_mutation.go index c5182cb624b..0e45737bc01 100644 --- a/erigon-lib/kv/memdb/memory_mutation.go +++ b/erigon-lib/kv/memdb/memory_mutation.go @@ -41,7 +41,7 @@ type MemoryMutation struct { // Common pattern: // // batch := NewMemoryBatch(db, tmpDir) -// defer batch.Rollback() +// defer batch.Close() // ... some calculations on `batch` // batch.Commit() func NewMemoryBatch(tx kv.Tx, tmpDir string) *MemoryMutation { diff --git a/eth/backend.go b/eth/backend.go index e44ee6cae1b..1642ddcdeaa 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -92,6 +92,7 @@ import ( "github.com/ledgerwatch/erigon/consensus" "github.com/ledgerwatch/erigon/consensus/bor" + "github.com/ledgerwatch/erigon/consensus/bor/finality/flags" "github.com/ledgerwatch/erigon/consensus/bor/heimdall" "github.com/ledgerwatch/erigon/consensus/bor/heimdallgrpc" "github.com/ledgerwatch/erigon/consensus/clique" @@ -105,7 +106,6 @@ import ( "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/core/vm" "github.com/ledgerwatch/erigon/crypto" - "github.com/ledgerwatch/erigon/eth/borfinality/flags" "github.com/ledgerwatch/erigon/eth/ethconfig" "github.com/ledgerwatch/erigon/eth/ethconsensusconfig" "github.com/ledgerwatch/erigon/eth/ethutils" @@ -496,7 +496,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere dirs, notifications, blockReader, blockWriter, backend.agg, backend.silkworm, terseLogger) chainReader := stagedsync.NewChainReaderImpl(chainConfig, batch, blockReader, logger) // We start the mining step - if err := stages2.StateStep(ctx, chainReader, backend.engine, batch, backend.blockWriter, stateSync, backend.sentriesClient.Bd, header, body, unwindPoint, headersChain, bodiesChain); err != nil { + if err := stages2.StateStep(ctx, chainReader, backend.engine, batch, backend.blockWriter, stateSync, backend.sentriesClient.Bd, header, body, unwindPoint, headersChain, bodiesChain, config.HistoryV3); err != nil { logger.Warn("Could not validate block", "err", err) return err } diff --git a/eth/stagedsync/default_stages.go b/eth/stagedsync/default_stages.go index 92cc9ba0c8c..e4b52eefdb4 100644 --- a/eth/stagedsync/default_stages.go +++ b/eth/stagedsync/default_stages.go @@ -444,7 +444,7 @@ func StateStages(ctx context.Context, headers HeadersCfg, bodies BodiesCfg, bloc ID: stages.Bodies, Description: "Download block bodies", Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx, logger log.Logger) error { - return BodiesForward(s, u, ctx, tx, bodies, false, false, logger) + return nil }, Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx, logger log.Logger) error { return UnwindBodiesStage(u, tx, bodies, ctx) diff --git a/eth/stagedsync/stage_bor_heimdall.go b/eth/stagedsync/stage_bor_heimdall.go index 8815ba97858..db3d70bfe9e 100644 --- a/eth/stagedsync/stage_bor_heimdall.go +++ b/eth/stagedsync/stage_bor_heimdall.go @@ -13,11 +13,11 @@ import ( "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/accounts/abi" "github.com/ledgerwatch/erigon/consensus/bor/contract" + "github.com/ledgerwatch/erigon/consensus/bor/finality/generics" + "github.com/ledgerwatch/erigon/consensus/bor/finality/whitelist" "github.com/ledgerwatch/erigon/consensus/bor/heimdall" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/dataflow" - "github.com/ledgerwatch/erigon/eth/borfinality/generics" - "github.com/ledgerwatch/erigon/eth/borfinality/whitelist" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/rlp" "github.com/ledgerwatch/erigon/turbo/services" diff --git a/eth/stagedsync/stage_execute.go b/eth/stagedsync/stage_execute.go index 39010bc1c5d..24e88dccaef 100644 --- a/eth/stagedsync/stage_execute.go +++ b/eth/stagedsync/stage_execute.go @@ -10,6 +10,7 @@ import ( "time" "github.com/c2h5oh/datasize" + "github.com/ledgerwatch/erigon-lib/kv/membatch" "github.com/ledgerwatch/log/v3" "golang.org/x/sync/errgroup" @@ -42,8 +43,6 @@ import ( "github.com/ledgerwatch/erigon/eth/ethconfig/estimate" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" trace_logger "github.com/ledgerwatch/erigon/eth/tracers/logger" - "github.com/ledgerwatch/erigon/ethdb" - "github.com/ledgerwatch/erigon/ethdb/olddb" "github.com/ledgerwatch/erigon/ethdb/prune" "github.com/ledgerwatch/erigon/turbo/services" "github.com/ledgerwatch/erigon/turbo/shards" @@ -140,7 +139,7 @@ func StageExecuteBlocksCfg( func executeBlock( block *types.Block, tx kv.RwTx, - batch ethdb.Database, + batch kv.StatelessRwTx, cfg ExecuteBlockCfg, vmConfig vm.Config, // emit copy, because will modify it writeChangesets bool, @@ -206,7 +205,7 @@ func executeBlock( } func newStateReaderWriter( - batch ethdb.Database, + batch kv.StatelessRwTx, tx kv.RwTx, block *types.Block, writeChangesets bool, @@ -415,12 +414,12 @@ func SpawnExecuteBlocksStage(s *StageState, u Unwinder, tx kv.RwTx, toBlock uint var stoppedErr error - var batch ethdb.DbWithPendingMutations + var batch kv.PendingMutations // state is stored through ethdb batches - batch = olddb.NewHashBatch(tx, quit, cfg.dirs.Tmp, logger) + batch = membatch.NewHashBatch(tx, quit, cfg.dirs.Tmp, logger) // avoids stacking defers within the loop defer func() { - batch.Rollback() + batch.Close() }() var readAhead chan uint64 @@ -508,7 +507,7 @@ Loop: if shouldUpdateProgress { logger.Info("Committed State", "gas reached", currentStateGas, "gasTarget", gasState) currentStateGas = 0 - if err = batch.Commit(); err != nil { + if err = batch.Flush(ctx, tx); err != nil { return err } if err = s.Update(tx, stageProgress); err != nil { @@ -525,7 +524,7 @@ Loop: // TODO: This creates stacked up deferrals defer tx.Rollback() } - batch = olddb.NewHashBatch(tx, quit, cfg.dirs.Tmp, logger) + batch = membatch.NewHashBatch(tx, quit, cfg.dirs.Tmp, logger) } gas = gas + block.GasUsed() @@ -543,7 +542,7 @@ Loop: if err = s.Update(batch, stageProgress); err != nil { return err } - if err = batch.Commit(); err != nil { + if err = batch.Flush(ctx, tx); err != nil { return fmt.Errorf("batch commit: %w", err) } @@ -647,7 +646,7 @@ func blocksReadAheadFunc(ctx context.Context, tx kv.Tx, cfg *ExecuteBlockCfg, bl } func logProgress(logPrefix string, prevBlock uint64, prevTime time.Time, currentBlock uint64, prevTx, currentTx uint64, gas uint64, - gasState float64, batch ethdb.DbWithPendingMutations, logger log.Logger) (uint64, uint64, time.Time) { + gasState float64, batch kv.PendingMutations, logger log.Logger) (uint64, uint64, time.Time) { currentTime := time.Now() interval := currentTime.Sub(prevTime) speed := float64(currentBlock-prevBlock) / (float64(interval) / float64(time.Second)) diff --git a/eth/stagedsync/stage_mining_exec.go b/eth/stagedsync/stage_mining_exec.go index 4bb14b2f0b1..afd017fead6 100644 --- a/eth/stagedsync/stage_mining_exec.go +++ b/eth/stagedsync/stage_mining_exec.go @@ -10,6 +10,7 @@ import ( mapset "github.com/deckarep/golang-set/v2" "github.com/holiman/uint256" + "github.com/ledgerwatch/erigon-lib/kv/membatch" "github.com/ledgerwatch/log/v3" "golang.org/x/net/context" @@ -17,7 +18,6 @@ import ( libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/common/fixedgas" "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon-lib/kv/memdb" types2 "github.com/ledgerwatch/erigon-lib/types" "github.com/ledgerwatch/erigon/consensus" @@ -115,8 +115,11 @@ func SpawnMiningExecStage(s *StageState, tx kv.RwTx, cfg MiningExecCfg, quit <-c } else { yielded := mapset.NewSet[[32]byte]() - simulationTx := memdb.NewMemoryBatch(tx, cfg.tmpdir) - defer simulationTx.Rollback() + var simulationTx kv.StatelessRwTx + m := membatch.NewHashBatch(tx, quit, cfg.tmpdir, logger) + defer m.Close() + simulationTx = m + executionAt, err := s.ExecutionAt(tx) if err != nil { return err @@ -180,7 +183,7 @@ func getNextTransactions( header *types.Header, amount uint16, executionAt uint64, - simulationTx *memdb.MemoryMutation, + simulationTx kv.StatelessRwTx, alreadyYielded mapset.Set[[32]byte], logger log.Logger, ) (types.TransactionsStream, int, error) { @@ -237,7 +240,7 @@ func getNextTransactions( return types.NewTransactionsFixedOrder(txs), count, nil } -func filterBadTransactions(transactions []types.Transaction, config chain.Config, blockNumber uint64, baseFee *big.Int, simulationTx *memdb.MemoryMutation, logger log.Logger) ([]types.Transaction, error) { +func filterBadTransactions(transactions []types.Transaction, config chain.Config, blockNumber uint64, baseFee *big.Int, simulationTx kv.StatelessRwTx, logger log.Logger) ([]types.Transaction, error) { initialCnt := len(transactions) var filtered []types.Transaction gasBailout := false diff --git a/ethdb/db_interface.go b/ethdb/db_interface.go index d8a1cf590f9..ab131be8800 100644 --- a/ethdb/db_interface.go +++ b/ethdb/db_interface.go @@ -17,7 +17,6 @@ package ethdb import ( - "context" "errors" "github.com/ledgerwatch/erigon-lib/kv" @@ -28,85 +27,6 @@ import ( // ErrKeyNotFound is returned when key isn't found in the database. var ErrKeyNotFound = errors.New("db: key not found") -type TxFlags uint - -const ( - RW TxFlags = 0x00 // default - RO TxFlags = 0x02 -) - -// DBGetter wraps the database read operations. -type DBGetter interface { - kv.Getter - - // Get returns the value for a given key if it's present. - Get(bucket string, key []byte) ([]byte, error) -} - -// Database wraps all database operations. All methods are safe for concurrent use. -type Database interface { - DBGetter - kv.Putter - kv.Deleter - kv.Closer - - Begin(ctx context.Context, flags TxFlags) (DbWithPendingMutations, error) // starts db transaction - Last(bucket string) ([]byte, []byte, error) - - IncrementSequence(bucket string, amount uint64) (uint64, error) - ReadSequence(bucket string) (uint64, error) - RwKV() kv.RwDB -} - -// MinDatabase is a minimalistic version of the Database interface. -type MinDatabase interface { - Get(bucket string, key []byte) ([]byte, error) - Put(table string, k, v []byte) error - Delete(table string, k []byte) error -} - -// DbWithPendingMutations is an extended version of the Database, -// where all changes are first made in memory. -// Later they can either be committed to the database or rolled back. -type DbWithPendingMutations interface { - Database - - // Commit - commits transaction (or flush data into underlying db object in case of `mutation`) - // - // Common pattern: - // - // tx := db.Begin() - // defer tx.Rollback() - // ... some calculations on `tx` - // tx.Commit() - // - Commit() error - - Rollback() - BatchSize() int -} - -type HasRwKV interface { - RwKV() kv.RwDB - SetRwKV(kv kv.RwDB) -} - type HasTx interface { Tx() kv.Tx } - -type BucketsMigrator interface { - BucketExists(bucket string) (bool, error) // makes them empty - ClearBuckets(buckets ...string) error // makes them empty - DropBuckets(buckets ...string) error // drops them, use of them after drop will panic -} - -func GetOneWrapper(dat []byte, err error) ([]byte, error) { - if err != nil { - return nil, err - } - if dat == nil { - return nil, ErrKeyNotFound - } - return dat, nil -} diff --git a/ethdb/olddb/mutation.go b/ethdb/olddb/mutation.go deleted file mode 100644 index ee39868dabb..00000000000 --- a/ethdb/olddb/mutation.go +++ /dev/null @@ -1,345 +0,0 @@ -package olddb - -import ( - "bytes" - "context" - "encoding/binary" - "fmt" - "strings" - "sync" - "time" - "unsafe" - - "github.com/google/btree" - "github.com/ledgerwatch/erigon-lib/common" - "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon/ethdb" - "github.com/ledgerwatch/log/v3" -) - -type mutation struct { - puts *btree.BTree - db kv.RwTx - quit <-chan struct{} - clean func() - searchItem MutationItem - mu sync.RWMutex - size int -} - -type MutationItem struct { - table string - key []byte - value []byte -} - -// NewBatch - starts in-mem batch -// -// Common pattern: -// -// batch := db.NewBatch() -// defer batch.Rollback() -// ... some calculations on `batch` -// batch.Commit() -func NewBatch(tx kv.RwTx, quit <-chan struct{}) *mutation { - clean := func() {} - if quit == nil { - ch := make(chan struct{}) - clean = func() { close(ch) } - quit = ch - } - return &mutation{ - db: tx, - puts: btree.New(32), - quit: quit, - clean: clean, - } -} - -func (mi *MutationItem) Less(than btree.Item) bool { - i, ok := than.(*MutationItem) - if !ok { - log.Warn("Failed to convert btree.Item to MutationItem pointer") - } - c := strings.Compare(mi.table, i.table) - if c != 0 { - return c < 0 - } - return bytes.Compare(mi.key, i.key) < 0 -} - -func (m *mutation) ReadOnly() bool { return false } -func (m *mutation) RwKV() kv.RwDB { - if casted, ok := m.db.(ethdb.HasRwKV); ok { - return casted.RwKV() - } - return nil -} - -func (m *mutation) getMem(table string, key []byte) ([]byte, bool) { - m.mu.RLock() - defer m.mu.RUnlock() - m.searchItem.table = table - m.searchItem.key = key - i := m.puts.Get(&m.searchItem) - if i == nil { - return nil, false - } - return i.(*MutationItem).value, true -} - -func (m *mutation) IncrementSequence(bucket string, amount uint64) (res uint64, err error) { - v, ok := m.getMem(kv.Sequence, []byte(bucket)) - if !ok && m.db != nil { - v, err = m.db.GetOne(kv.Sequence, []byte(bucket)) - if err != nil { - return 0, err - } - } - - var currentV uint64 = 0 - if len(v) > 0 { - currentV = binary.BigEndian.Uint64(v) - } - - newVBytes := make([]byte, 8) - binary.BigEndian.PutUint64(newVBytes, currentV+amount) - if err = m.Put(kv.Sequence, []byte(bucket), newVBytes); err != nil { - return 0, err - } - - return currentV, nil -} -func (m *mutation) ReadSequence(bucket string) (res uint64, err error) { - v, ok := m.getMem(kv.Sequence, []byte(bucket)) - if !ok && m.db != nil { - v, err = m.db.GetOne(kv.Sequence, []byte(bucket)) - if err != nil { - return 0, err - } - } - var currentV uint64 = 0 - if len(v) > 0 { - currentV = binary.BigEndian.Uint64(v) - } - - return currentV, nil -} - -// Can only be called from the worker thread -func (m *mutation) GetOne(table string, key []byte) ([]byte, error) { - if value, ok := m.getMem(table, key); ok { - if value == nil { - return nil, nil - } - return value, nil - } - if m.db != nil { - // TODO: simplify when tx can no longer be parent of mutation - value, err := m.db.GetOne(table, key) - if err != nil { - return nil, err - } - - return value, nil - } - return nil, nil -} - -// Can only be called from the worker thread -func (m *mutation) Get(table string, key []byte) ([]byte, error) { - value, err := m.GetOne(table, key) - if err != nil { - return nil, err - } - - if value == nil { - return nil, ethdb.ErrKeyNotFound - } - - return value, nil -} - -func (m *mutation) Last(table string) ([]byte, []byte, error) { - c, err := m.db.Cursor(table) - if err != nil { - return nil, nil, err - } - defer c.Close() - return c.Last() -} - -func (m *mutation) hasMem(table string, key []byte) bool { - m.mu.RLock() - defer m.mu.RUnlock() - m.searchItem.table = table - m.searchItem.key = key - return m.puts.Has(&m.searchItem) -} - -func (m *mutation) Has(table string, key []byte) (bool, error) { - if m.hasMem(table, key) { - return true, nil - } - if m.db != nil { - return m.db.Has(table, key) - } - return false, nil -} - -func (m *mutation) Put(table string, k, v []byte) error { - m.mu.Lock() - defer m.mu.Unlock() - - newMi := &MutationItem{table: table, key: k, value: v} - i := m.puts.ReplaceOrInsert(newMi) - m.size += int(unsafe.Sizeof(newMi)) + len(k) + len(v) - if i != nil { - oldMi := i.(*MutationItem) - m.size -= int(unsafe.Sizeof(oldMi)) + len(oldMi.key) + len(oldMi.value) - } - return nil -} - -func (m *mutation) Append(table string, key []byte, value []byte) error { - return m.Put(table, key, value) -} - -func (m *mutation) AppendDup(table string, key []byte, value []byte) error { - return m.Put(table, key, value) -} - -func (m *mutation) BatchSize() int { - m.mu.RLock() - defer m.mu.RUnlock() - return m.size -} - -func (m *mutation) ForEach(bucket string, fromPrefix []byte, walker func(k, v []byte) error) error { - m.panicOnEmptyDB() - return m.db.ForEach(bucket, fromPrefix, walker) -} - -func (m *mutation) ForPrefix(bucket string, prefix []byte, walker func(k, v []byte) error) error { - m.panicOnEmptyDB() - return m.db.ForPrefix(bucket, prefix, walker) -} - -func (m *mutation) ForAmount(bucket string, prefix []byte, amount uint32, walker func(k, v []byte) error) error { - m.panicOnEmptyDB() - return m.db.ForAmount(bucket, prefix, amount, walker) -} - -func (m *mutation) Delete(table string, k []byte) error { - //m.puts.Delete(table, k) - return m.Put(table, k, nil) -} - -func (m *mutation) doCommit(tx kv.RwTx) error { - var prevTable string - var c kv.RwCursor - var innerErr error - var isEndOfBucket bool - logEvery := time.NewTicker(30 * time.Second) - defer logEvery.Stop() - count := 0 - total := float64(m.puts.Len()) - - m.puts.Ascend(func(i btree.Item) bool { - mi := i.(*MutationItem) - if mi.table != prevTable { - if c != nil { - c.Close() - } - var err error - c, err = tx.RwCursor(mi.table) - if err != nil { - innerErr = err - return false - } - prevTable = mi.table - firstKey, _, err := c.Seek(mi.key) - if err != nil { - innerErr = err - return false - } - isEndOfBucket = firstKey == nil - } - if isEndOfBucket { - if len(mi.value) > 0 { - if err := c.Append(mi.key, mi.value); err != nil { - innerErr = err - return false - } - } - } else if len(mi.value) == 0 { - if err := c.Delete(mi.key); err != nil { - innerErr = err - return false - } - } else { - if err := c.Put(mi.key, mi.value); err != nil { - innerErr = err - return false - } - } - - count++ - - select { - default: - case <-logEvery.C: - progress := fmt.Sprintf("%.1fM/%.1fM", float64(count)/1_000_000, total/1_000_000) - log.Info("Write to db", "progress", progress, "current table", mi.table) - tx.CollectMetrics() - case <-m.quit: - innerErr = common.ErrStopped - return false - } - return true - }) - tx.CollectMetrics() - return innerErr -} - -func (m *mutation) Commit() error { - if m.db == nil { - return nil - } - m.mu.Lock() - defer m.mu.Unlock() - if err := m.doCommit(m.db); err != nil { - return err - } - - m.puts.Clear(false /* addNodesToFreelist */) - m.size = 0 - m.clean() - return nil -} - -func (m *mutation) Rollback() { - m.mu.Lock() - defer m.mu.Unlock() - m.puts.Clear(false /* addNodesToFreelist */) - m.size = 0 - m.clean() -} - -func (m *mutation) Close() { - m.Rollback() -} - -func (m *mutation) Begin(ctx context.Context, flags ethdb.TxFlags) (ethdb.DbWithPendingMutations, error) { - panic("mutation can't start transaction, because doesn't own it") -} - -func (m *mutation) panicOnEmptyDB() { - if m.db == nil { - panic("Not implemented") - } -} - -func (m *mutation) SetRwKV(kv kv.RwDB) { - m.db.(ethdb.HasRwKV).SetRwKV(kv) -} diff --git a/ethdb/olddb/object_db.go b/ethdb/olddb/object_db.go deleted file mode 100644 index 24d03523175..00000000000 --- a/ethdb/olddb/object_db.go +++ /dev/null @@ -1,248 +0,0 @@ -// Copyright 2014 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -// Package ethdb defines the interfaces for an Ethereum data store. -package olddb - -import ( - "context" - "fmt" - - "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon/common" - "github.com/ledgerwatch/erigon/ethdb" - "github.com/ledgerwatch/log/v3" -) - -// ObjectDatabase - is an object-style interface of DB accessing -type ObjectDatabase struct { - kv kv.RwDB -} - -// NewObjectDatabase returns a AbstractDB wrapper. -// Deprecated -func NewObjectDatabase(kv kv.RwDB) *ObjectDatabase { - return &ObjectDatabase{ - kv: kv, - } -} - -// Put inserts or updates a single entry. -func (db *ObjectDatabase) Put(table string, k, v []byte) error { - err := db.kv.Update(context.Background(), func(tx kv.RwTx) error { - return tx.Put(table, k, v) - }) - return err -} - -// Append appends a single entry to the end of the bucket. -func (db *ObjectDatabase) Append(bucket string, key []byte, value []byte) error { - err := db.kv.Update(context.Background(), func(tx kv.RwTx) error { - c, err := tx.RwCursor(bucket) - if err != nil { - return err - } - return c.Append(key, value) - }) - return err -} - -// AppendDup appends a single entry to the end of the bucket. -func (db *ObjectDatabase) AppendDup(bucket string, key []byte, value []byte) error { - err := db.kv.Update(context.Background(), func(tx kv.RwTx) error { - c, err := tx.RwCursorDupSort(bucket) - if err != nil { - return err - } - return c.AppendDup(key, value) - }) - return err -} - -func (db *ObjectDatabase) Has(bucket string, key []byte) (bool, error) { - var has bool - err := db.kv.View(context.Background(), func(tx kv.Tx) error { - v, err := tx.GetOne(bucket, key) - if err != nil { - return err - } - has = v != nil - return nil - }) - return has, err -} - -func (db *ObjectDatabase) IncrementSequence(bucket string, amount uint64) (res uint64, err error) { - err = db.kv.Update(context.Background(), func(tx kv.RwTx) error { - res, err = tx.IncrementSequence(bucket, amount) - return err - }) - return res, err -} -func (db *ObjectDatabase) ReadSequence(bucket string) (res uint64, err error) { - err = db.kv.View(context.Background(), func(tx kv.Tx) error { - res, err = tx.ReadSequence(bucket) - return err - }) - return res, err -} - -// Get returns the value for a given key if it's present. -func (db *ObjectDatabase) GetOne(bucket string, key []byte) ([]byte, error) { - var dat []byte - err := db.kv.View(context.Background(), func(tx kv.Tx) error { - v, err := tx.GetOne(bucket, key) - if err != nil { - return err - } - if v != nil { - dat = make([]byte, len(v)) - copy(dat, v) - } - return nil - }) - return dat, err -} - -func (db *ObjectDatabase) Get(bucket string, key []byte) ([]byte, error) { - dat, err := db.GetOne(bucket, key) - return ethdb.GetOneWrapper(dat, err) -} - -func (db *ObjectDatabase) Last(bucket string) ([]byte, []byte, error) { - var key, value []byte - if err := db.kv.View(context.Background(), func(tx kv.Tx) error { - c, err := tx.Cursor(bucket) - if err != nil { - return err - } - k, v, err := c.Last() - if err != nil { - return err - } - if k != nil { - key, value = common.CopyBytes(k), common.CopyBytes(v) - } - return nil - }); err != nil { - return nil, nil, err - } - return key, value, nil -} - -func (db *ObjectDatabase) ForEach(bucket string, fromPrefix []byte, walker func(k, v []byte) error) error { - return db.kv.View(context.Background(), func(tx kv.Tx) error { - return tx.ForEach(bucket, fromPrefix, walker) - }) -} -func (db *ObjectDatabase) ForAmount(bucket string, fromPrefix []byte, amount uint32, walker func(k, v []byte) error) error { - return db.kv.View(context.Background(), func(tx kv.Tx) error { - return tx.ForAmount(bucket, fromPrefix, amount, walker) - }) -} - -func (db *ObjectDatabase) ForPrefix(bucket string, prefix []byte, walker func(k, v []byte) error) error { - return db.kv.View(context.Background(), func(tx kv.Tx) error { - return tx.ForPrefix(bucket, prefix, walker) - }) -} - -// Delete deletes the key from the queue and database -func (db *ObjectDatabase) Delete(table string, k []byte) error { - // Execute the actual operation - err := db.kv.Update(context.Background(), func(tx kv.RwTx) error { - return tx.Delete(table, k) - }) - return err -} - -func (db *ObjectDatabase) BucketExists(name string) (bool, error) { - exists := false - if err := db.kv.View(context.Background(), func(tx kv.Tx) (err error) { - migrator, ok := tx.(kv.BucketMigrator) - if !ok { - return fmt.Errorf("%T doesn't implement ethdb.TxMigrator interface", db.kv) - } - exists, err = migrator.ExistsBucket(name) - if err != nil { - return err - } - return nil - }); err != nil { - return false, err - } - return exists, nil -} - -func (db *ObjectDatabase) ClearBuckets(buckets ...string) error { - for i := range buckets { - name := buckets[i] - if err := db.kv.Update(context.Background(), func(tx kv.RwTx) error { - migrator, ok := tx.(kv.BucketMigrator) - if !ok { - return fmt.Errorf("%T doesn't implement ethdb.TxMigrator interface", db.kv) - } - if err := migrator.ClearBucket(name); err != nil { - return err - } - return nil - }); err != nil { - return err - } - } - - return nil -} - -func (db *ObjectDatabase) DropBuckets(buckets ...string) error { - for i := range buckets { - name := buckets[i] - log.Info("Dropping bucket", "name", name) - if err := db.kv.Update(context.Background(), func(tx kv.RwTx) error { - migrator, ok := tx.(kv.BucketMigrator) - if !ok { - return fmt.Errorf("%T doesn't implement ethdb.TxMigrator interface", db.kv) - } - if err := migrator.DropBucket(name); err != nil { - return err - } - return nil - }); err != nil { - return err - } - } - return nil -} - -func (db *ObjectDatabase) Close() { - db.kv.Close() -} - -func (db *ObjectDatabase) RwKV() kv.RwDB { - return db.kv -} - -func (db *ObjectDatabase) SetRwKV(kv kv.RwDB) { - db.kv = kv -} - -func (db *ObjectDatabase) Begin(ctx context.Context, flags ethdb.TxFlags) (ethdb.DbWithPendingMutations, error) { - batch := &TxDb{db: db} - if err := batch.begin(ctx, flags); err != nil { - return batch, err - } - return batch, nil -} diff --git a/ethdb/olddb/tx_db.go b/ethdb/olddb/tx_db.go deleted file mode 100644 index f54727ba04e..00000000000 --- a/ethdb/olddb/tx_db.go +++ /dev/null @@ -1,238 +0,0 @@ -package olddb - -import ( - "context" - "fmt" - - "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon/ethdb" - "github.com/ledgerwatch/log/v3" -) - -// TxDb - provides Database interface around ethdb.Tx -// It's not thread-safe! -// TxDb not usable after .Commit()/.Rollback() call, but usable after .CommitAndBegin() call -// you can put unlimited amount of data into this class -// Walk and MultiWalk methods - work outside of Tx object yet, will implement it later -// Deprecated -// nolint -type TxDb struct { - db ethdb.Database - tx kv.Tx - cursors map[string]kv.Cursor - txFlags ethdb.TxFlags - len uint64 -} - -// nolint -func WrapIntoTxDB(tx kv.RwTx) *TxDb { - return &TxDb{tx: tx, cursors: map[string]kv.Cursor{}} -} - -func (m *TxDb) Close() { - panic("don't call me") -} - -func (m *TxDb) Begin(ctx context.Context, flags ethdb.TxFlags) (ethdb.DbWithPendingMutations, error) { - batch := m - if m.tx != nil { - panic("nested transactions not supported") - } - - if err := batch.begin(ctx, flags); err != nil { - return nil, err - } - return batch, nil -} - -func (m *TxDb) cursor(bucket string) (kv.Cursor, error) { - c, ok := m.cursors[bucket] - if !ok { - var err error - c, err = m.tx.Cursor(bucket) - if err != nil { - return nil, err - } - m.cursors[bucket] = c - } - return c, nil -} - -func (m *TxDb) IncrementSequence(bucket string, amount uint64) (res uint64, err error) { - return m.tx.(kv.RwTx).IncrementSequence(bucket, amount) -} - -func (m *TxDb) ReadSequence(bucket string) (res uint64, err error) { - return m.tx.ReadSequence(bucket) -} - -func (m *TxDb) Put(table string, k, v []byte) error { - m.len += uint64(len(k) + len(v)) - c, err := m.cursor(table) - if err != nil { - return err - } - return c.(kv.RwCursor).Put(k, v) -} - -func (m *TxDb) Append(bucket string, key []byte, value []byte) error { - m.len += uint64(len(key) + len(value)) - c, err := m.cursor(bucket) - if err != nil { - return err - } - return c.(kv.RwCursor).Append(key, value) -} - -func (m *TxDb) AppendDup(bucket string, key []byte, value []byte) error { - m.len += uint64(len(key) + len(value)) - c, err := m.cursor(bucket) - if err != nil { - return err - } - return c.(kv.RwCursorDupSort).AppendDup(key, value) -} - -func (m *TxDb) Delete(table string, k []byte) error { - m.len += uint64(len(k)) - c, err := m.cursor(table) - if err != nil { - return err - } - return c.(kv.RwCursor).Delete(k) -} - -func (m *TxDb) begin(ctx context.Context, flags ethdb.TxFlags) error { - db := m.db.(ethdb.HasRwKV).RwKV() - - var tx kv.Tx - var err error - if flagsðdb.RO != 0 { - tx, err = db.BeginRo(ctx) - } else { - tx, err = db.BeginRw(ctx) - } - if err != nil { - return err - } - m.tx = tx - m.cursors = make(map[string]kv.Cursor, 16) - return nil -} - -func (m *TxDb) RwKV() kv.RwDB { - panic("not allowed to get KV interface because you will loose transaction, please use .Tx() method") -} - -// Last can only be called from the transaction thread -func (m *TxDb) Last(bucket string) ([]byte, []byte, error) { - c, err := m.cursor(bucket) - if err != nil { - return []byte{}, nil, err - } - return c.Last() -} - -func (m *TxDb) GetOne(bucket string, key []byte) ([]byte, error) { - c, err := m.cursor(bucket) - if err != nil { - return nil, err - } - _, v, err := c.SeekExact(key) - return v, err -} - -func (m *TxDb) Get(bucket string, key []byte) ([]byte, error) { - dat, err := m.GetOne(bucket, key) - return ethdb.GetOneWrapper(dat, err) -} - -func (m *TxDb) Has(bucket string, key []byte) (bool, error) { - v, err := m.Get(bucket, key) - if err != nil { - return false, err - } - return v != nil, nil -} - -func (m *TxDb) BatchSize() int { - return int(m.len) -} - -func (m *TxDb) ForEach(bucket string, fromPrefix []byte, walker func(k, v []byte) error) error { - return m.tx.ForEach(bucket, fromPrefix, walker) -} - -func (m *TxDb) ForPrefix(bucket string, prefix []byte, walker func(k, v []byte) error) error { - return m.tx.ForPrefix(bucket, prefix, walker) -} - -func (m *TxDb) ForAmount(bucket string, prefix []byte, amount uint32, walker func(k, v []byte) error) error { - return m.tx.ForAmount(bucket, prefix, amount, walker) -} - -func (m *TxDb) Commit() error { - if m.tx == nil { - return fmt.Errorf("second call .Commit() on same transaction") - } - if err := m.tx.Commit(); err != nil { - return err - } - m.tx = nil - m.cursors = nil - m.len = 0 - return nil -} - -func (m *TxDb) Rollback() { - if m.tx == nil { - return - } - m.tx.Rollback() - m.cursors = nil - m.tx = nil - m.len = 0 -} - -func (m *TxDb) Tx() kv.Tx { - return m.tx -} - -func (m *TxDb) BucketExists(name string) (bool, error) { - migrator, ok := m.tx.(kv.BucketMigrator) - if !ok { - return false, fmt.Errorf("%T doesn't implement ethdb.TxMigrator interface", m.tx) - } - return migrator.ExistsBucket(name) -} - -func (m *TxDb) ClearBuckets(buckets ...string) error { - for i := range buckets { - name := buckets[i] - - migrator, ok := m.tx.(kv.BucketMigrator) - if !ok { - return fmt.Errorf("%T doesn't implement ethdb.TxMigrator interface", m.tx) - } - if err := migrator.ClearBucket(name); err != nil { - return err - } - } - - return nil -} - -func (m *TxDb) DropBuckets(buckets ...string) error { - for i := range buckets { - name := buckets[i] - log.Info("Dropping bucket", "name", name) - migrator, ok := m.tx.(kv.BucketMigrator) - if !ok { - return fmt.Errorf("%T doesn't implement ethdb.TxMigrator interface", m.tx) - } - if err := migrator.DropBucket(name); err != nil { - return err - } - } - return nil -} diff --git a/turbo/engineapi/engine_helpers/fork_validator.go b/turbo/engineapi/engine_helpers/fork_validator.go index 14bafb148b1..33f739e8981 100644 --- a/turbo/engineapi/engine_helpers/fork_validator.go +++ b/turbo/engineapi/engine_helpers/fork_validator.go @@ -195,10 +195,6 @@ func (fv *ForkValidator) ValidatePayload(tx kv.Tx, header *types.Header, body *t header *types.Header body *types.Body ) - body, criticalError = fv.blockReader.BodyWithTransactions(fv.ctx, tx, currentHash, unwindPoint) - if criticalError != nil { - return - } header, criticalError = fv.blockReader.Header(fv.ctx, tx, currentHash, unwindPoint) if criticalError != nil { return @@ -208,14 +204,18 @@ func (fv *ForkValidator) ValidatePayload(tx kv.Tx, header *types.Header, body *t status = engine_types.AcceptedStatus return } - headersChain = append([]*types.Header{header}, headersChain...) + body, criticalError = fv.blockReader.BodyWithTransactions(fv.ctx, tx, currentHash, unwindPoint) + if criticalError != nil { + return + } if body == nil { - bodiesChain = append([]*types.RawBody{nil}, bodiesChain...) - } else { - bodiesChain = append([]*types.RawBody{body.RawBody()}, bodiesChain...) - + criticalError = fmt.Errorf("found chain gap in block body at hash %s, number %d", currentHash, unwindPoint) + return } + headersChain = append([]*types.Header{header}, headersChain...) + bodiesChain = append([]*types.RawBody{body.RawBody()}, bodiesChain...) + currentHash = header.ParentHash unwindPoint = header.Number.Uint64() - 1 foundCanonical, criticalError = rawdb.IsCanonicalHash(tx, currentHash, unwindPoint) diff --git a/turbo/jsonrpc/bor_snapshot.go b/turbo/jsonrpc/bor_snapshot.go index 6c04e20ddc8..791a5a93752 100644 --- a/turbo/jsonrpc/bor_snapshot.go +++ b/turbo/jsonrpc/bor_snapshot.go @@ -13,10 +13,10 @@ import ( "github.com/ledgerwatch/erigon/consensus" "github.com/ledgerwatch/erigon/consensus/bor" + "github.com/ledgerwatch/erigon/consensus/bor/finality/whitelist" "github.com/ledgerwatch/erigon/consensus/bor/valset" "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/types" - "github.com/ledgerwatch/erigon/eth/borfinality/whitelist" "github.com/ledgerwatch/erigon/rpc" ) diff --git a/turbo/jsonrpc/erigon_system.go b/turbo/jsonrpc/erigon_system.go index b9b20085926..5513d186685 100644 --- a/turbo/jsonrpc/erigon_system.go +++ b/turbo/jsonrpc/erigon_system.go @@ -7,9 +7,9 @@ import ( "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon/common/hexutil" + borfinality "github.com/ledgerwatch/erigon/consensus/bor/finality" + "github.com/ledgerwatch/erigon/consensus/bor/finality/whitelist" "github.com/ledgerwatch/erigon/core/forkid" - "github.com/ledgerwatch/erigon/eth/borfinality" - "github.com/ledgerwatch/erigon/eth/borfinality/whitelist" "github.com/ledgerwatch/erigon/rpc" "github.com/ledgerwatch/erigon/turbo/rpchelper" ) diff --git a/turbo/jsonrpc/send_transaction_test.go b/turbo/jsonrpc/send_transaction_test.go index dab70e7d36d..b5cf7e4e814 100644 --- a/turbo/jsonrpc/send_transaction_test.go +++ b/turbo/jsonrpc/send_transaction_test.go @@ -107,7 +107,7 @@ func TestSendRawTransaction(t *testing.T) { t.Log("Timeout waiting for txn from channel") jsonTx, err := api.GetTransactionByHash(ctx, txHash) require.NoError(err) - require.Equal(expectedValue+1, jsonTx.Value.Uint64()) + require.Equal(expectedValue, jsonTx.Value.Uint64()) } //send same tx second time and expect error diff --git a/turbo/rpchelper/helper.go b/turbo/rpchelper/helper.go index 1b97a6ea16d..497eaa9523a 100644 --- a/turbo/rpchelper/helper.go +++ b/turbo/rpchelper/helper.go @@ -9,11 +9,11 @@ import ( "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/kvcache" "github.com/ledgerwatch/erigon-lib/kv/rawdbv3" + borfinality "github.com/ledgerwatch/erigon/consensus/bor/finality" + "github.com/ledgerwatch/erigon/consensus/bor/finality/whitelist" "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/state" "github.com/ledgerwatch/erigon/core/systemcontracts" - "github.com/ledgerwatch/erigon/eth/borfinality" - "github.com/ledgerwatch/erigon/eth/borfinality/whitelist" "github.com/ledgerwatch/erigon/eth/ethconfig" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/rpc" diff --git a/turbo/stages/mock/mock_sentry.go b/turbo/stages/mock/mock_sentry.go index 57a31976699..2912876287f 100644 --- a/turbo/stages/mock/mock_sentry.go +++ b/turbo/stages/mock/mock_sentry.go @@ -350,7 +350,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK dirs, notifications, mock.BlockReader, blockWriter, mock.agg, nil, terseLogger) chainReader := stagedsync.NewChainReaderImpl(mock.ChainConfig, batch, mock.BlockReader, logger) // We start the mining step - if err := stages2.StateStep(ctx, chainReader, mock.Engine, batch, blockWriter, stateSync, mock.sentriesClient.Bd, header, body, unwindPoint, headersChain, bodiesChain); err != nil { + if err := stages2.StateStep(ctx, chainReader, mock.Engine, batch, blockWriter, stateSync, mock.sentriesClient.Bd, header, body, unwindPoint, headersChain, bodiesChain, histV3); err != nil { logger.Warn("Could not validate block", "err", err) return err } diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index 2bd93144d5b..3b15f457628 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -25,13 +25,13 @@ import ( "github.com/ledgerwatch/erigon/turbo/silkworm" "github.com/ledgerwatch/erigon/cmd/sentry/sentry" + "github.com/ledgerwatch/erigon/consensus/bor/finality/flags" "github.com/ledgerwatch/erigon/consensus/bor/heimdall" "github.com/ledgerwatch/erigon/consensus/misc" "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/rawdb/blockio" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/core/vm" - "github.com/ledgerwatch/erigon/eth/borfinality/flags" "github.com/ledgerwatch/erigon/eth/ethconfig" "github.com/ledgerwatch/erigon/eth/stagedsync" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" @@ -357,7 +357,49 @@ func MiningStep(ctx context.Context, kv kv.RwDB, mining *stagedsync.Sync, tmpDir return nil } -func StateStep(ctx context.Context, chainReader consensus.ChainHeaderReader, engine consensus.Engine, batch kv.RwTx, blockWriter *blockio.BlockWriter, stateSync *stagedsync.Sync, Bd *bodydownload.BodyDownload, header *types.Header, body *types.RawBody, unwindPoint uint64, headersChain []*types.Header, bodiesChain []*types.RawBody) (err error) { +func addAndVerifyBlockStep(batch kv.RwTx, engine consensus.Engine, chainReader consensus.ChainReader, currentHeader *types.Header, currentBody *types.RawBody, histV3 bool) error { + currentHeight := currentHeader.Number.Uint64() + currentHash := currentHeader.Hash() + if chainReader != nil { + if err := engine.VerifyHeader(chainReader, currentHeader, true); err != nil { + log.Warn("Header Verification Failed", "number", currentHeight, "hash", currentHash, "reason", err) + return fmt.Errorf("%w: %v", consensus.ErrInvalidBlock, err) + } + if err := engine.VerifyUncles(chainReader, currentHeader, currentBody.Uncles); err != nil { + log.Warn("Unlcles Verification Failed", "number", currentHeight, "hash", currentHash, "reason", err) + return fmt.Errorf("%w: %v", consensus.ErrInvalidBlock, err) + } + } + // Prepare memory state for block execution + if err := rawdb.WriteHeader(batch, currentHeader); err != nil { + return err + } + if err := rawdb.WriteCanonicalHash(batch, currentHash, currentHeight); err != nil { + return err + } + if err := rawdb.WriteHeadHeaderHash(batch, currentHash); err != nil { + return err + } + var ok bool + var err error + if ok, err = rawdb.WriteRawBodyIfNotExists(batch, currentHash, currentHeight, currentBody); err != nil { + return err + } + if histV3 && ok { + if err := rawdb.AppendCanonicalTxNums(batch, currentHeight); err != nil { + return err + } + } + if err := stages.SaveStageProgress(batch, stages.Headers, currentHeight); err != nil { + return err + } + if err := stages.SaveStageProgress(batch, stages.Bodies, currentHeight); err != nil { + return err + } + return nil +} + +func StateStep(ctx context.Context, chainReader consensus.ChainReader, engine consensus.Engine, batch kv.RwTx, blockWriter *blockio.BlockWriter, stateSync *stagedsync.Sync, Bd *bodydownload.BodyDownload, header *types.Header, body *types.RawBody, unwindPoint uint64, headersChain []*types.Header, bodiesChain []*types.RawBody, histV3 bool) (err error) { defer func() { if rec := recover(); rec != nil { err = fmt.Errorf("%+v, trace: %s", rec, dbg.Stack()) @@ -379,30 +421,8 @@ func StateStep(ctx context.Context, chainReader consensus.ChainHeaderReader, eng for i := range headersChain { currentHeader := headersChain[i] currentBody := bodiesChain[i] - currentHeight := headersChain[i].Number.Uint64() - currentHash := headersChain[i].Hash() - if chainReader != nil { - if err := engine.VerifyHeader(chainReader, currentHeader, true); err != nil { - log.Warn("Header Verification Failed", "number", currentHeight, "hash", currentHash, "reason", err) - return fmt.Errorf("%w: %v", consensus.ErrInvalidBlock, err) - } - } - - // Prepare memory state for block execution - if err := rawdb.WriteHeader(batch, currentHeader); err != nil { - return err - } - if currentBody != nil { - Bd.AddToPrefetch(currentHeader, currentBody) - } - if err := rawdb.WriteCanonicalHash(batch, currentHash, currentHeight); err != nil { - return err - } - if err := rawdb.WriteHeadHeaderHash(batch, currentHash); err != nil { - return err - } - if err = stages.SaveStageProgress(batch, stages.Headers, currentHeight); err != nil { + if err := addAndVerifyBlockStep(batch, engine, chainReader, currentHeader, currentBody, histV3); err != nil { return err } // Run state sync @@ -415,30 +435,10 @@ func StateStep(ctx context.Context, chainReader consensus.ChainHeaderReader, eng if header == nil { return nil } - if err := engine.VerifyHeader(chainReader, header, true); err != nil { - log.Warn("Header Verification Failed", "number", header.Number.Uint64(), "hash", header.Hash(), "reason", err) - return fmt.Errorf("%w: %v", consensus.ErrInvalidBlock, err) - } - - // Setup - height := header.Number.Uint64() - hash := header.Hash() // Prepare memory state for block execution - if err = rawdb.WriteHeader(batch, header); err != nil { - return err - } - if err = rawdb.WriteCanonicalHash(batch, hash, height); err != nil { + if err := addAndVerifyBlockStep(batch, engine, chainReader, header, body, histV3); err != nil { return err } - if err := rawdb.WriteHeadHeaderHash(batch, hash); err != nil { - return err - } - if err = stages.SaveStageProgress(batch, stages.Headers, height); err != nil { - return err - } - if body != nil { - Bd.AddToPrefetch(header, body) - } // Run state sync if err = stateSync.RunNoInterrupt(nil, batch, false /* firstCycle */); err != nil { return err