Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multi: allow remote deletion of local payment attempt results #9289

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ run:
- neutrinorpc
- peersrpc
- signrpc
- switchrpc
- walletrpc
- watchtowerrpc
- kvdb_etcd
Expand Down
1 change: 1 addition & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,7 @@ func DefaultConfig() Config {
Sweeper: lncfg.DefaultSweeperConfig(),
Htlcswitch: &lncfg.Htlcswitch{
MailboxDeliveryTimeout: htlcswitch.DefaultMailboxDeliveryTimeout,
RemoteTracking: false,
},
GRPC: &GRPCConfig{
ServerPingTime: defaultGrpcServerPingTime,
Expand Down
74 changes: 74 additions & 0 deletions htlcswitch/payment_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,3 +297,77 @@ func (store *networkResultStore) cleanStore(keep map[uint64]struct{}) error {
return nil
}, func() {})
}

// fetchAttemptResults retrieves all results stored in the network result store,
// returning each result along with its associated attempt ID.
func (store *networkResultStore) fetchAttemptResults() (
map[uint64]*networkResult, error) {

results := make(map[uint64]*networkResult)

err := kvdb.View(store.backend, func(tx kvdb.RTx) error {
networkResults := tx.ReadBucket(networkResultStoreBucketKey)
if networkResults == nil {
return ErrPaymentIDNotFound
}

return networkResults.ForEach(func(k, v []byte) error {
// Convert the key (attemptID) back to uint64.
attemptID := binary.BigEndian.Uint64(k)

// Deserialize the result stored in the value.
r := bytes.NewReader(v)
result, err := deserializeNetworkResult(r)
if err != nil {
return err
}

// Store the result with its associated attempt ID.
results[attemptID] = result

return nil
})
}, func() {})
if err != nil {
return nil, err
}

return results, nil
}

// deleteAttemptResult deletes the result given by the specified attempt ID.
func (store *networkResultStore) deleteAttemptResult(attemptID uint64) error {
// Acquire the mutex for this attempt ID.
store.attemptIDMtx.Lock(attemptID)
defer store.attemptIDMtx.Unlock(attemptID)

log.Debugf("Deleting result for attemptID=%v", attemptID)

return kvdb.Update(store.backend, func(tx kvdb.RwTx) error {
networkResults := tx.ReadWriteBucket(
networkResultStoreBucketKey,
)
if networkResults == nil {
return ErrPaymentIDNotFound
}

var attemptIDBytes [8]byte
binary.BigEndian.PutUint64(attemptIDBytes[:], attemptID)

// Check if the result exists before attempting deletion.
resultBytes := networkResults.Get(attemptIDBytes[:])
if resultBytes == nil {
return ErrPaymentIDNotFound
}

// Delete the entry for the given attempt ID.
if err := networkResults.Delete(attemptIDBytes[:]); err != nil {
return err
}

log.Infof("Successfully deleted result for attemptID=%v",
attemptID)

return nil
}, func() {})
}
24 changes: 24 additions & 0 deletions htlcswitch/switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,12 @@ type Config struct {
// a mailbox via AddPacket.
MailboxDeliveryTimeout time.Duration

// RemoteTracking determines whether HTLC attempts should be considered
// as tracked remotely. If true, no automated cleanup of the attempt
// results will occur and the switch will treat all attempts as if they
// are managed by a remote controller.
RemoteTracking bool

// MaxFeeExposure is the threshold in milli-satoshis after which we'll
// fail incoming or outgoing payments for a particular channel.
MaxFeeExposure lnwire.MilliSatoshi
Expand Down Expand Up @@ -534,9 +540,27 @@ func (s *Switch) GetAttemptResult(attemptID uint64, paymentHash lntypes.Hash,
// preiodically to let the switch clean up payment results that we have
// handled.
func (s *Switch) CleanStore(keepPids map[uint64]struct{}) error {
if s.cfg.RemoteTracking {
log.Infof("Switch store automatic cleaning disabled.")
return nil
}

return s.networkResults.cleanStore(keepPids)
}

// FetchAttemptResults retrieves all results from the network result store.
func (s *Switch) FetchAttemptResults() (map[uint64]*networkResult, error) {
return s.networkResults.fetchAttemptResults()
}

// DeleteAttemptResult removes the given payment attempt result from the store
// of local payment attempt results. This allows for synchronization of state
// deletion between the creator of the attempt (router) and HTLC forwarder to
// prevent state from being cleaned up prematurely.
func (s *Switch) DeleteAttemptResult(attemptID uint64) error {
return s.networkResults.deleteAttemptResult(attemptID)
}

// SendHTLC is used by other subsystems which aren't belong to htlc switch
// package in order to send the htlc update. The attemptID used MUST be unique
// for this HTLC, and MUST be used only once, otherwise the switch might reject
Expand Down
4 changes: 4 additions & 0 deletions itest/list_on_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,4 +706,8 @@ var allTestCases = []*lntest.TestCase{
Name: "debuglevel show",
TestFunc: testDebuglevelShow,
},
{
Name: "switch store rpc",
TestFunc: testFetchAttemptResults,
},
}
78 changes: 78 additions & 0 deletions itest/lnd_switch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package itest
import (
"github.com/btcsuite/btcd/btcutil"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnrpc/routerrpc"
"github.com/lightningnetwork/lnd/lnrpc/switchrpc"
"github.com/lightningnetwork/lnd/lntest"
"github.com/lightningnetwork/lnd/lntest/node"
"github.com/lightningnetwork/lnd/lntest/wait"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -504,3 +507,78 @@ func (s *scenarioFourNodes) assertAmoutPaid(ht *lntest.HarnessTest,
amt+(baseFee*num)*2, int64(0),
)
}

func testFetchAttemptResults(ht *lntest.HarnessTest) {
// Create a simple two-node context consisting of Alice and Bob.
alice, bob := ht.Alice, ht.Bob

// Connect nodes to ensure propagation of channels.
ht.EnsureConnected(alice, bob)

const chanAmt = btcutil.Amount(1000000)

// Now, open a channel with 100k satoshis between Alice and Bob with
// Alice being the sole funder of the channel. This will provide Alice
// with outbound liquidity she can use to complete payments.
chanPointAliceBob := ht.OpenChannel(
alice, bob, lntest.OpenChannelParams{Amt: chanAmt},
)
defer ht.CloseChannel(alice, chanPointAliceBob)

const (
numPayments = 3
paymentAmt = 10000
)

// Request an invoice from Bob so he is expecting payment.
payReqs, rHashes, _ := ht.CreatePayReqs(bob, paymentAmt, numPayments)

// Send a few payments using Alice's node to populate her Switch's
// local payment attempt result store with entries.
for i := range numPayments {
payClient := alice.RPC.SendPayment(
&routerrpc.SendPaymentRequest{
PaymentRequest: payReqs[i],
FeeLimitMsat: noFeeLimitMsat,
TimeoutSeconds: 60,
},
)

ht.AssertPaymentSucceedWithTimeout(payClient,
wait.DefaultTimeout)
}

// NOTE(calvin): We need to block until the payments have settled so
// that the results are available when queried below.
for i := range rHashes {
trackClient := alice.RPC.TrackPaymentV2(rHashes[i])
ht.AssertPaymentStatusFromStream(
trackClient, lnrpc.Payment_SUCCEEDED,
)
}

// We expect there to be a result for each payment!
req := &switchrpc.FetchAttemptResultsRequest{}
resp := alice.RPC.FetchAttemptResults(req)
require.Lenf(ht.T, resp.AttemptResults, numPayments, "expected %d "+
"results, instead got %d", numPayments,
len(resp.AttemptResults))

// Delete the attempt results from the switch store.
deleteReq := &switchrpc.DeleteAttemptResultRequest{
AttemptId: 1,
}
alice.RPC.DeleteAttemptResult(deleteReq)

// Fetch the results again, and confirm that the correct results have
// been removed.
req = &switchrpc.FetchAttemptResultsRequest{}
resp = alice.RPC.FetchAttemptResults(req)
require.Lenf(ht.T, resp.AttemptResults, numPayments-1, "expected %d "+
"results, instead got %d", numPayments,
len(resp.AttemptResults))
require.Equal(ht.T, uint64(2), resp.AttemptResults[0].AttemptId,
"unexpected attempt ID")
require.Equal(ht.T, uint64(3), resp.AttemptResults[1].AttemptId,
"unexpected attempt ID")
}
6 changes: 6 additions & 0 deletions lncfg/htlcswitch.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ var (
//nolint:lll
type Htlcswitch struct {
MailboxDeliveryTimeout time.Duration `long:"mailboxdeliverytimeout" description:"The timeout value when delivering HTLCs to a channel link. Setting this value too small will result in local payment failures if large number of payments are sent over a short period."`

// RemoteTracking determines whether HTLC attempts should be considered
// as tracked remotely. If true, no automated cleanup of the attempt
// results will occur and the switch will treat all attempts as if they
// are managed by a remote controller.
RemoteTracking bool `long:"remotetracking" description:"If true, all HTLC attempts will be marked as remotely tracked, allowing for external control over when the attempts are marked for deletion."`
}

// Validate checks the values configured for htlcswitch.
Expand Down
34 changes: 34 additions & 0 deletions lnrpc/switchrpc/config_active.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//go:build switchrpc
// +build switchrpc

package switchrpc

import (
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/macaroons"
)

// Config is the primary configuration struct for the switch RPC subserver.
// It contains all the items required for the server to carry out its duties.
// The fields with struct tags are meant to be parsed as normal configuration
// options, while if able to be populated, the latter fields MUST also be
// specified.
//
//nolint:lll
type Config struct {
// Switch is the main HTLC switch instance that backs this RPC server.
Switch *htlcswitch.Switch

// SwitchMacPath is the path for the router macaroon. If unspecified
// then we assume that the macaroon will be found under the network
// directory, named DefaultRouterMacFilename.
SwitchMacPath string `long:"routermacaroonpath" description:"Path to the router macaroon"`

// NetworkDir is the main network directory wherein the router rpc
// server will find the macaroon named DefaultRouterMacFilename.
NetworkDir string

// MacService is the main macaroon service that we'll use to handle
// authentication for the Router rpc server.
MacService *macaroons.Service
}
7 changes: 7 additions & 0 deletions lnrpc/switchrpc/config_default.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
//go:build !switchrpc
// +build !switchrpc

package switchrpc

// Config is empty for non-switchrpc builds.
type Config struct{}
62 changes: 62 additions & 0 deletions lnrpc/switchrpc/driver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
//go:build switchrpc
// +build switchrpc

package switchrpc

import (
"fmt"

"github.com/lightningnetwork/lnd/lnrpc"
)

// createNewSubServer is a helper method that will create the new router sub
// server given the main config dispatcher method. If we're unable to find the
// config that is meant for us in the config dispatcher, then we'll exit with
// an error.
func createNewSubServer(configRegistry lnrpc.SubServerConfigDispatcher) (
*Server, lnrpc.MacaroonPerms, error) {

// We'll attempt to look up the config that we expect, according to our
// subServerName name. If we can't find this, then we'll exit with an
// error, as we're unable to properly initialize ourselves without this
// config.
subServerConf, ok := configRegistry.FetchConfig(subServerName)
if !ok {
return nil, nil, fmt.Errorf("unable to find config for "+
"subserver type %s", subServerName)
}

// Now that we've found an object mapping to our service name, we'll
// ensure that it's the type we need.
config, ok := subServerConf.(*Config)
if !ok {
return nil, nil, fmt.Errorf("wrong type of config for "+
"subserver %s, expected %T got %T", subServerName,
&Config{}, subServerConf)
}

// Before we try to make the new switch service instance, we'll perform
// some sanity checks on the arguments to ensure that they're usable.
if config.Switch == nil {
return nil, nil, fmt.Errorf("config must set Switch for " +
"SwitchRPC")
}

return New(config)
}

func init() {
subServer := &lnrpc.SubServerDriver{
SubServerName: subServerName,
NewGrpcHandler: func() lnrpc.GrpcHandler {
return &ServerShell{}
},
}

// If the build tag is active, then we'll register ourselves as a
// sub-RPC server within the global lnrpc package namespace.
if err := lnrpc.RegisterSubServer(subServer); err != nil {
panic(fmt.Sprintf("failed to register sub server driver '%s': "+
"%v", subServerName, err))
}
}
32 changes: 32 additions & 0 deletions lnrpc/switchrpc/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package switchrpc

import (
"github.com/btcsuite/btclog/v2"
"github.com/lightningnetwork/lnd/build"
)

// log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller
// requests it.
var log btclog.Logger

// Subsystem defines the logging code for this subsystem.
const Subsystem = "SWTCHRPC"

// The default amount of logging is none.
func init() {
UseLogger(build.NewSubLogger(Subsystem, nil))
}

// DisableLog disables all library log output. Logging output is disabled
// by default until UseLogger is called.
func DisableLog() {
UseLogger(btclog.Disabled)
}

// UseLogger uses a specified Logger to output package logging info.
// This should be used in preference to SetLogWriter if the caller is also
// using btclog.
func UseLogger(logger btclog.Logger) {
log = logger
}
Loading
Loading