Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 32 additions & 11 deletions core/chains/evm/log/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,14 @@ func (b *broadcaster) eventLoop(chRawLogs <-chan types.Log, chErr <-chan error)

b.logger.Debug("Starting the event loop")
for {
// Replay requests take priority.
select {
case blockNumber := <-b.replayChannel:
b.onReplayRequest(blockNumber)
return true, nil
default:
}

select {
case rawLog := <-chRawLogs:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if we get something on the replayChannel, but there is already a message waiting on the chRawLogs channel (from before the replay). Would that cause issues once the replay starts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC the replay should only be delayed by one log in the worst case (or one other action here in general, e.g. head), then it would loop again and necessarily select the replay in the priority select.

b.logger.Debugw("Received a log",
Expand All @@ -402,12 +410,8 @@ func (b *broadcaster) eventLoop(chRawLogs <-chan types.Log, chErr <-chan error)
// The eth node connection was terminated so we need to backfill after resubscribing.
lggr := b.logger
// Do we have logs in the pool?
if min := b.logPool.heap.FindMin(); min != nil {
// They are are invalid, since we may have missed 'removed' logs.
b.logPool = newLogPool()
// Note: even if we crash right now, PendingMinBlock is preserved in the database and we will backfill the same.
blockNum := int64(min.(Uint64))
b.backfillBlockNumber.SetValid(blockNum)
// They are are invalid, since we may have missed 'removed' logs.
if blockNum := b.invalidatePool(); blockNum > 0 {
lggr = lggr.With("blockNumber", blockNum)
}
lggr.Debugw("Subscription terminated. Backfilling after resubscribing")
Expand All @@ -417,11 +421,7 @@ func (b *broadcaster) eventLoop(chRawLogs <-chan types.Log, chErr <-chan error)
needsResubscribe = b.onChangeSubscriberStatus() || needsResubscribe

case blockNumber := <-b.replayChannel:
// NOTE: This ignores r.highestNumConfirmations, but it is
// generally assumed that this will only be performed rarely and
// manually by someone who knows what he is doing
b.backfillBlockNumber.SetValid(blockNumber)
b.logger.Debugw("Returning from the event loop to replay logs from specific block number", "blockNumber", blockNumber)
b.onReplayRequest(blockNumber)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have it above, do we still need it here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because we only check above, but then sit and wait here. We still need to yield to replay also.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yeah good point

return true, nil

case <-debounceResubscribe.C:
Expand All @@ -444,6 +444,27 @@ func (b *broadcaster) eventLoop(chRawLogs <-chan types.Log, chErr <-chan error)
}
}

// onReplayRequest clears the pool and sets the block backfill number.
func (b *broadcaster) onReplayRequest(blockNumber int64) {
_ = b.invalidatePool()
// NOTE: This ignores r.highestNumConfirmations, but it is
// generally assumed that this will only be performed rarely and
// manually by someone who knows what he is doing
b.backfillBlockNumber.SetValid(blockNumber)
b.logger.Debugw("Returning from the event loop to replay logs from specific block number", "blockNumber", blockNumber)
}

func (b *broadcaster) invalidatePool() int64 {
if min := b.logPool.heap.FindMin(); min != nil {
b.logPool = newLogPool()
// Note: even if we crash right now, PendingMinBlock is preserved in the database and we will backfill the same.
blockNum := int64(min.(Uint64))
b.backfillBlockNumber.SetValid(blockNum)
return blockNum
}
return -1
}

func (b *broadcaster) onNewLog(log types.Log) {
b.maybeWarnOnLargeBlockNumberDifference(int64(log.BlockNumber))

Expand Down
5 changes: 2 additions & 3 deletions core/chains/evm/log/eth_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,8 @@ func (sub *ethSubscriber) backfillLogs(fromBlockOverride null.Int64, addresses [
}
return true
}
if len(batchLogs) > 0 {
sub.logger.Infow(fmt.Sprintf("LogBroadcaster: Fetched a batch of %v logs from %v to %v%s", len(batchLogs), from, to, elapsedMessage), "len", len(batchLogs), "fromBlock", from, "toBlock", to, "remaining", int64(latestHeight)-to)
}

sub.logger.Infow(fmt.Sprintf("LogBroadcaster: Fetched a batch of %v logs from %v to %v%s", len(batchLogs), from, to, elapsedMessage), "len", len(batchLogs), "fromBlock", from, "toBlock", to, "remaining", int64(latestHeight)-to)

select {
case <-sub.chStop:
Expand Down