From 737e9482b629f8d9bc4c9a7ee0418ab49492771f Mon Sep 17 00:00:00 2001 From: Jeffery Walsh Date: Mon, 14 Nov 2022 09:19:25 -0800 Subject: [PATCH 1/3] resync mode option which will allow restarting from block 0 --- packages/relayer/cli/cli.go | 11 ++-- packages/relayer/cmd/main.go | 10 +-- packages/relayer/{cli/types.go => flags.go} | 2 +- .../relayer/indexer/filter_then_subscribe.go | 62 +++++++++++++------ 4 files changed, 53 insertions(+), 32 deletions(-) rename packages/relayer/{cli/types.go => flags.go} (94%) diff --git a/packages/relayer/cli/cli.go b/packages/relayer/cli/cli.go index bd464e9acf7..46cfbb19b42 100644 --- a/packages/relayer/cli/cli.go +++ b/packages/relayer/cli/cli.go @@ -35,8 +35,7 @@ var ( } ) -// TODO: implement `resync` mode to wipe DB and restart from block 0 -func Run(mode Mode, layer Layer) { +func Run(mode relayer.Mode, layer relayer.Layer) { if err := loadAndValidateEnv(); err != nil { log.Fatal(err) } @@ -68,7 +67,7 @@ func Run(mode Mode, layer Layer) { for _, i := range indexers { go func(i *indexer.Service) { - if err := i.FilterThenSubscribe(context.Background()); err != nil { + if err := i.FilterThenSubscribe(context.Background(), mode); err != nil { log.Fatal(err) } }(i) @@ -77,7 +76,7 @@ func Run(mode Mode, layer Layer) { <-forever } -func makeIndexers(layer Layer, db *gorm.DB) ([]*indexer.Service, func(), error) { +func makeIndexers(layer relayer.Layer, db *gorm.DB) ([]*indexer.Service, func(), error) { eventRepository, err := repo.NewEventRepository(db) if err != nil { return nil, nil, err @@ -110,7 +109,7 @@ func makeIndexers(layer Layer, db *gorm.DB) ([]*indexer.Service, func(), error) indexers := make([]*indexer.Service, 0) - if layer == L1 || layer == Both { + if layer == relayer.L1 || layer == relayer.Both { l1Indexer, err := indexer.NewService(indexer.NewServiceOpts{ EventRepo: eventRepository, BlockRepo: blockRepository, @@ -131,7 +130,7 @@ func makeIndexers(layer Layer, db *gorm.DB) ([]*indexer.Service, func(), error) indexers = append(indexers, l1Indexer) } - if layer == L2 || layer == Both { + if layer == relayer.L2 || layer == relayer.Both { l2Indexer, err := indexer.NewService(indexer.NewServiceOpts{ EventRepo: eventRepository, BlockRepo: blockRepository, diff --git a/packages/relayer/cmd/main.go b/packages/relayer/cmd/main.go index 1a30a93de9e..4a23067f7fc 100644 --- a/packages/relayer/cmd/main.go +++ b/packages/relayer/cmd/main.go @@ -9,14 +9,14 @@ import ( ) func main() { - modePtr := flag.String("mode", string(cli.SyncMode), `mode to run in. + modePtr := flag.String("mode", string(relayer.SyncMode), `mode to run in. options: sync: continue syncing from previous block resync: restart syncing from block 0 fromBlock: restart syncing from specified block number `) - layersPtr := flag.String("layers", string(cli.Both), `layers to watch and process. + layersPtr := flag.String("layers", string(relayer.Both), `layers to watch and process. options: l1: only watch l1 => l2 bridge messages l2: only watch l2 => l1 bridge messages @@ -25,13 +25,13 @@ func main() { flag.Parse() - if !relayer.IsInSlice(cli.Mode(*modePtr), cli.Modes) { + if !relayer.IsInSlice(relayer.Mode(*modePtr), relayer.Modes) { log.Fatal("mode not valid") } - if !relayer.IsInSlice(cli.Layer(*layersPtr), cli.Layers) { + if !relayer.IsInSlice(relayer.Layer(*layersPtr), relayer.Layers) { log.Fatal("mode not valid") } - cli.Run(cli.Mode(*modePtr), cli.Layer(*layersPtr)) + cli.Run(relayer.Mode(*modePtr), relayer.Layer(*layersPtr)) } diff --git a/packages/relayer/cli/types.go b/packages/relayer/flags.go similarity index 94% rename from packages/relayer/cli/types.go rename to packages/relayer/flags.go index 163327a6a47..4baeee3c30e 100644 --- a/packages/relayer/cli/types.go +++ b/packages/relayer/flags.go @@ -1,4 +1,4 @@ -package cli +package relayer type Mode string diff --git a/packages/relayer/indexer/filter_then_subscribe.go b/packages/relayer/indexer/filter_then_subscribe.go index 675bda481d4..1fd33394169 100644 --- a/packages/relayer/indexer/filter_then_subscribe.go +++ b/packages/relayer/indexer/filter_then_subscribe.go @@ -20,22 +20,17 @@ var ( // FilterThenSubscribe gets the most recent block height that has been indexed, and works it's way // up to the latest block. As it goes, it tries to process messages. // When it catches up, it then starts to Subscribe to latest events as they come in. -func (svc *Service) FilterThenSubscribe(ctx context.Context) error { +func (svc *Service) FilterThenSubscribe(ctx context.Context, mode relayer.Mode) error { chainID, err := svc.ethClient.ChainID(ctx) if err != nil { - return errors.Wrap(err, "s.ethClient.ChainID()") + return errors.Wrap(err, "svc.ethClient.ChainID()") } - // get most recently processed block height from the DB - latestProcessedBlock, err := svc.blockRepo.GetLatestBlockProcessedForEvent( - eventName, - chainID, - ) - if err != nil { - return errors.Wrap(err, "s.blockRepo.GetLatestBlock()") + if err := svc.setInitialProcessingBlockByMode(ctx, mode, chainID); err != nil { + return errors.Wrap(err, "svc.setInitialProcessingBlockByMode") } - log.Infof("latest processed block: %v", latestProcessedBlock.Height) + log.Infof("processing from block height: %v", svc.processingBlock.Height) if err != nil { return errors.Wrap(err, "bridge.FilterMessageSent") @@ -43,19 +38,16 @@ func (svc *Service) FilterThenSubscribe(ctx context.Context) error { header, err := svc.ethClient.HeaderByNumber(ctx, nil) if err != nil { - return errors.Wrap(err, "s.ethClient.HeaderByNumber") + return errors.Wrap(err, "svc.ethClient.HeaderByNumber") } - // if we have already done the latest block, exit early - // TODO: call SubscribeMessageSent, as we can now just watch the chain for new blocks - if latestProcessedBlock.Height == header.Number.Uint64() { + if svc.processingBlock.Height == header.Number.Uint64() { + log.Info("caught up, subscribing to new incoming events") return svc.subscribe(ctx, chainID) } const batchSize = 1000 - svc.processingBlock = latestProcessedBlock - log.Infof("getting events between %v and %v in batches of %v", svc.processingBlock.Height, header.Number.Int64(), @@ -65,7 +57,7 @@ func (svc *Service) FilterThenSubscribe(ctx context.Context) error { // todo: parallelize/concurrently catch up. don't think we need to do this in order. // use WaitGroup. // we get a timeout/EOF if we don't batch. - for i := latestProcessedBlock.Height; i < header.Number.Uint64(); i += batchSize { + for i := svc.processingBlock.Height; i < header.Number.Uint64(); i += batchSize { var end uint64 = svc.processingBlock.Height + batchSize // if the end of the batch is greater than the latest block number, set end // to the latest block number @@ -76,7 +68,7 @@ func (svc *Service) FilterThenSubscribe(ctx context.Context) error { log.Infof("batch from %v to %v", i, end) events, err := svc.bridge.FilterMessageSent(&bind.FilterOpts{ - Start: latestProcessedBlock.Height + uint64(1), + Start: svc.processingBlock.Height, End: &end, Context: ctx, }, nil) @@ -117,7 +109,7 @@ func (svc *Service) FilterThenSubscribe(ctx context.Context) error { } if svc.processingBlock.Height < latestBlock.Number.Uint64() { - return svc.FilterThenSubscribe(ctx) + return svc.FilterThenSubscribe(ctx, relayer.SyncMode) } return svc.subscribe(ctx, chainID) @@ -207,7 +199,6 @@ func (svc *Service) handleEvent(ctx context.Context, chainID *big.Int, event *co // we can now consider that previous block processed. save it to the DB // and bump the block number. if raw.BlockNumber > svc.processingBlock.Height { - log.Info("raw blockNumber > processingBlock.height") log.Infof("saving new latest processed block to DB: %v", raw.BlockNumber) if err := svc.blockRepo.Save(relayer.SaveBlockOpts{ @@ -283,3 +274,34 @@ func (svc *Service) handleNoEventsInBatch(ctx context.Context, chainID *big.Int, return nil } + +func (svc *Service) setInitialProcessingBlockByMode( + ctx context.Context, + mode relayer.Mode, + chainID *big.Int, +) error { + if mode == relayer.SyncMode { + // get most recently processed block height from the DB + latestProcessedBlock, err := svc.blockRepo.GetLatestBlockProcessedForEvent( + eventName, + chainID, + ) + if err != nil { + return errors.Wrap(err, "s.blockRepo.GetLatestBlock()") + } + + svc.processingBlock = latestProcessedBlock + } else if mode == relayer.ResyncMode { + block, err := svc.ethClient.BlockByNumber(ctx, big.NewInt(0)) + if err != nil { + return errors.Wrap(err, "s.blockRepo.GetLatestBlock()") + } + + svc.processingBlock = &relayer.Block{ + Height: block.NumberU64(), + Hash: block.Hash().Hex(), + } + } + + return nil +} From c0973ddc411b192d738f8469c9955956d028aa91 Mon Sep 17 00:00:00 2001 From: Jeffery Walsh Date: Mon, 14 Nov 2022 09:46:05 -0800 Subject: [PATCH 2/3] ethClient interface, mocks for block repository and ethClient, tests for setting initial processing block --- .../relayer/indexer/filter_then_subscribe.go | 31 ---------- packages/relayer/indexer/service.go | 10 +++- packages/relayer/indexer/service_test.go | 9 +++ .../set_initial_processing_block_by_mode.go | 40 +++++++++++++ ...t_initial_processing_block_by_mode_test.go | 58 +++++++++++++++++++ packages/relayer/mock/block_repository.go | 31 ++++++++++ packages/relayer/mock/eth_client.go | 25 ++++++++ 7 files changed, 172 insertions(+), 32 deletions(-) create mode 100644 packages/relayer/indexer/set_initial_processing_block_by_mode.go create mode 100644 packages/relayer/indexer/set_initial_processing_block_by_mode_test.go create mode 100644 packages/relayer/mock/block_repository.go create mode 100644 packages/relayer/mock/eth_client.go diff --git a/packages/relayer/indexer/filter_then_subscribe.go b/packages/relayer/indexer/filter_then_subscribe.go index 1fd33394169..f3487e70281 100644 --- a/packages/relayer/indexer/filter_then_subscribe.go +++ b/packages/relayer/indexer/filter_then_subscribe.go @@ -274,34 +274,3 @@ func (svc *Service) handleNoEventsInBatch(ctx context.Context, chainID *big.Int, return nil } - -func (svc *Service) setInitialProcessingBlockByMode( - ctx context.Context, - mode relayer.Mode, - chainID *big.Int, -) error { - if mode == relayer.SyncMode { - // get most recently processed block height from the DB - latestProcessedBlock, err := svc.blockRepo.GetLatestBlockProcessedForEvent( - eventName, - chainID, - ) - if err != nil { - return errors.Wrap(err, "s.blockRepo.GetLatestBlock()") - } - - svc.processingBlock = latestProcessedBlock - } else if mode == relayer.ResyncMode { - block, err := svc.ethClient.BlockByNumber(ctx, big.NewInt(0)) - if err != nil { - return errors.Wrap(err, "s.blockRepo.GetLatestBlock()") - } - - svc.processingBlock = &relayer.Block{ - Height: block.NumberU64(), - Hash: block.Hash().Hex(), - } - } - - return nil -} diff --git a/packages/relayer/indexer/service.go b/packages/relayer/indexer/service.go index b8e03713423..4b98e26a2e9 100644 --- a/packages/relayer/indexer/service.go +++ b/packages/relayer/indexer/service.go @@ -1,10 +1,13 @@ package indexer import ( + "context" "crypto/ecdsa" + "math/big" "github.com/cyberhorsey/errors" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/rpc" @@ -18,10 +21,15 @@ var ( ZeroAddress = common.HexToAddress("0x0000000000000000000000000000000000000000") ) +type ethClient interface { + ChainID(ctx context.Context) (*big.Int, error) + HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) +} + type Service struct { eventRepo relayer.EventRepository blockRepo relayer.BlockRepository - ethClient *ethclient.Client + ethClient ethClient destRPC *rpc.Client processingBlock *relayer.Block diff --git a/packages/relayer/indexer/service_test.go b/packages/relayer/indexer/service_test.go index 94ad7ad6e66..a9437fe372d 100644 --- a/packages/relayer/indexer/service_test.go +++ b/packages/relayer/indexer/service_test.go @@ -7,6 +7,7 @@ import ( "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/rpc" "github.com/taikochain/taiko-mono/packages/relayer" + "github.com/taikochain/taiko-mono/packages/relayer/mock" "github.com/taikochain/taiko-mono/packages/relayer/repo" "gopkg.in/go-playground/assert.v1" ) @@ -14,6 +15,14 @@ import ( var dummyEcdsaKey = "8da4ef21b864d2cc526dbdb2a120bd2874c36c9d0a1fb7f8c63d7f7a8b41de8f" var dummyAddress = "0x63FaC9201494f0bd17B9892B9fae4d52fe3BD377" +func newTestService() *Service { + return &Service{ + blockRepo: &mock.BlockRepository{}, + ethClient: &mock.EthClient{}, + + processingBlock: &relayer.Block{}, + } +} func Test_NewService(t *testing.T) { tests := []struct { name string diff --git a/packages/relayer/indexer/set_initial_processing_block_by_mode.go b/packages/relayer/indexer/set_initial_processing_block_by_mode.go new file mode 100644 index 00000000000..91684c496cb --- /dev/null +++ b/packages/relayer/indexer/set_initial_processing_block_by_mode.go @@ -0,0 +1,40 @@ +package indexer + +import ( + "context" + "math/big" + + "github.com/pkg/errors" + "github.com/taikochain/taiko-mono/packages/relayer" +) + +func (svc *Service) setInitialProcessingBlockByMode( + ctx context.Context, + mode relayer.Mode, + chainID *big.Int, +) error { + if mode == relayer.SyncMode { + // get most recently processed block height from the DB + latestProcessedBlock, err := svc.blockRepo.GetLatestBlockProcessedForEvent( + eventName, + chainID, + ) + if err != nil { + return errors.Wrap(err, "s.blockRepo.GetLatestBlock()") + } + + svc.processingBlock = latestProcessedBlock + } else if mode == relayer.ResyncMode { + header, err := svc.ethClient.HeaderByNumber(ctx, big.NewInt(0)) + if err != nil { + return errors.Wrap(err, "s.blockRepo.GetLatestBlock()") + } + + svc.processingBlock = &relayer.Block{ + Height: header.Number.Uint64(), + Hash: header.Hash().Hex(), + } + } + + return nil +} diff --git a/packages/relayer/indexer/set_initial_processing_block_by_mode_test.go b/packages/relayer/indexer/set_initial_processing_block_by_mode_test.go new file mode 100644 index 00000000000..460c0bffea9 --- /dev/null +++ b/packages/relayer/indexer/set_initial_processing_block_by_mode_test.go @@ -0,0 +1,58 @@ +package indexer + +import ( + "context" + "math/big" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/taikochain/taiko-mono/packages/relayer" + "github.com/taikochain/taiko-mono/packages/relayer/mock" +) + +func Test_SetInitialProcessingBlockByMode(t *testing.T) { + tests := []struct { + name string + mode relayer.Mode + chainID *big.Int + wantErr bool + wantHeight uint64 + }{ + { + "resync", + relayer.ResyncMode, + mock.MockChainID, + false, + 0, + }, + { + "sync", + relayer.SyncMode, + mock.MockChainID, + false, + mock.LatestBlock.Height, + }, + { + "sync error getting latest block", + relayer.SyncMode, + big.NewInt(328938), + true, + 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + svc := newTestService() + err := svc.setInitialProcessingBlockByMode( + context.Background(), + tt.mode, + tt.chainID, + ) + + assert.Equal(t, tt.wantErr, err != nil) + + assert.Equal(t, tt.wantHeight, svc.processingBlock.Height) + }) + } +} diff --git a/packages/relayer/mock/block_repository.go b/packages/relayer/mock/block_repository.go new file mode 100644 index 00000000000..29635b6e7f3 --- /dev/null +++ b/packages/relayer/mock/block_repository.go @@ -0,0 +1,31 @@ +package mock + +import ( + "errors" + "math/big" + + "github.com/taikochain/taiko-mono/packages/relayer" +) + +var ( + LatestBlock = &relayer.Block{ + Height: 100, + Hash: "0x", + ChainID: MockChainID.Int64(), + } +) + +type BlockRepository struct { +} + +func (r *BlockRepository) Save(opts relayer.SaveBlockOpts) error { + return nil +} + +func (r *BlockRepository) GetLatestBlockProcessedForEvent(eventName string, chainID *big.Int) (*relayer.Block, error) { + if chainID.Int64() != MockChainID.Int64() { + return nil, errors.New("error getting latest block processed for event") + } + + return LatestBlock, nil +} diff --git a/packages/relayer/mock/eth_client.go b/packages/relayer/mock/eth_client.go new file mode 100644 index 00000000000..4e9079dd5a7 --- /dev/null +++ b/packages/relayer/mock/eth_client.go @@ -0,0 +1,25 @@ +package mock + +import ( + "context" + "math/big" + + "github.com/ethereum/go-ethereum/core/types" +) + +var ( + MockChainID = big.NewInt(167001) +) + +type EthClient struct { +} + +func (c *EthClient) ChainID(ctx context.Context) (*big.Int, error) { + return MockChainID, nil +} + +func (c *EthClient) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { + return &types.Header{ + Number: number, + }, nil +} From f1d46d0ba4fc90e1ae1fcaead2506e8464a4f56c Mon Sep 17 00:00:00 2001 From: Jeffery Walsh Date: Mon, 14 Nov 2022 09:51:36 -0800 Subject: [PATCH 3/3] refactor set initial processing block by mode to check the mode and return error if it's not supported --- packages/relayer/errors.go | 1 + .../indexer/set_initial_processing_block_by_mode.go | 13 +++++++++---- .../set_initial_processing_block_by_mode_test.go | 7 +++++++ 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/packages/relayer/errors.go b/packages/relayer/errors.go index db47dd73295..254dc5571e4 100644 --- a/packages/relayer/errors.go +++ b/packages/relayer/errors.go @@ -15,4 +15,5 @@ var ( ErrNoRPCClient = errors.Validation.NewWithKeyAndDetail("ERR_NO_RPC_CLIENT", "RPCClient is required") ErrNoBridge = errors.Validation.NewWithKeyAndDetail("ERR_NO_BRIDGE", "Bridge is required") ErrNoTaikoL2 = errors.Validation.NewWithKeyAndDetail("ERR_NO_TAIKO_L2", "TaikoL2 is required") + ErrInvalidMode = errors.Validation.NewWithKeyAndDetail("ERR_INVALID_MODE", "Mode not supported") ) diff --git a/packages/relayer/indexer/set_initial_processing_block_by_mode.go b/packages/relayer/indexer/set_initial_processing_block_by_mode.go index 91684c496cb..f43b9af0714 100644 --- a/packages/relayer/indexer/set_initial_processing_block_by_mode.go +++ b/packages/relayer/indexer/set_initial_processing_block_by_mode.go @@ -13,7 +13,8 @@ func (svc *Service) setInitialProcessingBlockByMode( mode relayer.Mode, chainID *big.Int, ) error { - if mode == relayer.SyncMode { + switch mode { + case relayer.SyncMode: // get most recently processed block height from the DB latestProcessedBlock, err := svc.blockRepo.GetLatestBlockProcessedForEvent( eventName, @@ -24,7 +25,9 @@ func (svc *Service) setInitialProcessingBlockByMode( } svc.processingBlock = latestProcessedBlock - } else if mode == relayer.ResyncMode { + + return nil + case relayer.ResyncMode: header, err := svc.ethClient.HeaderByNumber(ctx, big.NewInt(0)) if err != nil { return errors.Wrap(err, "s.blockRepo.GetLatestBlock()") @@ -34,7 +37,9 @@ func (svc *Service) setInitialProcessingBlockByMode( Height: header.Number.Uint64(), Hash: header.Hash().Hex(), } - } - return nil + return nil + default: + return relayer.ErrInvalidMode + } } diff --git a/packages/relayer/indexer/set_initial_processing_block_by_mode_test.go b/packages/relayer/indexer/set_initial_processing_block_by_mode_test.go index 460c0bffea9..16e158c2b33 100644 --- a/packages/relayer/indexer/set_initial_processing_block_by_mode_test.go +++ b/packages/relayer/indexer/set_initial_processing_block_by_mode_test.go @@ -39,6 +39,13 @@ func Test_SetInitialProcessingBlockByMode(t *testing.T) { true, 0, }, + { + "invalidMode", + relayer.Mode("fake"), + mock.MockChainID, + true, + 0, + }, } for _, tt := range tests {