Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: store skipped transactions in local db #467

Merged
merged 27 commits into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
d698c6d
feat: store skipped txs in local db
Thegaram Aug 17, 2023
c45f7e7
bump version
Thegaram Aug 17, 2023
ed2514e
Merge branch 'develop' into store-skipped-transactions
Thegaram Aug 17, 2023
2ef883a
bump version
Thegaram Aug 17, 2023
c0e9f03
Merge branch 'develop' into store-skipped-transactions
Thegaram Aug 18, 2023
4f4f880
fix test
Thegaram Aug 18, 2023
b3f8a5d
Merge branch 'develop' into store-skipped-transactions
Thegaram Aug 19, 2023
adfb3f4
Merge branch 'develop' into store-skipped-transactions
Thegaram Aug 22, 2023
e9c6882
include L2 txs in skipped index
Thegaram Aug 22, 2023
b005178
goimports
Thegaram Aug 22, 2023
a43706e
rename more
Thegaram Aug 22, 2023
05f1457
Merge branch 'develop' into store-skipped-transactions
Thegaram Aug 22, 2023
a3e1e78
bump version
Thegaram Aug 22, 2023
3dc1489
fix missed renames
Thegaram Aug 22, 2023
26bbdde
Merge branch 'develop' into store-skipped-transactions
Thegaram Aug 22, 2023
2f8ef6c
fix the bug when calculating l2TxCount. (#479)
mask-pp Aug 23, 2023
44e936c
fix: exclude L1 message from block payload size validation (#476)
Thegaram Aug 23, 2023
55f04ca
fix: update row estimation with scroll-prover `v0.7.2` (#475)
silathdiir Aug 23, 2023
1d1c749
refactor: simplify ccc revert to snapshot (#480)
iczc Aug 24, 2023
053a7fd
feat: use --gcmode=archive and --cache.noprefetch=true by default (#482)
Thegaram Aug 24, 2023
9c72dd2
feat(sdk): support compressed response (#469)
mask-pp Aug 24, 2023
9ae17a9
fix: disable pruning and prefetch if not flags are provided (#483)
Thegaram Aug 24, 2023
5b414c3
fix: update libzkp to use scroll-prover `v0.7.5` (#484)
silathdiir Aug 25, 2023
ac18f11
address comments
Thegaram Aug 25, 2023
afc5b88
bump version
Thegaram Aug 25, 2023
9c7cb6c
Merge branch 'develop' into store-skipped-transactions
Thegaram Aug 25, 2023
59f113c
nit
Thegaram Aug 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions core/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ func (v *BlockValidator) ValidateL1Messages(block *types.Block) error {
return nil
}

blockHash := block.Hash()

if v.config.Scroll.L1Config == nil {
// TODO: should we allow follower nodes to skip L1 message verification?
panic("Running on L1Message-enabled network but no l1Config was provided")
Expand Down Expand Up @@ -168,13 +170,16 @@ func (v *BlockValidator) ValidateL1Messages(block *types.Block) error {
// skipped messages
// TODO: consider verifying that skipped messages overflow
for index := queueIndex; index < txQueueIndex; index++ {
log.Debug("Skipped L1 message", "queueIndex", index, "tx", tx.Hash().String(), "block", block.Hash().String())

if exists := it.Next(); !exists {
// the message in this block is not available in our local db.
// we'll reprocess this block at a later time.
return consensus.ErrMissingL1MessageData
}

l1msg := it.L1Message()
skippedTx := types.NewTx(&l1msg)
log.Debug("Skipped L1 message", "queueIndex", index, "tx", skippedTx.Hash().String(), "block", blockHash.String())
Thegaram marked this conversation as resolved.
Show resolved Hide resolved
rawdb.WriteSkippedTransaction(v.db, skippedTx, "unknown", block.NumberU64(), &blockHash)
}

queueIndex = txQueueIndex + 1
Expand Down
4 changes: 2 additions & 2 deletions core/rawdb/accessors_l1_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ type L1MessageIterator struct {
// IterateL1MessagesFrom creates an L1MessageIterator that iterates over
// all L1 message in the database starting at the provided enqueue index.
func IterateL1MessagesFrom(db ethdb.Iteratee, fromQueueIndex uint64) L1MessageIterator {
start := encodeQueueIndex(fromQueueIndex)
start := encodeBigEndian(fromQueueIndex)
it := db.NewIterator(l1MessagePrefix, start)
keyLength := len(l1MessagePrefix) + 8

Expand Down Expand Up @@ -180,7 +180,7 @@ func ReadL1MessagesFrom(db ethdb.Iteratee, startIndex, maxCount uint64) []types.
// The L2 block is identified by its block hash. If the L2 block contains zero
// L1 messages, this value MUST equal its parent's value.
func WriteFirstQueueIndexNotInL2Block(db ethdb.KeyValueWriter, l2BlockHash common.Hash, queueIndex uint64) {
if err := db.Put(FirstQueueIndexNotInL2BlockKey(l2BlockHash), encodeQueueIndex(queueIndex)); err != nil {
if err := db.Put(FirstQueueIndexNotInL2BlockKey(l2BlockHash), encodeBigEndian(queueIndex)); err != nil {
log.Crit("Failed to store first L1 message not in L2 block", "l2BlockHash", l2BlockHash, "err", err)
}
}
Expand Down
202 changes: 202 additions & 0 deletions core/rawdb/accessors_skipped_txs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package rawdb

import (
"bytes"
"encoding/binary"
"math/big"
"sync"

"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/ethdb"
"github.com/scroll-tech/go-ethereum/log"
"github.com/scroll-tech/go-ethereum/rlp"
)

// mutex used to avoid concurrent updates of NumSkippedTransactions
var mu sync.Mutex

// writeNumSkippedTransactions writes the number of skipped L1 transactions to the database.
func writeNumSkippedTransactions(db ethdb.KeyValueWriter, numSkipped uint64) {
value := big.NewInt(0).SetUint64(numSkipped).Bytes()

if err := db.Put(numSkippedTransactionsKey, value); err != nil {
log.Crit("Failed to update the number of skipped L1 transactions", "err", err)
}
}

// ReadNumSkippedTransactions retrieves the number of skipped messages.
func ReadNumSkippedTransactions(db ethdb.Reader) uint64 {
data, err := db.Get(numSkippedTransactionsKey)
if err != nil && isNotFoundErr(err) {
return 0
}
if err != nil {
log.Crit("Failed to read number of skipped L1 transactions from database", "err", err)
}
if len(data) == 0 {
return 0
}

number := new(big.Int).SetBytes(data)
if !number.IsUint64() {
log.Crit("Unexpected number of skipped L1 transactions in database", "number", number)
}
return number.Uint64()
}

// SkippedTransaction stores the transaction object, along with the skip reason and block context.
type SkippedTransaction struct {
// Tx is the skipped transaction.
// We store the tx itself because otherwise geth will discard it after skipping.
Tx *types.Transaction

// Reason is the skip reason.
Reason string

// BlockNumber is the number of the block in which this transaction was skipped.
BlockNumber uint64

// BlockNumber is the hash of the block in which this transaction was skipped or nil.
BlockHash *common.Hash
}

// writeSkippedTransaction writes a skipped transaction to the database.
func writeSkippedTransaction(db ethdb.KeyValueWriter, tx *types.Transaction, reason string, blockNumber uint64, blockHash *common.Hash) {
// workaround: RLP decoding fails if this is nil
if blockHash == nil {
blockHash = &common.Hash{}
}
stx := SkippedTransaction{Tx: tx, Reason: reason, BlockNumber: blockNumber, BlockHash: blockHash}
bytes, err := rlp.EncodeToBytes(stx)
if err != nil {
log.Crit("Failed to RLP encode skipped transaction", "err", err)
}
if err := db.Put(SkippedTransactionKey(tx.Hash()), bytes); err != nil {
log.Crit("Failed to store skipped transaction", "hash", tx.Hash().String(), "err", err)
}
}

// readSkippedTransactionRLP retrieves a skipped transaction in its raw RLP database encoding.
func readSkippedTransactionRLP(db ethdb.Reader, txHash common.Hash) rlp.RawValue {
data, err := db.Get(SkippedTransactionKey(txHash))
if err != nil && isNotFoundErr(err) {
return nil
}
if err != nil {
log.Crit("Failed to load skipped transaction", "hash", txHash.String(), "err", err)
}
return data
}

// ReadSkippedTransaction retrieves a skipped transaction by its hash, along with its skipped reason.
func ReadSkippedTransaction(db ethdb.Reader, txHash common.Hash) *SkippedTransaction {
data := readSkippedTransactionRLP(db, txHash)
if len(data) == 0 {
return nil
}
var stx SkippedTransaction
if err := rlp.Decode(bytes.NewReader(data), &stx); err != nil {
log.Crit("Invalid skipped transaction RLP", "hash", txHash.String(), "data", data, "err", err)
}
if stx.BlockHash != nil && *stx.BlockHash == (common.Hash{}) {
stx.BlockHash = nil
}
return &stx
}

// writeSkippedTransactionHash writes the hash of a skipped transaction to the database.
func writeSkippedTransactionHash(db ethdb.KeyValueWriter, index uint64, txHash common.Hash) {
if err := db.Put(SkippedTransactionHashKey(index), txHash[:]); err != nil {
log.Crit("Failed to store skipped transaction hash", "index", index, "hash", txHash.String(), "err", err)
}
}

// ReadSkippedTransactionHash retrieves the hash of a skipped transaction by its index.
func ReadSkippedTransactionHash(db ethdb.Reader, index uint64) *common.Hash {
data, err := db.Get(SkippedTransactionHashKey(index))
if err != nil && isNotFoundErr(err) {
return nil
}
if err != nil {
log.Crit("Failed to load skipped transaction hash", "index", index, "err", err)
}
hash := common.BytesToHash(data)
return &hash
}

// WriteSkippedTransaction writes a skipped transaction to the database and also updates the count and lookup index.
// Note: The lookup index and count will include duplicates if there are chain reorgs.
func WriteSkippedTransaction(db ethdb.Database, tx *types.Transaction, reason string, blockNumber uint64, blockHash *common.Hash) {
// this method is not accessed concurrently, but just to be sure...
mu.Lock()
defer mu.Unlock()

index := ReadNumSkippedTransactions(db)

// update in a batch
batch := db.NewBatch()
writeSkippedTransaction(db, tx, reason, blockNumber, blockHash)
writeSkippedTransactionHash(db, index, tx.Hash())
writeNumSkippedTransactions(db, index+1)

// write to DB
if err := batch.Write(); err != nil {
log.Crit("Failed to store skipped transaction", "hash", tx.Hash().String(), "err", err)
}
}

// SkippedTransactionIterator is a wrapper around ethdb.Iterator that
// allows us to iterate over skipped transaction hashes in the database.
// It implements an interface similar to ethdb.Iterator.
type SkippedTransactionIterator struct {
inner ethdb.Iterator
db ethdb.Reader
keyLength int
}

// IterateSkippedTransactionsFrom creates a SkippedTransactionIterator that iterates
// over all skipped transaction hashes in the database starting at the provided index.
func IterateSkippedTransactionsFrom(db ethdb.Database, index uint64) SkippedTransactionIterator {
start := encodeBigEndian(index)
it := db.NewIterator(skippedTransactionHashPrefix, start)
keyLength := len(skippedTransactionHashPrefix) + 8

return SkippedTransactionIterator{
inner: it,
db: db,
keyLength: keyLength,
}
}

// Next moves the iterator to the next key/value pair.
// It returns false when the iterator is exhausted.
// TODO: Consider reading items in batches.
func (it *SkippedTransactionIterator) Next() bool {
for it.inner.Next() {
key := it.inner.Key()
if len(key) == it.keyLength {
return true
}
}
return false
}

// Index returns the index of the current skipped transaction hash.
func (it *SkippedTransactionIterator) Index() uint64 {
key := it.inner.Key()
raw := key[len(skippedTransactionHashPrefix) : len(skippedTransactionHashPrefix)+8]
index := binary.BigEndian.Uint64(raw)
return index
}

// TransactionHash returns the current skipped transaction hash.
func (it *SkippedTransactionIterator) TransactionHash() common.Hash {
data := it.inner.Value()
return common.BytesToHash(data)
}

// Release releases the associated resources.
func (it *SkippedTransactionIterator) Release() {
it.inner.Release()
}
134 changes: 134 additions & 0 deletions core/rawdb/accessors_skipped_txs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package rawdb

import (
"math/big"
"sync"
"testing"

"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/core/types"
)

func TestReadWriteNumSkippedTransactions(t *testing.T) {
blockNumbers := []uint64{
1,
1 << 2,
1 << 8,
1 << 16,
1 << 32,
}

db := NewMemoryDatabase()
for _, num := range blockNumbers {
writeNumSkippedTransactions(db, num)
got := ReadNumSkippedTransactions(db)

if got != num {
t.Fatal("Num skipped transactions mismatch", "expected", num, "got", got)
}
}
}

func newTestTransaction(queueIndex uint64) *types.Transaction {
l1msg := types.L1MessageTx{
QueueIndex: queueIndex,
Gas: 0,
To: &common.Address{},
Value: big.NewInt(0),
Data: nil,
Sender: common.Address{},
}
return types.NewTx(&l1msg)
}

func TestReadWriteSkippedTransactionNoIndex(t *testing.T) {
tx := newTestTransaction(123)
db := NewMemoryDatabase()
writeSkippedTransaction(db, tx, "random reason", 1, &common.Hash{1})
got := ReadSkippedTransaction(db, tx.Hash())
if got == nil || got.Tx.Hash() != tx.Hash() || got.Reason != "random reason" || got.BlockNumber != 1 || got.BlockHash == nil || *got.BlockHash != (common.Hash{1}) {
t.Fatal("Skipped transaction mismatch", "got", got)
}
}

func TestReadWriteSkippedTransaction(t *testing.T) {
tx := newTestTransaction(123)
db := NewMemoryDatabase()
WriteSkippedTransaction(db, tx, "random reason", 1, &common.Hash{1})
got := ReadSkippedTransaction(db, tx.Hash())
if got == nil || got.Tx.Hash() != tx.Hash() || got.Reason != "random reason" || got.BlockNumber != 1 || got.BlockHash == nil || *got.BlockHash != (common.Hash{1}) {
t.Fatal("Skipped transaction mismatch", "got", got)
}
count := ReadNumSkippedTransactions(db)
if count != 1 {
t.Fatal("Skipped transaction count mismatch", "expected", 1, "got", count)
}
hash := ReadSkippedTransactionHash(db, 0)
if hash == nil || *hash != tx.Hash() {
t.Fatal("Skipped L1 message hash mismatch", "expected", tx.Hash(), "got", hash)
}
}

func TestSkippedTransactionConcurrentUpdate(t *testing.T) {
count := 20
tx := newTestTransaction(123)
db := NewMemoryDatabase()
var wg sync.WaitGroup
for ii := 0; ii < count; ii++ {
wg.Add(1)
go func() {
defer wg.Done()
WriteSkippedTransaction(db, tx, "random reason", 1, &common.Hash{1})
}()
}
wg.Wait()
got := ReadNumSkippedTransactions(db)
if got != uint64(count) {
t.Fatal("Skipped transaction count mismatch", "expected", count, "got", got)
}
}

func TestIterateSkippedTransactions(t *testing.T) {
db := NewMemoryDatabase()

txs := []*types.Transaction{
newTestTransaction(1),
newTestTransaction(2),
newTestTransaction(3),
newTestTransaction(4),
newTestTransaction(5),
}

for _, tx := range txs {
WriteSkippedTransaction(db, tx, "random reason", 1, &common.Hash{1})
}

// simulate skipped L2 tx that's not included in the index
l2tx := newTestTransaction(6)
writeSkippedTransaction(db, l2tx, "random reason", 1, &common.Hash{1})

it := IterateSkippedTransactionsFrom(db, 2)
defer it.Release()

for ii := 2; ii < len(txs); ii++ {
finished := !it.Next()
if finished {
t.Fatal("Iterator terminated early", "ii", ii)
}

index := it.Index()
if index != uint64(ii) {
t.Fatal("Invalid skipped transaction index", "expected", ii, "got", index)
}

hash := it.TransactionHash()
if hash != txs[ii].Hash() {
t.Fatal("Invalid skipped transaction hash", "expected", txs[ii].Hash(), "got", hash)
}
}

finished := !it.Next()
if !finished {
t.Fatal("Iterator did not terminate")
}
}
Loading
Loading