Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(relayer): Allow resync flag option to restart processing from block 0 #266

Merged
merged 3 commits into from
Nov 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions packages/relayer/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ var (
}
)

// TODO: implement `resync` mode to wipe DB and restart from block 0
func Run(mode Mode, layer Layer) {
func Run(mode relayer.Mode, layer relayer.Layer) {
if err := loadAndValidateEnv(); err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -68,7 +67,7 @@ func Run(mode Mode, layer Layer) {

for _, i := range indexers {
go func(i *indexer.Service) {
if err := i.FilterThenSubscribe(context.Background()); err != nil {
if err := i.FilterThenSubscribe(context.Background(), mode); err != nil {
log.Fatal(err)
}
}(i)
Expand All @@ -77,7 +76,7 @@ func Run(mode Mode, layer Layer) {
<-forever
}

func makeIndexers(layer Layer, db *gorm.DB) ([]*indexer.Service, func(), error) {
func makeIndexers(layer relayer.Layer, db *gorm.DB) ([]*indexer.Service, func(), error) {
eventRepository, err := repo.NewEventRepository(db)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -110,7 +109,7 @@ func makeIndexers(layer Layer, db *gorm.DB) ([]*indexer.Service, func(), error)

indexers := make([]*indexer.Service, 0)

if layer == L1 || layer == Both {
if layer == relayer.L1 || layer == relayer.Both {
l1Indexer, err := indexer.NewService(indexer.NewServiceOpts{
EventRepo: eventRepository,
BlockRepo: blockRepository,
Expand All @@ -131,7 +130,7 @@ func makeIndexers(layer Layer, db *gorm.DB) ([]*indexer.Service, func(), error)
indexers = append(indexers, l1Indexer)
}

if layer == L2 || layer == Both {
if layer == relayer.L2 || layer == relayer.Both {
l2Indexer, err := indexer.NewService(indexer.NewServiceOpts{
EventRepo: eventRepository,
BlockRepo: blockRepository,
Expand Down
10 changes: 5 additions & 5 deletions packages/relayer/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ import (
)

func main() {
modePtr := flag.String("mode", string(cli.SyncMode), `mode to run in.
modePtr := flag.String("mode", string(relayer.SyncMode), `mode to run in.
options:
sync: continue syncing from previous block
resync: restart syncing from block 0
fromBlock: restart syncing from specified block number
`)

layersPtr := flag.String("layers", string(cli.Both), `layers to watch and process.
layersPtr := flag.String("layers", string(relayer.Both), `layers to watch and process.
options:
l1: only watch l1 => l2 bridge messages
l2: only watch l2 => l1 bridge messages
Expand All @@ -25,13 +25,13 @@ func main() {

flag.Parse()

if !relayer.IsInSlice(cli.Mode(*modePtr), cli.Modes) {
if !relayer.IsInSlice(relayer.Mode(*modePtr), relayer.Modes) {
log.Fatal("mode not valid")
}

if !relayer.IsInSlice(cli.Layer(*layersPtr), cli.Layers) {
if !relayer.IsInSlice(relayer.Layer(*layersPtr), relayer.Layers) {
log.Fatal("mode not valid")
}

cli.Run(cli.Mode(*modePtr), cli.Layer(*layersPtr))
cli.Run(relayer.Mode(*modePtr), relayer.Layer(*layersPtr))
}
1 change: 1 addition & 0 deletions packages/relayer/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ var (
ErrNoRPCClient = errors.Validation.NewWithKeyAndDetail("ERR_NO_RPC_CLIENT", "RPCClient is required")
ErrNoBridge = errors.Validation.NewWithKeyAndDetail("ERR_NO_BRIDGE", "Bridge is required")
ErrNoTaikoL2 = errors.Validation.NewWithKeyAndDetail("ERR_NO_TAIKO_L2", "TaikoL2 is required")
ErrInvalidMode = errors.Validation.NewWithKeyAndDetail("ERR_INVALID_MODE", "Mode not supported")
)
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package cli
package relayer

type Mode string

Expand Down
31 changes: 11 additions & 20 deletions packages/relayer/indexer/filter_then_subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,42 +20,34 @@ var (
// FilterThenSubscribe gets the most recent block height that has been indexed, and works it's way
// up to the latest block. As it goes, it tries to process messages.
// When it catches up, it then starts to Subscribe to latest events as they come in.
func (svc *Service) FilterThenSubscribe(ctx context.Context) error {
func (svc *Service) FilterThenSubscribe(ctx context.Context, mode relayer.Mode) error {
chainID, err := svc.ethClient.ChainID(ctx)
if err != nil {
return errors.Wrap(err, "s.ethClient.ChainID()")
return errors.Wrap(err, "svc.ethClient.ChainID()")
}

// get most recently processed block height from the DB
latestProcessedBlock, err := svc.blockRepo.GetLatestBlockProcessedForEvent(
eventName,
chainID,
)
if err != nil {
return errors.Wrap(err, "s.blockRepo.GetLatestBlock()")
if err := svc.setInitialProcessingBlockByMode(ctx, mode, chainID); err != nil {
return errors.Wrap(err, "svc.setInitialProcessingBlockByMode")
}

log.Infof("latest processed block: %v", latestProcessedBlock.Height)
log.Infof("processing from block height: %v", svc.processingBlock.Height)

if err != nil {
return errors.Wrap(err, "bridge.FilterMessageSent")
}

header, err := svc.ethClient.HeaderByNumber(ctx, nil)
if err != nil {
return errors.Wrap(err, "s.ethClient.HeaderByNumber")
return errors.Wrap(err, "svc.ethClient.HeaderByNumber")
}

// if we have already done the latest block, exit early
// TODO: call SubscribeMessageSent, as we can now just watch the chain for new blocks
if latestProcessedBlock.Height == header.Number.Uint64() {
if svc.processingBlock.Height == header.Number.Uint64() {
log.Info("caught up, subscribing to new incoming events")
return svc.subscribe(ctx, chainID)
}

const batchSize = 1000

svc.processingBlock = latestProcessedBlock

log.Infof("getting events between %v and %v in batches of %v",
svc.processingBlock.Height,
header.Number.Int64(),
Expand All @@ -65,7 +57,7 @@ func (svc *Service) FilterThenSubscribe(ctx context.Context) error {
// todo: parallelize/concurrently catch up. don't think we need to do this in order.
// use WaitGroup.
// we get a timeout/EOF if we don't batch.
for i := latestProcessedBlock.Height; i < header.Number.Uint64(); i += batchSize {
for i := svc.processingBlock.Height; i < header.Number.Uint64(); i += batchSize {
var end uint64 = svc.processingBlock.Height + batchSize
// if the end of the batch is greater than the latest block number, set end
// to the latest block number
Expand All @@ -76,7 +68,7 @@ func (svc *Service) FilterThenSubscribe(ctx context.Context) error {
log.Infof("batch from %v to %v", i, end)

events, err := svc.bridge.FilterMessageSent(&bind.FilterOpts{
Start: latestProcessedBlock.Height + uint64(1),
Start: svc.processingBlock.Height,
End: &end,
Context: ctx,
}, nil)
Expand Down Expand Up @@ -117,7 +109,7 @@ func (svc *Service) FilterThenSubscribe(ctx context.Context) error {
}

if svc.processingBlock.Height < latestBlock.Number.Uint64() {
return svc.FilterThenSubscribe(ctx)
return svc.FilterThenSubscribe(ctx, relayer.SyncMode)
}

return svc.subscribe(ctx, chainID)
Expand Down Expand Up @@ -207,7 +199,6 @@ func (svc *Service) handleEvent(ctx context.Context, chainID *big.Int, event *co
// we can now consider that previous block processed. save it to the DB
// and bump the block number.
if raw.BlockNumber > svc.processingBlock.Height {
log.Info("raw blockNumber > processingBlock.height")
log.Infof("saving new latest processed block to DB: %v", raw.BlockNumber)

if err := svc.blockRepo.Save(relayer.SaveBlockOpts{
Expand Down
10 changes: 9 additions & 1 deletion packages/relayer/indexer/service.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package indexer

import (
"context"
"crypto/ecdsa"
"math/big"

"github.com/cyberhorsey/errors"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
Expand All @@ -18,10 +21,15 @@ var (
ZeroAddress = common.HexToAddress("0x0000000000000000000000000000000000000000")
)

type ethClient interface {
ChainID(ctx context.Context) (*big.Int, error)
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
}

type Service struct {
eventRepo relayer.EventRepository
blockRepo relayer.BlockRepository
ethClient *ethclient.Client
ethClient ethClient
destRPC *rpc.Client

processingBlock *relayer.Block
Expand Down
9 changes: 9 additions & 0 deletions packages/relayer/indexer/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,22 @@ import (
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
"github.com/taikochain/taiko-mono/packages/relayer"
"github.com/taikochain/taiko-mono/packages/relayer/mock"
"github.com/taikochain/taiko-mono/packages/relayer/repo"
"gopkg.in/go-playground/assert.v1"
)

var dummyEcdsaKey = "8da4ef21b864d2cc526dbdb2a120bd2874c36c9d0a1fb7f8c63d7f7a8b41de8f"
var dummyAddress = "0x63FaC9201494f0bd17B9892B9fae4d52fe3BD377"

func newTestService() *Service {
return &Service{
blockRepo: &mock.BlockRepository{},
ethClient: &mock.EthClient{},

processingBlock: &relayer.Block{},
}
}
func Test_NewService(t *testing.T) {
tests := []struct {
name string
Expand Down
45 changes: 45 additions & 0 deletions packages/relayer/indexer/set_initial_processing_block_by_mode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package indexer

import (
"context"
"math/big"

"github.com/pkg/errors"
"github.com/taikochain/taiko-mono/packages/relayer"
)

func (svc *Service) setInitialProcessingBlockByMode(
ctx context.Context,
mode relayer.Mode,
chainID *big.Int,
) error {
switch mode {
case relayer.SyncMode:
// get most recently processed block height from the DB
latestProcessedBlock, err := svc.blockRepo.GetLatestBlockProcessedForEvent(
eventName,
chainID,
)
if err != nil {
return errors.Wrap(err, "s.blockRepo.GetLatestBlock()")
}

svc.processingBlock = latestProcessedBlock

return nil
case relayer.ResyncMode:
header, err := svc.ethClient.HeaderByNumber(ctx, big.NewInt(0))
if err != nil {
return errors.Wrap(err, "s.blockRepo.GetLatestBlock()")
}

svc.processingBlock = &relayer.Block{
Height: header.Number.Uint64(),
Hash: header.Hash().Hex(),
}

return nil
default:
return relayer.ErrInvalidMode
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package indexer

import (
"context"
"math/big"
"testing"

"github.com/stretchr/testify/assert"
"github.com/taikochain/taiko-mono/packages/relayer"
"github.com/taikochain/taiko-mono/packages/relayer/mock"
)

func Test_SetInitialProcessingBlockByMode(t *testing.T) {
tests := []struct {
name string
mode relayer.Mode
chainID *big.Int
wantErr bool
wantHeight uint64
}{
{
"resync",
relayer.ResyncMode,
mock.MockChainID,
false,
0,
},
{
"sync",
relayer.SyncMode,
mock.MockChainID,
false,
mock.LatestBlock.Height,
},
{
"sync error getting latest block",
relayer.SyncMode,
big.NewInt(328938),
true,
0,
},
{
"invalidMode",
relayer.Mode("fake"),
mock.MockChainID,
true,
0,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
svc := newTestService()
err := svc.setInitialProcessingBlockByMode(
context.Background(),
tt.mode,
tt.chainID,
)

assert.Equal(t, tt.wantErr, err != nil)

assert.Equal(t, tt.wantHeight, svc.processingBlock.Height)
})
}
}
31 changes: 31 additions & 0 deletions packages/relayer/mock/block_repository.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package mock

import (
"errors"
"math/big"

"github.com/taikochain/taiko-mono/packages/relayer"
)

var (
LatestBlock = &relayer.Block{
Height: 100,
Hash: "0x",
ChainID: MockChainID.Int64(),
}
)

type BlockRepository struct {
}

func (r *BlockRepository) Save(opts relayer.SaveBlockOpts) error {
return nil
}

func (r *BlockRepository) GetLatestBlockProcessedForEvent(eventName string, chainID *big.Int) (*relayer.Block, error) {
if chainID.Int64() != MockChainID.Int64() {
return nil, errors.New("error getting latest block processed for event")
}

return LatestBlock, nil
}
Loading