Skip to content

Commit

Permalink
Merge pull request #2762 from halseth/reliable-payments-lookup-circui…
Browse files Browse the repository at this point in the history
…tmap

[reliable payments] persist htlcswitch pending payments
  • Loading branch information
halseth authored Jun 8, 2019
2 parents 52b7603 + dd88015 commit e45d4d7
Show file tree
Hide file tree
Showing 11 changed files with 1,014 additions and 110 deletions.
8 changes: 4 additions & 4 deletions htlcswitch/link_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1118,7 +1118,7 @@ func TestChannelLinkMultiHopUnknownPaymentHash(t *testing.T) {
}

resultChan, err := n.aliceServer.htlcSwitch.GetPaymentResult(
pid, newMockDeobfuscator(),
pid, htlc.PaymentHash, newMockDeobfuscator(),
)
if err != nil {
t.Fatalf("unable to get payment result: %v", err)
Expand Down Expand Up @@ -3898,7 +3898,7 @@ func TestChannelLinkAcceptDuplicatePayment(t *testing.T) {
}

resultChan, err := n.aliceServer.htlcSwitch.GetPaymentResult(
pid, newMockDeobfuscator(),
pid, htlc.PaymentHash, newMockDeobfuscator(),
)
if err != nil {
t.Fatalf("unable to get payment result: %v", err)
Expand All @@ -3909,8 +3909,8 @@ func TestChannelLinkAcceptDuplicatePayment(t *testing.T) {
err = n.aliceServer.htlcSwitch.SendHTLC(
n.firstBobChannelLink.ShortChanID(), pid, htlc,
)
if err != ErrPaymentIDAlreadyExists {
t.Fatalf("ErrPaymentIDAlreadyExists should have been "+
if err != ErrDuplicateAdd {
t.Fatalf("ErrDuplicateAdd should have been "+
"received got: %v", err)
}

Expand Down
62 changes: 62 additions & 0 deletions htlcswitch/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,10 @@ func initSwitchWithDB(startingHeight uint32, db *channeldb.DB) (*Switch, error)
}
}

priv, _ := btcec.NewPrivateKey(btcec.S256())
pubkey := priv.PubKey()
cfg := Config{
SelfKey: pubkey,
DB: db,
SwitchPackager: channeldb.NewSwitchPackager(),
FwdingLog: &mockForwardingLog{
Expand Down Expand Up @@ -390,7 +393,11 @@ func (o *mockDeobfuscator) DecryptError(reason lnwire.OpaqueReason) (*Forwarding
return nil, err
}

priv, _ := btcec.NewPrivateKey(btcec.S256())
pubkey := priv.PubKey()

return &ForwardingError{
ErrorSource: pubkey,
FailureMessage: failure,
}, nil
}
Expand Down Expand Up @@ -909,3 +916,58 @@ func (m *mockNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, _ []byte,
Spend: make(chan *chainntnfs.SpendDetail),
}, nil
}

type mockCircuitMap struct {
lookup chan *PaymentCircuit
}

var _ CircuitMap = (*mockCircuitMap)(nil)

func (m *mockCircuitMap) OpenCircuits(...Keystone) error {
return nil
}

func (m *mockCircuitMap) TrimOpenCircuits(chanID lnwire.ShortChannelID,
start uint64) error {
return nil
}

func (m *mockCircuitMap) DeleteCircuits(inKeys ...CircuitKey) error {
return nil
}

func (m *mockCircuitMap) CommitCircuits(
circuit ...*PaymentCircuit) (*CircuitFwdActions, error) {

return nil, nil
}

func (m *mockCircuitMap) CloseCircuit(outKey CircuitKey) (*PaymentCircuit,
error) {
return nil, nil
}

func (m *mockCircuitMap) FailCircuit(inKey CircuitKey) (*PaymentCircuit,
error) {
return nil, nil
}

func (m *mockCircuitMap) LookupCircuit(inKey CircuitKey) *PaymentCircuit {
return <-m.lookup
}

func (m *mockCircuitMap) LookupOpenCircuit(outKey CircuitKey) *PaymentCircuit {
return nil
}

func (m *mockCircuitMap) LookupByPaymentHash(hash [32]byte) []*PaymentCircuit {
return nil
}

func (m *mockCircuitMap) NumPending() int {
return 0
}

func (m *mockCircuitMap) NumOpen() int {
return 0
}
212 changes: 212 additions & 0 deletions htlcswitch/payment_result.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,24 @@
package htlcswitch

import (
"bytes"
"encoding/binary"
"errors"
"io"
"sync"

"github.com/coreos/bbolt"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/multimutex"
)

var (

// networkResultStoreBucketKey is used for the root level bucket that
// stores the network result for each payment ID.
networkResultStoreBucketKey = []byte("network-result-store-bucket")

// ErrPaymentIDNotFound is an error returned if the given paymentID is
// not found.
ErrPaymentIDNotFound = errors.New("paymentID not found")
Expand Down Expand Up @@ -46,3 +58,203 @@ type networkResult struct {
// which the failure reason might not be included.
isResolution bool
}

// serializeNetworkResult serializes the networkResult.
func serializeNetworkResult(w io.Writer, n *networkResult) error {
if _, err := lnwire.WriteMessage(w, n.msg, 0); err != nil {
return err
}

return channeldb.WriteElements(w, n.unencrypted, n.isResolution)
}

// deserializeNetworkResult deserializes the networkResult.
func deserializeNetworkResult(r io.Reader) (*networkResult, error) {
var (
err error
)

n := &networkResult{}

n.msg, err = lnwire.ReadMessage(r, 0)
if err != nil {
return nil, err
}

if err := channeldb.ReadElements(r,
&n.unencrypted, &n.isResolution,
); err != nil {
return nil, err
}

return n, nil
}

// networkResultStore is a persistent store that stores any results of HTLCs in
// flight on the network. Since payment results are inherently asynchronous, it
// is used as a common access point for senders of HTLCs, to know when a result
// is back. The Switch will checkpoint any received result to the store, and
// the store will keep results and notify the callers about them.
type networkResultStore struct {
db *channeldb.DB

// results is a map from paymentIDs to channels where subscribers to
// payment results will be notified.
results map[uint64][]chan *networkResult
resultsMtx sync.Mutex

// paymentIDMtx is a multimutex used to make sure the database and
// result subscribers map is consistent for each payment ID in case of
// concurrent callers.
paymentIDMtx *multimutex.Mutex
}

func newNetworkResultStore(db *channeldb.DB) *networkResultStore {
return &networkResultStore{
db: db,
results: make(map[uint64][]chan *networkResult),
paymentIDMtx: multimutex.NewMutex(),
}
}

// storeResult stores the networkResult for the given paymentID, and
// notifies any subscribers.
func (store *networkResultStore) storeResult(paymentID uint64,
result *networkResult) error {

// We get a mutex for this payment ID. This is needed to ensure
// consistency between the database state and the subscribers in case
// of concurrent calls.
store.paymentIDMtx.Lock(paymentID)
defer store.paymentIDMtx.Unlock(paymentID)

// Serialize the payment result.
var b bytes.Buffer
if err := serializeNetworkResult(&b, result); err != nil {
return err
}

var paymentIDBytes [8]byte
binary.BigEndian.PutUint64(paymentIDBytes[:], paymentID)

err := store.db.Batch(func(tx *bbolt.Tx) error {
networkResults, err := tx.CreateBucketIfNotExists(
networkResultStoreBucketKey,
)
if err != nil {
return err
}

return networkResults.Put(paymentIDBytes[:], b.Bytes())
})
if err != nil {
return err
}

// Now that the result is stored in the database, we can notify any
// active subscribers.
store.resultsMtx.Lock()
for _, res := range store.results[paymentID] {
res <- result
}
delete(store.results, paymentID)
store.resultsMtx.Unlock()

return nil
}

// subscribeResult is used to get the payment result for the given
// payment ID. It returns a channel on which the result will be delivered when
// ready.
func (store *networkResultStore) subscribeResult(paymentID uint64) (
<-chan *networkResult, error) {

// We get a mutex for this payment ID. This is needed to ensure
// consistency between the database state and the subscribers in case
// of concurrent calls.
store.paymentIDMtx.Lock(paymentID)
defer store.paymentIDMtx.Unlock(paymentID)

var (
result *networkResult
resultChan = make(chan *networkResult, 1)
)

err := store.db.View(func(tx *bbolt.Tx) error {
var err error
result, err = fetchResult(tx, paymentID)
switch {

// Result not yet available, we will notify once a result is
// available.
case err == ErrPaymentIDNotFound:
return nil

case err != nil:
return err

// The result was found, and will be returned immediately.
default:
return nil
}
})
if err != nil {
return nil, err
}

// If the result was found, we can send it on the result channel
// imemdiately.
if result != nil {
resultChan <- result
return resultChan, nil
}

// Otherwise we store the result channel for when the result is
// available.
store.resultsMtx.Lock()
store.results[paymentID] = append(
store.results[paymentID], resultChan,
)
store.resultsMtx.Unlock()

return resultChan, nil
}

// getResult attempts to immediately fetch the result for the given pid from
// the store. If no result is available, ErrPaymentIDNotFound is returned.
func (store *networkResultStore) getResult(pid uint64) (
*networkResult, error) {

var result *networkResult
err := store.db.View(func(tx *bbolt.Tx) error {
var err error
result, err = fetchResult(tx, pid)
return err
})
if err != nil {
return nil, err
}

return result, nil
}

func fetchResult(tx *bbolt.Tx, pid uint64) (*networkResult, error) {
var paymentIDBytes [8]byte
binary.BigEndian.PutUint64(paymentIDBytes[:], pid)

networkResults := tx.Bucket(networkResultStoreBucketKey)
if networkResults == nil {
return nil, ErrPaymentIDNotFound
}

// Check whether a result is already available.
resultBytes := networkResults.Get(paymentIDBytes[:])
if resultBytes == nil {
return nil, ErrPaymentIDNotFound
}

// Decode the result we found.
r := bytes.NewReader(resultBytes)

return deserializeNetworkResult(r)
}
Loading

0 comments on commit e45d4d7

Please sign in to comment.