diff --git a/core/chains/evm/log/broadcaster.go b/core/chains/evm/log/broadcaster.go index 160f711de3e..44d3d75442a 100644 --- a/core/chains/evm/log/broadcaster.go +++ b/core/chains/evm/log/broadcaster.go @@ -389,6 +389,14 @@ func (b *broadcaster) eventLoop(chRawLogs <-chan types.Log, chErr <-chan error) b.logger.Debug("Starting the event loop") for { + // Replay requests take priority. + select { + case blockNumber := <-b.replayChannel: + b.onReplayRequest(blockNumber) + return true, nil + default: + } + select { case rawLog := <-chRawLogs: b.logger.Debugw("Received a log", @@ -402,12 +410,8 @@ func (b *broadcaster) eventLoop(chRawLogs <-chan types.Log, chErr <-chan error) // The eth node connection was terminated so we need to backfill after resubscribing. lggr := b.logger // Do we have logs in the pool? - if min := b.logPool.heap.FindMin(); min != nil { - // They are are invalid, since we may have missed 'removed' logs. - b.logPool = newLogPool() - // Note: even if we crash right now, PendingMinBlock is preserved in the database and we will backfill the same. - blockNum := int64(min.(Uint64)) - b.backfillBlockNumber.SetValid(blockNum) + // They are are invalid, since we may have missed 'removed' logs. + if blockNum := b.invalidatePool(); blockNum > 0 { lggr = lggr.With("blockNumber", blockNum) } lggr.Debugw("Subscription terminated. Backfilling after resubscribing") @@ -417,11 +421,7 @@ func (b *broadcaster) eventLoop(chRawLogs <-chan types.Log, chErr <-chan error) needsResubscribe = b.onChangeSubscriberStatus() || needsResubscribe case blockNumber := <-b.replayChannel: - // NOTE: This ignores r.highestNumConfirmations, but it is - // generally assumed that this will only be performed rarely and - // manually by someone who knows what he is doing - b.backfillBlockNumber.SetValid(blockNumber) - b.logger.Debugw("Returning from the event loop to replay logs from specific block number", "blockNumber", blockNumber) + b.onReplayRequest(blockNumber) return true, nil case <-debounceResubscribe.C: @@ -444,6 +444,27 @@ func (b *broadcaster) eventLoop(chRawLogs <-chan types.Log, chErr <-chan error) } } +// onReplayRequest clears the pool and sets the block backfill number. +func (b *broadcaster) onReplayRequest(blockNumber int64) { + _ = b.invalidatePool() + // NOTE: This ignores r.highestNumConfirmations, but it is + // generally assumed that this will only be performed rarely and + // manually by someone who knows what he is doing + b.backfillBlockNumber.SetValid(blockNumber) + b.logger.Debugw("Returning from the event loop to replay logs from specific block number", "blockNumber", blockNumber) +} + +func (b *broadcaster) invalidatePool() int64 { + if min := b.logPool.heap.FindMin(); min != nil { + b.logPool = newLogPool() + // Note: even if we crash right now, PendingMinBlock is preserved in the database and we will backfill the same. + blockNum := int64(min.(Uint64)) + b.backfillBlockNumber.SetValid(blockNum) + return blockNum + } + return -1 +} + func (b *broadcaster) onNewLog(log types.Log) { b.maybeWarnOnLargeBlockNumberDifference(int64(log.BlockNumber)) diff --git a/core/chains/evm/log/eth_subscriber.go b/core/chains/evm/log/eth_subscriber.go index 443ed990718..fd813742550 100644 --- a/core/chains/evm/log/eth_subscriber.go +++ b/core/chains/evm/log/eth_subscriber.go @@ -128,9 +128,8 @@ func (sub *ethSubscriber) backfillLogs(fromBlockOverride null.Int64, addresses [ } return true } - if len(batchLogs) > 0 { - sub.logger.Infow(fmt.Sprintf("LogBroadcaster: Fetched a batch of %v logs from %v to %v%s", len(batchLogs), from, to, elapsedMessage), "len", len(batchLogs), "fromBlock", from, "toBlock", to, "remaining", int64(latestHeight)-to) - } + + sub.logger.Infow(fmt.Sprintf("LogBroadcaster: Fetched a batch of %v logs from %v to %v%s", len(batchLogs), from, to, elapsedMessage), "len", len(batchLogs), "fromBlock", from, "toBlock", to, "remaining", int64(latestHeight)-to) select { case <-sub.chStop: