Skip to content

Commit

Permalink
cleanup: simplify the concurrency configuration
Browse files Browse the repository at this point in the history
The previous code was confusing, the RPC-CONCURRENCY was configuring the
number of parallel blocks we could ingest, instead of the real jsonRPC
concurrency to the RPC node.

This (in a backwards compatible way) makes the variable configure the
jsonRPC max concurrency. The default value changed from: 25 to 80 to
account for this.

Number of concurrent workers to fetch blocks is now configured
internally to: RPC_CONCURRENCY / 4.
  • Loading branch information
msf committed Jul 29, 2024
1 parent 1c556be commit 675ed08
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 58 deletions.
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,16 @@ Also, we mention some of the options here:
The `log` flag (environment variable `LOG`) controls the log level. Use `--log debug`/`LOG=debug` to emit more logs than the default `info` level. To emit less logs, use `warn`, or `error` (least).

### Tuning RPC concurrency
The flag `--rpc-concurrency` (environment variable `RPC_CONCURRENCY`) specifies the number of threads (goroutines)
to run concurrently to perform RPC node requests. Default is 25.
The flag `--rpc-concurrency` (environment variable `RPC_CONCURRENCY`) specifies the number of threads (goroutines) to run concurrently to perform RPC node requests. See `--help` for up to date default value.

### Tuning for throughput
Throughput depends on: latency & request rate between RPC <-> Node Indexer <--> DuneAPI and can be tuned via a combination of:
1. RPC_CONCURRENCY, higher values feed more blocks into the node indexer to process
1. MAX_BATCH_SIZE, higher values send more blocks per request to DuneAPI
1. BLOCK_SUBMIT_INTERVAL, the interval at which blocks to DuneAPI
See `--help` for up to date default values.



### RPC poll interval
The flag `--rpc-poll-interval` (environment variable `RPC_POLL_INTERVAL`) specifies the duration to wait before checking
Expand Down
26 changes: 14 additions & 12 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func main() {
EVMStack: cfg.RPCStack,
// real max request concurrency to RPP node
// each block requires multiple RPC requests
TotalRPCConcurrency: cfg.BlockConcurrency * 4,
TotalRPCConcurrency: cfg.RPCConcurrency,
})
if err != nil {
stdlog.Fatal(err)
Expand Down Expand Up @@ -145,17 +145,19 @@ func main() {
duneClient,
duneClientDLQ,
ingester.Config{
MaxConcurrentRequests: cfg.BlockConcurrency,
MaxConcurrentRequestsDLQ: cfg.DLQBlockConcurrency,
MaxBatchSize: cfg.MaxBatchSize,
ReportProgressInterval: cfg.ReportProgressInterval,
PollInterval: cfg.PollInterval,
PollDLQInterval: cfg.PollDLQInterval,
Stack: cfg.RPCStack,
BlockchainName: cfg.BlockchainName,
BlockSubmitInterval: cfg.BlockSubmitInterval,
SkipFailedBlocks: cfg.RPCNode.SkipFailedBlocks,
DLQOnly: cfg.DLQOnly,
// OpStack does 3 requests per block, ArbitrumNova is variable
// leave some room for other requests
MaxConcurrentBlocks: cfg.RPCConcurrency / 4,
DLQMaxConcurrentBlocks: cfg.DLQBlockConcurrency,
MaxBatchSize: cfg.MaxBatchSize,
ReportProgressInterval: cfg.ReportProgressInterval,
PollInterval: cfg.PollInterval,
PollDLQInterval: cfg.PollDLQInterval,
Stack: cfg.RPCStack,
BlockchainName: cfg.BlockchainName,
BlockSubmitInterval: cfg.BlockSubmitInterval,
SkipFailedBlocks: cfg.RPCNode.SkipFailedBlocks,
DLQOnly: cfg.DLQOnly,
},
progress,
dlqBlockNumbers,
Expand Down
13 changes: 6 additions & 7 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,12 @@ type Config struct {
DLQRetryInterval time.Duration `long:"dlq-retry-interval" env:"DLQ_RETRY_INTERVAL" description:"Interval for linear backoff in DLQ " default:"1m"` // nolint:lll
ReportProgressInterval time.Duration `long:"report-progress-interval" env:"REPORT_PROGRESS_INTERVAL" description:"Interval to report progress" default:"30s"` // nolint:lll
RPCNode RPCClient
RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll
// kept the old cmdline arg names and env variables for backwards compatibility
BlockConcurrency int `long:"rpc-concurrency" env:"RPC_CONCURRENCY" description:"Number of concurrent block requests to the RPC node" default:"25"` // nolint:lll
DLQBlockConcurrency int `long:"dlq-concurrency" env:"DLQ_CONCURRENCY" description:"Number of concurrent block requests to the RPC node for DLQ processing" default:"2"` // nolint:lll
BlockSubmitInterval time.Duration `long:"block-submit-interval" env:"BLOCK_SUBMIT_INTERVAL" description:"Interval at which to submit batched blocks to Dune" default:"500ms"` // nolint:lll
LogLevel string `long:"log" env:"LOG" description:"Log level" choice:"info" choice:"debug" choice:"warn" choice:"error" default:"info"` // nolint:lll
MaxBatchSize int `long:"max-batch-size" env:"MAX_BATCH_SIZE" description:"Max number of blocks to send per batch (max:256)" default:"128"` // nolint:lll
RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll
RPCConcurrency int `long:"rpc-concurrency" env:"RPC_CONCURRENCY" description:"Number of maximum concurrent jsonRPC requests to the RPC node" default:"80"` // nolint:lll
DLQBlockConcurrency int `long:"dlq-concurrency" env:"DLQ_CONCURRENCY" description:"Number of concurrent block requests to the RPC node for DLQ processing" default:"2"` // nolint:lll
BlockSubmitInterval time.Duration `long:"block-submit-interval" env:"BLOCK_SUBMIT_INTERVAL" description:"Interval at which to submit batched blocks to Dune" default:"500ms"` // nolint:lll
LogLevel string `long:"log" env:"LOG" description:"Log level" choice:"info" choice:"debug" choice:"warn" choice:"error" default:"info"` // nolint:lll
MaxBatchSize int `long:"max-batch-size" env:"MAX_BATCH_SIZE" description:"Max number of blocks to send per batch (max:256)" default:"128"` // nolint:lll
}

func (c Config) HasError() error {
Expand Down
22 changes: 11 additions & 11 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,17 @@ const (
)

type Config struct {
MaxConcurrentRequests int
MaxConcurrentRequestsDLQ int
PollInterval time.Duration
PollDLQInterval time.Duration
ReportProgressInterval time.Duration
Stack models.EVMStack
BlockchainName string
BlockSubmitInterval time.Duration
SkipFailedBlocks bool
DLQOnly bool
MaxBatchSize int
MaxConcurrentBlocks int
DLQMaxConcurrentBlocks int
PollInterval time.Duration
PollDLQInterval time.Duration
ReportProgressInterval time.Duration
Stack models.EVMStack
BlockchainName string
BlockSubmitInterval time.Duration
SkipFailedBlocks bool
DLQOnly bool
MaxBatchSize int
}

type ingester struct {
Expand Down
16 changes: 8 additions & 8 deletions ingester/mainloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int
registerIngesterMetrics(i)

if i.cfg.DLQOnly {
i.cfg.MaxConcurrentRequests = 0 // if running DLQ Only mode, ignore the MaxConcurrentRequests and set this to 0
i.cfg.MaxConcurrentBlocks = 0 // if running DLQ Only mode, ignore the MaxConcurrentRequests and set this to 0
} else {
if i.cfg.MaxConcurrentRequests <= 0 {
if i.cfg.MaxConcurrentBlocks <= 0 {
return errors.Errorf("MaxConcurrentRequests must be > 0")
}
}
if i.cfg.MaxConcurrentRequestsDLQ <= 0 {
if i.cfg.DLQMaxConcurrentBlocks <= 0 {
return errors.Errorf("MaxConcurrentRequestsDLQ must be > 0")
}

Expand All @@ -52,8 +52,8 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int
blocks := make(chan models.RPCBlock, 2*maxBatchSize)
defer close(blocks)

// Start MaxBatchSize goroutines to consume blocks concurrently
for range i.cfg.MaxConcurrentRequests {
// Start MaxConcurrentBlocks goroutines to consume blocks concurrently
for range i.cfg.MaxConcurrentBlocks {
errGroup.Go(func() error {
return i.FetchBlockLoop(ctx, blockNumbers, blocks)
})
Expand All @@ -70,13 +70,13 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int
blockNumbersDLQ := make(chan dlq.Item[int64])
defer close(blockNumbersDLQ)

blocksDLQ := make(chan dlq.Item[models.RPCBlock], i.cfg.MaxConcurrentRequestsDLQ+1)
blocksDLQ := make(chan dlq.Item[models.RPCBlock], i.cfg.DLQMaxConcurrentBlocks+1)
defer close(blocksDLQ)

errGroup.Go(func() error {
return i.SendBlocksDLQ(ctx, blocksDLQ)
})
for range i.cfg.MaxConcurrentRequestsDLQ {
for range i.cfg.DLQMaxConcurrentBlocks {
errGroup.Go(func() error {
return i.FetchBlockLoopDLQ(ctx, blockNumbersDLQ, blocksDLQ)
})
Expand All @@ -91,7 +91,7 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int
"runForever", maxCount <= 0,
"startBlockNumber", startBlockNumber,
"endBlockNumber", endBlockNumber,
"maxConcurrency", i.cfg.MaxConcurrentRequests,
"maxConcurrency", i.cfg.MaxConcurrentBlocks,
)

// Produce block numbers in the main goroutine
Expand Down
36 changes: 18 additions & 18 deletions ingester/mainloop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ func TestRunUntilCancel(t *testing.T) {
duneapi,
duneapi,
ingester.Config{
BlockSubmitInterval: time.Nanosecond,
MaxConcurrentRequests: 10,
MaxConcurrentRequestsDLQ: 2,
SkipFailedBlocks: false,
BlockSubmitInterval: time.Nanosecond,
MaxConcurrentBlocks: 10,
DLQMaxConcurrentBlocks: 2,
SkipFailedBlocks: false,
},
nil, // progress
dlq.NewDLQ[int64](),
Expand Down Expand Up @@ -262,8 +262,8 @@ func TestRunBlocksOutOfOrder(t *testing.T) {
duneapi,
duneapi,
ingester.Config{
MaxConcurrentRequests: 20,
MaxConcurrentRequestsDLQ: 2, // fetch blocks in multiple goroutines
MaxConcurrentBlocks: 20,
DLQMaxConcurrentBlocks: 2, // fetch blocks in multiple goroutines
// big enough compared to the time spent in block by number to ensure batching. We panic
// in the mocked Dune client if we don't get a batch of blocks (more than one block).
BlockSubmitInterval: 50 * time.Millisecond,
Expand Down Expand Up @@ -315,10 +315,10 @@ func TestRunRPCNodeFails(t *testing.T) {
duneapi,
duneapi,
ingester.Config{
MaxConcurrentRequests: 10,
MaxConcurrentRequestsDLQ: 2,
BlockSubmitInterval: time.Millisecond,
SkipFailedBlocks: false,
MaxConcurrentBlocks: 10,
DLQMaxConcurrentBlocks: 2,
BlockSubmitInterval: time.Millisecond,
SkipFailedBlocks: false,
},
nil, // progress
dlq.NewDLQ[int64](),
Expand All @@ -338,7 +338,7 @@ func TestRunFailsIfNoConcurrentRequests(t *testing.T) {
nil,
nil,
ingester.Config{
MaxConcurrentRequests: 0,
MaxConcurrentBlocks: 0,
},
nil, // progress
dlq.NewDLQ[int64](),
Expand All @@ -357,8 +357,8 @@ func TestRunFailsIfNoConcurrentRequestsDLQ(t *testing.T) {
nil,
nil,
ingester.Config{
MaxConcurrentRequests: 10,
MaxConcurrentRequestsDLQ: 0,
MaxConcurrentBlocks: 10,
DLQMaxConcurrentBlocks: 0,
},
nil, // progress
dlq.NewDLQ[int64](),
Expand Down Expand Up @@ -478,11 +478,11 @@ func TestRunWithDLQ(t *testing.T) {
duneapi,
duneapi,
ingester.Config{
BlockSubmitInterval: time.Nanosecond,
MaxConcurrentRequests: 10,
MaxConcurrentRequestsDLQ: 1,
DLQOnly: false,
SkipFailedBlocks: true,
BlockSubmitInterval: time.Nanosecond,
MaxConcurrentBlocks: 10,
DLQMaxConcurrentBlocks: 1,
DLQOnly: false,
SkipFailedBlocks: true,
},
nil, // progress
dlqBlockNumbers,
Expand Down

0 comments on commit 675ed08

Please sign in to comment.