Skip to content

Commit

Permalink
feat(relayer): header sync check before processing messages (#441)
Browse files Browse the repository at this point in the history
* header sync check before processing messages

* rm gas limit + warn logs

* get latest synced header after waiting

* lint

* use header by hash instead of block by hash
  • Loading branch information
cyberhorsey authored Dec 15, 2022
1 parent 8216cc1 commit e9fda8b
Show file tree
Hide file tree
Showing 12 changed files with 180 additions and 86 deletions.
3 changes: 2 additions & 1 deletion packages/relayer/.default.env
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ MYSQL_CONN_MAX_LIFETIME_IN_MS=
NUM_GOROUTINES=20
SUBSCRIPTION_BACKOFF_IN_SECONDS=3
CONFIRMATIONS_BEFORE_PROCESSING=15
CORS_ORIGINS=*
CORS_ORIGINS=*
HEADER_SYNC_INTERVAL_IN_SECONDS=60
6 changes: 3 additions & 3 deletions packages/relayer/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ linters:

linters-settings:
funlen:
lines: 123
statements: 50
lines: 130
statements: 52
gocognit:
min-complexity: 37
min-complexity: 40

issues:
exclude-rules:
Expand Down
38 changes: 23 additions & 15 deletions packages/relayer/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ var (
"PROMETHEUS_HTTP_PORT",
}

defaultBlockBatchSize = 2
defaultNumGoroutines = 10
defaultSubscriptionBackoff = 2 * time.Second
defaultConfirmations = 15
defaultBlockBatchSize = 2
defaultNumGoroutines = 10
defaultSubscriptionBackoff = 2 * time.Second
defaultConfirmations = 15
defaultHeaderSyncIntervalSeconds int = 60
)

func Run(
Expand Down Expand Up @@ -150,12 +151,17 @@ func makeIndexers(
var subscriptionBackoff time.Duration

subscriptionBackoffInSeconds, err := strconv.Atoi(os.Getenv("SUBSCRIPTION_BACKOFF_IN_SECONDS"))
if err != nil || numGoroutines <= 0 {
if err != nil || subscriptionBackoffInSeconds <= 0 {
subscriptionBackoff = defaultSubscriptionBackoff
} else {
subscriptionBackoff = time.Duration(subscriptionBackoffInSeconds) * time.Second
}

headerSyncIntervalInSeconds, err := strconv.Atoi(os.Getenv("HEADER_SYNC_INTERVAL_IN_SECONDS"))
if err != nil || headerSyncIntervalInSeconds <= 0 {
headerSyncIntervalInSeconds = defaultHeaderSyncIntervalSeconds
}

confirmations, err := strconv.Atoi(os.Getenv("CONFIRMATIONS_BEFORE_PROCESSING"))
if err != nil || confirmations <= 0 {
confirmations = defaultConfirmations
Expand Down Expand Up @@ -198,11 +204,12 @@ func makeIndexers(
DestTaikoAddress: common.HexToAddress(os.Getenv("L2_TAIKO_ADDRESS")),
SrcTaikoAddress: common.HexToAddress(os.Getenv("L1_TAIKO_ADDRESS")),

BlockBatchSize: uint64(blockBatchSize),
NumGoroutines: numGoroutines,
SubscriptionBackoff: subscriptionBackoff,
Confirmations: uint64(confirmations),
ProfitableOnly: profitableOnly,
BlockBatchSize: uint64(blockBatchSize),
NumGoroutines: numGoroutines,
SubscriptionBackoff: subscriptionBackoff,
Confirmations: uint64(confirmations),
ProfitableOnly: profitableOnly,
HeaderSyncIntervalInSeconds: int64(headerSyncIntervalInSeconds),
})
if err != nil {
log.Fatal(err)
Expand All @@ -225,11 +232,12 @@ func makeIndexers(
DestBridgeAddress: common.HexToAddress(os.Getenv("L1_BRIDGE_ADDRESS")),
DestTaikoAddress: common.HexToAddress(os.Getenv("L1_TAIKO_ADDRESS")),

BlockBatchSize: uint64(blockBatchSize),
NumGoroutines: numGoroutines,
SubscriptionBackoff: subscriptionBackoff,
Confirmations: uint64(confirmations),
ProfitableOnly: profitableOnly,
BlockBatchSize: uint64(blockBatchSize),
NumGoroutines: numGoroutines,
SubscriptionBackoff: subscriptionBackoff,
Confirmations: uint64(confirmations),
ProfitableOnly: profitableOnly,
HeaderSyncIntervalInSeconds: int64(headerSyncIntervalInSeconds),
})
if err != nil {
log.Fatal(err)
Expand Down
56 changes: 29 additions & 27 deletions packages/relayer/indexer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,23 @@ type Service struct {
}

type NewServiceOpts struct {
EventRepo relayer.EventRepository
BlockRepo relayer.BlockRepository
EthClient *ethclient.Client
DestEthClient *ethclient.Client
RPCClient *rpc.Client
DestRPCClient *rpc.Client
ECDSAKey string
BridgeAddress common.Address
DestBridgeAddress common.Address
SrcTaikoAddress common.Address
DestTaikoAddress common.Address
BlockBatchSize uint64
NumGoroutines int
SubscriptionBackoff time.Duration
Confirmations uint64
ProfitableOnly relayer.ProfitableOnly
EventRepo relayer.EventRepository
BlockRepo relayer.BlockRepository
EthClient *ethclient.Client
DestEthClient *ethclient.Client
RPCClient *rpc.Client
DestRPCClient *rpc.Client
ECDSAKey string
BridgeAddress common.Address
DestBridgeAddress common.Address
SrcTaikoAddress common.Address
DestTaikoAddress common.Address
BlockBatchSize uint64
NumGoroutines int
SubscriptionBackoff time.Duration
Confirmations uint64
ProfitableOnly relayer.ProfitableOnly
HeaderSyncIntervalInSeconds int64
}

func NewService(opts NewServiceOpts) (*Service, error) {
Expand Down Expand Up @@ -144,17 +145,18 @@ func NewService(opts NewServiceOpts) (*Service, error) {
}

processor, err := message.NewProcessor(message.NewProcessorOpts{
Prover: prover,
ECDSAKey: privateKey,
RPCClient: opts.RPCClient,
DestETHClient: opts.DestEthClient,
DestBridge: destBridge,
EventRepo: opts.EventRepo,
DestHeaderSyncer: destHeaderSyncer,
RelayerAddress: relayerAddr,
Confirmations: opts.Confirmations,
SrcETHClient: opts.EthClient,
ProfitableOnly: opts.ProfitableOnly,
Prover: prover,
ECDSAKey: privateKey,
RPCClient: opts.RPCClient,
DestETHClient: opts.DestEthClient,
DestBridge: destBridge,
EventRepo: opts.EventRepo,
DestHeaderSyncer: destHeaderSyncer,
RelayerAddress: relayerAddr,
Confirmations: opts.Confirmations,
SrcETHClient: opts.EthClient,
ProfitableOnly: opts.ProfitableOnly,
HeaderSyncIntervalSeconds: opts.HeaderSyncIntervalInSeconds,
})
if err != nil {
return nil, errors.Wrap(err, "message.NewProcessor")
Expand Down
13 changes: 3 additions & 10 deletions packages/relayer/indexer/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/taikoxyz/taiko-mono/packages/relayer/contracts"
"golang.org/x/sync/errgroup"
)

// subscribe subscribes to latest events
Expand All @@ -30,23 +29,17 @@ func (svc *Service) subscribe(ctx context.Context, chainID *big.Int) error {

defer sub.Unsubscribe()

group, ctx := errgroup.WithContext(ctx)

group.SetLimit(svc.numGoroutines)

for {
select {
case err := <-sub.Err():
return errors.Wrap(err, "sub.Err()")
case event := <-sink:
group.Go(func() error {
go func() {
err := svc.handleEvent(ctx, chainID, event)
if err != nil {
log.Errorf("svc.handleEvent: %v", err)
log.Errorf("svc.subscribe, svc.handleEvent: %v", err)
}

return nil
})
}()
}
}
}
11 changes: 5 additions & 6 deletions packages/relayer/message/process_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,17 @@ func (p *Processor) ProcessMessage(
return errors.Wrap(err, "p.waitForConfirmations")
}

if err := p.waitHeaderSynced(ctx, event); err != nil {
return errors.Wrap(err, "p.waitHeaderSynced")
}

// get latest synced header since not every header is synced from L1 => L2,
// and later blocks still have the storage trie proof from previous blocks.
latestSyncedHeader, err := p.destHeaderSyncer.GetLatestSyncedHeader(&bind.CallOpts{})
if err != nil {
return errors.Wrap(err, "taiko.GetSyncedHeader")
}

// if header hasnt been synced, we are unable to process this message
if common.BytesToHash(latestSyncedHeader[:]).Hex() == relayer.ZeroHash.Hex() {
log.Infof("header not synced, bailing")
return nil
}

hashed := crypto.Keccak256(
event.Raw.Address.Bytes(), // L1 bridge address
event.Signal[:],
Expand All @@ -69,6 +67,7 @@ func (p *Processor) ProcessMessage(

// message will fail when we try to process it
if !received {
log.Warnf("signal %v not received on dest chain", common.Hash(event.Signal).Hex())
return errors.New("message not received")
}

Expand Down
30 changes: 17 additions & 13 deletions packages/relayer/message/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type ethClient interface {
PendingNonceAt(ctx context.Context, account common.Address) (uint64, error)
TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error)
BlockNumber(ctx context.Context) (uint64, error)
HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error)
}
type Processor struct {
eventRepo relayer.EventRepository
Expand All @@ -35,21 +36,23 @@ type Processor struct {
relayerAddr common.Address
confirmations uint64

profitableOnly relayer.ProfitableOnly
profitableOnly relayer.ProfitableOnly
headerSyncIntervalSeconds int64
}

type NewProcessorOpts struct {
Prover *proof.Prover
ECDSAKey *ecdsa.PrivateKey
RPCClient relayer.Caller
SrcETHClient ethClient
DestETHClient ethClient
DestBridge relayer.Bridge
EventRepo relayer.EventRepository
DestHeaderSyncer relayer.HeaderSyncer
RelayerAddress common.Address
Confirmations uint64
ProfitableOnly relayer.ProfitableOnly
Prover *proof.Prover
ECDSAKey *ecdsa.PrivateKey
RPCClient relayer.Caller
SrcETHClient ethClient
DestETHClient ethClient
DestBridge relayer.Bridge
EventRepo relayer.EventRepository
DestHeaderSyncer relayer.HeaderSyncer
RelayerAddress common.Address
Confirmations uint64
ProfitableOnly relayer.ProfitableOnly
HeaderSyncIntervalSeconds int64
}

func NewProcessor(opts NewProcessorOpts) (*Processor, error) {
Expand Down Expand Up @@ -107,6 +110,7 @@ func NewProcessor(opts NewProcessorOpts) (*Processor, error) {
relayerAddr: opts.RelayerAddress,
confirmations: opts.Confirmations,

profitableOnly: opts.ProfitableOnly,
profitableOnly: opts.ProfitableOnly,
headerSyncIntervalSeconds: opts.HeaderSyncIntervalSeconds,
}, nil
}
21 changes: 11 additions & 10 deletions packages/relayer/message/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,17 @@ func newTestProcessor(profitableOnly relayer.ProfitableOnly) *Processor {
)

return &Processor{
eventRepo: &mock.EventRepository{},
destBridge: &mock.Bridge{},
srcEthClient: &mock.EthClient{},
destEthClient: &mock.EthClient{},
mu: &sync.Mutex{},
ecdsaKey: privateKey,
destHeaderSyncer: &mock.HeaderSyncer{},
prover: prover,
rpc: &mock.Caller{},
profitableOnly: profitableOnly,
eventRepo: &mock.EventRepository{},
destBridge: &mock.Bridge{},
srcEthClient: &mock.EthClient{},
destEthClient: &mock.EthClient{},
mu: &sync.Mutex{},
ecdsaKey: privateKey,
destHeaderSyncer: &mock.HeaderSyncer{},
prover: prover,
rpc: &mock.Caller{},
profitableOnly: profitableOnly,
headerSyncIntervalSeconds: 1,
}
}
func Test_NewProcessor(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion packages/relayer/message/wait_for_confirmations.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

func (p *Processor) waitForConfirmations(ctx context.Context, txHash common.Hash, blockNumber uint64) error {
// TODO: make timeout a config var
ctx, cancelFunc := context.WithTimeout(ctx, 2*time.Minute)
ctx, cancelFunc := context.WithTimeout(ctx, 5*time.Minute)

defer cancelFunc()

Expand Down
55 changes: 55 additions & 0 deletions packages/relayer/message/wait_header_synced.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package message

import (
"context"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/taikoxyz/taiko-mono/packages/relayer/contracts"
)

func (p *Processor) waitHeaderSynced(ctx context.Context, event *contracts.BridgeMessageSent) error {
ticker := time.NewTicker(time.Duration(p.headerSyncIntervalSeconds) * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
// get latest synced header since not every header is synced from L1 => L2,
// and later blocks still have the storage trie proof from previous blocks.
latestSyncedHeader, err := p.destHeaderSyncer.GetLatestSyncedHeader(&bind.CallOpts{})
if err != nil {
return errors.Wrap(err, "p.destHeaderSyncer.GetLatestSyncedHeader")
}

header, err := p.srcEthClient.HeaderByHash(ctx, latestSyncedHeader)
if err != nil {
return errors.Wrap(err, "p.destHeaderSyncer.GetLatestSyncedHeader")
}

// header is caught up and processible
if header.Number.Uint64() >= event.Raw.BlockNumber {
log.Infof(
"signal: %v is processable. occured in block %v, latestSynced is block %v",
common.Hash(event.Signal).Hex(),
event.Raw.BlockNumber,
header.Number.Uint64(),
)

return nil
}

log.Infof(
"signal: %v waiting to be processable. occured in block %v, latestSynced is block %v",
common.Hash(event.Signal).Hex(),
event.Raw.BlockNumber,
header.Number.Uint64(),
)
}
}
}
21 changes: 21 additions & 0 deletions packages/relayer/message/wait_header_synced_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package message

import (
"context"
"testing"

"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/assert"
"github.com/taikoxyz/taiko-mono/packages/relayer/contracts"
)

func Test_waitHeaderSynced(t *testing.T) {
p := newTestProcessor(true)

err := p.waitHeaderSynced(context.TODO(), &contracts.BridgeMessageSent{
Raw: types.Log{
BlockNumber: 1,
},
})
assert.Nil(t, err)
}
Loading

0 comments on commit e9fda8b

Please sign in to comment.