Skip to content

Commit

Permalink
routing: wait for result collectors.
Browse files Browse the repository at this point in the history
Before failing a payment we make sure all results for payment
attempts are collected and update the payment in the db accordingly.
This also signals the correct terminal condition to the trackpayment
subscribers.
  • Loading branch information
ziggie1984 committed Nov 3, 2024
1 parent 00af7bc commit d5f5a97
Showing 1 changed file with 39 additions and 4 deletions.
43 changes: 39 additions & 4 deletions routing/payment_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"sync/atomic"
"time"

"github.com/btcsuite/btcd/btcec/v2"
Expand Down Expand Up @@ -54,6 +55,14 @@ type paymentLifecycle struct {
// returned from the htlcswitch.
switchResults lnutils.SyncMap[*channeldb.HTLCAttempt,
*htlcswitch.PaymentResult]

// activeCollectors tracks the number of active result collectors.
// So that we can resolve all acitve collectors and correctly fail
// a payment so that the payment only exits when all result collectors
// received their results.
//
// NOTE: To be used atomically.
activeCollectors int32
}

// newPaymentLifecycle initiates a new payment lifecycle and returns it.
Expand Down Expand Up @@ -398,6 +407,26 @@ func (p *paymentLifecycle) requestRoute(

// stop signals any active shard goroutine to exit.
func (p *paymentLifecycle) stop() {
// Process any remaining results that might have come in while we were
// shutting down.
for atomic.LoadInt32(&p.activeCollectors) > 0 {
select {
case <-p.resultCollected:
// Process any lingering results before exiting the
// payment lifecycle.
err := p.processSwitchResults()
if err != nil {
log.Errorf("Error processing final results "+
"for payment %v during shutdown: %v",
p.identifier, err)
}

case <-p.router.quit:
log.Infof("ChanRouter shutting down while collecting "+
"lingering result for payment=%v",
p.identifier)
}
}
close(p.quit)
}

Expand All @@ -422,7 +451,14 @@ func (p *paymentLifecycle) collectResultAsync(attempt *channeldb.HTLCAttempt) {
log.Debugf("Collecting result for attempt %v in payment %v",
attempt.AttemptID, p.identifier)

// Increment the active collectors counter
atomic.AddInt32(&p.activeCollectors, 1)

go func() {
// Make sure we decrease the counter if this result collector
// exits.
defer atomic.AddInt32(&p.activeCollectors, -1)

result, err := p.collectResult(attempt)
if err != nil {
log.Errorf("Error collecting result for attempt %v in "+
Expand All @@ -443,11 +479,10 @@ func (p *paymentLifecycle) collectResultAsync(attempt *channeldb.HTLCAttempt) {
// Send the signal or quit.
case p.resultCollected <- struct{}{}:

case <-p.quit:
log.Debugf("Lifecycle exiting while collecting "+
"result for payment %v", p.identifier)

case <-p.router.quit:
log.Debugf("ChanRouter shutting down while collecting "+
"result for payment=%v attemptID=%v",
p.identifier, attempt.AttemptID)
}
}()
}
Expand Down

0 comments on commit d5f5a97

Please sign in to comment.