Skip to content
This repository has been archived by the owner on May 11, 2024. It is now read-only.

Commit

Permalink
feat(pkg): improve / simplify reorg check logic (#647)
Browse files Browse the repository at this point in the history
Co-authored-by: maskpp <maskpp266@gmail.com>
Co-authored-by: David <david@taiko.xyz>
  • Loading branch information
3 people authored Mar 22, 2024
1 parent 9ac73ba commit 0b08772
Show file tree
Hide file tree
Showing 10 changed files with 212 additions and 43 deletions.
9 changes: 9 additions & 0 deletions cmd/flags/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ var (
Usage: "HTTP RPC endpoint of another synced L2 execution engine node",
Category: driverCategory,
}
// syncer specific flag
MaxExponent = &cli.Uint64Flag{
Name: "syncer.maxExponent",
Usage: "Maximum exponent of retrieving L1 blocks when there is a mismatch between protocol and L2 EE," +
"0 means that it is reset to the genesis height",
Value: 0,
Category: driverCategory,
}
)

// DriverFlags All driver flags.
Expand All @@ -54,4 +62,5 @@ var DriverFlags = MergeFlags(CommonFlags, []cli.Flag{
P2PSyncVerifiedBlocks,
P2PSyncTimeout,
CheckPointSyncURL,
MaxExponent,
})
128 changes: 98 additions & 30 deletions driver/chain_syncer/calldata/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"

ethereum "github.com/ethereum/go-ethereum"
"github.com/taikoxyz/taiko-client/bindings"
"github.com/taikoxyz/taiko-client/bindings/encoding"
anchorTxConstructor "github.com/taikoxyz/taiko-client/driver/anchor_tx_constructor"
Expand All @@ -40,6 +41,7 @@ type Syncer struct {
// Used by BlockInserter
lastInsertedBlockID *big.Int
reorgDetectedFlag bool
maxRetrieveExponent uint64
}

// NewSyncer creates a new syncer instance.
Expand All @@ -48,6 +50,7 @@ func NewSyncer(
client *rpc.Client,
state *state.State,
progressTracker *beaconsync.SyncProgressTracker,
maxRetrieveExponent uint64,
) (*Syncer, error) {
configs, err := client.TaikoL1.GetConfig(&bind.CallOpts{Context: ctx})
if err != nil {
Expand All @@ -70,6 +73,7 @@ func NewSyncer(
rpc.BlockMaxTxListBytes,
client.L2.ChainID,
),
maxRetrieveExponent: maxRetrieveExponent,
}, nil
}

Expand Down Expand Up @@ -522,65 +526,129 @@ func (s *Syncer) createExecutionPayloads(

// checkLastVerifiedBlockMismatch checks if there is a mismatch between protocol's last verified block hash and
// the corresponding L2 EE block hash.
func (s *Syncer) checkLastVerifiedBlockMismatch(ctx context.Context) (bool, error) {
func (s *Syncer) checkLastVerifiedBlockMismatch(ctx context.Context) (*rpc.ReorgCheckResult, error) {
var (
reorgCheckResult = new(rpc.ReorgCheckResult)
err error
)

stateVars, err := s.rpc.GetProtocolStateVariables(&bind.CallOpts{Context: ctx})
if err != nil {
return false, err
return nil, err
}

if s.state.GetL2Head().Number.Uint64() < stateVars.B.LastVerifiedBlockId {
return false, nil
return reorgCheckResult, nil
}

blockInfo, err := s.rpc.GetL2BlockInfo(ctx, new(big.Int).SetUint64(stateVars.B.LastVerifiedBlockId))
genesisL1Header, err := s.rpc.GetGenesisL1Header(ctx)
if err != nil {
return false, err
return nil, fmt.Errorf("failed to fetch genesis L1 header: %w", err)
}

l2Header, err := s.rpc.L2.HeaderByNumber(ctx, new(big.Int).SetUint64(stateVars.B.LastVerifiedBlockId))
reorgCheckResult, err = s.retrievePastBlock(ctx, stateVars.B.LastVerifiedBlockId, 0, genesisL1Header)
if err != nil {
return false, err
return nil, err
}

return blockInfo.Ts.BlockHash != l2Header.Hash(), nil
return reorgCheckResult, nil
}

// checkReorg checks whether the L1 chain has been reorged, and resets the L1Current cursor if necessary.
func (s *Syncer) checkReorg(
// retrievePastBlock find proper L1 header and L2 block id to reset when there is a mismatch
func (s *Syncer) retrievePastBlock(
ctx context.Context,
event *bindings.TaikoL1ClientBlockProposed,
) (*rpc.ReorgCheckResult, error) {
blockID uint64,
retries uint64,
genesisL1Header *types.Header) (*rpc.ReorgCheckResult, error) {
if retries > s.maxRetrieveExponent {
return &rpc.ReorgCheckResult{
IsReorged: true,
L1CurrentToReset: genesisL1Header,
LastHandledBlockIDToReset: new(big.Int).SetUint64(blockID),
}, nil
}

var (
reorgCheckResult = new(rpc.ReorgCheckResult)
err error
currentBlockID uint64
l1HeaderToSet = genesisL1Header
)

if val := uint64(1 << retries); blockID > val {
currentBlockID = blockID - val + 1
} else {
currentBlockID = 0
}

blockInfo, err := s.rpc.GetL2BlockInfo(ctx, new(big.Int).SetUint64(currentBlockID))
if err != nil {
return nil, err
}

l2Header, err := s.rpc.L2.HeaderByNumber(ctx, new(big.Int).SetUint64(currentBlockID))
if err != nil {
return nil, err
}
if blockInfo.Ts.BlockHash == l2Header.Hash() {
// To reduce the number of call contracts by bringing forward the termination condition judgement
if retries == 0 {
return nil, nil
}
l1Origin, err := s.rpc.L2.L1OriginByID(ctx, new(big.Int).SetUint64(currentBlockID))
if err != nil {
if err.Error() == ethereum.NotFound.Error() {
log.Info("L1Origin not found in retrievePastBlock because the L2 EE is just synced through P2P",
"blockID",
currentBlockID)
// Can't find l1Origin in L2 EE, so we call the contract to get block info
blockInfo, err := s.rpc.TaikoL1.GetBlock(&bind.CallOpts{Context: ctx}, currentBlockID)
if err != nil {
return nil, err
}
if blockInfo.Blk.ProposedIn != 0 {
l1HeaderToSet, err = s.rpc.L1.HeaderByNumber(ctx, new(big.Int).SetUint64(blockInfo.Blk.ProposedIn))
if err != nil {
return nil, err
}
}
} else {
return nil, err
}
} else {
l1HeaderToSet, err = s.rpc.L1.HeaderByNumber(ctx, l1Origin.L1BlockHeight)
if err != nil {
return nil, err
}
}
reorgCheckResult.IsReorged = retries > 0
reorgCheckResult.L1CurrentToReset = l1HeaderToSet
reorgCheckResult.LastHandledBlockIDToReset = new(big.Int).SetUint64(currentBlockID)
} else {
reorgCheckResult, err = s.retrievePastBlock(ctx, blockID, retries+1, genesisL1Header)
if err != nil {
return nil, err
}
}
return reorgCheckResult, nil
}

// checkReorg checks whether the L1 chain has been reorged, and resets the L1Current cursor if necessary.
func (s *Syncer) checkReorg(
ctx context.Context,
event *bindings.TaikoL1ClientBlockProposed,
) (*rpc.ReorgCheckResult, error) {
// If the L2 chain is at genesis, we don't need to check L1 reorg.
if s.state.GetL1Current().Number == s.state.GenesisL1Height {
return reorgCheckResult, nil
return new(rpc.ReorgCheckResult), nil
}

// 1. The latest verified block
mismatch, err := s.checkLastVerifiedBlockMismatch(ctx)
reorgCheckResult, err := s.checkLastVerifiedBlockMismatch(ctx)
if err != nil {
return nil, fmt.Errorf("failed to check if last verified block in L2 EE has been reorged: %w", err)
}

// If the latest verified block in chain is mismatched, we reset the L2 chain to genesis, and restart
// the calldata sync process.
// TODO(Gavin): improve this approach.
if mismatch {
log.Warn("The latest verified block mismatch detected, reset L2 chain to genesis")

genesisL1Header, err := s.rpc.GetGenesisL1Header(ctx)
if err != nil {
return nil, fmt.Errorf("failed to fetch genesis L1 header: %w", err)
}

reorgCheckResult.IsReorged = true
reorgCheckResult.L1CurrentToReset = genesisL1Header
reorgCheckResult.LastHandledBlockIDToReset = common.Big0
} else {
if reorgCheckResult == nil {
// 2. Parent block
reorgCheckResult, err = s.rpc.CheckL1Reorg(
ctx,
Expand Down
41 changes: 39 additions & 2 deletions driver/chain_syncer/calldata/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ type CalldataSyncerTestSuite struct {
func (s *CalldataSyncerTestSuite) SetupTest() {
s.ClientTestSuite.SetupTest()

state, err := state.New(context.Background(), s.RPCClient)
state2, err := state.New(context.Background(), s.RPCClient)
s.Nil(err)

syncer, err := NewSyncer(
context.Background(),
s.RPCClient,
state,
state2,
beaconsync.NewSyncProgressTracker(s.RPCClient.L2, 1*time.Hour),
0,
)
s.Nil(err)
s.s = syncer
Expand Down Expand Up @@ -78,6 +79,7 @@ func (s *CalldataSyncerTestSuite) TestCancelNewSyncer() {
s.RPCClient,
s.s.state,
s.s.progressTracker,
0,
)
s.Nil(syncer)
s.NotNil(err)
Expand Down Expand Up @@ -211,6 +213,41 @@ func (s *CalldataSyncerTestSuite) TestTreasuryIncome() {
s.Zero(balanceAfter.Cmp(balance))
}

func (s *CalldataSyncerTestSuite) TestRetrievePastBlock() {
syncer, err := NewSyncer(
context.Background(),
s.RPCClient,
s.s.state,
s.s.progressTracker,
5,
)
s.Nil(err)
sender := s.p.GetSender()

s.s = syncer
for i := 0; i < 10; i++ {
s.ProposeAndInsertValidBlock(s.p, s.s)
}
genesisL1Header, err := s.RPCClient.GetGenesisL1Header(context.Background())
s.Nil(err)
l1Snapshot := s.SetL1Snapshot()
for i := 0; i < 5; i++ {
s.ProposeAndInsertValidBlock(s.p, s.s)
}
s.RevertL1Snapshot(l1Snapshot)
// Because of evm_revert operation, the nonce of the proposer need to be adjusted.
s.Nil(sender.SetNonce(nil, true))
// Propose 5 blocks on another fork
for i := 0; i < 5; i++ {
s.ProposeInvalidTxListBytes(s.p)
}
reorgResult, err := s.s.retrievePastBlock(context.Background(), 12, 0, genesisL1Header)
s.Nil(err)
s.NotNil(reorgResult)
s.Equal(reorgResult.IsReorged, true)
s.GreaterOrEqual(reorgResult.L1CurrentToReset.Number.Uint64(), genesisL1Header.Number.Uint64())
}

func TestCalldataSyncerTestSuite(t *testing.T) {
suite.Run(t, new(CalldataSyncerTestSuite))
}
3 changes: 2 additions & 1 deletion driver/chain_syncer/chain_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,13 @@ func New(
state *state.State,
p2pSyncVerifiedBlocks bool,
p2pSyncTimeout time.Duration,
maxRetrieveExponent uint64,
) (*L2ChainSyncer, error) {
tracker := beaconsync.NewSyncProgressTracker(rpc.L2, p2pSyncTimeout)
go tracker.Track(ctx)

beaconSyncer := beaconsync.NewSyncer(ctx, rpc, state, tracker)
calldataSyncer, err := calldata.NewSyncer(ctx, rpc, state, tracker)
calldataSyncer, err := calldata.NewSyncer(ctx, rpc, state, tracker, maxRetrieveExponent)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions driver/chain_syncer/chain_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func (s *ChainSyncerTestSuite) SetupTest() {
state,
false,
1*time.Hour,
0,
)
s.Nil(err)
s.s = syncer
Expand Down
2 changes: 2 additions & 0 deletions driver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type Config struct {
P2PSyncTimeout time.Duration
RPCTimeout time.Duration
RetryInterval time.Duration
MaxExponent uint64
}

// NewConfigFromCliContext creates a new config instance from
Expand Down Expand Up @@ -60,5 +61,6 @@ func NewConfigFromCliContext(c *cli.Context) (*Config, error) {
P2PSyncVerifiedBlocks: p2pSyncVerifiedBlocks,
P2PSyncTimeout: c.Duration(flags.P2PSyncTimeout.Name),
RPCTimeout: timeout,
MaxExponent: c.Uint64(flags.MaxExponent.Name),
}, nil
}
1 change: 1 addition & 0 deletions driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func (d *Driver) InitFromConfig(ctx context.Context, cfg *Config) (err error) {
d.state,
cfg.P2PSyncVerifiedBlocks,
cfg.P2PSyncTimeout,
cfg.MaxExponent,
); err != nil {
return err
}
Expand Down
56 changes: 56 additions & 0 deletions internal/testutils/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,62 @@ func (s *ClientTestSuite) ProposeAndInsertValidBlock(
return event
}

func (s *ClientTestSuite) ProposeValidBlock(
proposer Proposer,
) *bindings.TaikoL1ClientBlockProposed {
l1Head, err := s.RPCClient.L1.HeaderByNumber(context.Background(), nil)
s.Nil(err)

l2Head, err := s.RPCClient.L2.HeaderByNumber(context.Background(), nil)
s.Nil(err)

// Propose txs in L2 execution engine's mempool
sink := make(chan *bindings.TaikoL1ClientBlockProposed)

sub, err := s.RPCClient.TaikoL1.WatchBlockProposed(nil, sink, nil, nil)
s.Nil(err)
defer func() {
sub.Unsubscribe()
close(sink)
}()

baseFeeInfo, err := s.RPCClient.TaikoL2.GetBasefee(nil, l1Head.Number.Uint64()+1, uint32(l2Head.GasUsed))
s.Nil(err)

nonce, err := s.RPCClient.L2.PendingNonceAt(context.Background(), s.TestAddr)
s.Nil(err)

tx := types.NewTransaction(
nonce,
common.BytesToAddress(RandomBytes(32)),
common.Big1,
100000,
baseFeeInfo.Basefee,
[]byte{},
)
signedTx, err := types.SignTx(tx, types.LatestSignerForChainID(s.RPCClient.L2.ChainID), s.TestAddrPrivKey)
s.Nil(err)
s.Nil(s.RPCClient.L2.SendTransaction(context.Background(), signedTx))

s.Nil(proposer.ProposeOp(context.Background()))

event := <-sink

_, isPending, err := s.RPCClient.L1.TransactionByHash(context.Background(), event.Raw.TxHash)
s.Nil(err)
s.False(isPending)

receipt, err := s.RPCClient.L1.TransactionReceipt(context.Background(), event.Raw.TxHash)
s.Nil(err)
s.Equal(types.ReceiptStatusSuccessful, receipt.Status)

newL1Head, err := s.RPCClient.L1.HeaderByNumber(context.Background(), nil)
s.Nil(err)
s.Greater(newL1Head.Number.Uint64(), l1Head.Number.Uint64())

return event
}

// NewTestProverServer starts a new prover server that has channel listeners to respond and react
// to requests for capacity, which provers can call.
func (s *ClientTestSuite) NewTestProverServer(
Expand Down
Loading

0 comments on commit 0b08772

Please sign in to comment.