From e9fda8bb80ecfefcfd7d64062b50ebf5b5eec2ef Mon Sep 17 00:00:00 2001 From: jeff <113397187+cyberhorsey@users.noreply.github.com> Date: Wed, 14 Dec 2022 20:40:56 -0800 Subject: [PATCH] feat(relayer): header sync check before processing messages (#441) * header sync check before processing messages * rm gas limit + warn logs * get latest synced header after waiting * lint * use header by hash instead of block by hash --- packages/relayer/.default.env | 3 +- packages/relayer/.golangci.yml | 6 +- packages/relayer/cli/cli.go | 38 ++++++++----- packages/relayer/indexer/service.go | 56 ++++++++++--------- packages/relayer/indexer/subscribe.go | 13 +---- packages/relayer/message/process_message.go | 11 ++-- packages/relayer/message/processor.go | 30 +++++----- packages/relayer/message/processor_test.go | 21 +++---- .../relayer/message/wait_for_confirmations.go | 2 +- .../relayer/message/wait_header_synced.go | 55 ++++++++++++++++++ .../message/wait_header_synced_test.go | 21 +++++++ packages/relayer/mock/eth_client.go | 10 ++++ 12 files changed, 180 insertions(+), 86 deletions(-) create mode 100644 packages/relayer/message/wait_header_synced.go create mode 100644 packages/relayer/message/wait_header_synced_test.go diff --git a/packages/relayer/.default.env b/packages/relayer/.default.env index ecdfd7843b7..7445d2e4e49 100644 --- a/packages/relayer/.default.env +++ b/packages/relayer/.default.env @@ -18,4 +18,5 @@ MYSQL_CONN_MAX_LIFETIME_IN_MS= NUM_GOROUTINES=20 SUBSCRIPTION_BACKOFF_IN_SECONDS=3 CONFIRMATIONS_BEFORE_PROCESSING=15 -CORS_ORIGINS=* \ No newline at end of file +CORS_ORIGINS=* +HEADER_SYNC_INTERVAL_IN_SECONDS=60 \ No newline at end of file diff --git a/packages/relayer/.golangci.yml b/packages/relayer/.golangci.yml index 8408925a1d2..25aa24eb396 100644 --- a/packages/relayer/.golangci.yml +++ b/packages/relayer/.golangci.yml @@ -28,10 +28,10 @@ linters: linters-settings: funlen: - lines: 123 - statements: 50 + lines: 130 + statements: 52 gocognit: - min-complexity: 37 + min-complexity: 40 issues: exclude-rules: diff --git a/packages/relayer/cli/cli.go b/packages/relayer/cli/cli.go index 9a14f230964..ab4ba1adb80 100644 --- a/packages/relayer/cli/cli.go +++ b/packages/relayer/cli/cli.go @@ -42,10 +42,11 @@ var ( "PROMETHEUS_HTTP_PORT", } - defaultBlockBatchSize = 2 - defaultNumGoroutines = 10 - defaultSubscriptionBackoff = 2 * time.Second - defaultConfirmations = 15 + defaultBlockBatchSize = 2 + defaultNumGoroutines = 10 + defaultSubscriptionBackoff = 2 * time.Second + defaultConfirmations = 15 + defaultHeaderSyncIntervalSeconds int = 60 ) func Run( @@ -150,12 +151,17 @@ func makeIndexers( var subscriptionBackoff time.Duration subscriptionBackoffInSeconds, err := strconv.Atoi(os.Getenv("SUBSCRIPTION_BACKOFF_IN_SECONDS")) - if err != nil || numGoroutines <= 0 { + if err != nil || subscriptionBackoffInSeconds <= 0 { subscriptionBackoff = defaultSubscriptionBackoff } else { subscriptionBackoff = time.Duration(subscriptionBackoffInSeconds) * time.Second } + headerSyncIntervalInSeconds, err := strconv.Atoi(os.Getenv("HEADER_SYNC_INTERVAL_IN_SECONDS")) + if err != nil || headerSyncIntervalInSeconds <= 0 { + headerSyncIntervalInSeconds = defaultHeaderSyncIntervalSeconds + } + confirmations, err := strconv.Atoi(os.Getenv("CONFIRMATIONS_BEFORE_PROCESSING")) if err != nil || confirmations <= 0 { confirmations = defaultConfirmations @@ -198,11 +204,12 @@ func makeIndexers( DestTaikoAddress: common.HexToAddress(os.Getenv("L2_TAIKO_ADDRESS")), SrcTaikoAddress: common.HexToAddress(os.Getenv("L1_TAIKO_ADDRESS")), - BlockBatchSize: uint64(blockBatchSize), - NumGoroutines: numGoroutines, - SubscriptionBackoff: subscriptionBackoff, - Confirmations: uint64(confirmations), - ProfitableOnly: profitableOnly, + BlockBatchSize: uint64(blockBatchSize), + NumGoroutines: numGoroutines, + SubscriptionBackoff: subscriptionBackoff, + Confirmations: uint64(confirmations), + ProfitableOnly: profitableOnly, + HeaderSyncIntervalInSeconds: int64(headerSyncIntervalInSeconds), }) if err != nil { log.Fatal(err) @@ -225,11 +232,12 @@ func makeIndexers( DestBridgeAddress: common.HexToAddress(os.Getenv("L1_BRIDGE_ADDRESS")), DestTaikoAddress: common.HexToAddress(os.Getenv("L1_TAIKO_ADDRESS")), - BlockBatchSize: uint64(blockBatchSize), - NumGoroutines: numGoroutines, - SubscriptionBackoff: subscriptionBackoff, - Confirmations: uint64(confirmations), - ProfitableOnly: profitableOnly, + BlockBatchSize: uint64(blockBatchSize), + NumGoroutines: numGoroutines, + SubscriptionBackoff: subscriptionBackoff, + Confirmations: uint64(confirmations), + ProfitableOnly: profitableOnly, + HeaderSyncIntervalInSeconds: int64(headerSyncIntervalInSeconds), }) if err != nil { log.Fatal(err) diff --git a/packages/relayer/indexer/service.go b/packages/relayer/indexer/service.go index aa69bd1227d..8f4f1b824a4 100644 --- a/packages/relayer/indexer/service.go +++ b/packages/relayer/indexer/service.go @@ -50,22 +50,23 @@ type Service struct { } type NewServiceOpts struct { - EventRepo relayer.EventRepository - BlockRepo relayer.BlockRepository - EthClient *ethclient.Client - DestEthClient *ethclient.Client - RPCClient *rpc.Client - DestRPCClient *rpc.Client - ECDSAKey string - BridgeAddress common.Address - DestBridgeAddress common.Address - SrcTaikoAddress common.Address - DestTaikoAddress common.Address - BlockBatchSize uint64 - NumGoroutines int - SubscriptionBackoff time.Duration - Confirmations uint64 - ProfitableOnly relayer.ProfitableOnly + EventRepo relayer.EventRepository + BlockRepo relayer.BlockRepository + EthClient *ethclient.Client + DestEthClient *ethclient.Client + RPCClient *rpc.Client + DestRPCClient *rpc.Client + ECDSAKey string + BridgeAddress common.Address + DestBridgeAddress common.Address + SrcTaikoAddress common.Address + DestTaikoAddress common.Address + BlockBatchSize uint64 + NumGoroutines int + SubscriptionBackoff time.Duration + Confirmations uint64 + ProfitableOnly relayer.ProfitableOnly + HeaderSyncIntervalInSeconds int64 } func NewService(opts NewServiceOpts) (*Service, error) { @@ -144,17 +145,18 @@ func NewService(opts NewServiceOpts) (*Service, error) { } processor, err := message.NewProcessor(message.NewProcessorOpts{ - Prover: prover, - ECDSAKey: privateKey, - RPCClient: opts.RPCClient, - DestETHClient: opts.DestEthClient, - DestBridge: destBridge, - EventRepo: opts.EventRepo, - DestHeaderSyncer: destHeaderSyncer, - RelayerAddress: relayerAddr, - Confirmations: opts.Confirmations, - SrcETHClient: opts.EthClient, - ProfitableOnly: opts.ProfitableOnly, + Prover: prover, + ECDSAKey: privateKey, + RPCClient: opts.RPCClient, + DestETHClient: opts.DestEthClient, + DestBridge: destBridge, + EventRepo: opts.EventRepo, + DestHeaderSyncer: destHeaderSyncer, + RelayerAddress: relayerAddr, + Confirmations: opts.Confirmations, + SrcETHClient: opts.EthClient, + ProfitableOnly: opts.ProfitableOnly, + HeaderSyncIntervalSeconds: opts.HeaderSyncIntervalInSeconds, }) if err != nil { return nil, errors.Wrap(err, "message.NewProcessor") diff --git a/packages/relayer/indexer/subscribe.go b/packages/relayer/indexer/subscribe.go index 7d1a96216c1..e48dfabd452 100644 --- a/packages/relayer/indexer/subscribe.go +++ b/packages/relayer/indexer/subscribe.go @@ -9,7 +9,6 @@ import ( "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/taikoxyz/taiko-mono/packages/relayer/contracts" - "golang.org/x/sync/errgroup" ) // subscribe subscribes to latest events @@ -30,23 +29,17 @@ func (svc *Service) subscribe(ctx context.Context, chainID *big.Int) error { defer sub.Unsubscribe() - group, ctx := errgroup.WithContext(ctx) - - group.SetLimit(svc.numGoroutines) - for { select { case err := <-sub.Err(): return errors.Wrap(err, "sub.Err()") case event := <-sink: - group.Go(func() error { + go func() { err := svc.handleEvent(ctx, chainID, event) if err != nil { - log.Errorf("svc.handleEvent: %v", err) + log.Errorf("svc.subscribe, svc.handleEvent: %v", err) } - - return nil - }) + }() } } } diff --git a/packages/relayer/message/process_message.go b/packages/relayer/message/process_message.go index 0e06a94d232..325e373cdd6 100644 --- a/packages/relayer/message/process_message.go +++ b/packages/relayer/message/process_message.go @@ -32,6 +32,10 @@ func (p *Processor) ProcessMessage( return errors.Wrap(err, "p.waitForConfirmations") } + if err := p.waitHeaderSynced(ctx, event); err != nil { + return errors.Wrap(err, "p.waitHeaderSynced") + } + // get latest synced header since not every header is synced from L1 => L2, // and later blocks still have the storage trie proof from previous blocks. latestSyncedHeader, err := p.destHeaderSyncer.GetLatestSyncedHeader(&bind.CallOpts{}) @@ -39,12 +43,6 @@ func (p *Processor) ProcessMessage( return errors.Wrap(err, "taiko.GetSyncedHeader") } - // if header hasnt been synced, we are unable to process this message - if common.BytesToHash(latestSyncedHeader[:]).Hex() == relayer.ZeroHash.Hex() { - log.Infof("header not synced, bailing") - return nil - } - hashed := crypto.Keccak256( event.Raw.Address.Bytes(), // L1 bridge address event.Signal[:], @@ -69,6 +67,7 @@ func (p *Processor) ProcessMessage( // message will fail when we try to process it if !received { + log.Warnf("signal %v not received on dest chain", common.Hash(event.Signal).Hex()) return errors.New("message not received") } diff --git a/packages/relayer/message/processor.go b/packages/relayer/message/processor.go index f3d472a493e..f25a69a9622 100644 --- a/packages/relayer/message/processor.go +++ b/packages/relayer/message/processor.go @@ -16,6 +16,7 @@ type ethClient interface { PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) BlockNumber(ctx context.Context) (uint64, error) + HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) } type Processor struct { eventRepo relayer.EventRepository @@ -35,21 +36,23 @@ type Processor struct { relayerAddr common.Address confirmations uint64 - profitableOnly relayer.ProfitableOnly + profitableOnly relayer.ProfitableOnly + headerSyncIntervalSeconds int64 } type NewProcessorOpts struct { - Prover *proof.Prover - ECDSAKey *ecdsa.PrivateKey - RPCClient relayer.Caller - SrcETHClient ethClient - DestETHClient ethClient - DestBridge relayer.Bridge - EventRepo relayer.EventRepository - DestHeaderSyncer relayer.HeaderSyncer - RelayerAddress common.Address - Confirmations uint64 - ProfitableOnly relayer.ProfitableOnly + Prover *proof.Prover + ECDSAKey *ecdsa.PrivateKey + RPCClient relayer.Caller + SrcETHClient ethClient + DestETHClient ethClient + DestBridge relayer.Bridge + EventRepo relayer.EventRepository + DestHeaderSyncer relayer.HeaderSyncer + RelayerAddress common.Address + Confirmations uint64 + ProfitableOnly relayer.ProfitableOnly + HeaderSyncIntervalSeconds int64 } func NewProcessor(opts NewProcessorOpts) (*Processor, error) { @@ -107,6 +110,7 @@ func NewProcessor(opts NewProcessorOpts) (*Processor, error) { relayerAddr: opts.RelayerAddress, confirmations: opts.Confirmations, - profitableOnly: opts.ProfitableOnly, + profitableOnly: opts.ProfitableOnly, + headerSyncIntervalSeconds: opts.HeaderSyncIntervalSeconds, }, nil } diff --git a/packages/relayer/message/processor_test.go b/packages/relayer/message/processor_test.go index 1125a050aef..f3d10981d36 100644 --- a/packages/relayer/message/processor_test.go +++ b/packages/relayer/message/processor_test.go @@ -26,16 +26,17 @@ func newTestProcessor(profitableOnly relayer.ProfitableOnly) *Processor { ) return &Processor{ - eventRepo: &mock.EventRepository{}, - destBridge: &mock.Bridge{}, - srcEthClient: &mock.EthClient{}, - destEthClient: &mock.EthClient{}, - mu: &sync.Mutex{}, - ecdsaKey: privateKey, - destHeaderSyncer: &mock.HeaderSyncer{}, - prover: prover, - rpc: &mock.Caller{}, - profitableOnly: profitableOnly, + eventRepo: &mock.EventRepository{}, + destBridge: &mock.Bridge{}, + srcEthClient: &mock.EthClient{}, + destEthClient: &mock.EthClient{}, + mu: &sync.Mutex{}, + ecdsaKey: privateKey, + destHeaderSyncer: &mock.HeaderSyncer{}, + prover: prover, + rpc: &mock.Caller{}, + profitableOnly: profitableOnly, + headerSyncIntervalSeconds: 1, } } func Test_NewProcessor(t *testing.T) { diff --git a/packages/relayer/message/wait_for_confirmations.go b/packages/relayer/message/wait_for_confirmations.go index f27333df304..600ac574a02 100644 --- a/packages/relayer/message/wait_for_confirmations.go +++ b/packages/relayer/message/wait_for_confirmations.go @@ -11,7 +11,7 @@ import ( func (p *Processor) waitForConfirmations(ctx context.Context, txHash common.Hash, blockNumber uint64) error { // TODO: make timeout a config var - ctx, cancelFunc := context.WithTimeout(ctx, 2*time.Minute) + ctx, cancelFunc := context.WithTimeout(ctx, 5*time.Minute) defer cancelFunc() diff --git a/packages/relayer/message/wait_header_synced.go b/packages/relayer/message/wait_header_synced.go new file mode 100644 index 00000000000..5175440c345 --- /dev/null +++ b/packages/relayer/message/wait_header_synced.go @@ -0,0 +1,55 @@ +package message + +import ( + "context" + "time" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + "github.com/taikoxyz/taiko-mono/packages/relayer/contracts" +) + +func (p *Processor) waitHeaderSynced(ctx context.Context, event *contracts.BridgeMessageSent) error { + ticker := time.NewTicker(time.Duration(p.headerSyncIntervalSeconds) * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + // get latest synced header since not every header is synced from L1 => L2, + // and later blocks still have the storage trie proof from previous blocks. + latestSyncedHeader, err := p.destHeaderSyncer.GetLatestSyncedHeader(&bind.CallOpts{}) + if err != nil { + return errors.Wrap(err, "p.destHeaderSyncer.GetLatestSyncedHeader") + } + + header, err := p.srcEthClient.HeaderByHash(ctx, latestSyncedHeader) + if err != nil { + return errors.Wrap(err, "p.destHeaderSyncer.GetLatestSyncedHeader") + } + + // header is caught up and processible + if header.Number.Uint64() >= event.Raw.BlockNumber { + log.Infof( + "signal: %v is processable. occured in block %v, latestSynced is block %v", + common.Hash(event.Signal).Hex(), + event.Raw.BlockNumber, + header.Number.Uint64(), + ) + + return nil + } + + log.Infof( + "signal: %v waiting to be processable. occured in block %v, latestSynced is block %v", + common.Hash(event.Signal).Hex(), + event.Raw.BlockNumber, + header.Number.Uint64(), + ) + } + } +} diff --git a/packages/relayer/message/wait_header_synced_test.go b/packages/relayer/message/wait_header_synced_test.go new file mode 100644 index 00000000000..a8bc0f43abc --- /dev/null +++ b/packages/relayer/message/wait_header_synced_test.go @@ -0,0 +1,21 @@ +package message + +import ( + "context" + "testing" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/stretchr/testify/assert" + "github.com/taikoxyz/taiko-mono/packages/relayer/contracts" +) + +func Test_waitHeaderSynced(t *testing.T) { + p := newTestProcessor(true) + + err := p.waitHeaderSynced(context.TODO(), &contracts.BridgeMessageSent{ + Raw: types.Log{ + BlockNumber: 1, + }, + }) + assert.Nil(t, err) +} diff --git a/packages/relayer/mock/eth_client.go b/packages/relayer/mock/eth_client.go index b7cca8a08e6..c8ec2e135ed 100644 --- a/packages/relayer/mock/eth_client.go +++ b/packages/relayer/mock/eth_client.go @@ -2,11 +2,13 @@ package mock import ( "context" + "errors" "math/big" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/taikoxyz/taiko-mono/packages/relayer" ) var ( @@ -61,3 +63,11 @@ func (c *EthClient) TransactionReceipt(ctx context.Context, txHash common.Hash) func (c *EthClient) BlockNumber(ctx context.Context) (uint64, error) { return uint64(BlockNum), nil } + +func (c *EthClient) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) { + if hash == relayer.ZeroHash { + return nil, errors.New("cant find block") + } + + return Header, nil +}