Skip to content

Commit

Permalink
feat(relayer): add numLatestBlocksStartWhenCrawling in crawler (#17599)
Browse files Browse the repository at this point in the history
Co-authored-by: jeff <113397187+cyberhorsey@users.noreply.github.com>
  • Loading branch information
xiaodino and cyberhorsey authored Jun 25, 2024
1 parent a114ce9 commit 0e031b3
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 66 deletions.
24 changes: 18 additions & 6 deletions packages/relayer/cmd/flags/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,23 @@ var (
Category: indexerCategory,
EnvVars: []string{"SRC_TAIKO_ADDRESS"},
}
NumLatestBlocksToIgnoreWhenCrawling = &cli.Uint64Flag{
Name: "numLatestBlocksToIgnoreWhenCrawling",
Usage: "Number of blocks to ignore when crawling chain, should be higher for L2-L1 indexing due to delay",
Value: 1000,
NumLatestBlocksEndWhenCrawling = &cli.Uint64Flag{
Name: "numLatestBlocksEndWhenCrawling",
Usage: `Number of blocks to ignore from the end when crawling chain,
should be higher for L2-L1 indexing due to delay
`,
Value: 300,
Category: indexerCategory,
EnvVars: []string{"NUM_LATEST_BLOCKS_END_WHEN_CRAWLING"},
}
NumLatestBlocksStartWhenCrawling = &cli.Uint64Flag{
Name: "numLatestBlocksStartWhenCrawling",
Usage: `Number of latest blocks to index from the start when crawling chain.
The default value is to cover past 7 days.
`,
Value: 50400,
Category: indexerCategory,
EnvVars: []string{"NUM_LATEST_BLOCKS_TO_IGNORE_WHEN_CRAWLING"},
EnvVars: []string{"NUM_LATEST_BLOCKS_START_WHEN_CRAWLING"},
}
EventName = &cli.StringFlag{
Name: "event",
Expand All @@ -101,7 +112,8 @@ var IndexerFlags = MergeFlags(CommonFlags, QueueFlags, []cli.Flag{
SubscriptionBackoff,
SyncMode,
WatchMode,
NumLatestBlocksToIgnoreWhenCrawling,
NumLatestBlocksEndWhenCrawling,
NumLatestBlocksStartWhenCrawling,
EventName,
TargetBlockNumber,
})
86 changes: 44 additions & 42 deletions packages/relayer/indexer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,53 +36,55 @@ type Config struct {
QueueHost string
QueuePort uint64
// rpc configs
SrcRPCUrl string
DestRPCUrl string
ETHClientTimeout uint64
BlockBatchSize uint64
NumGoroutines uint64
SubscriptionBackoff uint64
SyncMode SyncMode
WatchMode WatchMode
NumLatestBlocksToIgnoreWhenCrawling uint64
EventName string
TargetBlockNumber *uint64
BackOffRetryInterval time.Duration
BackOffMaxRetries uint64
OpenQueueFunc func() (queue.Queue, error)
OpenDBFunc func() (DB, error)
SrcRPCUrl string
DestRPCUrl string
ETHClientTimeout uint64
BlockBatchSize uint64
NumGoroutines uint64
SubscriptionBackoff uint64
SyncMode SyncMode
WatchMode WatchMode
NumLatestBlocksEndWhenCrawling uint64
NumLatestBlocksStartWhenCrawling uint64
EventName string
TargetBlockNumber *uint64
BackOffRetryInterval time.Duration
BackOffMaxRetries uint64
OpenQueueFunc func() (queue.Queue, error)
OpenDBFunc func() (DB, error)
}

// NewConfigFromCliContext creates a new config instance from command line flags.
func NewConfigFromCliContext(c *cli.Context) (*Config, error) {
return &Config{
SrcBridgeAddress: common.HexToAddress(c.String(flags.SrcBridgeAddress.Name)),
SrcTaikoAddress: common.HexToAddress(c.String(flags.SrcTaikoAddress.Name)),
SrcSignalServiceAddress: common.HexToAddress(c.String(flags.SrcSignalServiceAddress.Name)),
DestBridgeAddress: common.HexToAddress(c.String(flags.DestBridgeAddress.Name)),
DatabaseUsername: c.String(flags.DatabaseUsername.Name),
DatabasePassword: c.String(flags.DatabasePassword.Name),
DatabaseName: c.String(flags.DatabaseName.Name),
DatabaseHost: c.String(flags.DatabaseHost.Name),
DatabaseMaxIdleConns: c.Uint64(flags.DatabaseMaxIdleConns.Name),
DatabaseMaxOpenConns: c.Uint64(flags.DatabaseMaxOpenConns.Name),
DatabaseMaxConnLifetime: c.Uint64(flags.DatabaseConnMaxLifetime.Name),
QueueUsername: c.String(flags.QueueUsername.Name),
QueuePassword: c.String(flags.QueuePassword.Name),
QueuePort: c.Uint64(flags.QueuePort.Name),
QueueHost: c.String(flags.QueueHost.Name),
SrcRPCUrl: c.String(flags.SrcRPCUrl.Name),
DestRPCUrl: c.String(flags.DestRPCUrl.Name),
BlockBatchSize: c.Uint64(flags.BlockBatchSize.Name),
NumGoroutines: c.Uint64(flags.MaxNumGoroutines.Name),
SubscriptionBackoff: c.Uint64(flags.SubscriptionBackoff.Name),
WatchMode: WatchMode(c.String(flags.WatchMode.Name)),
SyncMode: SyncMode(c.String(flags.SyncMode.Name)),
ETHClientTimeout: c.Uint64(flags.ETHClientTimeout.Name),
NumLatestBlocksToIgnoreWhenCrawling: c.Uint64(flags.NumLatestBlocksToIgnoreWhenCrawling.Name),
EventName: c.String(flags.EventName.Name),
BackOffMaxRetries: c.Uint64(flags.BackOffMaxRetrys.Name),
BackOffRetryInterval: c.Duration(flags.BackOffRetryInterval.Name),
SrcBridgeAddress: common.HexToAddress(c.String(flags.SrcBridgeAddress.Name)),
SrcTaikoAddress: common.HexToAddress(c.String(flags.SrcTaikoAddress.Name)),
SrcSignalServiceAddress: common.HexToAddress(c.String(flags.SrcSignalServiceAddress.Name)),
DestBridgeAddress: common.HexToAddress(c.String(flags.DestBridgeAddress.Name)),
DatabaseUsername: c.String(flags.DatabaseUsername.Name),
DatabasePassword: c.String(flags.DatabasePassword.Name),
DatabaseName: c.String(flags.DatabaseName.Name),
DatabaseHost: c.String(flags.DatabaseHost.Name),
DatabaseMaxIdleConns: c.Uint64(flags.DatabaseMaxIdleConns.Name),
DatabaseMaxOpenConns: c.Uint64(flags.DatabaseMaxOpenConns.Name),
DatabaseMaxConnLifetime: c.Uint64(flags.DatabaseConnMaxLifetime.Name),
QueueUsername: c.String(flags.QueueUsername.Name),
QueuePassword: c.String(flags.QueuePassword.Name),
QueuePort: c.Uint64(flags.QueuePort.Name),
QueueHost: c.String(flags.QueueHost.Name),
SrcRPCUrl: c.String(flags.SrcRPCUrl.Name),
DestRPCUrl: c.String(flags.DestRPCUrl.Name),
BlockBatchSize: c.Uint64(flags.BlockBatchSize.Name),
NumGoroutines: c.Uint64(flags.MaxNumGoroutines.Name),
SubscriptionBackoff: c.Uint64(flags.SubscriptionBackoff.Name),
WatchMode: WatchMode(c.String(flags.WatchMode.Name)),
SyncMode: SyncMode(c.String(flags.SyncMode.Name)),
ETHClientTimeout: c.Uint64(flags.ETHClientTimeout.Name),
NumLatestBlocksEndWhenCrawling: c.Uint64(flags.NumLatestBlocksEndWhenCrawling.Name),
NumLatestBlocksStartWhenCrawling: c.Uint64(flags.NumLatestBlocksStartWhenCrawling.Name),
EventName: c.String(flags.EventName.Name),
BackOffMaxRetries: c.Uint64(flags.BackOffMaxRetrys.Name),
BackOffRetryInterval: c.Duration(flags.BackOffRetryInterval.Name),
TargetBlockNumber: func() *uint64 {
if c.IsSet(flags.TargetBlockNumber.Name) {
value := c.Uint64(flags.TargetBlockNumber.Name)
Expand Down
8 changes: 5 additions & 3 deletions packages/relayer/indexer/handle_chain_data_synced_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package indexer
import (
"context"
"encoding/json"
"math/big"

"log/slog"

Expand All @@ -16,7 +15,6 @@ import (
// handleChainDataSyncedEvent handles an individual ChainDataSynced event
func (i *Indexer) handleChainDataSyncedEvent(
ctx context.Context,
chainID *big.Int,
event *signalservice.SignalServiceChainDataSynced,
waitForConfirmations bool,
) error {
Expand Down Expand Up @@ -74,7 +72,11 @@ func (i *Indexer) handleChainDataSyncedEvent(
return errors.Wrap(err, "i.eventRepo.Save")
}

slog.Info("chainDataSynced event saved")
slog.Info("chainDataSynced event saved",
"srcChainId", i.srcChainId,
"destChainId", i.destChainId,
"SyncedChainID", event.ChainId,
)

relayer.ChainDataSyncedEventsIndexed.Inc()

Expand Down
33 changes: 22 additions & 11 deletions packages/relayer/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ type Indexer struct {

wg *sync.WaitGroup

numLatestBlocksToIgnoreWhenCrawling uint64
numLatestBlocksEndWhenCrawling uint64
numLatestBlocksStartWhenCrawling uint64

targetBlockNumber *uint64

Expand Down Expand Up @@ -233,7 +234,8 @@ func InitFromConfig(ctx context.Context, i *Indexer, cfg *Config) (err error) {

i.ethClientTimeout = time.Duration(cfg.ETHClientTimeout) * time.Second

i.numLatestBlocksToIgnoreWhenCrawling = cfg.NumLatestBlocksToIgnoreWhenCrawling
i.numLatestBlocksEndWhenCrawling = cfg.NumLatestBlocksEndWhenCrawling
i.numLatestBlocksStartWhenCrawling = cfg.NumLatestBlocksStartWhenCrawling

i.targetBlockNumber = cfg.TargetBlockNumber

Expand Down Expand Up @@ -274,7 +276,7 @@ func (i *Indexer) Start() error {
}

// set the initial processing block, which will vary by sync mode.
if err := i.setInitialIndexingBlockByMode(i.ctx, i.syncMode, i.srcChainId); err != nil {
if err := i.setInitialIndexingBlockByMode(i.syncMode, i.srcChainId); err != nil {
return errors.Wrap(err, "i.setInitialIndexingBlockByMode")
}

Expand All @@ -298,10 +300,6 @@ func (i *Indexer) eventLoop(ctx context.Context, startBlockID uint64) {

var d time.Duration = 10 * time.Second

if i.watchMode == CrawlPastBlocks {
d = 10 * time.Minute
}

t := time.NewTicker(d)

defer t.Stop()
Expand Down Expand Up @@ -333,6 +331,15 @@ func (i *Indexer) filter(ctx context.Context) error {
// ignore latest N blocks if we are crawling past blocks, they are probably in queue already
// and are not "missed", have just not been processed.
if i.watchMode == CrawlPastBlocks {
if i.numLatestBlocksEndWhenCrawling > i.numLatestBlocksStartWhenCrawling {
slog.Error("Invalid configuration",
"numLatestBlocksEndWhenCrawling", i.numLatestBlocksEndWhenCrawling,
"numLatestBlocksStartWhenCrawling", i.numLatestBlocksStartWhenCrawling,
)

return errors.New("numLatestBlocksStartWhenCrawling must be greater than numLatestBlocksEndWhenCrawling")
}

// if targetBlockNumber is not nil, we are just going to process a singular block.
if i.targetBlockNumber != nil {
slog.Info("targetBlockNumber is set", "targetBlockNumber", *i.targetBlockNumber)
Expand All @@ -342,15 +349,19 @@ func (i *Indexer) filter(ctx context.Context) error {
endBlockID = i.latestIndexedBlockNumber + 1
} else {
// set the initial processing block back to either 0 or the genesis block again.
if err := i.setInitialIndexingBlockByMode(i.ctx, i.syncMode, i.srcChainId); err != nil {
if err := i.setInitialIndexingBlockByMode(i.syncMode, i.srcChainId); err != nil {
return errors.Wrap(err, "i.setInitialIndexingBlockByMode")
}

if endBlockID > i.numLatestBlocksToIgnoreWhenCrawling {
if i.latestIndexedBlockNumber < endBlockID-i.numLatestBlocksStartWhenCrawling {
i.latestIndexedBlockNumber = endBlockID - i.numLatestBlocksStartWhenCrawling
}

if endBlockID > i.numLatestBlocksEndWhenCrawling {
// otherwise, we need to set the endBlockID as the greater of the two:
// either the endBlockID minus the number of latest blocks to ignore,
// or endBlockID.
endBlockID -= i.numLatestBlocksToIgnoreWhenCrawling
endBlockID -= i.numLatestBlocksEndWhenCrawling
}
}
}
Expand Down Expand Up @@ -594,7 +605,7 @@ func (i *Indexer) indexChainDataSyncedEvents(ctx context.Context,
event := chainDataSyncedEvents.Event

group.Go(func() error {
err := i.handleChainDataSyncedEvent(ctx, i.srcChainId, event, true)
err := i.handleChainDataSyncedEvent(ctx, event, true)
if err != nil {
relayer.MessageStatusChangedEventsIndexingErrors.Inc()

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package indexer

import (
"context"
"math/big"

"github.com/pkg/errors"
Expand All @@ -11,7 +10,6 @@ import (
// setInitialIndexingBlockByMode takes in a SyncMode and determines how we should
// start our indexing
func (i *Indexer) setInitialIndexingBlockByMode(
ctx context.Context,
mode SyncMode,
chainID *big.Int,
) error {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package indexer

import (
"context"
"math/big"
"testing"

Expand Down Expand Up @@ -51,7 +50,6 @@ func Test_setInitialIndexingBlockByMode(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
svc, _ := newTestService(tt.mode, FilterAndSubscribe)
err := svc.setInitialIndexingBlockByMode(
context.Background(),
tt.mode,
tt.chainID,
)
Expand Down

0 comments on commit 0e031b3

Please sign in to comment.