Skip to content

Commit

Permalink
Implement boosted catch-up sync with cfx_getLogs (#256)
Browse files Browse the repository at this point in the history
- Optimize catch-up using `cfx_getLogs` when only event logs are required.
- Add coordinator for task orchestration and dynamic sizing.
- Use memory-based backpressure to manage resource usage.
- Periodically persist epoch data to reduce memory consumption.
  • Loading branch information
wanliqun authored Jan 14, 2025
1 parent 9deb666 commit 45f82e7
Show file tree
Hide file tree
Showing 11 changed files with 963 additions and 77 deletions.
125 changes: 75 additions & 50 deletions config/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,58 +98,83 @@ eth:
# # Percentiles of average txn gas price mapped to three levels of urgency (`low`, `medium` and `high`).
# percentiles: [1, 50, 99]

# Blockchain sync configurations
sync:
# Core space sync configurations
#
# Pub/Sub configurations
sub:
# Channel size to buffer notified epoch response
buffer: 1000
# # Whether to use `epoch_getEpochReceipts` to batch get receipts
# useBatch: false
# # The epoch number from which to sync core space
# fromEpoch: 0
# # Maximum number of epochs to batch sync once
# maxEpochs: 10
# Blacklisted contract address(es) whose event logs will be ignored until some specific
# epoch height, with 0 means always.
blackListAddrs: >
[
{"address": "cfx:acav5v98np8t3m66uw7x61yer1ja1jm0dpzj1zyzxv", "epoch": 0}
]
# # Fast cache-up sync configuration
# catchup:
# # Pool of fullnodes for catching up. There will be 1 goroutine per fullnode or
# # the catch up will be disabled if none fullnode provided.
# cfxPool: [http://test.confluxrpc.com]
# # Threshold for number of db rows per batch persistence
# dbRowsThreshold: 2500
# # Max number of db rows collected before persistence to restrict memory usage
# maxDbRows: 7500
# # Capacity of channel per worker to buffer queried epoch data
# workerChanSize: 5
# # Blockchain sync configurations
# sync:
# # Core space sync configurations
# #
# # Pub/Sub configurations
# sub:
# # Channel size to buffer notified epoch response
# buffer: 1000
# # Whether to use `epoch_getEpochReceipts` to batch get receipts
# useBatch: false
# # The epoch number from which to sync core space
# fromEpoch: 0
# # Maximum number of epochs to batch sync once
# maxEpochs: 10
# # Blacklisted contract address(es) whose event logs will be ignored until some specific
# # epoch height, with 0 means always.
# blackListAddrs: >
# [
# {"address": "cfx:acav5v98np8t3m66uw7x61yer1ja1jm0dpzj1zyzxv", "epoch": 0}
# ]
# # Fast cache-up sync configuration
# catchup:
# # Pool of fullnodes for catching up. There will be 1 goroutine per fullnode or
# # the catch up will be disabled if none fullnode provided.
# cfxPool: [http://test.confluxrpc.com]
# # Threshold for number of db rows per batch persistence
# dbRowsThreshold: 2500
# # Max number of db rows collected before persistence to restrict memory usage
# maxDbRows: 7500
# # Capacity of channel per worker to buffer queried epoch data
# workerChanSize: 5
# # Whether to enable benchmark.
# benchmark: false
# # Boost mode configuration
# boost:
# # Task queue sizes to schedule tasks
# taskQueueSize: 1000
# # Task result queue sizes to collect results
# resultQueueSize: 1000
# # Size of buffer for storage write
# writeBufferSize: 3000
# # Default task size and bounds
# defaultTaskSize: 100
# minTaskSize: 1
# maxTaskSize: 1000
# # Task size adjustment ratios
# incrementRatio: 0.2
# decrementRatio: 0.5
# # Maximum sample size for dynamic new task size estimation
# maxSampleSize: 20
# # Max allowed memory usage (in bytes), no limit if 0
# memoryThreshold: 0
# # Memory check interval
# memoryCheckInterval: 20s
# # Force persistence interval
# forcePersistenceInterval: 45s

# # EVM space sync configurations
# eth:
# # The block number from which to sync evm space, better use the evm space hardfork point:
# # for mainnet it is 36935000, for testnet it is 61465000
# fromBlock: 61465000
# # Maximum number of blocks to batch sync ETH data once
# maxBlocks: 10
# # EVM space sync configurations
# eth:
# # The block number from which to sync evm space, better use the evm space hardfork point:
# # for mainnet it is 36935000, for testnet it is 61465000
# fromBlock: 61465000
# # Maximum number of blocks to batch sync ETH data once
# maxBlocks: 10

# # HA leader/follower election.
# election:
# # Enable/disable leader election
# enabled: false
# # The leader identity
# id: "leader"
# # The duration of the leader term
# lease: 1m
# # the amount of time to wait between retries of becoming the leader
# retry: 5s
# # the time interval at which the leader will try to renew its term
# renew: 15s
# # HA leader/follower election.
# election:
# # Enable/disable leader election
# enabled: false
# # The leader identity
# id: "leader"
# # The duration of the leader term
# lease: 1m
# # the amount of time to wait between retries of becoming the leader
# retry: 5s
# # the time interval at which the leader will try to renew its term
# renew: 15s

# # Metrics configurations
# metrics:
Expand Down
2 changes: 1 addition & 1 deletion store/epoch_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func queryEpochData(cfx sdk.ClientOperator, epochNumber uint64, useBatch bool) (
}

func validateBlock(block *types.Block, epochNumber uint64, hash types.Hash) error {
if block.GasUsed == nil { // block is not executed yet?
if epochNumber != 0 && block.GasUsed == nil { // block is not executed yet?
return errors.WithMessage(errBlockValidationFailed, "gas used is nil")
}

Expand Down
7 changes: 1 addition & 6 deletions store/mysql/store_log_addr.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/Conflux-Chain/confura/store"
"github.com/Conflux-Chain/confura/util"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"gorm.io/gorm"
)

Expand Down Expand Up @@ -84,11 +83,7 @@ func (ls *AddressIndexedLogStore) convertToPartitionedLogs(

receipt, ok := data.Receipts[tx.Hash]
if !ok {
// should never occur, just to ensure code robust
logrus.WithFields(logrus.Fields{
"epoch": data.Number,
"tx": tx.Hash,
}).Error("Cannot find transaction receipt in epoch data")
// This could happen if there are no event logs for this transaction.
continue
}

Expand Down
35 changes: 22 additions & 13 deletions sync/catchup/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ type benchmarker struct {
}

func newBenchmarker() *benchmarker {
if !gmetrics.Enabled {
logrus.Warn("Geth metrics are not enabled, which will prevent performance metrics from being collected.")
}
return &benchmarker{
persistDbRowsMetrics: gmetrics.NewHistogram(gmetrics.NewExpDecaySample(1024, 0.015)),
persistEpochsMetrics: gmetrics.NewHistogram(gmetrics.NewExpDecaySample(1024, 0.015)),
Expand Down Expand Up @@ -55,7 +58,7 @@ func (b *benchmarker) report(start, end uint64) {
}()

totalDuration := b.endTime.Sub(b.startTime)
totalEpochs := end - start
totalEpochs := end - start + 1

totalPersistDuration := atomic.LoadInt64((*int64)(&b.totalPersistDuration))
totalPersistDbRows := atomic.LoadInt64(&b.totalPersistDbRows)
Expand Down Expand Up @@ -96,40 +99,45 @@ func (b *benchmarker) report(start, end uint64) {
fmt.Printf(" max batch db rows: %v\n", b.persistDbRowsMetrics.Snapshot().Max())
fmt.Printf(" min batch db rows: %v\n", b.persistDbRowsMetrics.Snapshot().Min())
fmt.Printf("mean batch db rows: %v\n", b.persistDbRowsMetrics.Snapshot().Mean())
fmt.Printf(" p99 batch db rows: %v\n", b.persistDbRowsMetrics.Snapshot().Percentile(99))
fmt.Printf(" p75 batch db rows: %v\n", b.persistDbRowsMetrics.Snapshot().Percentile(75))
fmt.Printf(" p99 batch db rows: %v\n", b.persistDbRowsMetrics.Snapshot().Percentile(0.99))
fmt.Printf(" p75 batch db rows: %v\n", b.persistDbRowsMetrics.Snapshot().Percentile(0.75))
fmt.Printf(" p50 batch db rows: %v\n", b.persistDbRowsMetrics.Snapshot().Percentile(0.50))

fmt.Println("// --------- batch persisted epochs -----------")
fmt.Printf(" total epochs: %v\n", b.persistEpochsMetrics.Snapshot().Sum())
fmt.Printf(" max batch epochs: %v\n", b.persistEpochsMetrics.Snapshot().Max())
fmt.Printf(" min batch epochs: %v\n", b.persistEpochsMetrics.Snapshot().Min())
fmt.Printf("mean batch epochs: %v\n", b.persistEpochsMetrics.Snapshot().Mean())
fmt.Printf(" p99 batch epochs: %v\n", b.persistEpochsMetrics.Snapshot().Percentile(99))
fmt.Printf(" p75 batch epochs: %v\n", b.persistEpochsMetrics.Snapshot().Percentile(75))
fmt.Printf(" p99 batch epochs: %v\n", b.persistEpochsMetrics.Snapshot().Percentile(0.99))
fmt.Printf(" p75 batch epochs: %v\n", b.persistEpochsMetrics.Snapshot().Percentile(0.75))
fmt.Printf(" p50 batch epochs: %v\n", b.persistEpochsMetrics.Snapshot().Percentile(0.50))

fmt.Println("// ------ batch persisted db durations --------")
fmt.Printf("total duration: %.2f(ms)\n", float64(b.persistTimer.Snapshot().Sum())/1e6)
fmt.Printf(" max duration: %.2f(ms)\n", float64(b.persistTimer.Snapshot().Max())/1e6)
fmt.Printf(" min duration: %.2f(ms)\n", float64(b.persistTimer.Snapshot().Min())/1e6)
fmt.Printf(" mean duration: %.2f(ms)\n", b.persistTimer.Snapshot().Mean()/1e6)
fmt.Printf(" p99 duration: %.2f(ms)\n", float64(b.persistTimer.Snapshot().Percentile(99))/1e6)
fmt.Printf(" p75 duration: %.2f(ms)\n", float64(b.persistTimer.Snapshot().Percentile(75))/1e6)
fmt.Printf(" p99 duration: %.2f(ms)\n", float64(b.persistTimer.Snapshot().Percentile(0.99))/1e6)
fmt.Printf(" p75 duration: %.2f(ms)\n", float64(b.persistTimer.Snapshot().Percentile(0.75))/1e6)
fmt.Printf(" p50 duration: %.2f(ms)\n", float64(b.persistTimer.Snapshot().Percentile(0.50))/1e6)

fmt.Println("// ------ avg persist duration/db row ---------")
fmt.Printf("total duration: %.2f(ms)\n", float64(b.avgPersistDurationPerDbRowMetrics.Snapshot().Sum())/1e6)
fmt.Printf(" max duration: %.2f(ms)\n", float64(b.avgPersistDurationPerDbRowMetrics.Snapshot().Max())/1e6)
fmt.Printf(" min duration: %.2f(ms)\n", float64(b.avgPersistDurationPerDbRowMetrics.Snapshot().Min())/1e6)
fmt.Printf(" mean duration: %.2f(ms)\n", float64(b.avgPersistDurationPerDbRowMetrics.Snapshot().Mean())/1e6)
fmt.Printf(" p99 duration: %.2f(ms)\n", float64(b.avgPersistDurationPerDbRowMetrics.Snapshot().Percentile(99))/1e6)
fmt.Printf(" p75 duration: %.2f(ms)\n", float64(b.avgPersistDurationPerDbRowMetrics.Snapshot().Percentile(75))/1e6)
fmt.Printf(" p99 duration: %.2f(ms)\n", float64(b.avgPersistDurationPerDbRowMetrics.Snapshot().Percentile(0.99))/1e6)
fmt.Printf(" p75 duration: %.2f(ms)\n", float64(b.avgPersistDurationPerDbRowMetrics.Snapshot().Percentile(0.75))/1e6)
fmt.Printf(" p50 duration: %.2f(ms)\n", float64(b.avgPersistDurationPerDbRowMetrics.Snapshot().Percentile(0.50))/1e6)

fmt.Println("// ------ avg persist duration/epoch ----------")
fmt.Printf("total duration: %.2f(ms)\n", float64(b.avgPersistDurationPerEpochMetrics.Snapshot().Sum())/1e6)
fmt.Printf(" max duration: %.2f(ms)\n", float64(b.avgPersistDurationPerEpochMetrics.Snapshot().Max())/1e6)
fmt.Printf(" min duration: %.2f(ms)\n", float64(b.avgPersistDurationPerEpochMetrics.Snapshot().Min())/1e6)
fmt.Printf(" mean duration: %.2f(ms)\n", float64(b.avgPersistDurationPerEpochMetrics.Snapshot().Mean())/1e6)
fmt.Printf(" p99 duration: %.2f(ms)\n", float64(b.avgPersistDurationPerEpochMetrics.Snapshot().Percentile(99))/1e6)
fmt.Printf(" p75 duration: %.2f(ms)\n", float64(b.avgPersistDurationPerEpochMetrics.Snapshot().Percentile(75))/1e6)
fmt.Printf(" p99 duration: %.2f(ms)\n", float64(b.avgPersistDurationPerEpochMetrics.Snapshot().Percentile(0.99))/1e6)
fmt.Printf(" p75 duration: %.2f(ms)\n", float64(b.avgPersistDurationPerEpochMetrics.Snapshot().Percentile(0.75))/1e6)
fmt.Printf(" p50 duration: %.2f(ms)\n", float64(b.avgPersistDurationPerEpochMetrics.Snapshot().Percentile(0.50))/1e6)

fmt.Println("// --------- batch persisted db tps -----------")
fmt.Printf("mean tps: %v\n", b.persistTimer.Snapshot().RateMean())
Expand All @@ -142,8 +150,9 @@ func (b *benchmarker) report(start, end uint64) {
fmt.Printf(" max duration: %.2f(ms)\n", float64(b.fetchPerEpochTimer.Snapshot().Max())/1e6)
fmt.Printf(" min duration: %.2f(ms)\n", float64(b.fetchPerEpochTimer.Snapshot().Min()/1e6))
fmt.Printf(" mean duration: %.2f(ms)\n", b.fetchPerEpochTimer.Snapshot().Mean()/1e6)
fmt.Printf(" p99 duration: %.2f(ms)\n", float64(b.fetchPerEpochTimer.Snapshot().Percentile(99))/1e6)
fmt.Printf(" p75 duration: %.2f(ms)\n", float64(b.fetchPerEpochTimer.Snapshot().Percentile(75))/1e6)
fmt.Printf(" p99 duration: %.2f(ms)\n", float64(b.fetchPerEpochTimer.Snapshot().Percentile(0.99))/1e6)
fmt.Printf(" p75 duration: %.2f(ms)\n", float64(b.fetchPerEpochTimer.Snapshot().Percentile(0.75))/1e6)
fmt.Printf(" p50 duration: %.2f(ms)\n", float64(b.fetchPerEpochTimer.Snapshot().Percentile(0.50))/1e6)

fmt.Println("// ------------- epoch fetch tps --------------")
fmt.Printf("mean tps: %v\n", b.fetchPerEpochTimer.Snapshot().RateMean())
Expand Down
Loading

0 comments on commit 45f82e7

Please sign in to comment.