Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc+htlcswitch: add htlc transformation capabilities to the interceptor #8633

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions htlcswitch/interceptable_switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import (
"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb/models"
"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/htlcswitch/hop"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/record"
)

var (
Expand Down Expand Up @@ -105,6 +107,10 @@ const (

// FwdActionFail fails the intercepted packet back to the sender.
FwdActionFail

// FwdActionResumeModified forwards the intercepted packet to the switch
// with modifications.
FwdActionResumeModified
)

// FwdResolution defines the action to be taken on an intercepted packet.
Expand All @@ -119,6 +125,18 @@ type FwdResolution struct {
// FwdActionSettle.
Preimage lntypes.Preimage

// IncomingAmountMsat is the amount that is to be used for validating if
// Action is FwdActionResumeModified.
IncomingAmountMsat fn.Option[lnwire.MilliSatoshi]

// OutgoingAmountMsat is the amount that is to be used for forwarding if
// Action is FwdActionResumeModified.
OutgoingAmountMsat fn.Option[lnwire.MilliSatoshi]

// CustomRecords is the custom records that are to be used for
// forwarding if Action is FwdActionResumeModified.
CustomRecords fn.Option[record.CustomSet]

// FailureMessage is the encrypted failure message that is to be passed
// back to the sender if action is FwdActionFail.
FailureMessage []byte
Expand Down Expand Up @@ -363,6 +381,8 @@ func (s *InterceptableSwitch) setInterceptor(interceptor ForwardInterceptor) {
})
}

// resolve processes a HTLC given the resolution type specified by the
// intercepting client.
func (s *InterceptableSwitch) resolve(res *FwdResolution) error {
intercepted, err := s.heldHtlcSet.pop(res.Key)
if err != nil {
Expand All @@ -373,6 +393,12 @@ func (s *InterceptableSwitch) resolve(res *FwdResolution) error {
case FwdActionResume:
return intercepted.Resume()

case FwdActionResumeModified:
return intercepted.ResumeModified(
res.IncomingAmountMsat, res.OutgoingAmountMsat,
res.CustomRecords,
)

case FwdActionSettle:
return intercepted.Settle(res.Preimage)

Expand Down Expand Up @@ -615,6 +641,73 @@ func (f *interceptedForward) Resume() error {
return f.htlcSwitch.ForwardPackets(nil, f.packet)
}

// ResumeModified resumes the default behavior with field modifications.
func (f *interceptedForward) ResumeModified(
incomingAmountMsat fn.Option[lnwire.MilliSatoshi],
outgoingAmountMsat fn.Option[lnwire.MilliSatoshi],
customRecords fn.Option[record.CustomSet]) error {

// Set the incoming amount, if it is provided, on the packet.
incomingAmountMsat.WhenSome(func(amount lnwire.MilliSatoshi) {
f.packet.incomingAmount = amount
})

// Modify the wire message contained in the packet.
switch htlc := f.packet.htlc.(type) {
case *lnwire.UpdateAddHTLC:
outgoingAmountMsat.WhenSome(func(amount lnwire.MilliSatoshi) {
htlc.Amount = amount
})

//nolint:lll
err := fn.MapOptionZ(customRecords, func(records record.CustomSet) error {
if len(records) == 0 {
return nil
}

// Type cast and validate custom records.
htlc.CustomRecords = lnwire.CustomRecords(records)
err := htlc.CustomRecords.Validate()
if err != nil {
return fmt.Errorf("failed to validate custom "+
"records: %w", err)
}

return nil
})
if err != nil {
return fmt.Errorf("failed to encode custom records: %w",
err)
}

case *lnwire.UpdateFulfillHTLC:
//nolint:lll
err := fn.MapOptionZ(customRecords, func(records record.CustomSet) error {
if len(records) == 0 {
return nil
}

// Type cast and validate custom records.
htlc.CustomRecords = lnwire.CustomRecords(records)
err := htlc.CustomRecords.Validate()
if err != nil {
return fmt.Errorf("failed to validate custom "+
"records: %w", err)
}

return nil
})
if err != nil {
return fmt.Errorf("failed to encode custom records: %w",
err)
}
}

// Forward to the switch. A link quit channel isn't needed, because we
// are on a different thread now.
return f.htlcSwitch.ForwardPackets(nil, f.packet)
}

// Fail notifies the intention to Fail an existing hold forward with an
// encrypted failure reason.
func (f *interceptedForward) Fail(reason []byte) error {
Expand Down
9 changes: 8 additions & 1 deletion htlcswitch/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/models"
"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/invoices"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
Expand Down Expand Up @@ -323,7 +324,7 @@ type InterceptableHtlcForwarder interface {
type ForwardInterceptor func(InterceptedPacket) error

// InterceptedPacket contains the relevant information for the interceptor about
// an htlc.
// an HTLC.
type InterceptedPacket struct {
// IncomingCircuit contains the incoming channel and htlc id of the
// packet.
Expand Down Expand Up @@ -375,6 +376,12 @@ type InterceptedForward interface {
// this htlc which usually means forward it.
Resume() error

// ResumeModified notifies the intention to resume an existing hold
// forward with modified fields.
ResumeModified(incomingAmountMsat,
outgoingAmountMsat fn.Option[lnwire.MilliSatoshi],
customRecords fn.Option[record.CustomSet]) error

// Settle notifies the intention to settle an existing hold
// forward with a given preimage.
Settle(lntypes.Preimage) error
Expand Down
1 change: 0 additions & 1 deletion htlcswitch/payment_result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func TestNetworkResultSerialization(t *testing.T) {
ChanID: chanID,
ID: 2,
PaymentPreimage: preimage,
ExtraData: make([]byte, 0),
}

fail := &lnwire.UpdateFailHTLC{
Expand Down
10 changes: 10 additions & 0 deletions intercepted_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package lnd
import (
"errors"

"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/record"
)

var (
Expand Down Expand Up @@ -51,6 +53,14 @@ func (f *interceptedForward) Resume() error {
return ErrCannotResume
}

// ResumeModified notifies the intention to resume an existing hold forward with
// a modified htlc.
func (f *interceptedForward) ResumeModified(_, _ fn.Option[lnwire.MilliSatoshi],
_ fn.Option[record.CustomSet]) error {

return ErrCannotResume
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe I'm missing something, why are we defaulting to err here? is this some old/outdated impementation?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code relates to the on-chain resolution flow. In the on-chain resolution flow, an intercepted forward can not be resumed. Therefore, it can't be resume modified either.

}

// Fail notifies the intention to fail an existing hold forward with an
// encrypted failure reason.
func (f *interceptedForward) Fail(_ []byte) error {
Expand Down
4 changes: 4 additions & 0 deletions itest/list_on_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,10 @@ var allTestCases = []*lntest.TestCase{
Name: "forward interceptor",
TestFunc: testForwardInterceptorBasic,
},
{
Name: "forward interceptor modified htlc",
ffranr marked this conversation as resolved.
Show resolved Hide resolved
TestFunc: testForwardInterceptorModifiedHtlc,
},
{
Name: "zero conf channel open",
TestFunc: testZeroConfChannelOpen,
Expand Down
135 changes: 135 additions & 0 deletions itest/lnd_forward_interceptor_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package itest

import (
"context"
"fmt"
"strings"
"time"
Expand Down Expand Up @@ -344,6 +345,140 @@ func testForwardInterceptorBasic(ht *lntest.HarnessTest) {
ht.CloseChannel(bob, cpBC)
}

// testForwardInterceptorModifiedHtlc tests that the interceptor can modify the
// amount and custom records of an intercepted HTLC and resume it.
func testForwardInterceptorModifiedHtlc(ht *lntest.HarnessTest) {
// Initialize the test context with 3 connected nodes.
ts := newInterceptorTestScenario(ht)

alice, bob, carol := ts.alice, ts.bob, ts.carol

// Open and wait for channels.
const chanAmt = btcutil.Amount(300000)
p := lntest.OpenChannelParams{Amt: chanAmt}
reqs := []*lntest.OpenChannelRequest{
{Local: alice, Remote: bob, Param: p},
{Local: bob, Remote: carol, Param: p},
}
resp := ht.OpenMultiChannelsAsync(reqs)
cpAB, cpBC := resp[0], resp[1]

// Make sure Alice is aware of channel Bob=>Carol.
ht.AssertTopologyChannelOpen(alice, cpBC)

// Connect an interceptor to Bob's node.
bobInterceptor, cancelBobInterceptor := bob.RPC.HtlcInterceptor()

// Prepare the test cases.
invoiceValueAmtMsat := int64(1000)
req := &lnrpc.Invoice{ValueMsat: invoiceValueAmtMsat}
addResponse := carol.RPC.AddInvoice(req)
invoice := carol.RPC.LookupInvoice(addResponse.RHash)
tc := &interceptorTestCase{
amountMsat: invoiceValueAmtMsat,
invoice: invoice,
payAddr: invoice.PaymentAddr,
}

// We initiate a payment from Alice.
done := make(chan struct{})
go func() {
// Signal that all the payments have been sent.
defer close(done)

ts.sendPaymentAndAssertAction(tc)
}()

// We start the htlc interceptor with a simple implementation that saves
// all intercepted packets. These packets are held to simulate a
// pending payment.
packet := ht.ReceiveHtlcInterceptor(bobInterceptor)

// Resume the intercepted HTLC with a modified amount and custom
// records.
if packet.CustomRecords == nil {
packet.CustomRecords = make(map[uint64][]byte)
}
customRecords := packet.CustomRecords

// Add custom records entry.
crKey := uint64(65537)
crValue := []byte("custom-records-test-value")
customRecords[crKey] = crValue

action := routerrpc.ResolveHoldForwardAction_RESUME_MODIFIED
newOutgoingAmountMsat := packet.OutgoingAmountMsat + 4000

err := bobInterceptor.Send(&routerrpc.ForwardHtlcInterceptResponse{
IncomingCircuitKey: packet.IncomingCircuitKey,
OutgoingAmountMsat: newOutgoingAmountMsat,
CustomRecords: customRecords,
Action: action,
})
require.NoError(ht, err, "failed to send request")

// Check that the modified UpdateAddHTLC message fields were reported in
// Carol's log.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we used this trick to get the itests running

should we now consider reading these values the "healthy" way? i.e Carol also intercepts and acquires values from htlc message?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ideally Carol should intercept as you suggest. But the log lookup should do for this PR, IMO. I'll see if I can put another PR so that Carol can intercept also. That way this test wont be bound to a log message.

targetLogPrefixStr := "Received UpdateAddHTLC("
targetOutgoingAmountMsatStr := fmt.Sprintf(
"amt=%d", newOutgoingAmountMsat,
)

// Formulate custom records target log string.
var asciiValues []string
for _, b := range crValue {
asciiValues = append(asciiValues, fmt.Sprintf("%d", b))
}

targetCustomRecordsStr := fmt.Sprintf(
"%d:[%s]", crKey, strings.Join(asciiValues, " "),
)

// logEntryCheck is a helper function that checks if the log entry
// contains the expected strings.
logEntryCheck := func(logEntry string) bool {
return strings.Contains(logEntry, targetLogPrefixStr) &&
strings.Contains(logEntry, targetCustomRecordsStr) &&
strings.Contains(logEntry, targetOutgoingAmountMsatStr)
}

// Wait for the log entry to appear in Carol's log.
require.Eventually(ht, func() bool {
ctx := context.Background()
dbgInfo, err := carol.RPC.LN.GetDebugInfo(
ffranr marked this conversation as resolved.
Show resolved Hide resolved
ctx, &lnrpc.GetDebugInfoRequest{},
)
require.NoError(ht, err, "failed to get Carol node debug info")

for _, logEntry := range dbgInfo.Log {
if logEntryCheck(logEntry) {
return true
}
}

return false
}, defaultTimeout, time.Second)

// Cancel the context, which will disconnect Bob's interceptor.
cancelBobInterceptor()

// Make sure all goroutines are finished.
select {
case <-done:
case <-time.After(defaultTimeout):
require.Fail(ht, "timeout waiting for sending payment")
}

// Assert that the payment was successful.
var preimage lntypes.Preimage
copy(preimage[:], invoice.RPreimage)
ht.AssertPaymentStatus(alice, preimage, lnrpc.Payment_SUCCEEDED)

// Finally, close channels.
ht.CloseChannel(alice, cpAB)
ht.CloseChannel(bob, cpBC)
}

// interceptorTestScenario is a helper struct to hold the test context and
// provide the needed functionality.
type interceptorTestScenario struct {
Expand Down
Loading
Loading