Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: check all transactions in monitor check pending with timeout #4867

Merged
merged 3 commits into from
Oct 17, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 21 additions & 27 deletions pkg/transaction/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type transactionMonitor struct {
}

type transactionWatch struct {
start time.Time
receiptC chan types.Receipt // channel to which the receipt will be written once available
errC chan error // error channel (primarily for cancelled transactions)
}
Expand Down Expand Up @@ -91,6 +92,7 @@ func (tm *transactionMonitor) WatchTransaction(txHash common.Hash, nonce uint64)
}

tm.watchesByNonce[nonce][txHash] = append(tm.watchesByNonce[nonce][txHash], transactionWatch{
start: time.Now(),
receiptC: receiptC,
errC: errC,
})
Expand Down Expand Up @@ -169,44 +171,36 @@ func (tm *transactionMonitor) watchPending() {
}
}

// potentiallyConfirmedTxWatches returns all watches with nonce less than what was specified
func (tm *transactionMonitor) potentiallyConfirmedTxWatches(nonce uint64) (watches map[uint64]map[common.Hash][]transactionWatch) {
func (tm *transactionMonitor) hasWatches() bool {
tm.lock.Lock()
defer tm.lock.Unlock()
return len(tm.watchesByNonce) > 0
}

potentiallyConfirmedTxWatches := make(map[uint64]map[common.Hash][]transactionWatch)
for n, watches := range tm.watchesByNonce {
if n < nonce {
potentiallyConfirmedTxWatches[n] = watches
func watchStart(watches []transactionWatch) time.Time {
if len(watches) == 0 {
return time.Time{}
}
start := watches[0].start
for _, w := range watches[1:] {
if w.start.Before(start) {
start = w.start
}
}

return potentiallyConfirmedTxWatches
}

func (tm *transactionMonitor) hasWatches() bool {
tm.lock.Lock()
defer tm.lock.Unlock()
return len(tm.watchesByNonce) > 0
return start
}

// check pending checks the given block (number) for confirmed or cancelled transactions
func (tm *transactionMonitor) checkPending(block uint64) error {
nonce, err := tm.backend.NonceAt(tm.ctx, tm.sender, new(big.Int).SetUint64(block))
if err != nil {
return err
}

// transactions with a nonce lower or equal to what is found on-chain are either confirmed or (at least temporarily) cancelled
potentiallyConfirmedTxWatches := tm.potentiallyConfirmedTxWatches(nonce)

confirmedNonces := make(map[uint64]*types.Receipt)
var cancelledNonces []uint64
for nonceGroup, watchMap := range potentiallyConfirmedTxWatches {
for txHash := range watchMap {
for nonceGroup, watchMap := range tm.watchesByNonce {
for txHash, watches := range watchMap {
receipt, err := tm.backend.TransactionReceipt(tm.ctx, txHash)
if err != nil {
if errors.Is(err, ethereum.NotFound) {
// wait for a few blocks to be mined before considering a transaction not existing
transactionWatchNotFoundTimeout := 5 * tm.pollingInterval
if errors.Is(err, ethereum.NotFound) && watchStart(watches).Before(time.Now().Add(transactionWatchNotFoundTimeout)) {
// if both err and receipt are nil, there is no receipt
// the reason why we consider this only potentially cancelled is to catch cases where after a reorg the original transaction wins
continue
Expand All @@ -220,7 +214,7 @@ func (tm *transactionMonitor) checkPending(block uint64) error {
}
}

for nonceGroup := range potentiallyConfirmedTxWatches {
for nonceGroup := range tm.watchesByNonce {
if _, ok := confirmedNonces[nonceGroup]; ok {
continue
}
Expand All @@ -240,7 +234,7 @@ func (tm *transactionMonitor) checkPending(block uint64) error {
defer tm.lock.Unlock()

for nonce, receipt := range confirmedNonces {
for txHash, watches := range potentiallyConfirmedTxWatches[nonce] {
for txHash, watches := range tm.watchesByNonce[nonce] {
if receipt.TxHash == txHash {
for _, watch := range watches {
select {
Expand Down
Loading