diff --git a/config/config.yml b/config/config.yml index f52d894..16543c8 100644 --- a/config/config.yml +++ b/config/config.yml @@ -98,58 +98,83 @@ eth: # # Percentiles of average txn gas price mapped to three levels of urgency (`low`, `medium` and `high`). # percentiles: [1, 50, 99] -# Blockchain sync configurations -sync: - # Core space sync configurations - # - # Pub/Sub configurations - sub: - # Channel size to buffer notified epoch response - buffer: 1000 - # # Whether to use `epoch_getEpochReceipts` to batch get receipts - # useBatch: false - # # The epoch number from which to sync core space - # fromEpoch: 0 - # # Maximum number of epochs to batch sync once - # maxEpochs: 10 - # Blacklisted contract address(es) whose event logs will be ignored until some specific - # epoch height, with 0 means always. - blackListAddrs: > - [ - {"address": "cfx:acav5v98np8t3m66uw7x61yer1ja1jm0dpzj1zyzxv", "epoch": 0} - ] - # # Fast cache-up sync configuration - # catchup: - # # Pool of fullnodes for catching up. There will be 1 goroutine per fullnode or - # # the catch up will be disabled if none fullnode provided. - # cfxPool: [http://test.confluxrpc.com] - # # Threshold for number of db rows per batch persistence - # dbRowsThreshold: 2500 - # # Max number of db rows collected before persistence to restrict memory usage - # maxDbRows: 7500 - # # Capacity of channel per worker to buffer queried epoch data - # workerChanSize: 5 +# # Blockchain sync configurations +# sync: +# # Core space sync configurations +# # +# # Pub/Sub configurations +# sub: +# # Channel size to buffer notified epoch response +# buffer: 1000 +# # Whether to use `epoch_getEpochReceipts` to batch get receipts +# useBatch: false +# # The epoch number from which to sync core space +# fromEpoch: 0 +# # Maximum number of epochs to batch sync once +# maxEpochs: 10 +# # Blacklisted contract address(es) whose event logs will be ignored until some specific +# # epoch height, with 0 means always. +# blackListAddrs: > +# [ +# {"address": "cfx:acav5v98np8t3m66uw7x61yer1ja1jm0dpzj1zyzxv", "epoch": 0} +# ] +# # Fast cache-up sync configuration +# catchup: +# # Pool of fullnodes for catching up. There will be 1 goroutine per fullnode or +# # the catch up will be disabled if none fullnode provided. +# cfxPool: [http://test.confluxrpc.com] +# # Threshold for number of db rows per batch persistence +# dbRowsThreshold: 2500 +# # Max number of db rows collected before persistence to restrict memory usage +# maxDbRows: 7500 +# # Capacity of channel per worker to buffer queried epoch data +# workerChanSize: 5 +# # Whether to enable benchmark. +# benchmark: false +# # Boost mode configuration +# boost: +# # Task queue sizes to schedule tasks +# taskQueueSize: 1000 +# # Task result queue sizes to collect results +# resultQueueSize: 1000 +# # Size of buffer for storage write +# writeBufferSize: 3000 +# # Default task size and bounds +# defaultTaskSize: 100 +# minTaskSize: 1 +# maxTaskSize: 1000 +# # Task size adjustment ratios +# incrementRatio: 0.2 +# decrementRatio: 0.5 +# # Maximum sample size for dynamic new task size estimation +# maxSampleSize: 20 +# # Max allowed memory usage (in bytes), no limit if 0 +# memoryThreshold: 0 +# # Memory check interval +# memoryCheckInterval: 20s +# # Force persistence interval +# forcePersistenceInterval: 45s - # # EVM space sync configurations - # eth: - # # The block number from which to sync evm space, better use the evm space hardfork point: - # # for mainnet it is 36935000, for testnet it is 61465000 - # fromBlock: 61465000 - # # Maximum number of blocks to batch sync ETH data once - # maxBlocks: 10 +# # EVM space sync configurations +# eth: +# # The block number from which to sync evm space, better use the evm space hardfork point: +# # for mainnet it is 36935000, for testnet it is 61465000 +# fromBlock: 61465000 +# # Maximum number of blocks to batch sync ETH data once +# maxBlocks: 10 - # # HA leader/follower election. - # election: - # # Enable/disable leader election - # enabled: false - # # The leader identity - # id: "leader" - # # The duration of the leader term - # lease: 1m - # # the amount of time to wait between retries of becoming the leader - # retry: 5s - # # the time interval at which the leader will try to renew its term - # renew: 15s +# # HA leader/follower election. +# election: +# # Enable/disable leader election +# enabled: false +# # The leader identity +# id: "leader" +# # The duration of the leader term +# lease: 1m +# # the amount of time to wait between retries of becoming the leader +# retry: 5s +# # the time interval at which the leader will try to renew its term +# renew: 15s # # Metrics configurations # metrics: diff --git a/store/epoch_data.go b/store/epoch_data.go index 2a1cf84..9af9e48 100644 --- a/store/epoch_data.go +++ b/store/epoch_data.go @@ -267,7 +267,7 @@ func queryEpochData(cfx sdk.ClientOperator, epochNumber uint64, useBatch bool) ( } func validateBlock(block *types.Block, epochNumber uint64, hash types.Hash) error { - if block.GasUsed == nil { // block is not executed yet? + if epochNumber != 0 && block.GasUsed == nil { // block is not executed yet? return errors.WithMessage(errBlockValidationFailed, "gas used is nil") } diff --git a/store/mysql/store_log_addr.go b/store/mysql/store_log_addr.go index f4febaf..d9eebf8 100644 --- a/store/mysql/store_log_addr.go +++ b/store/mysql/store_log_addr.go @@ -8,7 +8,6 @@ import ( "github.com/Conflux-Chain/confura/store" "github.com/Conflux-Chain/confura/util" "github.com/pkg/errors" - "github.com/sirupsen/logrus" "gorm.io/gorm" ) @@ -84,11 +83,7 @@ func (ls *AddressIndexedLogStore) convertToPartitionedLogs( receipt, ok := data.Receipts[tx.Hash] if !ok { - // should never occur, just to ensure code robust - logrus.WithFields(logrus.Fields{ - "epoch": data.Number, - "tx": tx.Hash, - }).Error("Cannot find transaction receipt in epoch data") + // This could happen if there are no event logs for this transaction. continue } diff --git a/sync/catchup/benchmark.go b/sync/catchup/benchmark.go index 4777117..e72ae13 100644 --- a/sync/catchup/benchmark.go +++ b/sync/catchup/benchmark.go @@ -28,6 +28,9 @@ type benchmarker struct { } func newBenchmarker() *benchmarker { + if !gmetrics.Enabled { + logrus.Warn("Geth metrics are not enabled, which will prevent performance metrics from being collected.") + } return &benchmarker{ persistDbRowsMetrics: gmetrics.NewHistogram(gmetrics.NewExpDecaySample(1024, 0.015)), persistEpochsMetrics: gmetrics.NewHistogram(gmetrics.NewExpDecaySample(1024, 0.015)), @@ -55,7 +58,7 @@ func (b *benchmarker) report(start, end uint64) { }() totalDuration := b.endTime.Sub(b.startTime) - totalEpochs := end - start + totalEpochs := end - start + 1 totalPersistDuration := atomic.LoadInt64((*int64)(&b.totalPersistDuration)) totalPersistDbRows := atomic.LoadInt64(&b.totalPersistDbRows) @@ -96,40 +99,45 @@ func (b *benchmarker) report(start, end uint64) { fmt.Printf(" max batch db rows: %v\n", b.persistDbRowsMetrics.Snapshot().Max()) fmt.Printf(" min batch db rows: %v\n", b.persistDbRowsMetrics.Snapshot().Min()) fmt.Printf("mean batch db rows: %v\n", b.persistDbRowsMetrics.Snapshot().Mean()) - fmt.Printf(" p99 batch db rows: %v\n", b.persistDbRowsMetrics.Snapshot().Percentile(99)) - fmt.Printf(" p75 batch db rows: %v\n", b.persistDbRowsMetrics.Snapshot().Percentile(75)) + fmt.Printf(" p99 batch db rows: %v\n", b.persistDbRowsMetrics.Snapshot().Percentile(0.99)) + fmt.Printf(" p75 batch db rows: %v\n", b.persistDbRowsMetrics.Snapshot().Percentile(0.75)) + fmt.Printf(" p50 batch db rows: %v\n", b.persistDbRowsMetrics.Snapshot().Percentile(0.50)) fmt.Println("// --------- batch persisted epochs -----------") fmt.Printf(" total epochs: %v\n", b.persistEpochsMetrics.Snapshot().Sum()) fmt.Printf(" max batch epochs: %v\n", b.persistEpochsMetrics.Snapshot().Max()) fmt.Printf(" min batch epochs: %v\n", b.persistEpochsMetrics.Snapshot().Min()) fmt.Printf("mean batch epochs: %v\n", b.persistEpochsMetrics.Snapshot().Mean()) - fmt.Printf(" p99 batch epochs: %v\n", b.persistEpochsMetrics.Snapshot().Percentile(99)) - fmt.Printf(" p75 batch epochs: %v\n", b.persistEpochsMetrics.Snapshot().Percentile(75)) + fmt.Printf(" p99 batch epochs: %v\n", b.persistEpochsMetrics.Snapshot().Percentile(0.99)) + fmt.Printf(" p75 batch epochs: %v\n", b.persistEpochsMetrics.Snapshot().Percentile(0.75)) + fmt.Printf(" p50 batch epochs: %v\n", b.persistEpochsMetrics.Snapshot().Percentile(0.50)) fmt.Println("// ------ batch persisted db durations --------") fmt.Printf("total duration: %.2f(ms)\n", float64(b.persistTimer.Snapshot().Sum())/1e6) fmt.Printf(" max duration: %.2f(ms)\n", float64(b.persistTimer.Snapshot().Max())/1e6) fmt.Printf(" min duration: %.2f(ms)\n", float64(b.persistTimer.Snapshot().Min())/1e6) fmt.Printf(" mean duration: %.2f(ms)\n", b.persistTimer.Snapshot().Mean()/1e6) - fmt.Printf(" p99 duration: %.2f(ms)\n", float64(b.persistTimer.Snapshot().Percentile(99))/1e6) - fmt.Printf(" p75 duration: %.2f(ms)\n", float64(b.persistTimer.Snapshot().Percentile(75))/1e6) + fmt.Printf(" p99 duration: %.2f(ms)\n", float64(b.persistTimer.Snapshot().Percentile(0.99))/1e6) + fmt.Printf(" p75 duration: %.2f(ms)\n", float64(b.persistTimer.Snapshot().Percentile(0.75))/1e6) + fmt.Printf(" p50 duration: %.2f(ms)\n", float64(b.persistTimer.Snapshot().Percentile(0.50))/1e6) fmt.Println("// ------ avg persist duration/db row ---------") fmt.Printf("total duration: %.2f(ms)\n", float64(b.avgPersistDurationPerDbRowMetrics.Snapshot().Sum())/1e6) fmt.Printf(" max duration: %.2f(ms)\n", float64(b.avgPersistDurationPerDbRowMetrics.Snapshot().Max())/1e6) fmt.Printf(" min duration: %.2f(ms)\n", float64(b.avgPersistDurationPerDbRowMetrics.Snapshot().Min())/1e6) fmt.Printf(" mean duration: %.2f(ms)\n", float64(b.avgPersistDurationPerDbRowMetrics.Snapshot().Mean())/1e6) - fmt.Printf(" p99 duration: %.2f(ms)\n", float64(b.avgPersistDurationPerDbRowMetrics.Snapshot().Percentile(99))/1e6) - fmt.Printf(" p75 duration: %.2f(ms)\n", float64(b.avgPersistDurationPerDbRowMetrics.Snapshot().Percentile(75))/1e6) + fmt.Printf(" p99 duration: %.2f(ms)\n", float64(b.avgPersistDurationPerDbRowMetrics.Snapshot().Percentile(0.99))/1e6) + fmt.Printf(" p75 duration: %.2f(ms)\n", float64(b.avgPersistDurationPerDbRowMetrics.Snapshot().Percentile(0.75))/1e6) + fmt.Printf(" p50 duration: %.2f(ms)\n", float64(b.avgPersistDurationPerDbRowMetrics.Snapshot().Percentile(0.50))/1e6) fmt.Println("// ------ avg persist duration/epoch ----------") fmt.Printf("total duration: %.2f(ms)\n", float64(b.avgPersistDurationPerEpochMetrics.Snapshot().Sum())/1e6) fmt.Printf(" max duration: %.2f(ms)\n", float64(b.avgPersistDurationPerEpochMetrics.Snapshot().Max())/1e6) fmt.Printf(" min duration: %.2f(ms)\n", float64(b.avgPersistDurationPerEpochMetrics.Snapshot().Min())/1e6) fmt.Printf(" mean duration: %.2f(ms)\n", float64(b.avgPersistDurationPerEpochMetrics.Snapshot().Mean())/1e6) - fmt.Printf(" p99 duration: %.2f(ms)\n", float64(b.avgPersistDurationPerEpochMetrics.Snapshot().Percentile(99))/1e6) - fmt.Printf(" p75 duration: %.2f(ms)\n", float64(b.avgPersistDurationPerEpochMetrics.Snapshot().Percentile(75))/1e6) + fmt.Printf(" p99 duration: %.2f(ms)\n", float64(b.avgPersistDurationPerEpochMetrics.Snapshot().Percentile(0.99))/1e6) + fmt.Printf(" p75 duration: %.2f(ms)\n", float64(b.avgPersistDurationPerEpochMetrics.Snapshot().Percentile(0.75))/1e6) + fmt.Printf(" p50 duration: %.2f(ms)\n", float64(b.avgPersistDurationPerEpochMetrics.Snapshot().Percentile(0.50))/1e6) fmt.Println("// --------- batch persisted db tps -----------") fmt.Printf("mean tps: %v\n", b.persistTimer.Snapshot().RateMean()) @@ -142,8 +150,9 @@ func (b *benchmarker) report(start, end uint64) { fmt.Printf(" max duration: %.2f(ms)\n", float64(b.fetchPerEpochTimer.Snapshot().Max())/1e6) fmt.Printf(" min duration: %.2f(ms)\n", float64(b.fetchPerEpochTimer.Snapshot().Min()/1e6)) fmt.Printf(" mean duration: %.2f(ms)\n", b.fetchPerEpochTimer.Snapshot().Mean()/1e6) - fmt.Printf(" p99 duration: %.2f(ms)\n", float64(b.fetchPerEpochTimer.Snapshot().Percentile(99))/1e6) - fmt.Printf(" p75 duration: %.2f(ms)\n", float64(b.fetchPerEpochTimer.Snapshot().Percentile(75))/1e6) + fmt.Printf(" p99 duration: %.2f(ms)\n", float64(b.fetchPerEpochTimer.Snapshot().Percentile(0.99))/1e6) + fmt.Printf(" p75 duration: %.2f(ms)\n", float64(b.fetchPerEpochTimer.Snapshot().Percentile(0.75))/1e6) + fmt.Printf(" p50 duration: %.2f(ms)\n", float64(b.fetchPerEpochTimer.Snapshot().Percentile(0.50))/1e6) fmt.Println("// ------------- epoch fetch tps --------------") fmt.Printf("mean tps: %v\n", b.fetchPerEpochTimer.Snapshot().RateMean()) diff --git a/sync/catchup/boost.go b/sync/catchup/boost.go new file mode 100644 index 0000000..6d1afc3 --- /dev/null +++ b/sync/catchup/boost.go @@ -0,0 +1,689 @@ +package catchup + +import ( + "container/heap" + "context" + "math" + "runtime" + "sort" + "sync" + "sync/atomic" + "time" + + "github.com/Conflux-Chain/confura/store" + "github.com/Conflux-Chain/confura/types" + "github.com/Conflux-Chain/confura/util/metrics" + cfxTypes "github.com/Conflux-Chain/go-conflux-sdk/types" + "github.com/Conflux-Chain/go-conflux-util/health" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +const ( + // Min priority queue capacity for shrink + minPqShrinkCapacity = 100 +) + +var ( + // Memory health configuration + memoryHealthCfg = health.CounterConfig{ + Threshold: 3, Remind: 3, + } +) + +// syncTask represents a range of epochs synchronization task +type syncTask struct { + types.RangeUint64 +} + +// newSyncTask creates a new sync task with the given epoch range +func newSyncTask(start, end uint64) syncTask { + return syncTask{RangeUint64: types.RangeUint64{From: start, To: end}} +} + +// syncTaskItem is a heap item +type syncTaskItem struct { + syncTask + index int +} + +// syncTaskPriorityQueue implements heap.Interface and is a min-heap. +type syncTaskPriorityQueue []*syncTaskItem + +func (pq syncTaskPriorityQueue) Len() int { return len(pq) } + +func (pq syncTaskPriorityQueue) Less(i, j int) bool { + return pq[i].From < pq[j].From +} + +func (pq syncTaskPriorityQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] + pq[i].index = i + pq[j].index = j +} + +func (pq *syncTaskPriorityQueue) Push(x interface{}) { + n := len(*pq) + item := x.(*syncTaskItem) + item.index = n + *pq = append(*pq, item) +} + +func (pq *syncTaskPriorityQueue) Pop() interface{} { + old := *pq + n := len(old) + if n == 0 { + return nil + } + + item := old[n-1] + old[n-1] = nil // avoid memory leak + item.index = -1 // for safety + *pq = old[0 : n-1] + + // Check if we need to shrink the underlying array to reduce memory usage + if oldCap := cap(*pq); oldCap > minPqShrinkCapacity && len(*pq) < cap(*pq)/4 { + newCap := 2 * len(*pq) + newPq := make(syncTaskPriorityQueue, len(*pq), newCap) + copy(newPq, *pq) + *pq = newPq + } + return item +} + +// syncTaskResult holds the result of a completed syncTask. +type syncTaskResult struct { + task syncTask + err error + epochData []*store.EpochData +} + +// coordinator orchestrates the synchronization process by: +// - Managing pending tasks and recall tasks +// - Adjusting task sizes dynamically +// - Handling backpressure based on memory usage +// - Collecting and ordering results for final persistence +type coordinator struct { + // Configuration + boostConfig + + // List of workers + workers []*boostWorker + + // Full epoch range for synchronization + fullEpochRange types.RangeUint64 + + // Task queues + pendingTaskQueue chan syncTask + + // Recall task priority queue + recallTaskPq syncTaskPriorityQueue + + // Task result queue + taskResultQueue chan syncTaskResult + + // Synchronization state + nextAssignEpoch uint64 + + // Result pipeline + nextWriteEpoch uint64 + epochDataStore map[uint64]*store.EpochData + epochResultChan chan<- *store.EpochData + + // Backpressure control + backpressureControl *atomic.Value +} + +func newCoordinator(cfg boostConfig, workers []*boostWorker, fullRange types.RangeUint64, resultChan chan<- *store.EpochData) *coordinator { + backpressureControl := new(atomic.Value) + backpressureControl.Store(make(chan struct{})) + return &coordinator{ + boostConfig: cfg, + workers: workers, + fullEpochRange: fullRange, + nextAssignEpoch: fullRange.From, + nextWriteEpoch: fullRange.From, + epochResultChan: resultChan, + epochDataStore: make(map[uint64]*store.EpochData), + pendingTaskQueue: make(chan syncTask, cfg.TaskQueueSize), + taskResultQueue: make(chan syncTaskResult, cfg.ResultQueueSize), + backpressureControl: backpressureControl, + } +} + +// backpressureChan returns the backpressure control channel +func (c *coordinator) backpressureChan() chan struct{} { + return c.backpressureControl.Load().(chan struct{}) +} + +func (c *coordinator) run(ctx context.Context, wg *sync.WaitGroup) { + var innerWg sync.WaitGroup + defer wg.Done() + + // Start boost workers to process assigned tasks + for _, w := range c.workers { + innerWg.Add(1) + go c.boostWorkerLoop(ctx, &innerWg, w) + } + + // Start the result dispatch loop + innerWg.Add(1) + go c.dispatchLoop(ctx, &innerWg) + + // Seeds the pending tasks queue with initial workload for workers. + c.assignTasks(ctx, c.DefaultTaskSize) + + innerWg.Wait() +} + +// boostWorkerLoop continuously processes tasks assigned to the boostWorker. +func (c *coordinator) boostWorkerLoop(ctx context.Context, wg *sync.WaitGroup, w *boostWorker) { + logrus.WithField("worker", w.name).Info("Catch-up boost worker started") + defer logrus.WithField("worker", w.name).Info("Catch-up boost worker stopped") + + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + case <-c.backpressureChan(): + time.Sleep(time.Second) + continue + default: + select { + case <-ctx.Done(): + return + case <-c.backpressureChan(): + time.Sleep(time.Second) + continue + case task := <-c.pendingTaskQueue: + epochData, err := w.queryEpochData(task.From, task.To) + if logrus.IsLevelEnabled(logrus.DebugLevel) { + logrus.WithFields(logrus.Fields{ + "worker": w.name, + "task": task, + "numEpochData": len(epochData), + "numPendingTasks": len(c.pendingTaskQueue), + }).WithError(err).Debug("Catch-up boost worker processed task") + } + c.taskResultQueue <- syncTaskResult{ + task: task, + epochData: epochData, + err: err, + } + } + } + } +} + +// dispatchLoop collects results from workers, adjusts task sizes, and dispatches new tasks. +func (c *coordinator) dispatchLoop(ctx context.Context, wg *sync.WaitGroup) { + logrus.Info("Catch-up boost coordinator dispatch loop started") + defer logrus.Info("Catch-up boost coordinator dispatch loop stopped") + + var resultHistory []syncTaskResult + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + case <-c.backpressureChan(): + time.Sleep(time.Second) + continue + default: + select { + case <-ctx.Done(): + return + case <-c.backpressureChan(): + time.Sleep(time.Second) + continue + case result := <-c.taskResultQueue: + // Collect a batch of results + taskResults := []syncTaskResult{result} + for i := 0; i < len(c.taskResultQueue); i++ { + taskResults = append(taskResults, <-c.taskResultQueue) + } + // Sort the task result + sort.Slice(taskResults, func(i, j int) bool { + r0, r1 := taskResults[i], taskResults[j] + return r0.task.From < r1.task.From + }) + // Process the batch of results + for _, r := range taskResults { + if r.err != nil { + // Recall the task by splitting and re-assigning + heap.Push(&c.recallTaskPq, &syncTaskItem{syncTask: r.task}) + resultHistory = append(resultHistory, r) + continue + } + + // Collect epoch data + if err := c.collectEpochData(ctx, r.epochData); err != nil { + return + } + resultHistory = append(resultHistory, r) + r.epochData = nil // free memory + } + // Sort the task result history + sort.Slice(resultHistory, func(i, j int) bool { + r0, r1 := resultHistory[i], resultHistory[j] + if r0.task.From == r1.task.From { + return r0.task.To > r1.task.To + } + return r0.task.From < r1.task.From + }) + // Retain the recent result history for estimation + if len(resultHistory) > c.MaxSampleSize { + resultHistory = resultHistory[len(resultHistory)-c.MaxSampleSize:] + } + // Estimate next task size and assign new tasks + nextTaskSize := c.estimateTaskSize(resultHistory) + if err := c.assignTasks(ctx, nextTaskSize); err != nil { + return + } + } + } + } +} + +// estimateTaskSize dynamically adjusts task size based on recent history. +func (c *coordinator) estimateTaskSize(results []syncTaskResult) uint64 { + if len(results) == 0 { + return c.DefaultTaskSize + } + + var totalEstSize, totalWeight float64 + for i, r := range results { + weight := math.Pow(2, float64(1+i-len(results))) + taskSize := float64(r.task.To - r.task.From + 1) + var estSize float64 + if r.err == nil { + estSize = taskSize * (1 + c.IncrementRatio) * weight + } else { + estSize = taskSize * (1 - c.DecrementRatio) * weight + } + totalEstSize += estSize + totalWeight += weight + } + + newTaskSize := uint64(math.Ceil(totalEstSize / totalWeight)) + newTaskSize = min(max(c.MinTaskSize, newTaskSize), c.MaxTaskSize) + return newTaskSize +} + +// collectEpochData accumulates epoch data in memory until contiguous and flushes them in order. +func (c *coordinator) collectEpochData(ctx context.Context, result []*store.EpochData) error { + // Cache store epoch data + for _, data := range result { + c.epochDataStore[data.Number] = data + } + + // Flush any stored epochs that are now contiguous + for { + data, ok := c.epochDataStore[c.nextWriteEpoch] + if !ok { + break + } + + if len(c.epochResultChan) >= c.WriteBufferSize { + logrus.WithFields(logrus.Fields{ + "nextWriteEpoch": c.nextWriteEpoch, + "numCacheEpochs": len(c.epochDataStore), + }).Info("Catch-up boost sync write buffer is full") + } + + select { + case <-ctx.Done(): + return ctx.Err() + case c.epochResultChan <- data: + delete(c.epochDataStore, data.Number) + c.nextWriteEpoch++ + } + } + return nil +} + +// assignTasks schedules tasks for workers, handling recall tasks first if any. +func (c *coordinator) assignTasks(ctx context.Context, taskSize uint64) error { + for numPendingTasks := len(c.pendingTaskQueue); numPendingTasks < len(c.workers); numPendingTasks++ { + // Handle recall tasks by splitting them into sub-tasks if possible + if len(c.recallTaskPq) > 0 { + recallTask := heap.Pop(&c.recallTaskPq).(*syncTaskItem).syncTask + midEpoch := (recallTask.From + recallTask.To) / 2 + if err := c.addPendingTask(ctx, recallTask.From, midEpoch); err != nil { + return err + } + if midEpoch+1 <= recallTask.To { + numPendingTasks++ + if err := c.addPendingTask(ctx, midEpoch+1, recallTask.To); err != nil { + return err + } + } + continue + } + + // The full epoch range has already been assigned + if c.nextAssignEpoch > c.fullEpochRange.To { + break + } + + end := min(c.nextAssignEpoch+taskSize-1, c.fullEpochRange.To) + if err := c.addPendingTask(ctx, c.nextAssignEpoch, end); err != nil { + return err + } + c.nextAssignEpoch = end + 1 + } + return nil +} + +func (c *coordinator) addPendingTask(ctx context.Context, start, end uint64) error { + task := newSyncTask(start, end) + if len(c.pendingTaskQueue) >= c.TaskQueueSize { + logrus.WithFields(logrus.Fields{ + "toAddTask": task, + "nextAssignEpoch": c.nextAssignEpoch, + }).Info("Catch-up boost pending task queue is full") + } + + select { + case <-ctx.Done(): + return ctx.Err() + case c.pendingTaskQueue <- task: + return nil + } +} + +// enableBackpressure toggles backpressure by closing or resetting the control channel. +func (c *coordinator) enableBackpressure(enabled bool) { + if enabled { + close(c.backpressureChan()) + } else { + c.backpressureControl.Store(make(chan struct{})) + } +} + +type boostSyncer struct { + *Syncer + + // List of boost workers + workers []*boostWorker + + // Result channel + resultChan chan *store.EpochData +} + +func newBoostSyncer(s *Syncer) *boostSyncer { + workers := make([]*boostWorker, len(s.workers)) + for i, w := range s.workers { + workers[i] = &boostWorker{w} + } + return &boostSyncer{ + Syncer: s, + workers: workers, + resultChan: make(chan *store.EpochData, s.boostConf.WriteBufferSize), + } +} + +func (s *boostSyncer) doSync(ctx context.Context, bmarker *benchmarker, start, end uint64) { + var wg sync.WaitGroup + ctx, cancel := context.WithCancel(ctx) + + // Start coordinator + fullEpochRange := types.RangeUint64{From: start, To: end} + coord := newCoordinator(s.boostConf, s.workers, fullEpochRange, s.resultChan) + + wg.Add(1) + go coord.run(ctx, &wg) + + // Start memory monitor + if s.boostConf.MemoryThreshold > 0 { + go s.memoryMonitorLoop(ctx, coord) + } + + // Start persisting results + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + + err := s.fetchAndPersistResults(ctx, start, end, bmarker) + if err != nil && !errors.Is(err, context.Canceled) { + if errors.Is(err, store.ErrLeaderRenewal) { + logrus.WithFields(logrus.Fields{ + "start": start, + "end": end, + "leaderIdentity": s.elm.Identity(), + }).Info("Catch-up boost syncer failed to renew leadership on fetching result") + } else { + logrus.WithFields(logrus.Fields{ + "start": start, + "end": end, + }).WithError(err).Error("Catch-up boost syncer failed to fetch result") + } + } + }() + + wg.Wait() +} + +// memoryMonitorLoop checks memory periodically and applies backpressure when memory is high. +func (s *boostSyncer) memoryMonitorLoop(ctx context.Context, c *coordinator) { + ticker := time.NewTicker(s.boostConf.MemoryCheckInterval) + defer ticker.Stop() + + // Counter to track memory health status + var healthStatus health.Counter + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + var memStats runtime.MemStats + runtime.ReadMemStats(&memStats) + + logger := logrus.WithFields(logrus.Fields{ + "memory": memStats.Alloc, + "threshold": s.boostConf.MemoryThreshold, + }) + + // Backpressure control according to memory usage + if memStats.Alloc < s.boostConf.MemoryThreshold { + // Memory usage is below the threshold, try to lift backpressure. + if recovered, _ := healthStatus.OnSuccess(memoryHealthCfg); recovered { + logger.Warn("Catch-up boost sync memory usage has recovered below threshold") + c.enableBackpressure(false) + } + } else { + // Memory usage exceeds the threshold, check for health degradation. + unhealthy, unrecovered, _ := healthStatus.OnFailure(memoryHealthCfg) + if unhealthy { + logger.Warn("Catch-up boost sync memory usage exceeded threshold") + c.enableBackpressure(true) + } else if unrecovered { + logger.Warn("Catch-up boost sync memory usage remains above threshold") + } + } + } + } +} + +// fetchAndPersistResults retrieves completed epoch data and persists them into the database. +func (s *boostSyncer) fetchAndPersistResults(ctx context.Context, start, end uint64, bmarker *benchmarker) error { + forceInterval := s.boostConf.ForcePersistenceInterval + timer := time.NewTimer(forceInterval) + defer timer.Stop() + + var state persistState + for eno := start; eno <= end; { + forcePersist := false + startTime := time.Now() + + select { + case <-ctx.Done(): + return ctx.Err() + case epochData := <-s.resultChan: + // collect epoch data + if epochData.Number != eno { + return errors.Errorf("unexpected epoch collected, expected %v got %v", eno, epochData.Number) + } + if bmarker != nil { + bmarker.metricFetchPerEpochDuration(startTime) + } + eno++ + s.monitor.Update(eno) + + epochDbRows, storeDbRows := state.update(epochData) + if logrus.IsLevelEnabled(logrus.DebugLevel) { + logrus.WithFields(logrus.Fields{ + "resultBufLen": len(s.resultChan), + "epochNo": epochData.Number, + "epochDbRows": epochDbRows, + "storeDbRows": storeDbRows, + "state.insertDbRows": state.insertDbRows, + "state.totalDbRows": state.totalDbRows, + }).Debug("Catch-up boost syncer collected new epoch data") + } + case <-timer.C: + // Force persist if timer expires + forcePersist = true + } + + // Batch insert into db if `forcePersist` is true or enough db rows collected, also use total db rows here to + // check if we need to persist to restrict memory usage. + if forcePersist || state.totalDbRows >= s.maxDbRows || state.insertDbRows >= s.minBatchDbRows { + if err := s.persist(ctx, &state, bmarker); err != nil { + return err + } + + state.reset() + timer.Reset(forceInterval) + } + } + + // Persist any remaining data + return s.persist(ctx, &state, bmarker) +} + +type boostWorker struct { + *worker +} + +// queryEpochData fetches blocks and logs for a given epoch range to construct a minimal `EpochData` +// using `cfx_getLogs` for best peformance. +func (w *boostWorker) queryEpochData(fromEpoch, toEpoch uint64) (res []*store.EpochData, err error) { + startTime := time.Now() + defer func() { + metrics.Registry.Sync.BoostQueryEpochData("cfx").UpdateSince(startTime) + metrics.Registry.Sync.BoostQueryEpochDataAvailability("cfx").Mark(err == nil) + if err == nil { + metrics.Registry.Sync.BoostQueryEpochRange().Update(int64(toEpoch - fromEpoch + 1)) + } + }() + + // Retrieve event logs within the specified epoch range + logFilter := cfxTypes.LogFilter{ + FromEpoch: cfxTypes.NewEpochNumberUint64(fromEpoch), + ToEpoch: cfxTypes.NewEpochNumberUint64(toEpoch), + } + logs, err := w.cfx.GetLogs(logFilter) + if err != nil { + return nil, errors.WithMessage(err, "failed to get event logs") + } + + var logCursor int + for epochNum := fromEpoch; epochNum <= toEpoch; epochNum++ { + // Initialize epoch data for the current epoch + epochData := &store.EpochData{ + Number: epochNum, + Receipts: make(map[cfxTypes.Hash]*cfxTypes.TransactionReceipt), + } + + var blockHashes []cfxTypes.Hash + blockHashes, err = w.cfx.GetBlocksByEpoch(cfxTypes.NewEpochNumberUint64(epochNum)) + if err != nil { + return nil, errors.WithMessagef(err, "failed to get blocks by epoch %v", epochNum) + } + if len(blockHashes) == 0 { + err = errors.Errorf("invalid epoch data (must have at least one block)") + return nil, err + } + + // Cache to store blocks fetched by their hash to avoid repeated network calls + blockCache := make(map[cfxTypes.Hash]*cfxTypes.Block) + + // Get the first and last block of the epoch + for _, bh := range []cfxTypes.Hash{blockHashes[0], blockHashes[len(blockHashes)-1]} { + if _, ok := blockCache[bh]; ok { + continue + } + + var block *cfxTypes.Block + block, err = w.cfx.GetBlockByHash(bh) + if err != nil { + return nil, errors.WithMessagef(err, "failed to get block by hash %v", bh) + } + if block == nil { + err = errors.Errorf("block %v not found", bh) + return nil, err + } + blockCache[bh] = block + } + + // Process logs that belong to the current epoch + for ; logCursor < len(logs); logCursor++ { + if logs[logCursor].EpochNumber.ToInt().Uint64() != epochNum { + // Move to next epoch data construction if current log doesn't belong here + break + } + + // Retrieve or fetch the block associated with the current log + blockHash := logs[logCursor].BlockHash + if _, ok := blockCache[*blockHash]; !ok { + var block *cfxTypes.Block + block, err = w.cfx.GetBlockByHash(*blockHash) + if err != nil { + return nil, errors.WithMessagef(err, "failed to get block by hash %v", *blockHash) + } + if block == nil { + err = errors.Errorf("block %v not found", *blockHash) + return nil, err + } + blockCache[*blockHash] = block + } + + // Retrieve or initialize the transaction receipt associated with the current log + txnHash := logs[logCursor].TransactionHash + txnReceipt, ok := epochData.Receipts[*txnHash] + if !ok { + txnReceipt = &cfxTypes.TransactionReceipt{ + EpochNumber: (*hexutil.Uint64)(&epochNum), + BlockHash: *blockHash, + TransactionHash: *txnHash, + } + + epochData.Receipts[*txnHash] = txnReceipt + } + + // Append the current log to the transaction receipt's logs + txnReceipt.Logs = append(txnReceipt.Logs, logs[logCursor]) + } + + // Append all necessary blocks for the epoch + for _, bh := range blockHashes { + if block, ok := blockCache[bh]; ok { + epochData.Blocks = append(epochData.Blocks, block) + } + } + + // Append the constructed epoch data to the result list + res = append(res, epochData) + } + + if logCursor != len(logs) { + err = errors.Errorf("failed to process all logs: processed %v, total %v", logCursor, len(logs)) + return nil, err + } + + return res, nil +} diff --git a/sync/catchup/boost_test.go b/sync/catchup/boost_test.go new file mode 100644 index 0000000..8b435bc --- /dev/null +++ b/sync/catchup/boost_test.go @@ -0,0 +1,72 @@ +package catchup + +import ( + "container/heap" + "testing" + + "github.com/stretchr/testify/assert" +) + +// TestSyncTaskPriorityQueue tests the priority queue with shrink strategy and normal operations +func TestSyncTaskPriorityQueue(t *testing.T) { + pq := &syncTaskPriorityQueue{} + + heap.Push(pq, &syncTaskItem{syncTask: newSyncTask(5, 10)}) + heap.Push(pq, &syncTaskItem{syncTask: newSyncTask(3, 7)}) + heap.Push(pq, &syncTaskItem{syncTask: newSyncTask(8, 15)}) + + // Assert the priority queue has the correct length after pushing 3 tasks + assert.Equal(t, 3, pq.Len()) + + // Ensure the items are ordered correctly (min-heap) + // The item with From=3 should be at the front + item := heap.Pop(pq).(*syncTaskItem) + assert.Equal(t, uint64(3), item.From) + + // The item with From=5 should now be at the front + item = heap.Pop(pq).(*syncTaskItem) + assert.Equal(t, uint64(5), item.From) + + // The item with From=8 should now be at the front + item = heap.Pop(pq).(*syncTaskItem) + assert.Equal(t, uint64(8), item.From) + + // Assert that the queue is empty after popping all tasks + assert.Equal(t, 0, pq.Len()) + + // Ensure priority queue shrinks when the size drops below threshold + // We need to fill the queue with more than `minPqShrinkCapacity` elements to trigger the shrink + + // Push `minPqShrinkCapacity+10` items into the priority queue + for i := 0; i < minPqShrinkCapacity+10; i++ { + heap.Push(pq, &syncTaskItem{syncTask: newSyncTask(uint64(i), uint64(i+1))}) + } + + // Assert that the queue's length is `minPqShrinkCapacity+10` + assert.Equal(t, minPqShrinkCapacity+10, pq.Len()) + + // Get the current capacity of the priority queue + oldCap := cap(*pq) + + // Pop enough items so that the queue should shrink + for i := 0; i < minPqShrinkCapacity+1; i++ { + heap.Pop(pq) + } + + // After popping `minPqShrinkCapacity+1` items, the length should be 9 + assert.Equal(t, 9, pq.Len()) + + // Ensure that the underlying array has shrunk. + // The doubling strategy should make the capacity after shrinking equal to about 2*len(*pq) + assert.True(t, oldCap > cap(*pq), "Priority queue should have shrunk") + + // Verify that the elements are still intact after shrink + // Check that the remaining elements in the queue are ordered and correct + for i := minPqShrinkCapacity + 1; i < minPqShrinkCapacity+10; i++ { + item := heap.Pop(pq).(*syncTaskItem) + assert.Equal(t, uint64(i), item.From) + } + + // Assert that the queue is empty after all items are popped + assert.Equal(t, 0, pq.Len()) +} diff --git a/sync/catchup/config.go b/sync/catchup/config.go index 1d5ef64..524eb3c 100644 --- a/sync/catchup/config.go +++ b/sync/catchup/config.go @@ -1,5 +1,7 @@ package catchup +import "time" + type config struct { // list of Conflux fullnodes to accelerate catching up until the latest stable epoch CfxPool []string @@ -9,4 +11,33 @@ type config struct { MaxDbRows int `default:"7500"` // capacity of channel per worker to buffer queried epoch data WorkerChanSize int `default:"5"` + // benchmark mode + Benchmark bool + // boost mode + Boost boostConfig +} + +// boostConfig holds the configuration parameters for boost catch-up mode. +type boostConfig struct { + // task queue sizes to schedule tasks + TaskQueueSize int `default:"1000"` + // task result queue sizes to collect results + ResultQueueSize int `default:"1000"` + // size of buffer for storage write + WriteBufferSize int `default:"3000"` + // default task size and bounds + DefaultTaskSize uint64 `default:"100"` + MinTaskSize uint64 `default:"1"` + MaxTaskSize uint64 `default:"1000"` + // task size adjustment ratios + IncrementRatio float64 `default:"0.2"` + DecrementRatio float64 `default:"0.5"` + // maximum sample size for dynamic new task size estimation + MaxSampleSize int `default:"20"` + // max allowed memory usage (in bytes) to trigger backpressure + MemoryThreshold uint64 + // Memory check interval + MemoryCheckInterval time.Duration `default:"20s"` + // Force persistence interval + ForcePersistenceInterval time.Duration `default:"45s"` } diff --git a/sync/catchup/syncer.go b/sync/catchup/syncer.go index 293845c..15349f8 100644 --- a/sync/catchup/syncer.go +++ b/sync/catchup/syncer.go @@ -37,6 +37,10 @@ type Syncer struct { elm election.LeaderManager // sync monitor monitor *monitor.Monitor + // epoch to start sync + epochFrom uint64 + // configuration for boost mode + boostConf boostConfig } // functional options for syncer @@ -66,10 +70,18 @@ func WithBenchmark(benchmark bool) SyncOption { } } +func WithBoostConfig(config boostConfig) SyncOption { + return func(s *Syncer) { + s.boostConf = config + } +} + func MustNewSyncer( cfxClients []*sdk.Client, db *mysql.MysqlStore, elm election.LeaderManager, + monitor *monitor.Monitor, + epochFrom uint64, opts ...SyncOption) *Syncer { var conf config viperutil.MustUnmarshalKey("sync.catchup", &conf) @@ -86,15 +98,28 @@ func MustNewSyncer( WithMaxDbRows(conf.MaxDbRows), WithMinBatchDbRows(conf.DbRowsThreshold), WithWorkers(workers), + WithBenchmark(conf.Benchmark), + WithBoostConfig(conf.Boost), ) - return newSyncer(cfxClients, db, elm, append(newOpts, opts...)...) + return newSyncer(cfxClients, db, elm, monitor, epochFrom, append(newOpts, opts...)...) } func newSyncer( - cfxClients []*sdk.Client, db *mysql.MysqlStore, - elm election.LeaderManager, opts ...SyncOption) *Syncer { - syncer := &Syncer{elm: elm, db: db, cfxs: cfxClients, minBatchDbRows: 1500} + cfxClients []*sdk.Client, + db *mysql.MysqlStore, + elm election.LeaderManager, + monitor *monitor.Monitor, + epochFrom uint64, + opts ...SyncOption) *Syncer { + syncer := &Syncer{ + elm: elm, + db: db, + cfxs: cfxClients, + monitor: monitor, + epochFrom: epochFrom, + minBatchDbRows: 1500, + } for _, opt := range opts { opt(syncer) } @@ -147,6 +172,24 @@ func (s *Syncer) syncOnce(ctx context.Context, start, end uint64) { }() } + // Boost sync performance if all chain data types are disabled except event logs by using `getLogs` to synchronize + // blockchain data across wide epoch range, or using `epoch-by-epoch` sync mode if any of them are enabled. + if disabler := store.StoreConfig(); !disabler.IsChainLogDisabled() && + disabler.IsChainBlockDisabled() && disabler.IsChainTxnDisabled() && disabler.IsChainReceiptDisabled() { + logrus.WithFields(logrus.Fields{ + "start": start, "end": end, + }).Info("Catch-up syncer using boosted sync mode with getLogs optimization") + newBoostSyncer(s).doSync(ctx, bmarker, start, end) + return + } + + logrus.WithFields(logrus.Fields{ + "start": start, "end": end, + }).Info("Catch-up syncer using standard epoch-by-epoch sync mode") + s.doSync(ctx, bmarker, start, end) +} + +func (s *Syncer) doSync(ctx context.Context, bmarker *benchmarker, start, end uint64) { var wg sync.WaitGroup ctx, cancel := context.WithCancel(ctx) @@ -292,7 +335,7 @@ func (s *Syncer) nextSyncRange() (uint64, uint64, error) { if ok { start++ } else { - start = 0 + start = s.epochFrom } var retErr error diff --git a/sync/election/leader_manager.go b/sync/election/leader_manager.go index b5fa261..acfe7da 100644 --- a/sync/election/leader_manager.go +++ b/sync/election/leader_manager.go @@ -363,9 +363,17 @@ type noopLeaderManager struct { func (l *noopLeaderManager) Identity() string { return "noop" } func (l *noopLeaderManager) Extend(ctx context.Context) error { return nil } -func (l *noopLeaderManager) Await(ctx context.Context) bool { return true } func (l *noopLeaderManager) OnError(cb ErrorCallback) { /* do nothing */ } +func (l *noopLeaderManager) Await(ctx context.Context) bool { + select { + case <-ctx.Done(): + return false + default: + return true + } +} + func (l *noopLeaderManager) Stop() error { if v := l.cancel.Load(); v != nil { if cancelFn, ok := v.(context.CancelFunc); ok { diff --git a/sync/sync_db.go b/sync/sync_db.go index 3d92b7e..2f87583 100644 --- a/sync/sync_db.go +++ b/sync/sync_db.go @@ -163,7 +163,9 @@ func (syncer *DatabaseSyncer) Sync(ctx context.Context, wg *sync.WaitGroup) { // fast catch-up until the latest stable epoch // (maximum between the latest finalized and checkpoint epoch) func (syncer *DatabaseSyncer) fastCatchup(ctx context.Context) { - catchUpSyncer := catchup.MustNewSyncer(syncer.cfxs, syncer.db, syncer.elm) + catchUpSyncer := catchup.MustNewSyncer( + syncer.cfxs, syncer.db, syncer.elm, syncer.monitor, syncer.epochFrom, + ) defer catchUpSyncer.Close() catchUpSyncer.Sync(ctx) diff --git a/util/metrics/metrics.go b/util/metrics/metrics.go index c1f7c20..d7dcabe 100644 --- a/util/metrics/metrics.go +++ b/util/metrics/metrics.go @@ -153,10 +153,22 @@ func (*SyncMetrics) QueryEpochData(space string) metrics.Timer { return metricUtil.GetOrRegisterTimer("infura/sync/%v/fullnode", space) } +func (*SyncMetrics) BoostQueryEpochData(space string) metrics.Timer { + return metricUtil.GetOrRegisterTimer("infura/sync/boost/%v/fullnode", space) +} + +func (*SyncMetrics) BoostQueryEpochRange() metrics.Histogram { + return metricUtil.GetOrRegisterHistogram("infura/sync/boost/epoch/range") +} + func (*SyncMetrics) QueryEpochDataAvailability(space string) metricUtil.Percentage { return metricUtil.GetOrRegisterTimeWindowPercentageDefault(0, "infura/sync/%v/fullnode/availability", space) } +func (*SyncMetrics) BoostQueryEpochDataAvailability(space string) metricUtil.Percentage { + return metricUtil.GetOrRegisterTimeWindowPercentageDefault(0, "infura/sync/boost/%v/fullnode/availability", space) +} + // Store metrics type StoreMetrics struct{}