From 49e417eee62dbb4851c73906fdd32df750e93b03 Mon Sep 17 00:00:00 2001 From: eugene Date: Fri, 15 Jan 2021 15:16:29 -0500 Subject: [PATCH 01/12] htlcswitch+peer: pass in BestHeight to ChannelLinkConfig --- htlcswitch/link.go | 5 ++++- htlcswitch/link_test.go | 2 ++ htlcswitch/test_utils.go | 1 + peer/brontide.go | 2 +- 4 files changed, 8 insertions(+), 2 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index c133c46e8d..d8534fbd38 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -134,6 +134,9 @@ type ChannelLinkConfig struct { // TODO(conner): remove after refactoring htlcswitch testing framework. Switch *Switch + // BestHeight returns the best known height. + BestHeight func() uint32 + // ForwardPackets attempts to forward the batch of htlcs through the // switch. The function returns and error in case it fails to send one or // more packets. The link's quit signal should be provided to allow @@ -2674,7 +2677,7 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg, continue } - heightNow := l.cfg.Switch.BestHeight() + heightNow := l.cfg.BestHeight() pld, err := chanIterator.HopPayload() if err != nil { diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index 6a0603d7fd..cf62a77a4f 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -1951,6 +1951,7 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) ( FwrdingPolicy: globalPolicy, Peer: alicePeer, Switch: aliceSwitch, + BestHeight: aliceSwitch.BestHeight, Circuits: aliceSwitch.CircuitModifier(), ForwardPackets: aliceSwitch.ForwardPackets, DecodeHopIterators: decoder.DecodeHopIterators, @@ -4454,6 +4455,7 @@ func (h *persistentLinkHarness) restartLink( FwrdingPolicy: globalPolicy, Peer: alicePeer, Switch: aliceSwitch, + BestHeight: aliceSwitch.BestHeight, Circuits: aliceSwitch.CircuitModifier(), ForwardPackets: aliceSwitch.ForwardPackets, DecodeHopIterators: decoder.DecodeHopIterators, diff --git a/htlcswitch/test_utils.go b/htlcswitch/test_utils.go index cfdadd969e..d33daff8f4 100644 --- a/htlcswitch/test_utils.go +++ b/htlcswitch/test_utils.go @@ -1119,6 +1119,7 @@ func (h *hopNetwork) createChannelLink(server, peer *mockServer, link := NewChannelLink( ChannelLinkConfig{ Switch: server.htlcSwitch, + BestHeight: server.htlcSwitch.BestHeight, FwrdingPolicy: h.globalPolicy, Peer: peer, Circuits: server.htlcSwitch.CircuitModifier(), diff --git a/peer/brontide.go b/peer/brontide.go index 8d52cdbe39..b54eac4be7 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -813,7 +813,7 @@ func (p *Brontide) addLink(chanPoint *wire.OutPoint, FetchLastChannelUpdate: p.cfg.FetchLastChanUpdate, HodlMask: p.cfg.Hodl.Mask(), Registry: p.cfg.Invoices, - Switch: p.cfg.Switch, + BestHeight: p.cfg.Switch.BestHeight, Circuits: p.cfg.Switch.CircuitModifier(), ForwardPackets: p.cfg.InterceptSwitch.ForwardPackets, FwrdingPolicy: *forwardingPolicy, From e5bf2343347f3bae59ac8d31f14b7e8f4747c2d1 Mon Sep 17 00:00:00 2001 From: eugene Date: Fri, 15 Jan 2021 15:50:47 -0500 Subject: [PATCH 02/12] peer: introduce and use MessageLink interface in place of ChannelLink --- peer/brontide.go | 7 +++---- peer/interfaces.go | 10 ++++++++++ 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/peer/brontide.go b/peer/brontide.go index b54eac4be7..eca593e4a3 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -1139,10 +1139,9 @@ func (ms *msgStream) AddMsg(msg lnwire.Message) { } // waitUntilLinkActive waits until the target link is active and returns a -// ChannelLink to pass messages to. It accomplishes this by subscribing to +// MessageLink to pass messages to. It accomplishes this by subscribing to // an ActiveLinkEvent which is emitted by the link when it first starts up. -func waitUntilLinkActive(p *Brontide, - cid lnwire.ChannelID) htlcswitch.ChannelLink { +func waitUntilLinkActive(p *Brontide, cid lnwire.ChannelID) MessageLink { // Subscribe to receive channel events. // @@ -1211,7 +1210,7 @@ func waitUntilLinkActive(p *Brontide, // lookups. func newChanMsgStream(p *Brontide, cid lnwire.ChannelID) *msgStream { - var chanLink htlcswitch.ChannelLink + var chanLink MessageLink apply := func(msg lnwire.Message) { // This check is fine because if the link no longer exists, it will diff --git a/peer/interfaces.go b/peer/interfaces.go index 23c1194c7f..2390da61e0 100644 --- a/peer/interfaces.go +++ b/peer/interfaces.go @@ -54,3 +54,13 @@ type MessageConn interface { // ReadNextBody reads the next body. ReadNextBody([]byte) ([]byte, error) } + +// MessageLink is an interface that contains some functionality from a +// htlcswitch.ChannelLink. +type MessageLink interface { + // ChanID returns the ChannelID of the MessageLink. + ChanID() lnwire.ChannelID + + // HandleChannelUpdate passes lnwire.Message to the MessageLink. + HandleChannelUpdate(lnwire.Message) +} From d79a55c26ef3ec116750f3b2babfd247c1610ee9 Mon Sep 17 00:00:00 2001 From: eugene Date: Fri, 15 Jan 2021 15:51:51 -0500 Subject: [PATCH 03/12] lnd+peer: use MessageSwitch in place of *htlcswitch.Switch in peer --- peer/brontide.go | 69 ++++++++++++++++++++++++++++++++++++---------- peer/interfaces.go | 24 ++++++++++++++++ peer/test_utils.go | 2 +- server.go | 2 +- 4 files changed, 80 insertions(+), 17 deletions(-) diff --git a/peer/brontide.go b/peer/brontide.go index eca593e4a3..2f35aebdb2 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -121,6 +121,56 @@ type TimestampedError struct { Timestamp time.Time } +// ChannelSwitch is an implementation of the MessageSwitch interface that wraps +// htlcswitch.Switch. +type ChannelSwitch struct { + *htlcswitch.Switch +} + +// NewChannelSwitch initializes a ChannelSwitch given a raw *htlcswitch.Switch +// pointer. +func NewChannelSwitch(innerSwitch *htlcswitch.Switch) *ChannelSwitch { + return &ChannelSwitch{innerSwitch} +} + +// BestHeight returns innerSwitch's BestHeight. +func (s *ChannelSwitch) BestHeight() uint32 { + return s.Switch.BestHeight() +} + +// CircuitModifier returns the innerSwitch's CircuitModifier. +func (s *ChannelSwitch) CircuitModifier() htlcswitch.CircuitModifier { + return s.Switch.CircuitModifier() +} + +// GetLink retrieves a MessageLink given a ChannelID from the inner Switch. +func (s *ChannelSwitch) GetLink(cid lnwire.ChannelID) (MessageLink, error) { + return s.Switch.GetLink(cid) +} + +// InitLink initializes a ChannelLink in the Switch. +func (s *ChannelSwitch) InitLink(linkCfg htlcswitch.ChannelLinkConfig, + lnChan *lnwallet.LightningChannel) error { + + link := htlcswitch.NewChannelLink(linkCfg, lnChan) + + // Before adding our new link, purge the Switch of any pending or live + // links going by the same channel id. If one is found, we'll shut it + // down to ensure that the mailboxes are only ever under the control of + // one link. + s.RemoveLink(link.ChanID()) + + // With the ChannelLink created, we'll now notify the Switch so this + // channel can be used to dispatch local payments and also passively + // forward payments. + return s.AddLink(link) +} + +// RemoveLink removes a ChannelLink from the Switch. +func (s *ChannelSwitch) RemoveLink(cid lnwire.ChannelID) { + s.Switch.RemoveLink(cid) +} + // Config defines configuration fields that are necessary for a peer object // to function. type Config struct { @@ -175,9 +225,9 @@ type Config struct { // ReadPool is the task pool that manages reuse of read buffers. ReadPool *pool.Read - // Switch is a pointer to the htlcswitch. It is used to setup, get, and - // tear-down ChannelLinks. - Switch *htlcswitch.Switch + // Switch is an implementation of MessageSwitch. It is used to setup, + // get, and tear-down MessageLinks. + Switch MessageSwitch // InterceptSwitch is a pointer to the InterceptableSwitch, a wrapper around // the regular Switch. We only export it here to pass ForwardPackets to the @@ -841,18 +891,7 @@ func (p *Brontide) addLink(chanPoint *wire.OutPoint, HtlcNotifier: p.cfg.HtlcNotifier, } - link := htlcswitch.NewChannelLink(linkCfg, lnChan) - - // Before adding our new link, purge the switch of any pending or live - // links going by the same channel id. If one is found, we'll shut it - // down to ensure that the mailboxes are only ever under the control of - // one link. - p.cfg.Switch.RemoveLink(link.ChanID()) - - // With the channel link created, we'll now notify the htlc switch so - // this channel can be used to dispatch local payments and also - // passively forward payments. - return p.cfg.Switch.AddLink(link) + return p.cfg.Switch.InitLink(linkCfg, lnChan) } // maybeSendNodeAnn sends our node announcement to the remote peer if at least diff --git a/peer/interfaces.go b/peer/interfaces.go index 2390da61e0..9057e4fb58 100644 --- a/peer/interfaces.go +++ b/peer/interfaces.go @@ -4,6 +4,8 @@ import ( "net" "time" + "github.com/lightningnetwork/lnd/htlcswitch" + "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" ) @@ -64,3 +66,25 @@ type MessageLink interface { // HandleChannelUpdate passes lnwire.Message to the MessageLink. HandleChannelUpdate(lnwire.Message) } + +// MessageSwitch is an interface that manages setup, retrieval, and shutdown of +// MessageLink implementations. +type MessageSwitch interface { + // BestHeight returns the best height known to the MessageSwitch. + BestHeight() uint32 + + // CircuitModifier returns a reference to a CircuitModifier. + CircuitModifier() htlcswitch.CircuitModifier + + // GetLink retrieves a MessageLink given a ChannelID. + GetLink(lnwire.ChannelID) (MessageLink, error) + + // InitLink creates a link given a ChannelLinkConfig and + // LightningChannel. + InitLink(htlcswitch.ChannelLinkConfig, + *lnwallet.LightningChannel) error + + // RemoveLink removes a MessageLink from the MessageSwitch given a + // ChannelID. + RemoveLink(lnwire.ChannelID) +} diff --git a/peer/test_utils.go b/peer/test_utils.go index e2534c1cd8..edda5c4ab8 100644 --- a/peer/test_utils.go +++ b/peer/test_utils.go @@ -371,7 +371,7 @@ func createTestPeer(notifier chainntnfs.ChainNotifier, PubKeyBytes: pubKey, ErrorBuffer: errBuffer, ChainIO: chainIO, - Switch: htlcSwitch, + Switch: NewChannelSwitch(htlcSwitch), ChanActiveTimeout: chanActiveTimeout, InterceptSwitch: htlcswitch.NewInterceptableSwitch(htlcSwitch), diff --git a/server.go b/server.go index bc2d75c006..394380d5f8 100644 --- a/server.go +++ b/server.go @@ -3237,7 +3237,7 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, ErrorBuffer: errBuffer, WritePool: s.writePool, ReadPool: s.readPool, - Switch: s.htlcSwitch, + Switch: peer.NewChannelSwitch(s.htlcSwitch), InterceptSwitch: s.interceptableSwitch, ChannelDB: s.remoteChanDB, ChannelGraph: s.localChanDB.ChannelGraph(), From 6ca89469378624d347e42484cf00412a946fb66b Mon Sep 17 00:00:00 2001 From: eugene Date: Tue, 19 Jan 2021 11:31:54 -0500 Subject: [PATCH 04/12] peer: mockMessageLink, mockMessageSwitch in test_utils.go --- peer/test_utils.go | 115 ++++++++++++++++++++++++++++++++++----------- 1 file changed, 88 insertions(+), 27 deletions(-) diff --git a/peer/test_utils.go b/peer/test_utils.go index edda5c4ab8..60dfa8b46b 100644 --- a/peer/test_utils.go +++ b/peer/test_utils.go @@ -29,7 +29,6 @@ import ( "github.com/lightningnetwork/lnd/netann" "github.com/lightningnetwork/lnd/queue" "github.com/lightningnetwork/lnd/shachain" - "github.com/lightningnetwork/lnd/ticker" "github.com/stretchr/testify/require" ) @@ -307,29 +306,6 @@ func createTestPeer(notifier chainntnfs.ChainNotifier, }, } - _, currentHeight, err := chainIO.GetBestBlock() - if err != nil { - return nil, nil, nil, err - } - - htlcSwitch, err := htlcswitch.New(htlcswitch.Config{ - DB: dbAlice, - SwitchPackager: channeldb.NewSwitchPackager(), - Notifier: notifier, - FwdEventTicker: ticker.New( - htlcswitch.DefaultFwdEventInterval), - LogEventTicker: ticker.New( - htlcswitch.DefaultLogInterval), - AckEventTicker: ticker.New( - htlcswitch.DefaultAckInterval), - }, uint32(currentHeight)) - if err != nil { - return nil, nil, nil, err - } - if err = htlcSwitch.Start(); err != nil { - return nil, nil, nil, err - } - nodeSignerAlice := netann.NewNodeSigner(aliceKeySigner) const chanActiveTimeout = time.Minute @@ -342,7 +318,7 @@ func createTestPeer(notifier chainntnfs.ChainNotifier, Graph: dbAlice.ChannelGraph(), MessageSigner: nodeSignerAlice, OurPubKey: aliceKeyPub, - IsChannelActive: htlcSwitch.HasActiveLink, + IsChannelActive: nil, ApplyChannelUpdate: func(*lnwire.ChannelUpdate) error { return nil }, }) if err != nil { @@ -371,10 +347,10 @@ func createTestPeer(notifier chainntnfs.ChainNotifier, PubKeyBytes: pubKey, ErrorBuffer: errBuffer, ChainIO: chainIO, - Switch: NewChannelSwitch(htlcSwitch), + Switch: newMockMessageSwitch(), ChanActiveTimeout: chanActiveTimeout, - InterceptSwitch: htlcswitch.NewInterceptableSwitch(htlcSwitch), + InterceptSwitch: htlcswitch.NewInterceptableSwitch(nil), ChannelDB: dbAlice, FeeEstimator: estimator, @@ -395,6 +371,91 @@ func createTestPeer(notifier chainntnfs.ChainNotifier, return alicePeer, channelBob, cleanUpFunc, nil } +type mockMessageLink struct { + cid lnwire.ChannelID + cfg htlcswitch.ChannelLinkConfig + channel *lnwallet.LightningChannel +} + +// newMockMessageLink makes a mockMessageLink from a ChannelID. +func newMockMessageLink(linkCfg htlcswitch.ChannelLinkConfig, + lnChan *lnwallet.LightningChannel, + cid lnwire.ChannelID) *mockMessageLink { + + messageLink := &mockMessageLink{ + cid: cid, + cfg: linkCfg, + channel: lnChan, + } + + return messageLink +} + +// ChanID returns the mockMessageLink's ChannelID. +func (l *mockMessageLink) ChanID() lnwire.ChannelID { + return l.cid +} + +// HandleChannelUpdate currently does nothing. +func (l *mockMessageLink) HandleChannelUpdate(msg lnwire.Message) {} + +// start begins the message-processing part of the mockMessageLink. +func (l *mockMessageLink) start() { + l.cfg.NotifyActiveLink(l.channel.State().FundingOutpoint) +} + +type mockMessageSwitch struct { + linkIndex map[lnwire.ChannelID]*mockMessageLink +} + +// newMockMessageSwitch creates a new *mockMessageSwitch. +func newMockMessageSwitch() *mockMessageSwitch { + messageSwitch := &mockMessageSwitch{ + linkIndex: make(map[lnwire.ChannelID]*mockMessageLink), + } + + return messageSwitch +} + +// BestHeight returns 0 since it is unused in testing. +func (s *mockMessageSwitch) BestHeight() uint32 { + return 0 +} + +// CircuitModifier returns nil since it is unused in testing. +func (s *mockMessageSwitch) CircuitModifier() htlcswitch.CircuitModifier { + return nil +} + +// GetLink retrieves a *mockMessageLink from linkIndex given a ChannelID. +func (s *mockMessageSwitch) GetLink(cid lnwire.ChannelID) (MessageLink, + error) { + + messageLink := s.linkIndex[cid] + return messageLink, nil +} + +// InitLink starts a *mockMessageLink and adds it to linkIndex. +func (s *mockMessageSwitch) InitLink(linkCfg htlcswitch.ChannelLinkConfig, + lnChan *lnwallet.LightningChannel) error { + + cid := lnwire.NewChanIDFromOutPoint(&lnChan.State().FundingOutpoint) + messageLink := newMockMessageLink(linkCfg, lnChan, cid) + messageLink.start() + s.linkIndex[cid] = messageLink + return nil +} + +// RemoveLink stops and removes a *mockMessageLink given its ChannelID. +func (s *mockMessageSwitch) RemoveLink(cid lnwire.ChannelID) { + messageLink := s.linkIndex[cid] + if messageLink == nil { + return + } + + delete(s.linkIndex, cid) +} + type mockMessageConn struct { t *testing.T From af6a358289eab80c38cdacdc0ce3bfd4f38decb6 Mon Sep 17 00:00:00 2001 From: eugene Date: Wed, 20 Jan 2021 15:31:55 -0500 Subject: [PATCH 05/12] peer: add ChannelGraph, StatusManager interfaces and mocks This removes the need to create a netann.ChanStatusManager in tests. --- peer/brontide.go | 8 +++---- peer/interfaces.go | 21 +++++++++++++++++++ peer/test_utils.go | 52 ++++++++++++++++++++++++++-------------------- 3 files changed, 54 insertions(+), 27 deletions(-) diff --git a/peer/brontide.go b/peer/brontide.go index 2f35aebdb2..348bc19472 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -237,9 +237,9 @@ type Config struct { // ChannelDB is used to fetch opened channels, and closed channels. ChannelDB *channeldb.DB - // ChannelGraph is a pointer to the channel graph which is used to - // query information about the set of known active channels. - ChannelGraph *channeldb.ChannelGraph + // ChannelGraph is an implementation of the ChannelGraph interface and + // is used to query information about the set of known active channels. + ChannelGraph ChannelGraph // ChainArb is used to subscribe to channel events, update contract signals, // and force close channels. @@ -251,7 +251,7 @@ type Config struct { // ChanStatusMgr is used to set or un-set the disabled bit in channel // updates. - ChanStatusMgr *netann.ChanStatusManager + ChanStatusMgr StatusManager // ChainIO is used to retrieve the best block. ChainIO lnwallet.BlockChainIO diff --git a/peer/interfaces.go b/peer/interfaces.go index 9057e4fb58..f007a97dd7 100644 --- a/peer/interfaces.go +++ b/peer/interfaces.go @@ -4,6 +4,8 @@ import ( "net" "time" + "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" @@ -88,3 +90,22 @@ type MessageSwitch interface { // ChannelID. RemoveLink(lnwire.ChannelID) } + +// ChannelGraph is an interface that abstracts the network graph. +type ChannelGraph interface { + // FetchChannelEdgesByOutpoint queries for channel information given an + // outpoint. + FetchChannelEdgesByOutpoint(*wire.OutPoint) ( + *channeldb.ChannelEdgeInfo, *channeldb.ChannelEdgePolicy, + *channeldb.ChannelEdgePolicy, error) +} + +// StatusManager is an interface that abstracts the subsystem that deals with +// enabling and disabling of a channel via ChannelUpdate's disabled bit. +type StatusManager interface { + // RequestEnable attempts to enable a channel. + RequestEnable(wire.OutPoint, bool) error + + // RequestDisable attempts to disable a channel. + RequestDisable(wire.OutPoint, bool) error +} diff --git a/peer/test_utils.go b/peer/test_utils.go index 60dfa8b46b..a31a33eedf 100644 --- a/peer/test_utils.go +++ b/peer/test_utils.go @@ -26,7 +26,6 @@ import ( "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet/chainfee" "github.com/lightningnetwork/lnd/lnwire" - "github.com/lightningnetwork/lnd/netann" "github.com/lightningnetwork/lnd/queue" "github.com/lightningnetwork/lnd/shachain" "github.com/stretchr/testify/require" @@ -60,7 +59,6 @@ func createTestPeer(notifier chainntnfs.ChainNotifier, aliceKeyPriv, aliceKeyPub := btcec.PrivKeyFromBytes( btcec.S256(), channels.AlicesPrivKey, ) - aliceKeySigner := &keychain.PrivKeyDigestSigner{PrivKey: aliceKeyPriv} bobKeyPriv, bobKeyPub := btcec.PrivKeyFromBytes( btcec.S256(), channels.BobsPrivKey, ) @@ -306,28 +304,8 @@ func createTestPeer(notifier chainntnfs.ChainNotifier, }, } - nodeSignerAlice := netann.NewNodeSigner(aliceKeySigner) - const chanActiveTimeout = time.Minute - chanStatusMgr, err := netann.NewChanStatusManager(&netann.ChanStatusConfig{ - ChanStatusSampleInterval: 30 * time.Second, - ChanEnableTimeout: chanActiveTimeout, - ChanDisableTimeout: 2 * time.Minute, - DB: dbAlice, - Graph: dbAlice.ChannelGraph(), - MessageSigner: nodeSignerAlice, - OurPubKey: aliceKeyPub, - IsChannelActive: nil, - ApplyChannelUpdate: func(*lnwire.ChannelUpdate) error { return nil }, - }) - if err != nil { - return nil, nil, nil, err - } - if err = chanStatusMgr.Start(); err != nil { - return nil, nil, nil, err - } - errBuffer, err := queue.NewCircularBuffer(ErrorBufferSize) if err != nil { return nil, nil, nil, err @@ -356,7 +334,7 @@ func createTestPeer(notifier chainntnfs.ChainNotifier, FeeEstimator: estimator, Wallet: wallet, ChainNotifier: notifier, - ChanStatusMgr: chanStatusMgr, + ChanStatusMgr: newMockStatusMgr(), DisconnectPeer: func(b *btcec.PublicKey) error { return nil }, } @@ -456,6 +434,34 @@ func (s *mockMessageSwitch) RemoveLink(cid lnwire.ChannelID) { delete(s.linkIndex, cid) } +type mockChannelGraph struct{} + +// newMockChannelGraph returns an instance of *mockChannelGraph. +func newMockChannelGraph() *mockChannelGraph { return &mockChannelGraph{} } + +// FetchChannelEdgesByOutpoint currently returns nil for all values. +func (g *mockChannelGraph) FetchChannelEdgesByOutpoint(_ *wire.OutPoint) ( + *channeldb.ChannelEdgeInfo, *channeldb.ChannelEdgePolicy, + *channeldb.ChannelEdgePolicy, error) { + + return nil, nil, nil, nil +} + +type mockStatusMgr struct{} + +// newMockStatusMgr returns an instance of *mockStatusMgr. +func newMockStatusMgr() *mockStatusMgr { return &mockStatusMgr{} } + +// RequestEnable returns nil. +func (s *mockStatusMgr) RequestEnable(_ wire.OutPoint, _ bool) error { + return nil +} + +// RequestDisable returns nil. +func (s *mockStatusMgr) RequestDisable(_ wire.OutPoint, _ bool) error { + return nil +} + type mockMessageConn struct { t *testing.T From 5274bac224ce09a00cc050da69d3e43f274580c1 Mon Sep 17 00:00:00 2001 From: eugene Date: Wed, 20 Jan 2021 15:50:11 -0500 Subject: [PATCH 06/12] peer: add ChainArbitrator interface and mocks --- peer/brontide.go | 6 +++--- peer/interfaces.go | 18 ++++++++++++++++++ peer/test_utils.go | 28 ++++++++++++++++++++++++++++ 3 files changed, 49 insertions(+), 3 deletions(-) diff --git a/peer/brontide.go b/peer/brontide.go index 348bc19472..94d90d3c51 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -241,9 +241,9 @@ type Config struct { // is used to query information about the set of known active channels. ChannelGraph ChannelGraph - // ChainArb is used to subscribe to channel events, update contract signals, - // and force close channels. - ChainArb *contractcourt.ChainArbitrator + // ChainArb is used to subscribe to channel events, update contract + // signals, and force close channels. + ChainArb ChainArbitrator // AuthGossiper is needed so that the Brontide impl can register with the // gossiper and process remote channel announcements. diff --git a/peer/interfaces.go b/peer/interfaces.go index f007a97dd7..d48749c7b6 100644 --- a/peer/interfaces.go +++ b/peer/interfaces.go @@ -6,6 +6,7 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" @@ -109,3 +110,20 @@ type StatusManager interface { // RequestDisable attempts to disable a channel. RequestDisable(wire.OutPoint, bool) error } + +// ChainArbitrator is an interface that abstracts the subsystem that manages +// on-chain handling related to our channels. +type ChainArbitrator interface { + // SubscribeChannelEvents subscribes to the set of on-chain events for + // a channel. + SubscribeChannelEvents(wire.OutPoint) ( + *contractcourt.ChainEventSubscription, error) + + // UpdateContractSignals updates the contract signals that updates to + // the channel will be sent over. + UpdateContractSignals(wire.OutPoint, + *contractcourt.ContractSignals) error + + // ForceCloseContract attempts to force close the channel. + ForceCloseContract(wire.OutPoint) (*wire.MsgTx, error) +} diff --git a/peer/test_utils.go b/peer/test_utils.go index a31a33eedf..b016b322af 100644 --- a/peer/test_utils.go +++ b/peer/test_utils.go @@ -4,6 +4,7 @@ import ( "bytes" crand "crypto/rand" "encoding/binary" + "fmt" "io" "io/ioutil" "math/rand" @@ -18,6 +19,7 @@ import ( "github.com/btcsuite/btcutil" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/keychain" @@ -462,6 +464,32 @@ func (s *mockStatusMgr) RequestDisable(_ wire.OutPoint, _ bool) error { return nil } +type mockChainArb struct{} + +// newMockChainArb returns an instance of *mockChainArb. +func newMockChainArb() *mockChainArb { return &mockChainArb{} } + +// SubscribeChannelEvents returns nil values. +func (c *mockChainArb) SubscribeChannelEvents(_ wire.OutPoint) ( + *contractcourt.ChainEventSubscription, error) { + + return nil, nil +} + +// UpdateContractSignals returns nil. +func (c *mockChainArb) UpdateContractSignals(_ wire.OutPoint, + _ *contractcourt.ContractSignals) error { + + return nil +} + +// ForceCloseContract currently returns an error. +func (c *mockChainArb) ForceCloseContract(_ wire.OutPoint) (*wire.MsgTx, + error) { + + return nil, fmt.Errorf("could not force close channel") +} + type mockMessageConn struct { t *testing.T From 1a831e18ffdc5b07d28883edb049b6fc3a246404 Mon Sep 17 00:00:00 2001 From: eugene Date: Wed, 20 Jan 2021 16:11:51 -0500 Subject: [PATCH 07/12] peer: add Sphinx interface and mocks --- peer/brontide.go | 7 +++---- peer/interfaces.go | 14 ++++++++++++++ peer/test_utils.go | 21 +++++++++++++++++++++ 3 files changed, 38 insertions(+), 4 deletions(-) diff --git a/peer/brontide.go b/peer/brontide.go index 94d90d3c51..72b7bc2e62 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -27,7 +27,6 @@ import ( "github.com/lightningnetwork/lnd/funding" "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/htlcswitch/hodl" - "github.com/lightningnetwork/lnd/htlcswitch/hop" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/invoices" "github.com/lightningnetwork/lnd/lnpeer" @@ -278,9 +277,9 @@ type Config struct { // the Brontide. RoutingPolicy htlcswitch.ForwardingPolicy - // Sphinx is used when setting up ChannelLinks so they can decode sphinx - // onion blobs. - Sphinx *hop.OnionProcessor + // Sphinx is used when setting up ChannelLinks so they can decode + // sphinx onion blobs. + Sphinx Sphinx // WitnessBeacon is used when setting up ChannelLinks so they can add any // preimages that they learn. diff --git a/peer/interfaces.go b/peer/interfaces.go index d48749c7b6..59a79f1d42 100644 --- a/peer/interfaces.go +++ b/peer/interfaces.go @@ -4,10 +4,12 @@ import ( "net" "time" + "github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/wire" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/htlcswitch" + "github.com/lightningnetwork/lnd/htlcswitch/hop" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" ) @@ -127,3 +129,15 @@ type ChainArbitrator interface { // ForceCloseContract attempts to force close the channel. ForceCloseContract(wire.OutPoint) (*wire.MsgTx, error) } + +// Sphinx is an interface that abstracts the decryption of onion blobs. +type Sphinx interface { + // DecodeHopIterators batch decodes HTLC onion blobs. + DecodeHopIterators([]byte, []hop.DecodeHopIteratorRequest) ( + []hop.DecodeHopIteratorResponse, error) + + // ExtractErrorEncrypter creates an ErrorEncrypter instance using a + // derived shared secret. + ExtractErrorEncrypter(*btcec.PublicKey) (hop.ErrorEncrypter, + lnwire.FailCode) +} diff --git a/peer/test_utils.go b/peer/test_utils.go index b016b322af..c01cb89bae 100644 --- a/peer/test_utils.go +++ b/peer/test_utils.go @@ -21,6 +21,7 @@ import ( "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/htlcswitch" + "github.com/lightningnetwork/lnd/htlcswitch/hop" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/lntest/channels" @@ -490,6 +491,26 @@ func (c *mockChainArb) ForceCloseContract(_ wire.OutPoint) (*wire.MsgTx, return nil, fmt.Errorf("could not force close channel") } +type mockSphinx struct{} + +// newMockSphinx returns an instance of *mockSphinx. +func newMockSphinx() *mockSphinx { return &mockSphinx{} } + +// DecodeHopIterators returns nil values. +func (s *mockSphinx) DecodeHopIterators(_ []byte, + _ []hop.DecodeHopIteratorRequest) ([]hop.DecodeHopIteratorResponse, + error) { + + return nil, nil +} + +// ExtractErrorEncrypter returns nil values. +func (s *mockSphinx) ExtractErrorEncrypter(_ *btcec.PublicKey) ( + hop.ErrorEncrypter, lnwire.FailCode) { + + return nil, 0 +} + type mockMessageConn struct { t *testing.T From a60c6003d6a0b6ddec2b6839209119fa1e7e473a Mon Sep 17 00:00:00 2001 From: eugene Date: Wed, 20 Jan 2021 16:41:51 -0500 Subject: [PATCH 08/12] peer: add Gossiper interface and mocks --- peer/brontide.go | 7 +++---- peer/interfaces.go | 13 +++++++++++++ peer/test_utils.go | 16 ++++++++++++++++ 3 files changed, 32 insertions(+), 4 deletions(-) diff --git a/peer/brontide.go b/peer/brontide.go index 72b7bc2e62..dc04df0222 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -22,7 +22,6 @@ import ( "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channelnotifier" "github.com/lightningnetwork/lnd/contractcourt" - "github.com/lightningnetwork/lnd/discovery" "github.com/lightningnetwork/lnd/feature" "github.com/lightningnetwork/lnd/funding" "github.com/lightningnetwork/lnd/htlcswitch" @@ -244,9 +243,9 @@ type Config struct { // signals, and force close channels. ChainArb ChainArbitrator - // AuthGossiper is needed so that the Brontide impl can register with the - // gossiper and process remote channel announcements. - AuthGossiper *discovery.AuthenticatedGossiper + // AuthGossiper is needed so that the Brontide impl can register with + // the gossiper and process remote channel announcements. + AuthGossiper Gossiper // ChanStatusMgr is used to set or un-set the disabled bit in channel // updates. diff --git a/peer/interfaces.go b/peer/interfaces.go index 59a79f1d42..ccc016ed9d 100644 --- a/peer/interfaces.go +++ b/peer/interfaces.go @@ -10,6 +10,7 @@ import ( "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/htlcswitch/hop" + "github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" ) @@ -141,3 +142,15 @@ type Sphinx interface { ExtractErrorEncrypter(*btcec.PublicKey) (hop.ErrorEncrypter, lnwire.FailCode) } + +// Gossiper is an interface that abstracts the subsystem that handles the +// gossiping protocol. +type Gossiper interface { + // InitSyncState initializes a gossip syncer for a given peer that + // understands channel range queries. + InitSyncState(lnpeer.Peer) + + // ProcessRemoteAnnouncement processes a remote message intended for + // the Gossiper. + ProcessRemoteAnnouncement(lnwire.Message, lnpeer.Peer) chan error +} diff --git a/peer/test_utils.go b/peer/test_utils.go index c01cb89bae..701d80070b 100644 --- a/peer/test_utils.go +++ b/peer/test_utils.go @@ -24,6 +24,7 @@ import ( "github.com/lightningnetwork/lnd/htlcswitch/hop" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/keychain" + "github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lntest/channels" "github.com/lightningnetwork/lnd/lntest/mock" "github.com/lightningnetwork/lnd/lnwallet" @@ -511,6 +512,21 @@ func (s *mockSphinx) ExtractErrorEncrypter(_ *btcec.PublicKey) ( return nil, 0 } +type mockGossiper struct{} + +// newMockGossiper returns an instance of *mockGossiper. +func newMockGossiper() *mockGossiper { return &mockGossiper{} } + +// InitSyncState currently does nothing. +func (g *mockGossiper) InitSyncState(_ lnpeer.Peer) {} + +// ProcessRemoteAnnouncement currently does nothing. +func (g *mockGossiper) ProcessRemoteAnnouncement(_ lnwire.Message, + _ lnpeer.Peer) chan error { + + return make(chan error) +} + type mockMessageConn struct { t *testing.T From d18560a544e99c0813c2e3692473dd197bc46091 Mon Sep 17 00:00:00 2001 From: eugene Date: Wed, 20 Jan 2021 16:53:29 -0500 Subject: [PATCH 09/12] peer+funding: move and rename Controller interface to Funding, add mocks --- funding/interfaces.go | 21 --------------------- peer/brontide.go | 5 ++--- peer/interfaces.go | 12 ++++++++++++ peer/test_utils.go | 13 +++++++++++++ 4 files changed, 27 insertions(+), 24 deletions(-) delete mode 100644 funding/interfaces.go diff --git a/funding/interfaces.go b/funding/interfaces.go deleted file mode 100644 index 8d37840a49..0000000000 --- a/funding/interfaces.go +++ /dev/null @@ -1,21 +0,0 @@ -package funding - -import ( - "github.com/lightningnetwork/lnd/lnpeer" - "github.com/lightningnetwork/lnd/lnwire" -) - -// Controller is an interface with basic funding flow functions. -// It describes the basic functionality of a funding manager. -// It should at a minimum process a subset of lnwire messages that -// are denoted as funding messages. -type Controller interface { - // ProcessFundingMsg processes a funding message represented by the - // lnwire.Message parameter along with the Peer object representing a - // connection to the counterparty. - ProcessFundingMsg(lnwire.Message, lnpeer.Peer) - - // IsPendingChannel returns whether a particular 32-byte identifier - // represents a pending channel in the Controller implementation. - IsPendingChannel([32]byte, lnpeer.Peer) bool -} diff --git a/peer/brontide.go b/peer/brontide.go index dc04df0222..6b5a7e93ee 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -23,7 +23,6 @@ import ( "github.com/lightningnetwork/lnd/channelnotifier" "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/feature" - "github.com/lightningnetwork/lnd/funding" "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/htlcswitch/hodl" "github.com/lightningnetwork/lnd/input" @@ -321,8 +320,8 @@ type Config struct { FetchLastChanUpdate func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error) - // FundingManager is an implementation of the funding.Controller interface. - FundingManager funding.Controller + // FundingManager handles state relating to the funding process. + FundingManager Funding // Hodl is used when creating ChannelLinks to specify HodlFlags as // breakpoints in dev builds. diff --git a/peer/interfaces.go b/peer/interfaces.go index ccc016ed9d..6a9f41ebe1 100644 --- a/peer/interfaces.go +++ b/peer/interfaces.go @@ -154,3 +154,15 @@ type Gossiper interface { // the Gossiper. ProcessRemoteAnnouncement(lnwire.Message, lnpeer.Peer) chan error } + +// Funding is an interface that abstracts the funding process. +type Funding interface { + // ProcessFundingMsg processes a funding message represented by the + // lnwire.Message parameter along with the Peer object representing a + // connection to the counterparty. + ProcessFundingMsg(lnwire.Message, lnpeer.Peer) + + // IsPendingChannel returns whether a particular 32-byte identifier + // represents a pending channel in the Funding implementation. + IsPendingChannel([32]byte, lnpeer.Peer) bool +} diff --git a/peer/test_utils.go b/peer/test_utils.go index 701d80070b..7104cb3629 100644 --- a/peer/test_utils.go +++ b/peer/test_utils.go @@ -527,6 +527,19 @@ func (g *mockGossiper) ProcessRemoteAnnouncement(_ lnwire.Message, return make(chan error) } +type mockFunding struct{} + +// newMockFunding returns an instance of *mockFunding. +func newMockFunding() *mockFunding { return &mockFunding{} } + +// ProcessFundingMsg currently does nothing. +func (f *mockFunding) ProcessFundingMsg(_ lnwire.Message, _ lnpeer.Peer) {} + +// IsPendingChannel currently returns false. +func (f *mockFunding) IsPendingChannel(_ [32]byte, _ lnpeer.Peer) bool { + return false +} + type mockMessageConn struct { t *testing.T From c0134473e2b4570f66adf529dcd7b17fdee6e30e Mon Sep 17 00:00:00 2001 From: eugene Date: Thu, 21 Jan 2021 11:02:19 -0500 Subject: [PATCH 10/12] peer: use wtmock.MockPeer to mock connection --- peer/brontide_test.go | 18 ++++++++---- peer/test_utils.go | 62 +++++++++++++-------------------------- watchtower/wtmock/peer.go | 44 +++++++++++++++++++++++++++ 3 files changed, 76 insertions(+), 48 deletions(-) diff --git a/peer/brontide_test.go b/peer/brontide_test.go index 2141a945f9..8816f0ef61 100644 --- a/peer/brontide_test.go +++ b/peer/brontide_test.go @@ -16,6 +16,7 @@ import ( "github.com/lightningnetwork/lnd/lnwallet/chancloser" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/pool" + "github.com/lightningnetwork/lnd/watchtower/wtmock" "github.com/stretchr/testify/require" ) @@ -910,6 +911,7 @@ func TestStaticRemoteDowngrade(t *testing.T) { expectedInit: &lnwire.Init{ GlobalFeatures: rawLegacy, Features: rawFeatureOptional, + ExtraData: make([]byte, 0), }, }, { @@ -919,6 +921,7 @@ func TestStaticRemoteDowngrade(t *testing.T) { expectedInit: &lnwire.Init{ GlobalFeatures: rawLegacy, Features: rawFeatureOptional, + ExtraData: make([]byte, 0), }, }, { @@ -928,6 +931,7 @@ func TestStaticRemoteDowngrade(t *testing.T) { expectedInit: &lnwire.Init{ GlobalFeatures: rawLegacy, Features: rawFeatureRequired, + ExtraData: make([]byte, 0), }, }, @@ -942,6 +946,7 @@ func TestStaticRemoteDowngrade(t *testing.T) { expectedInit: &lnwire.Init{ GlobalFeatures: legacyCombinedOptional, Features: rawFeatureOptional, + ExtraData: make([]byte, 0), }, }, } @@ -960,7 +965,7 @@ func TestStaticRemoteDowngrade(t *testing.T) { ) require.NoError(t, writePool.Start()) - mockConn := newMockConn(t, 1) + mockConn := wtmock.NewMockPeer(nil, nil, nil, 1) p := Brontide{ cfg: Config{ @@ -971,14 +976,15 @@ func TestStaticRemoteDowngrade(t *testing.T) { }, } - var b bytes.Buffer - _, err := lnwire.WriteMessage(&b, test.expectedInit, 0) - require.NoError(t, err) - // Send our init message, assert that we write our expected message // and shutdown our write pool. require.NoError(t, p.sendInitMsg(test.legacy)) - mockConn.assertWrite(b.Bytes()) + + // Check that mockConn sent out the expected message. + msg, err := getMessage(mockConn) + require.NoError(t, err) + require.Equal(t, test.expectedInit, msg) + require.NoError(t, writePool.Stop()) }) } diff --git a/peer/test_utils.go b/peer/test_utils.go index 7104cb3629..e88c33bcae 100644 --- a/peer/test_utils.go +++ b/peer/test_utils.go @@ -10,7 +10,6 @@ import ( "math/rand" "net" "os" - "testing" "time" "github.com/btcsuite/btcd/btcec" @@ -32,7 +31,7 @@ import ( "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/queue" "github.com/lightningnetwork/lnd/shachain" - "github.com/stretchr/testify/require" + "github.com/lightningnetwork/lnd/watchtower/wtmock" ) const ( @@ -540,55 +539,34 @@ func (f *mockFunding) IsPendingChannel(_ [32]byte, _ lnpeer.Peer) bool { return false } -type mockMessageConn struct { - t *testing.T - - // MessageConn embeds our interface so that the mock does not need to - // implement every function. The mock will panic if an unspecified function - // is called. - MessageConn - - // writtenMessages is a channel that our mock pushes written messages into. - writtenMessages chan []byte -} - -func newMockConn(t *testing.T, expectedMessages int) *mockMessageConn { - return &mockMessageConn{ - t: t, - writtenMessages: make(chan []byte, expectedMessages), +// pushMessage pushes an lnwire.Message to the MockPeer. +func pushMessage(p *wtmock.MockPeer, msg lnwire.Message) error { + var b bytes.Buffer + if _, err := lnwire.WriteMessage(&b, msg, 0); err != nil { + return err } -} -// SetWriteDeadline mocks setting write deadline for our conn. -func (m *mockMessageConn) SetWriteDeadline(time.Time) error { - return nil -} - -// Flush mocks a message conn flush. -func (m *mockMessageConn) Flush() (int, error) { - return 0, nil -} - -// WriteMessage mocks sending of a message on our connection. It will push -// the bytes sent into the mock's writtenMessages channel. -func (m *mockMessageConn) WriteMessage(msg []byte) error { select { - case m.writtenMessages <- msg: + case p.IncomingMsgs <- b.Bytes(): + return nil case <-time.After(timeout): - m.t.Fatalf("timeout sending message: %v", msg) + return fmt.Errorf("failed pushing message: %v", msg) } - - return nil } -// assertWrite asserts that our mock as had WriteMessage called with the byte -// slice we expect. -func (m *mockMessageConn) assertWrite(expected []byte) { +// getMessage retrieves a message from the MockPeer. +func getMessage(p *wtmock.MockPeer) (lnwire.Message, error) { select { - case actual := <-m.writtenMessages: - require.Equal(m.t, expected, actual) + case msgBytes := <-p.OutgoingMsgs: + r := bytes.NewReader(msgBytes) + msg, err := lnwire.ReadMessage(r, 0) + if err != nil { + return nil, err + } + + return msg, nil case <-time.After(timeout): - m.t.Fatalf("timeout waiting for write: %v", expected) + return nil, fmt.Errorf("timeout waiting to retrieve message") } } diff --git a/watchtower/wtmock/peer.go b/watchtower/wtmock/peer.go index fc1ff9af11..8a0b8d531b 100644 --- a/watchtower/wtmock/peer.go +++ b/watchtower/wtmock/peer.go @@ -9,6 +9,10 @@ import ( "github.com/lightningnetwork/lnd/watchtower/wtserver" ) +const ( + timeout = 5 * time.Second +) + // MockPeer emulates a single endpoint of brontide transport. type MockPeer struct { remotePub *btcec.PublicKey @@ -175,6 +179,46 @@ func (p *MockPeer) SetDeadline(t time.Time) error { panic("not implemented") } +// WriteMessage mocks sending the message. +func (p *MockPeer) WriteMessage(msg []byte) error { + // Since we may be using a write pool, we copy the message + // over before sending it through. + msgLen := len(msg) + msgCopy := make([]byte, msgLen) + if msgLen != copy(msgCopy, msg) { + return fmt.Errorf("failed to copy message: %v", msg) + } + + select { + case p.OutgoingMsgs <- msgCopy: + case <-time.After(timeout): + return fmt.Errorf("timeout sending message: %v", msg) + } + + return nil +} + +// Flush mocks flushing the message down the wire. +func (p *MockPeer) Flush() (int, error) { + return 0, nil +} + +// ReadNextHeader mocks reading the next header. +func (p *MockPeer) ReadNextHeader() (uint32, error) { + return 0, nil +} + +// ReadNextBody mocks reading the next body. +func (p *MockPeer) ReadNextBody(_ []byte) ([]byte, error) { + select { + case msg := <-p.IncomingMsgs: + return msg, nil + + case <-time.After(timeout): + return nil, fmt.Errorf("timeout reading message") + } +} + // Compile-time constraint ensuring the MockPeer implements the wserver.Peer // interface. var _ wtserver.Peer = (*MockPeer)(nil) From 17f253b47eba8a1fa548a8aa990baa590e6f21ed Mon Sep 17 00:00:00 2001 From: eugene Date: Fri, 22 Jan 2021 15:33:02 -0500 Subject: [PATCH 11/12] lntest/mock: extend mockWalletController with P2WPKH Otherwise, the Peer will send Shutdown messages with incorrect DeliveryAddress. --- lntest/mock/walletcontroller.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/lntest/mock/walletcontroller.go b/lntest/mock/walletcontroller.go index 50eddd1cee..2e4fcf4d60 100644 --- a/lntest/mock/walletcontroller.go +++ b/lntest/mock/walletcontroller.go @@ -63,10 +63,20 @@ func (w *WalletController) ConfirmedBalance(confs int32, func (w *WalletController) NewAddress(addrType lnwallet.AddressType, change bool, _ string) (btcutil.Address, error) { - addr, _ := btcutil.NewAddressPubKey( - w.RootKey.PubKey().SerializeCompressed(), &chaincfg.MainNetParams, - ) - return addr, nil + key := w.RootKey.PubKey().SerializeCompressed() + + if addrType == lnwallet.WitnessPubKey { + witProgram := btcutil.Hash160(key) + + addr, err := btcutil.NewAddressWitnessPubKeyHash( + witProgram, &chaincfg.MainNetParams, + ) + + return addr, err + } + + addr, err := btcutil.NewAddressPubKey(key, &chaincfg.MainNetParams) + return addr, err } // LastUnusedAddress currently returns dummy values. From 8390f7f7c09158884f7a9c698c93a1d1529e4070 Mon Sep 17 00:00:00 2001 From: eugene Date: Thu, 4 Feb 2021 16:20:56 -0500 Subject: [PATCH 12/12] peer: add testContext and use *Brontide for testing This commit removes the references to internal Brontide members and instead creates *Brontide and only accesses public members. This allows for proper testing of the Brontide API and allows us to send messages as a normal peer would. This also paves the way for moving the peer tests to the peer_test package instead of residing in the peer package. --- peer/brontide_test.go | 980 ++++++++++++++++++++---------------------- peer/test_utils.go | 512 ++++++++++++++-------- 2 files changed, 785 insertions(+), 707 deletions(-) diff --git a/peer/brontide_test.go b/peer/brontide_test.go index 8816f0ef61..22e31fd9fe 100644 --- a/peer/brontide_test.go +++ b/peer/brontide_test.go @@ -7,16 +7,12 @@ import ( "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/txscript" - "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/htlcswitch" - "github.com/lightningnetwork/lnd/lntest/mock" "github.com/lightningnetwork/lnd/lnwallet/chancloser" "github.com/lightningnetwork/lnd/lnwire" - "github.com/lightningnetwork/lnd/pool" - "github.com/lightningnetwork/lnd/watchtower/wtmock" "github.com/stretchr/testify/require" ) @@ -33,102 +29,98 @@ var ( func TestPeerChannelClosureAcceptFeeResponder(t *testing.T) { t.Parallel() - notifier := &mock.ChainNotifier{ - SpendChan: make(chan *chainntnfs.SpendDetail), - EpochChan: make(chan *chainntnfs.BlockEpoch), - ConfChan: make(chan *chainntnfs.TxConfirmation), - } - broadcastTxChan := make(chan *wire.MsgTx) + ctx := createTestContext(t) + defer ctx.cleanup() - alicePeer, bobChan, cleanUp, err := createTestPeer( - notifier, broadcastTxChan, noUpdate, - ) - if err != nil { - t.Fatalf("unable to create test channels: %v", err) - } - defer cleanUp() + aliceConn := ctx.aliceConn - chanID := lnwire.NewChanIDFromOutPoint(bobChan.ChannelPoint()) + // Send the Init message to Alice. + err := pushMessage(aliceConn, ctx.bobInit) + require.NoError(t, err) - // We send a shutdown request to Alice. She will now be the responding - // node in this shutdown procedure. We first expect Alice to answer - // this shutdown request with a Shutdown message. - alicePeer.chanCloseMsgs <- &closeMsg{ - cid: chanID, - msg: lnwire.NewShutdown(chanID, dummyDeliveryScript), - } + // Alice's Peer struct is created, representing a connection to Bob. + alicePeer := NewBrontide(*ctx.aliceCfg) + require.NoError(t, alicePeer.Start()) - var msg lnwire.Message - select { - case outMsg := <-alicePeer.outgoingQueue: - msg = outMsg.msg - case <-time.After(timeout): - t.Fatalf("did not receive shutdown message") - } + // Cleanup Alice via Disconnect. + defer func() { + alicePeer.Disconnect(errDummy) + alicePeer.WaitForDisconnect(make(chan struct{})) + }() - shutdownMsg, ok := msg.(*lnwire.Shutdown) - if !ok { - t.Fatalf("expected Shutdown message, got %T", msg) - } + // Alice should reply with an Init message, we discard it. + msg, err := getMessage(aliceConn) + require.NoError(t, err) + require.IsType(t, &lnwire.Init{}, msg) - respDeliveryScript := shutdownMsg.Address + // Create a test channel between Alice and Bob. + aliceChan, bobLnChan, err := createTestChannels(noUpdate, + ctx.alicePriv, ctx.bobPriv, ctx.alicePub, ctx.bobPub, + ctx.aliceDb, ctx.bobDb, ctx.aliceCfg.FeeEstimator, + ctx.bobSigner, ctx.bobPool, false) + require.NoError(t, err) - // Alice will then send a ClosingSigned message, indicating her proposed - // closing transaction fee. Alice sends the ClosingSigned message as she is - // the initiator of the channel. - select { - case outMsg := <-alicePeer.outgoingQueue: - msg = outMsg.msg - case <-time.After(timeout): - t.Fatalf("did not receive ClosingSigned message") - } + err = alicePeer.AddNewChannel(aliceChan, make(chan struct{})) + require.NoError(t, err) - respClosingSigned, ok := msg.(*lnwire.ClosingSigned) - if !ok { - t.Fatalf("expected ClosingSigned message, got %T", msg) - } + chanID := lnwire.NewChanIDFromOutPoint(bobLnChan.ChannelPoint()) + + // We send a shutdown request to Alice. She will now be the responding + // node in this shutdown procedure. We first expect Alice to answer + // this shutdown request with a Shutdown message. + bobShutdownMsg := lnwire.NewShutdown(chanID, dummyDeliveryScript) + err = pushMessage(aliceConn, bobShutdownMsg) + require.NoError(t, err) + + msg, err = getMessage(aliceConn) + require.NoError(t, err) + aliceShutdownMsg, ok := msg.(*lnwire.Shutdown) + require.True(t, ok) + + aliceDeliveryScript := aliceShutdownMsg.Address + + // Alice will then send a ClosingSigned message, indicating her + // proposed closing transaction fee. Alice sends the ClosingSigned + // message as she is the initiator of the channel. + msg, err = getMessage(aliceConn) + require.NoError(t, err) + aliceClosingSigned, ok := msg.(*lnwire.ClosingSigned) + require.True(t, ok) // We accept the fee, and send a ClosingSigned with the same fee back, // so she knows we agreed. - aliceFee := respClosingSigned.FeeSatoshis - bobSig, _, _, err := bobChan.CreateCloseProposal( - aliceFee, dummyDeliveryScript, respDeliveryScript, + aliceFee := aliceClosingSigned.FeeSatoshis + bobSig, _, _, err := bobLnChan.CreateCloseProposal( + aliceFee, dummyDeliveryScript, aliceDeliveryScript, ) - if err != nil { - t.Fatalf("error creating close proposal: %v", err) - } + require.NoError(t, err) parsedSig, err := lnwire.NewSigFromSignature(bobSig) - if err != nil { - t.Fatalf("error parsing signature: %v", err) - } - closingSigned := lnwire.NewClosingSigned(chanID, aliceFee, parsedSig) - alicePeer.chanCloseMsgs <- &closeMsg{ - cid: chanID, - msg: closingSigned, - } + require.NoError(t, err) + + bobClosingSigned := lnwire.NewClosingSigned( + chanID, aliceFee, parsedSig, + ) + err = pushMessage(aliceConn, bobClosingSigned) + require.NoError(t, err) - // Alice should now see that we agreed on the fee, and should broadcast the - // closing transaction. + // Alice should now see that we agreed on the fee, and should broadcast + // the closing transaction. select { - case <-broadcastTxChan: + case <-ctx.publTx: case <-time.After(timeout): t.Fatalf("closing tx not broadcast") } - // Need to pull the remaining message off of Alice's outgoing queue. - select { - case outMsg := <-alicePeer.outgoingQueue: - msg = outMsg.msg - case <-time.After(timeout): - t.Fatalf("did not receive ClosingSigned message") - } - if _, ok := msg.(*lnwire.ClosingSigned); !ok { - t.Fatalf("expected ClosingSigned message, got %T", msg) - } + // Alice will respond with a ClosingSigned. + msg, err = getMessage(aliceConn) + require.NoError(t, err) + aliceClosingSigned, ok = msg.(*lnwire.ClosingSigned) + require.True(t, ok) + require.Equal(t, aliceClosingSigned.FeeSatoshis, aliceFee) // Alice should be waiting in a goroutine for a confirmation. - notifier.ConfChan <- &chainntnfs.TxConfirmation{} + ctx.notifier.ConfChan <- &chainntnfs.TxConfirmation{} } // TestPeerChannelClosureAcceptFeeInitiator tests the shutdown initiator's @@ -136,120 +128,108 @@ func TestPeerChannelClosureAcceptFeeResponder(t *testing.T) { func TestPeerChannelClosureAcceptFeeInitiator(t *testing.T) { t.Parallel() - notifier := &mock.ChainNotifier{ - SpendChan: make(chan *chainntnfs.SpendDetail), - EpochChan: make(chan *chainntnfs.BlockEpoch), - ConfChan: make(chan *chainntnfs.TxConfirmation), - } - broadcastTxChan := make(chan *wire.MsgTx) + ctx := createTestContext(t) + defer ctx.cleanup() - alicePeer, bobChan, cleanUp, err := createTestPeer( - notifier, broadcastTxChan, noUpdate, - ) - if err != nil { - t.Fatalf("unable to create test channels: %v", err) - } - defer cleanUp() + aliceConn := ctx.aliceConn + + // Send the Init message to Alice. + err := pushMessage(aliceConn, ctx.bobInit) + require.NoError(t, err) + + // Alice's Peer struct is created, representing a connection to Bob. + alicePeer := NewBrontide(*ctx.aliceCfg) + require.NoError(t, alicePeer.Start()) + + // Cleanup Alice via Disconnect. + defer func() { + alicePeer.Disconnect(errDummy) + alicePeer.WaitForDisconnect(make(chan struct{})) + }() + + // Alice should reply with an Init message, we discard it. + msg, err := getMessage(aliceConn) + require.NoError(t, err) + require.IsType(t, &lnwire.Init{}, msg) + + // Create a test channel between Alice and Bob. + aliceChan, bobLnChan, err := createTestChannels(noUpdate, + ctx.alicePriv, ctx.bobPriv, ctx.alicePub, ctx.bobPub, + ctx.aliceDb, ctx.bobDb, ctx.aliceCfg.FeeEstimator, + ctx.bobSigner, ctx.bobPool, false) + require.NoError(t, err) + + err = alicePeer.AddNewChannel(aliceChan, make(chan struct{})) + require.NoError(t, err) // We make Alice send a shutdown request. - updateChan := make(chan interface{}, 1) - errChan := make(chan error, 1) closeCommand := &htlcswitch.ChanClose{ CloseType: htlcswitch.CloseRegular, - ChanPoint: bobChan.ChannelPoint(), - Updates: updateChan, + ChanPoint: bobLnChan.ChannelPoint(), + Updates: make(chan interface{}, 1), TargetFeePerKw: 12500, - Err: errChan, + Err: make(chan error, 1), } - alicePeer.localCloseChanReqs <- closeCommand - // We can now pull a Shutdown message off of Alice's outgoingQueue. - var msg lnwire.Message - select { - case outMsg := <-alicePeer.outgoingQueue: - msg = outMsg.msg - case <-time.After(timeout): - t.Fatalf("did not receive shutdown request") - } + alicePeer.HandleLocalCloseChanReqs(closeCommand) - shutdownMsg, ok := msg.(*lnwire.Shutdown) - if !ok { - t.Fatalf("expected Shutdown message, got %T", msg) - } + // Alice should send a Shutdown message to Bob. + msg, err = getMessage(aliceConn) + require.NoError(t, err) + aliceShutdownMsg, ok := msg.(*lnwire.Shutdown) + require.True(t, ok) - aliceDeliveryScript := shutdownMsg.Address + aliceDeliveryScript := aliceShutdownMsg.Address // Bob will respond with his own Shutdown message. - chanID := shutdownMsg.ChannelID - alicePeer.chanCloseMsgs <- &closeMsg{ - cid: chanID, - msg: lnwire.NewShutdown(chanID, - dummyDeliveryScript), - } + chanID := aliceShutdownMsg.ChannelID + bobShutdownMsg := lnwire.NewShutdown(chanID, dummyDeliveryScript) + err = pushMessage(aliceConn, bobShutdownMsg) + require.NoError(t, err) // Alice will reply with a ClosingSigned here. - select { - case outMsg := <-alicePeer.outgoingQueue: - msg = outMsg.msg - case <-time.After(timeout): - t.Fatalf("did not receive closing signed message") - } - closingSignedMsg, ok := msg.(*lnwire.ClosingSigned) - if !ok { - t.Fatalf("expected to receive closing signed message, got %T", msg) - } + msg, err = getMessage(aliceConn) + require.NoError(t, err) + aliceClosingSigned, ok := msg.(*lnwire.ClosingSigned) + require.True(t, ok) // Bob should reply with the exact same fee in his next ClosingSigned // message. - bobFee := closingSignedMsg.FeeSatoshis - bobSig, _, _, err := bobChan.CreateCloseProposal( - bobFee, dummyDeliveryScript, aliceDeliveryScript, + aliceFee := aliceClosingSigned.FeeSatoshis + bobSig, _, _, err := bobLnChan.CreateCloseProposal( + aliceFee, dummyDeliveryScript, aliceDeliveryScript, ) - if err != nil { - t.Fatalf("unable to create close proposal: %v", err) - } + require.NoError(t, err) + parsedSig, err := lnwire.NewSigFromSignature(bobSig) - if err != nil { - t.Fatalf("unable to parse signature: %v", err) - } + require.NoError(t, err) - closingSigned := lnwire.NewClosingSigned(shutdownMsg.ChannelID, - bobFee, parsedSig) - alicePeer.chanCloseMsgs <- &closeMsg{ - cid: chanID, - msg: closingSigned, - } + bobClosingSigned := lnwire.NewClosingSigned( + chanID, aliceFee, parsedSig, + ) + err = pushMessage(aliceConn, bobClosingSigned) + require.NoError(t, err) - // Alice should accept Bob's fee, broadcast the cooperative close tx, and - // send a ClosingSigned message back to Bob. + // Alice should accept Bob's fee, broadcast the cooperative close tx, + // and send a ClosingSigned message back to Bob. // Alice should now broadcast the closing transaction. select { - case <-broadcastTxChan: + case <-ctx.publTx: case <-time.After(timeout): t.Fatalf("closing tx not broadcast") } // Alice should respond with the ClosingSigned they both agreed upon. - select { - case outMsg := <-alicePeer.outgoingQueue: - msg = outMsg.msg - case <-time.After(timeout): - t.Fatalf("did not receive closing signed message") - } - - closingSignedMsg, ok = msg.(*lnwire.ClosingSigned) - if !ok { - t.Fatalf("expected ClosingSigned message, got %T", msg) - } - - if closingSignedMsg.FeeSatoshis != bobFee { - t.Fatalf("expected ClosingSigned fee to be %v, instead got %v", - bobFee, closingSignedMsg.FeeSatoshis) - } + msg, err = getMessage(aliceConn) + require.NoError(t, err) + aliceClosingSigned, ok = msg.(*lnwire.ClosingSigned) + require.True(t, ok) + require.Equal(t, aliceClosingSigned.FeeSatoshis, aliceFee) - // Alice should be waiting on a single confirmation for the coop close tx. - notifier.ConfChan <- &chainntnfs.TxConfirmation{} + // Alice should be waiting on a single confirmation for the coop close + // tx. + ctx.notifier.ConfChan <- &chainntnfs.TxConfirmation{} } // TestPeerChannelClosureFeeNegotiationsResponder tests the shutdown @@ -258,192 +238,158 @@ func TestPeerChannelClosureAcceptFeeInitiator(t *testing.T) { func TestPeerChannelClosureFeeNegotiationsResponder(t *testing.T) { t.Parallel() - notifier := &mock.ChainNotifier{ - SpendChan: make(chan *chainntnfs.SpendDetail), - EpochChan: make(chan *chainntnfs.BlockEpoch), - ConfChan: make(chan *chainntnfs.TxConfirmation), - } - broadcastTxChan := make(chan *wire.MsgTx) + ctx := createTestContext(t) + defer ctx.cleanup() - alicePeer, bobChan, cleanUp, err := createTestPeer( - notifier, broadcastTxChan, noUpdate, - ) - if err != nil { - t.Fatalf("unable to create test channels: %v", err) - } - defer cleanUp() + aliceConn := ctx.aliceConn - chanID := lnwire.NewChanIDFromOutPoint(bobChan.ChannelPoint()) + // Send the Init message to Alice. + err := pushMessage(aliceConn, ctx.bobInit) + require.NoError(t, err) - // Bob sends a shutdown request to Alice. She will now be the responding - // node in this shutdown procedure. We first expect Alice to answer this - // Shutdown request with a Shutdown message. - alicePeer.chanCloseMsgs <- &closeMsg{ - cid: chanID, - msg: lnwire.NewShutdown(chanID, - dummyDeliveryScript), - } + // Alice's Peer struct is created, representing a connection to Bob. + alicePeer := NewBrontide(*ctx.aliceCfg) + require.NoError(t, alicePeer.Start()) - var msg lnwire.Message - select { - case outMsg := <-alicePeer.outgoingQueue: - msg = outMsg.msg - case <-time.After(timeout): - t.Fatalf("did not receive shutdown message") - } + // Cleanup Alice via Disconnect. + defer func() { + alicePeer.Disconnect(errDummy) + alicePeer.WaitForDisconnect(make(chan struct{})) + }() - shutdownMsg, ok := msg.(*lnwire.Shutdown) - if !ok { - t.Fatalf("expected Shutdown message, got %T", msg) - } + // Alice should reply with an Init message, we discard it. + msg, err := getMessage(aliceConn) + require.NoError(t, err) + require.IsType(t, &lnwire.Init{}, msg) + + // Create a test channel between Alice and Bob. + aliceChan, bobLnChan, err := createTestChannels(noUpdate, + ctx.alicePriv, ctx.bobPriv, ctx.alicePub, ctx.bobPub, + ctx.aliceDb, ctx.bobDb, ctx.aliceCfg.FeeEstimator, + ctx.bobSigner, ctx.bobPool, false) + require.NoError(t, err) + + err = alicePeer.AddNewChannel(aliceChan, make(chan struct{})) + require.NoError(t, err) + + chanID := lnwire.NewChanIDFromOutPoint(bobLnChan.ChannelPoint()) + + // Bob sends a shutdown request to Alice. She will now be the + // responding node in this shutdown procedure. We first expect Alice to + // answer this Shutdown request with a Shutdown message. + bobShutdownMsg := lnwire.NewShutdown(chanID, dummyDeliveryScript) + err = pushMessage(aliceConn, bobShutdownMsg) + require.NoError(t, err) + + msg, err = getMessage(aliceConn) + require.NoError(t, err) + aliceShutdownMsg, ok := msg.(*lnwire.Shutdown) + require.True(t, ok) - aliceDeliveryScript := shutdownMsg.Address + aliceDeliveryScript := aliceShutdownMsg.Address // As Alice is the channel initiator, she will send her ClosingSigned // message. - select { - case outMsg := <-alicePeer.outgoingQueue: - msg = outMsg.msg - case <-time.After(timeout): - t.Fatalf("did not receive closing signed message") - } - + msg, err = getMessage(aliceConn) + require.NoError(t, err) aliceClosingSigned, ok := msg.(*lnwire.ClosingSigned) - if !ok { - t.Fatalf("expected ClosingSigned message, got %T", msg) - } + require.True(t, ok) // Bob doesn't agree with the fee and will send one back that's 2.5x. preferredRespFee := aliceClosingSigned.FeeSatoshis increasedFee := btcutil.Amount(float64(preferredRespFee) * 2.5) - bobSig, _, _, err := bobChan.CreateCloseProposal( + bobSig, _, _, err := bobLnChan.CreateCloseProposal( increasedFee, dummyDeliveryScript, aliceDeliveryScript, ) - if err != nil { - t.Fatalf("error creating close proposal: %v", err) - } + require.NoError(t, err) parsedSig, err := lnwire.NewSigFromSignature(bobSig) - if err != nil { - t.Fatalf("error parsing signature: %v", err) - } - closingSigned := lnwire.NewClosingSigned(chanID, increasedFee, parsedSig) - alicePeer.chanCloseMsgs <- &closeMsg{ - cid: chanID, - msg: closingSigned, - } - - // Alice will now see the new fee we propose, but with current settings it - // won't accept it immediately as it differs too much by its ideal fee. We - // should get a new proposal back, which should have the average fee rate - // proposed. - select { - case outMsg := <-alicePeer.outgoingQueue: - msg = outMsg.msg - case <-time.After(timeout): - t.Fatalf("did not receive closing signed message") - } + require.NoError(t, err) + bobClosingSigned := lnwire.NewClosingSigned( + chanID, increasedFee, parsedSig, + ) + err = pushMessage(aliceConn, bobClosingSigned) + require.NoError(t, err) + + // Alice will now see the new fee we propose, but with current settings + // it won't accept it immediately as it differs too much by its ideal + // fee. We should get a new proposal back, which should have the + // average fee rate proposed. + msg, err = getMessage(aliceConn) + require.NoError(t, err) aliceClosingSigned, ok = msg.(*lnwire.ClosingSigned) - if !ok { - t.Fatalf("expected ClosingSigned message, got %T", msg) - } + require.True(t, ok) - // The fee sent by Alice should be less than the fee Bob just sent as Alice - // should attempt to compromise. + // The fee sent by Alice should be less than the fee Bob just sent as + // Alice should attempt to compromise. aliceFee := aliceClosingSigned.FeeSatoshis - if aliceFee > increasedFee { - t.Fatalf("new fee should be less than our fee: new=%v, "+ - "prior=%v", aliceFee, increasedFee) - } + require.LessOrEqual(t, int64(aliceFee), int64(increasedFee)) + lastFeeResponder := aliceFee // We try negotiating a 2.1x fee, which should also be rejected. increasedFee = btcutil.Amount(float64(preferredRespFee) * 2.1) - bobSig, _, _, err = bobChan.CreateCloseProposal( + bobSig, _, _, err = bobLnChan.CreateCloseProposal( increasedFee, dummyDeliveryScript, aliceDeliveryScript, ) - if err != nil { - t.Fatalf("error creating close proposal: %v", err) - } + require.NoError(t, err) parsedSig, err = lnwire.NewSigFromSignature(bobSig) - if err != nil { - t.Fatalf("error parsing signature: %v", err) - } - closingSigned = lnwire.NewClosingSigned(chanID, increasedFee, parsedSig) - alicePeer.chanCloseMsgs <- &closeMsg{ - cid: chanID, - msg: closingSigned, - } - - // Bob's latest proposal still won't be accepted and Alice should send over - // a new ClosingSigned message. It should be the average of what Bob and - // Alice each proposed last time. - select { - case outMsg := <-alicePeer.outgoingQueue: - msg = outMsg.msg - case <-time.After(timeout): - t.Fatalf("did not receive closing signed message") - } + require.NoError(t, err) + bobClosingSigned = lnwire.NewClosingSigned( + chanID, increasedFee, parsedSig, + ) + err = pushMessage(aliceConn, bobClosingSigned) + require.NoError(t, err) + + // Bob's latest proposal still won't be accepted and Alice should send + // over a new ClosingSigned message. It should be the average of what + // Bob and Alice each proposed last time. + msg, err = getMessage(aliceConn) + require.NoError(t, err) aliceClosingSigned, ok = msg.(*lnwire.ClosingSigned) - if !ok { - t.Fatalf("expected ClosingSigned message, got %T", msg) - } + require.True(t, ok) // Alice should inch towards Bob's fee, in order to compromise. // Additionally, this fee should be less than the fee Bob sent before. aliceFee = aliceClosingSigned.FeeSatoshis - if aliceFee < lastFeeResponder { - t.Fatalf("new fee should be greater than prior: new=%v, "+ - "prior=%v", aliceFee, lastFeeResponder) - } - if aliceFee > increasedFee { - t.Fatalf("new fee should be less than Bob's fee: new=%v, "+ - "prior=%v", aliceFee, increasedFee) - } + require.GreaterOrEqual(t, int64(aliceFee), int64(lastFeeResponder)) + require.LessOrEqual(t, int64(aliceFee), int64(increasedFee)) - // Finally, Bob will accept the fee by echoing back the same fee that Alice - // just sent over. - bobSig, _, _, err = bobChan.CreateCloseProposal( + // Finally, Bob will accept the fee by echoing back the same fee that + // Alice just sent over. + bobSig, _, _, err = bobLnChan.CreateCloseProposal( aliceFee, dummyDeliveryScript, aliceDeliveryScript, ) - if err != nil { - t.Fatalf("error creating close proposal: %v", err) - } + require.NoError(t, err) parsedSig, err = lnwire.NewSigFromSignature(bobSig) - if err != nil { - t.Fatalf("error parsing signature: %v", err) - } - closingSigned = lnwire.NewClosingSigned(chanID, aliceFee, parsedSig) - alicePeer.chanCloseMsgs <- &closeMsg{ - cid: chanID, - msg: closingSigned, - } + require.NoError(t, err) - // Alice will now see that Bob agreed on the fee, and broadcast the coop - // close transaction. + bobClosingSigned = lnwire.NewClosingSigned(chanID, aliceFee, parsedSig) + err = pushMessage(aliceConn, bobClosingSigned) + require.NoError(t, err) + + // Alice will now see that Bob agreed on the fee, and broadcast the + // coop close transaction. select { - case <-broadcastTxChan: + case <-ctx.publTx: case <-time.After(timeout): t.Fatalf("closing tx not broadcast") } // Alice should respond with the ClosingSigned they both agreed upon. - select { - case outMsg := <-alicePeer.outgoingQueue: - msg = outMsg.msg - case <-time.After(timeout): - t.Fatalf("did not receive closing signed message") - } - if _, ok := msg.(*lnwire.ClosingSigned); !ok { - t.Fatalf("expected to receive closing signed message, got %T", msg) - } + msg, err = getMessage(aliceConn) + require.NoError(t, err) + aliceClosingSigned, ok = msg.(*lnwire.ClosingSigned) + require.True(t, ok) + require.Equal(t, aliceClosingSigned.FeeSatoshis, aliceFee) - // Alice should be waiting on a single confirmation for the coop close tx. - notifier.ConfChan <- &chainntnfs.TxConfirmation{} + // Alice should be waiting on a single confirmation for the coop close + // tx. + ctx.notifier.ConfChan <- &chainntnfs.TxConfirmation{} } // TestPeerChannelClosureFeeNegotiationsInitiator tests the shutdown @@ -452,117 +398,102 @@ func TestPeerChannelClosureFeeNegotiationsResponder(t *testing.T) { func TestPeerChannelClosureFeeNegotiationsInitiator(t *testing.T) { t.Parallel() - notifier := &mock.ChainNotifier{ - SpendChan: make(chan *chainntnfs.SpendDetail), - EpochChan: make(chan *chainntnfs.BlockEpoch), - ConfChan: make(chan *chainntnfs.TxConfirmation), - } - broadcastTxChan := make(chan *wire.MsgTx) + ctx := createTestContext(t) + defer ctx.cleanup() - alicePeer, bobChan, cleanUp, err := createTestPeer( - notifier, broadcastTxChan, noUpdate, - ) - if err != nil { - t.Fatalf("unable to create test channels: %v", err) - } - defer cleanUp() + aliceConn := ctx.aliceConn + + // Send the Init message to Alice. + err := pushMessage(aliceConn, ctx.bobInit) + require.NoError(t, err) + + // Alice's Peer struct is created, representing a connection to Bob. + alicePeer := NewBrontide(*ctx.aliceCfg) + require.NoError(t, alicePeer.Start()) + + // Cleanup Alice via Disconnect. + defer func() { + alicePeer.Disconnect(errDummy) + alicePeer.WaitForDisconnect(make(chan struct{})) + }() + + // Alice should reply with an Init message, we discard it. + msg, err := getMessage(aliceConn) + require.NoError(t, err) + require.IsType(t, &lnwire.Init{}, msg) + + // Create a test channel between Alice and Bob. + aliceChan, bobLnChan, err := createTestChannels(noUpdate, + ctx.alicePriv, ctx.bobPriv, ctx.alicePub, ctx.bobPub, + ctx.aliceDb, ctx.bobDb, ctx.aliceCfg.FeeEstimator, + ctx.bobSigner, ctx.bobPool, false) + require.NoError(t, err) + + err = alicePeer.AddNewChannel(aliceChan, make(chan struct{})) + require.NoError(t, err) - // We make the initiator send a shutdown request. - updateChan := make(chan interface{}, 1) - errChan := make(chan error, 1) + // We make Alice send a shutdown request. closeCommand := &htlcswitch.ChanClose{ CloseType: htlcswitch.CloseRegular, - ChanPoint: bobChan.ChannelPoint(), - Updates: updateChan, + ChanPoint: bobLnChan.ChannelPoint(), + Updates: make(chan interface{}, 1), TargetFeePerKw: 12500, - Err: errChan, + Err: make(chan error, 1), } - alicePeer.localCloseChanReqs <- closeCommand + alicePeer.HandleLocalCloseChanReqs(closeCommand) // Alice should now send a Shutdown request to Bob. - var msg lnwire.Message - select { - case outMsg := <-alicePeer.outgoingQueue: - msg = outMsg.msg - case <-time.After(timeout): - t.Fatalf("did not receive shutdown request") - } - - shutdownMsg, ok := msg.(*lnwire.Shutdown) - if !ok { - t.Fatalf("expected Shutdown message, got %T", msg) - } + msg, err = getMessage(aliceConn) + require.NoError(t, err) + aliceShutdownMsg, ok := msg.(*lnwire.Shutdown) + require.True(t, ok) - aliceDeliveryScript := shutdownMsg.Address + aliceDeliveryScript := aliceShutdownMsg.Address // Bob will answer the Shutdown message with his own Shutdown. - chanID := lnwire.NewChanIDFromOutPoint(bobChan.ChannelPoint()) - respShutdown := lnwire.NewShutdown(chanID, dummyDeliveryScript) - alicePeer.chanCloseMsgs <- &closeMsg{ - cid: chanID, - msg: respShutdown, - } + chanID := aliceShutdownMsg.ChannelID + bobShutdownMsg := lnwire.NewShutdown(chanID, dummyDeliveryScript) + err = pushMessage(aliceConn, bobShutdownMsg) + require.NoError(t, err) // Alice should now respond with a ClosingSigned message with her ideal // fee rate. - select { - case outMsg := <-alicePeer.outgoingQueue: - msg = outMsg.msg - case <-time.After(timeout): - t.Fatalf("did not receive closing signed") - } - closingSignedMsg, ok := msg.(*lnwire.ClosingSigned) - if !ok { - t.Fatalf("expected ClosingSigned message, got %T", msg) - } + msg, err = getMessage(aliceConn) + require.NoError(t, err) + aliceClosingSigned, ok := msg.(*lnwire.ClosingSigned) + require.True(t, ok) - idealFeeRate := closingSignedMsg.FeeSatoshis + idealFeeRate := aliceClosingSigned.FeeSatoshis lastReceivedFee := idealFeeRate increasedFee := btcutil.Amount(float64(idealFeeRate) * 2.1) lastSentFee := increasedFee - bobSig, _, _, err := bobChan.CreateCloseProposal( + bobSig, _, _, err := bobLnChan.CreateCloseProposal( increasedFee, dummyDeliveryScript, aliceDeliveryScript, ) - if err != nil { - t.Fatalf("error creating close proposal: %v", err) - } + require.NoError(t, err) parsedSig, err := lnwire.NewSigFromSignature(bobSig) - if err != nil { - t.Fatalf("unable to parse signature: %v", err) - } + require.NoError(t, err) - closingSigned := lnwire.NewClosingSigned(chanID, increasedFee, parsedSig) - alicePeer.chanCloseMsgs <- &closeMsg{ - cid: chanID, - msg: closingSigned, - } + bobClosingSigned := lnwire.NewClosingSigned( + chanID, increasedFee, parsedSig, + ) + err = pushMessage(aliceConn, bobClosingSigned) + require.NoError(t, err) // It still won't be accepted, and we should get a new proposal, the // average of what we proposed, and what they proposed last time. - select { - case outMsg := <-alicePeer.outgoingQueue: - msg = outMsg.msg - case <-time.After(timeout): - t.Fatalf("did not receive closing signed") - } - closingSignedMsg, ok = msg.(*lnwire.ClosingSigned) - if !ok { - t.Fatalf("expected ClosingSigned message, got %T", msg) - } + msg, err = getMessage(aliceConn) + require.NoError(t, err) + aliceClosingSigned, ok = msg.(*lnwire.ClosingSigned) + require.True(t, ok) - aliceFee := closingSignedMsg.FeeSatoshis - if aliceFee < lastReceivedFee { - t.Fatalf("new fee should be greater than prior: new=%v, old=%v", - aliceFee, lastReceivedFee) - } - if aliceFee > lastSentFee { - t.Fatalf("new fee should be less than our fee: new=%v, old=%v", - aliceFee, lastSentFee) - } + aliceFee := aliceClosingSigned.FeeSatoshis + require.GreaterOrEqual(t, int64(aliceFee), int64(lastReceivedFee)) + require.LessOrEqual(t, int64(aliceFee), int64(lastSentFee)) lastReceivedFee = aliceFee @@ -570,87 +501,63 @@ func TestPeerChannelClosureFeeNegotiationsInitiator(t *testing.T) { increasedFee = btcutil.Amount(float64(idealFeeRate) * 1.5) lastSentFee = increasedFee - bobSig, _, _, err = bobChan.CreateCloseProposal( + bobSig, _, _, err = bobLnChan.CreateCloseProposal( increasedFee, dummyDeliveryScript, aliceDeliveryScript, ) - if err != nil { - t.Fatalf("error creating close proposal: %v", err) - } + require.NoError(t, err) parsedSig, err = lnwire.NewSigFromSignature(bobSig) - if err != nil { - t.Fatalf("error parsing signature: %v", err) - } + require.NoError(t, err) - closingSigned = lnwire.NewClosingSigned(chanID, increasedFee, parsedSig) - alicePeer.chanCloseMsgs <- &closeMsg{ - cid: chanID, - msg: closingSigned, - } + bobClosingSigned = lnwire.NewClosingSigned( + chanID, increasedFee, parsedSig, + ) + err = pushMessage(aliceConn, bobClosingSigned) + require.NoError(t, err) // Alice won't accept Bob's new proposal, and Bob should receive a new - // proposal which is the average of what Bob proposed and Alice proposed - // last time. - select { - case outMsg := <-alicePeer.outgoingQueue: - msg = outMsg.msg - case <-time.After(timeout): - t.Fatalf("did not receive closing signed") - } - closingSignedMsg, ok = msg.(*lnwire.ClosingSigned) - if !ok { - t.Fatalf("expected ClosingSigned message, got %T", msg) - } + // proposal which is the average of what Bob proposed and Alice + // proposed last time. + msg, err = getMessage(aliceConn) + require.NoError(t, err) + aliceClosingSigned, ok = msg.(*lnwire.ClosingSigned) + require.True(t, ok) - aliceFee = closingSignedMsg.FeeSatoshis - if aliceFee < lastReceivedFee { - t.Fatalf("new fee should be greater than prior: new=%v, old=%v", - aliceFee, lastReceivedFee) - } - if aliceFee > lastSentFee { - t.Fatalf("new fee should be less than Bob's fee: new=%v, old=%v", - aliceFee, lastSentFee) - } + aliceFee = aliceClosingSigned.FeeSatoshis + require.GreaterOrEqual(t, int64(aliceFee), int64(lastReceivedFee)) + require.LessOrEqual(t, int64(aliceFee), int64(lastSentFee)) - // Bob will now accept their fee by sending back a ClosingSigned message - // with an identical fee. - bobSig, _, _, err = bobChan.CreateCloseProposal( + // Bob will now accept their fee by sending back a ClosingSigned + // message with an identical fee. + bobSig, _, _, err = bobLnChan.CreateCloseProposal( aliceFee, dummyDeliveryScript, aliceDeliveryScript, ) - if err != nil { - t.Fatalf("error creating close proposal: %v", err) - } + require.NoError(t, err) parsedSig, err = lnwire.NewSigFromSignature(bobSig) - if err != nil { - t.Fatalf("error parsing signature: %v", err) - } - closingSigned = lnwire.NewClosingSigned(chanID, aliceFee, parsedSig) - alicePeer.chanCloseMsgs <- &closeMsg{ - cid: chanID, - msg: closingSigned, - } + require.NoError(t, err) + + bobClosingSigned = lnwire.NewClosingSigned(chanID, aliceFee, parsedSig) + err = pushMessage(aliceConn, bobClosingSigned) + require.NoError(t, err) // Wait for closing tx to be broadcasted. select { - case <-broadcastTxChan: + case <-ctx.publTx: case <-time.After(timeout): t.Fatalf("closing tx not broadcast") } // Alice should respond with the ClosingSigned they both agreed upon. - select { - case outMsg := <-alicePeer.outgoingQueue: - msg = outMsg.msg - case <-time.After(timeout): - t.Fatalf("did not receive closing signed message") - } - if _, ok := msg.(*lnwire.ClosingSigned); !ok { - t.Fatalf("expected to receive closing signed message, got %T", msg) - } + msg, err = getMessage(aliceConn) + require.NoError(t, err) + aliceClosingSigned, ok = msg.(*lnwire.ClosingSigned) + require.True(t, ok) + require.Equal(t, aliceClosingSigned.FeeSatoshis, aliceFee) - // Alice should be waiting on a single confirmation for the coop close tx. - notifier.ConfChan <- &chainntnfs.TxConfirmation{} + // Alice should be waiting on a single confirmation for the coop close + // tx. + ctx.notifier.ConfChan <- &chainntnfs.TxConfirmation{} } // TestChooseDeliveryScript tests that chooseDeliveryScript correctly errors @@ -730,8 +637,8 @@ func TestChooseDeliveryScript(t *testing.T) { func TestCustomShutdownScript(t *testing.T) { script := genScript(t, p2SHAddress) - // setShutdown is a function which sets the upfront shutdown address for - // the local channel. + // setShutdown is a function which sets the upfront shutdown address + // for the local channel. setShutdown := func(a, b *channeldb.OpenChannel) { a.LocalShutdownScript = script b.RemoteShutdownScript = script @@ -740,15 +647,16 @@ func TestCustomShutdownScript(t *testing.T) { tests := []struct { name string - // update is a function used to set values on the channel set up for the - // test. It is used to set values for upfront shutdown addresses. + // update is a function used to set values on the channel set + // up for the test. It is used to set values for upfront + // shutdown addresses. update func(a, b *channeldb.OpenChannel) // userCloseScript is the address specified by the user. userCloseScript lnwire.DeliveryAddress - // expectedScript is the address we expect to be set on the shutdown - // message. + // expectedScript is the address we expect to be set on the + // shutdown message. expectedScript lnwire.DeliveryAddress // expectedError is the error we expect, if any. @@ -787,75 +695,100 @@ func TestCustomShutdownScript(t *testing.T) { test := test t.Run(test.name, func(t *testing.T) { - notifier := &mock.ChainNotifier{ - SpendChan: make(chan *chainntnfs.SpendDetail), - EpochChan: make(chan *chainntnfs.BlockEpoch), - ConfChan: make(chan *chainntnfs.TxConfirmation), - } - broadcastTxChan := make(chan *wire.MsgTx) - // Open a channel. - alicePeer, bobChan, cleanUp, err := createTestPeer( - notifier, broadcastTxChan, test.update, + ctx := createTestContext(t) + defer ctx.cleanup() + + aliceConn := ctx.aliceConn + + // Send the Init message to Alice. + err := pushMessage(aliceConn, ctx.bobInit) + require.NoError(t, err) + + // Alice's Peer struct is created, representing a + // connection to Bob. + alicePeer := NewBrontide(*ctx.aliceCfg) + require.NoError(t, alicePeer.Start()) + + // Cleanup Alice via Disconnect. + defer func() { + alicePeer.Disconnect(errDummy) + alicePeer.WaitForDisconnect( + make(chan struct{}), + ) + }() + + // Alice should reply with an Init message, we discard + // it. + msg, err := getMessage(aliceConn) + require.NoError(t, err) + require.IsType(t, &lnwire.Init{}, msg) + + // Create a test channel between Alice and Bob. + aliceChan, bobLnChan, err := createTestChannels( + test.update, ctx.alicePriv, ctx.bobPriv, + ctx.alicePub, ctx.bobPub, ctx.aliceDb, + ctx.bobDb, ctx.aliceCfg.FeeEstimator, + ctx.bobSigner, ctx.bobPool, false) + require.NoError(t, err) + + err = alicePeer.AddNewChannel( + aliceChan, make(chan struct{}), ) - if err != nil { - t.Fatalf("unable to create test channels: %v", err) - } - defer cleanUp() + require.NoError(t, err) - // Request initiator to cooperatively close the channel, with - // a specified delivery address. - updateChan := make(chan interface{}, 1) + // Request initiator to cooperatively close the + // channel, with a specified delivery address. errChan := make(chan error, 1) - chanPoint := bobChan.ChannelPoint() - closeCommand := htlcswitch.ChanClose{ + closeCommand := &htlcswitch.ChanClose{ CloseType: htlcswitch.CloseRegular, - ChanPoint: chanPoint, - Updates: updateChan, + ChanPoint: bobLnChan.ChannelPoint(), + Updates: make(chan interface{}, 1), TargetFeePerKw: 12500, DeliveryScript: test.userCloseScript, Err: errChan, } - // Send the close command for the correct channel and check that a - // shutdown message is sent. - alicePeer.localCloseChanReqs <- &closeCommand - - var msg lnwire.Message - select { - case outMsg := <-alicePeer.outgoingQueue: - msg = outMsg.msg - case <-time.After(timeout): - t.Fatalf("did not receive shutdown message") - case err := <-errChan: - // Fail if we do not expect an error. - if err != test.expectedError { - t.Fatalf("error closing channel: %v", err) + // Send the close command for the correct channel and + // check that a shutdown message is sent. + alicePeer.HandleLocalCloseChanReqs(closeCommand) + + if test.expectedError != nil { + select { + case <-time.After(timeout): + t.Fatalf("did not receive error") + case err := <-errChan: + // Fail if we do not expect this error. + require.Equal(t, err, test.expectedError) + + // Terminate the test early if we have + // received the expected error, no + // further action is expected. + return } - - // Terminate the test early if have received an error, no - // further action is expected. - return } // Check that we have received a shutdown message. - shutdownMsg, ok := msg.(*lnwire.Shutdown) - if !ok { - t.Fatalf("expected shutdown message, got %T", msg) - } + msg, err = getMessage(aliceConn) + require.NoError(t, err) + aliceShutdownMsg, ok := msg.(*lnwire.Shutdown) + require.True(t, ok) - // If the test has not specified an expected address, do not check - // whether the shutdown address matches. This covers the case where - // we epect shutdown to a random address and cannot match it. + // If the test has not specified an expected address, + // do not check whether the shutdown address matches. + // This covers the case where we expect shutdown to a + // random address and cannot match it. if len(test.expectedScript) == 0 { return } - // Check that the Shutdown message includes the expected delivery - // script. - if !bytes.Equal(test.expectedScript, shutdownMsg.Address) { + // Check that the Shutdown message includes the + // expected delivery script. + if !bytes.Equal( + test.expectedScript, aliceShutdownMsg.Address, + ) { t.Fatalf("expected delivery script: %x, got: %x", - test.expectedScript, shutdownMsg.Address) + test.expectedScript, aliceShutdownMsg.Address) } }) } @@ -955,37 +888,48 @@ func TestStaticRemoteDowngrade(t *testing.T) { test := test t.Run(test.name, func(t *testing.T) { - writeBufferPool := pool.NewWriteBuffer( - pool.DefaultWriteBufferGCInterval, - pool.DefaultWriteBufferExpiryInterval, - ) + ctx := createTestContext(t) + defer ctx.cleanup() - writePool := pool.NewWrite( - writeBufferPool, 1, timeout, - ) - require.NoError(t, writePool.Start()) + aliceConn := ctx.aliceConn - mockConn := wtmock.NewMockPeer(nil, nil, nil, 1) + ctx.aliceCfg.LegacyFeatures = legacy + ctx.aliceCfg.Features = test.features - p := Brontide{ - cfg: Config{ - LegacyFeatures: legacy, - Features: test.features, - Conn: mockConn, - WritePool: writePool, - }, - } + // Create a test channel between Alice and Bob. + _, _, err := createTestChannels( + noUpdate, ctx.alicePriv, ctx.bobPriv, + ctx.alicePub, ctx.bobPub, ctx.aliceDb, + ctx.bobDb, ctx.aliceCfg.FeeEstimator, + ctx.bobSigner, ctx.bobPool, test.legacy, + ) + require.NoError(t, err) - // Send our init message, assert that we write our expected message - // and shutdown our write pool. - require.NoError(t, p.sendInitMsg(test.legacy)) + // Send the Init message to Alice. + err = pushMessage(aliceConn, ctx.bobInit) + require.NoError(t, err) - // Check that mockConn sent out the expected message. - msg, err := getMessage(mockConn) + // Alice's Peer struct is created, representing a + // connection to Bob. + alicePeer := NewBrontide(*ctx.aliceCfg) + require.NoError(t, alicePeer.Start()) + + // Cleanup Alice via Disconnect. + defer func() { + alicePeer.Disconnect(errDummy) + alicePeer.WaitForDisconnect( + make(chan struct{}), + ) + }() + + // Alice should reply with an Init message, we discard + // it during this test. + msg, err := getMessage(aliceConn) require.NoError(t, err) - require.Equal(t, test.expectedInit, msg) + aliceInitMsg, ok := msg.(*lnwire.Init) + require.True(t, ok) - require.NoError(t, writePool.Stop()) + require.Equal(t, aliceInitMsg, test.expectedInit) }) } } diff --git a/peer/test_utils.go b/peer/test_utils.go index e88c33bcae..b7a9d4c272 100644 --- a/peer/test_utils.go +++ b/peer/test_utils.go @@ -10,6 +10,7 @@ import ( "math/rand" "net" "os" + "testing" "time" "github.com/btcsuite/btcd/btcec" @@ -18,8 +19,10 @@ import ( "github.com/btcsuite/btcutil" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/channelnotifier" "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/htlcswitch" + "github.com/lightningnetwork/lnd/htlcswitch/hodl" "github.com/lightningnetwork/lnd/htlcswitch/hop" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/keychain" @@ -29,49 +32,290 @@ import ( "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet/chainfee" "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/netann" + "github.com/lightningnetwork/lnd/pool" "github.com/lightningnetwork/lnd/queue" "github.com/lightningnetwork/lnd/shachain" "github.com/lightningnetwork/lnd/watchtower/wtmock" + "github.com/stretchr/testify/require" ) const ( - broadcastHeight = 100 - // timeout is a timeout value to use for tests which need to wait for // a return value on a channel. timeout = time.Second * 5 ) var ( + aliceAddr = &net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 18555, + } + + bobAddr = &net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 18556, + } + // Just use some arbitrary bytes as delivery script. dummyDeliveryScript = channels.AlicesPrivKey + + errDummy = fmt.Errorf("dummy error") ) -// noUpdate is a function which can be used as a parameter in createTestPeer to -// call the setup code with no custom values on the channels set up. -var noUpdate = func(a, b *channeldb.OpenChannel) {} +type testContext struct { + alicePriv *btcec.PrivateKey + alicePub *btcec.PublicKey + aliceDb *channeldb.DB + + bobPriv *btcec.PrivateKey + bobPub *btcec.PublicKey + bobDb *channeldb.DB -// createTestPeer creates a channel between two nodes, and returns a peer for -// one of the nodes, together with the channel seen from both nodes. It takes -// an updateChan function which can be used to modify the default values on -// the channel states for each peer. -func createTestPeer(notifier chainntnfs.ChainNotifier, - publTx chan *wire.MsgTx, updateChan func(a, b *channeldb.OpenChannel)) ( - *Brontide, *lnwallet.LightningChannel, func(), error) { + aliceSigner *mock.SingleSigner + alicePool *lnwallet.SigPool - aliceKeyPriv, aliceKeyPub := btcec.PrivKeyFromBytes( + bobSigner *mock.SingleSigner + bobPool *lnwallet.SigPool + + publTx chan *wire.MsgTx + + aliceConn *wtmock.MockPeer + + notifier *mock.ChainNotifier + + aliceCfg *Config + + bobInit *lnwire.Init + + cleanup func() +} + +// createTestContext makes a test context. +func createTestContext(t *testing.T) *testContext { + alicePriv, alicePub := btcec.PrivKeyFromBytes( btcec.S256(), channels.AlicesPrivKey, ) - bobKeyPriv, bobKeyPub := btcec.PrivKeyFromBytes( + + alicePath, err := ioutil.TempDir("", "alicedb") + require.NoError(t, err) + + aliceDb, err := channeldb.Open(alicePath) + require.NoError(t, err) + + bobPriv, bobPub := btcec.PrivKeyFromBytes( btcec.S256(), channels.BobsPrivKey, ) + bobPath, err := ioutil.TempDir("", "bobdb") + require.NoError(t, err) + + bobDb, err := channeldb.Open(bobPath) + require.NoError(t, err) + + aliceSigner := &mock.SingleSigner{Privkey: alicePriv} + alicePool := lnwallet.NewSigPool(1, aliceSigner) + require.NoError(t, alicePool.Start()) + + bobSigner := &mock.SingleSigner{Privkey: bobPriv} + bobPool := lnwallet.NewSigPool(1, bobSigner) + require.NoError(t, bobPool.Start()) + + cleanup := func() { + os.RemoveAll(alicePath) + os.RemoveAll(bobPath) + _ = alicePool.Stop() + _ = bobPool.Stop() + } + + publTx := make(chan *wire.MsgTx) + + aliceConn := wtmock.NewMockPeer(nil, nil, nil, 1) + + notifier := &mock.ChainNotifier{ + SpendChan: make(chan *chainntnfs.SpendDetail), + EpochChan: make(chan *chainntnfs.BlockEpoch), + ConfChan: make(chan *chainntnfs.TxConfirmation), + } + + aliceCfg, err := assembleConfig( + alicePriv, alicePub, bobPub, aliceDb, aliceSigner, alicePool, + publTx, aliceConn, notifier, + ) + require.NoError(t, err) + + bobGlobalVec := lnwire.NewRawFeatureVector() + bobFeatureVec := lnwire.NewRawFeatureVector( + lnwire.DataLossProtectRequired, + ) + bobInit := lnwire.NewInitMessage(bobGlobalVec, bobFeatureVec) + + return &testContext{ + alicePriv: alicePriv, + alicePub: alicePub, + aliceDb: aliceDb, + + bobPriv: bobPriv, + bobPub: bobPub, + bobDb: bobDb, + + aliceSigner: aliceSigner, + alicePool: alicePool, + + bobSigner: bobSigner, + bobPool: bobPool, + + publTx: publTx, + + aliceConn: aliceConn, + + notifier: notifier, + + aliceCfg: aliceCfg, + + bobInit: bobInit, + + cleanup: cleanup, + } +} + +// assembleConfig assembles a peer Config. +func assembleConfig(alicePriv *btcec.PrivateKey, alicePub, + bobPub *btcec.PublicKey, aliceDb *channeldb.DB, + aliceSigner input.Signer, alicePool *lnwallet.SigPool, + publTx chan *wire.MsgTx, conn MessageConn, + notifier chainntnfs.ChainNotifier) (*Config, error) { + + var aliceSerPub [33]byte + copy(aliceSerPub[:], alicePub.SerializeCompressed()) + + var bobSerPub [33]byte + copy(bobSerPub[:], bobPub.SerializeCompressed()) + + bobNetAddr := &lnwire.NetAddress{ + IdentityKey: bobPub, + Address: bobAddr, + ChainNet: wire.SimNet, + } + + features := lnwire.NewFeatureVector( + lnwire.NewRawFeatureVector(lnwire.StaticRemoteKeyRequired), + lnwire.Features, + ) + legacy := lnwire.NewFeatureVector( + lnwire.NewRawFeatureVector(), lnwire.Features, + ) + + errBuffer, err := queue.NewCircularBuffer(ErrorBufferSize) + if err != nil { + return nil, err + } + + writeBufferPool := pool.NewWriteBuffer( + pool.DefaultWriteBufferGCInterval, + pool.DefaultWriteBufferExpiryInterval, + ) + writePool := pool.NewWrite( + writeBufferPool, 1, timeout, + ) + if err := writePool.Start(); err != nil { + return nil, err + } + + readBufferPool := pool.NewReadBuffer( + pool.DefaultReadBufferGCInterval, + pool.DefaultReadBufferExpiryInterval, + ) + readPool := pool.NewRead( + readBufferPool, 1, timeout, + ) + if err := readPool.Start(); err != nil { + return nil, err + } + + chainIO := &mock.ChainIO{} + + estimator := chainfee.NewStaticEstimator(12500, 0) + + wallet := &lnwallet.LightningWallet{ + WalletController: &mock.WalletController{ + RootKey: alicePriv, + PublishedTransactions: publTx, + }, + } + + chanNotifier := channelnotifier.New(aliceDb) + if err := chanNotifier.Start(); err != nil { + return nil, err + } + + disconnect := func(_ *btcec.PublicKey) error { return nil } + + genAnn := func(_ bool, _ ...netann.NodeAnnModifier) ( + lnwire.NodeAnnouncement, error) { + return lnwire.NodeAnnouncement{}, nil + } + + prunePeer := func([33]byte) {} + + fetchUpdate := func(_ lnwire.ShortChannelID) (*lnwire.ChannelUpdate, + error) { + return &lnwire.ChannelUpdate{}, nil + } + + interceptSwitch := htlcswitch.NewInterceptableSwitch(nil) + + cfg := &Config{ + Conn: conn, + PubKeyBytes: bobSerPub, + Addr: bobNetAddr, + Features: features, + LegacyFeatures: legacy, + ChanActiveTimeout: time.Hour, + ErrorBuffer: errBuffer, + WritePool: writePool, + ReadPool: readPool, + Switch: newMockMessageSwitch(), + InterceptSwitch: interceptSwitch, + ChannelDB: aliceDb, + ChannelGraph: newMockChannelGraph(), + ChainArb: newMockChainArb(), + AuthGossiper: newMockGossiper(), + ChanStatusMgr: newMockStatusMgr(), + ChainIO: chainIO, + FeeEstimator: estimator, + Signer: aliceSigner, + SigPool: alicePool, + Wallet: wallet, + ChainNotifier: notifier, + RoutingPolicy: htlcswitch.ForwardingPolicy{}, + Sphinx: newMockSphinx(), + ChannelNotifier: chanNotifier, + DisconnectPeer: disconnect, + GenNodeAnnouncement: genAnn, + PrunePersistentPeerConnection: prunePeer, + FetchLastChanUpdate: fetchUpdate, + FundingManager: newMockFunding(), + Hodl: &hodl.Config{}, + ServerPubKey: aliceSerPub, + Quit: make(chan struct{}), + } + + return cfg, nil +} + +// createTestChannels makes two test channels for alice, bob. +func createTestChannels(update func(a, b *channeldb.OpenChannel), + alicePriv, bobPriv *btcec.PrivateKey, alicePub, + bobPub *btcec.PublicKey, aliceDb, bobDb *channeldb.DB, + estimator chainfee.Estimator, bobSigner input.Signer, + bobSigPool *lnwallet.SigPool, tweak bool) (*channeldb.OpenChannel, + *lnwallet.LightningChannel, error) { + channelCapacity := btcutil.Amount(10 * 1e8) channelBal := channelCapacity / 2 - aliceDustLimit := btcutil.Amount(200) - bobDustLimit := btcutil.Amount(1300) - csvTimeoutAlice := uint32(5) - csvTimeoutBob := uint32(4) + dustLimit := btcutil.Amount(1300) + csvTimeout := uint32(5) prevOut := &wire.OutPoint{ Hash: channels.TestHdSeed, @@ -81,112 +325,71 @@ func createTestPeer(notifier chainntnfs.ChainNotifier, aliceCfg := channeldb.ChannelConfig{ ChannelConstraints: channeldb.ChannelConstraints{ - DustLimit: aliceDustLimit, + DustLimit: dustLimit, MaxPendingAmount: lnwire.MilliSatoshi(rand.Int63()), ChanReserve: btcutil.Amount(rand.Int63()), MinHTLC: lnwire.MilliSatoshi(rand.Int63()), MaxAcceptedHtlcs: uint16(rand.Int31()), - CsvDelay: uint16(csvTimeoutAlice), - }, - MultiSigKey: keychain.KeyDescriptor{ - PubKey: aliceKeyPub, - }, - RevocationBasePoint: keychain.KeyDescriptor{ - PubKey: aliceKeyPub, - }, - PaymentBasePoint: keychain.KeyDescriptor{ - PubKey: aliceKeyPub, - }, - DelayBasePoint: keychain.KeyDescriptor{ - PubKey: aliceKeyPub, - }, - HtlcBasePoint: keychain.KeyDescriptor{ - PubKey: aliceKeyPub, + CsvDelay: uint16(csvTimeout), }, + MultiSigKey: keychain.KeyDescriptor{PubKey: alicePub}, + RevocationBasePoint: keychain.KeyDescriptor{PubKey: alicePub}, + PaymentBasePoint: keychain.KeyDescriptor{PubKey: alicePub}, + DelayBasePoint: keychain.KeyDescriptor{PubKey: alicePub}, + HtlcBasePoint: keychain.KeyDescriptor{PubKey: alicePub}, } bobCfg := channeldb.ChannelConfig{ ChannelConstraints: channeldb.ChannelConstraints{ - DustLimit: bobDustLimit, + DustLimit: dustLimit, MaxPendingAmount: lnwire.MilliSatoshi(rand.Int63()), ChanReserve: btcutil.Amount(rand.Int63()), MinHTLC: lnwire.MilliSatoshi(rand.Int63()), MaxAcceptedHtlcs: uint16(rand.Int31()), - CsvDelay: uint16(csvTimeoutBob), - }, - MultiSigKey: keychain.KeyDescriptor{ - PubKey: bobKeyPub, - }, - RevocationBasePoint: keychain.KeyDescriptor{ - PubKey: bobKeyPub, - }, - PaymentBasePoint: keychain.KeyDescriptor{ - PubKey: bobKeyPub, + CsvDelay: uint16(csvTimeout), }, - DelayBasePoint: keychain.KeyDescriptor{ - PubKey: bobKeyPub, - }, - HtlcBasePoint: keychain.KeyDescriptor{ - PubKey: bobKeyPub, - }, - } - - bobRoot, err := chainhash.NewHash(bobKeyPriv.Serialize()) - if err != nil { - return nil, nil, nil, err - } - bobPreimageProducer := shachain.NewRevocationProducer(*bobRoot) - bobFirstRevoke, err := bobPreimageProducer.AtIndex(0) - if err != nil { - return nil, nil, nil, err + MultiSigKey: keychain.KeyDescriptor{PubKey: bobPub}, + RevocationBasePoint: keychain.KeyDescriptor{PubKey: bobPub}, + PaymentBasePoint: keychain.KeyDescriptor{PubKey: bobPub}, + DelayBasePoint: keychain.KeyDescriptor{PubKey: bobPub}, + HtlcBasePoint: keychain.KeyDescriptor{PubKey: bobPub}, } - bobCommitPoint := input.ComputeCommitmentPoint(bobFirstRevoke[:]) - aliceRoot, err := chainhash.NewHash(aliceKeyPriv.Serialize()) + aliceRoot, err := chainhash.NewHash(alicePriv.Serialize()) if err != nil { - return nil, nil, nil, err + return nil, nil, err } alicePreimageProducer := shachain.NewRevocationProducer(*aliceRoot) aliceFirstRevoke, err := alicePreimageProducer.AtIndex(0) if err != nil { - return nil, nil, nil, err + return nil, nil, err } aliceCommitPoint := input.ComputeCommitmentPoint(aliceFirstRevoke[:]) - aliceCommitTx, bobCommitTx, err := lnwallet.CreateCommitmentTxns( - channelBal, channelBal, &aliceCfg, &bobCfg, aliceCommitPoint, - bobCommitPoint, *fundingTxIn, channeldb.SingleFunderTweaklessBit, - ) - if err != nil { - return nil, nil, nil, err - } - - alicePath, err := ioutil.TempDir("", "alicedb") - if err != nil { - return nil, nil, nil, err - } - - dbAlice, err := channeldb.Open(alicePath) + bobRoot, err := chainhash.NewHash(bobPriv.Serialize()) if err != nil { - return nil, nil, nil, err + return nil, nil, err } - - bobPath, err := ioutil.TempDir("", "bobdb") + bobPreimageProducer := shachain.NewRevocationProducer(*bobRoot) + bobFirstRevoke, err := bobPreimageProducer.AtIndex(0) if err != nil { - return nil, nil, nil, err + return nil, nil, err } + bobCommitPoint := input.ComputeCommitmentPoint(bobFirstRevoke[:]) - dbBob, err := channeldb.Open(bobPath) + aliceCommitTx, bobCommitTx, err := lnwallet.CreateCommitmentTxns( + channelBal, channelBal, &aliceCfg, &bobCfg, aliceCommitPoint, + bobCommitPoint, *fundingTxIn, + channeldb.SingleFunderTweaklessBit, + ) if err != nil { - return nil, nil, nil, err + return nil, nil, err } - estimator := chainfee.NewStaticEstimator(12500, 0) feePerKw, err := estimator.EstimateFeePerKW(1) if err != nil { - return nil, nil, nil, err + return nil, nil, err } - // TODO(roasbeef): need to factor in commit fee? aliceCommit := channeldb.ChannelCommitment{ CommitHeight: 0, LocalBalance: lnwire.NewMSatFromSatoshis(channelBal), @@ -208,150 +411,81 @@ func createTestPeer(notifier chainntnfs.ChainNotifier, var chanIDBytes [8]byte if _, err := io.ReadFull(crand.Reader, chanIDBytes[:]); err != nil { - return nil, nil, nil, err + return nil, nil, err } shortChanID := lnwire.NewShortChanIDFromInt( binary.BigEndian.Uint64(chanIDBytes[:]), ) + chanType := channeldb.SingleFunderTweaklessBit + if tweak { + chanType = channeldb.SingleFunderBit + } + + alicePackager := channeldb.NewChannelPackager(shortChanID) + bobPackager := channeldb.NewChannelPackager(shortChanID) + aliceChannelState := &channeldb.OpenChannel{ LocalChanCfg: aliceCfg, RemoteChanCfg: bobCfg, - IdentityPub: aliceKeyPub, + IdentityPub: bobPub, FundingOutpoint: *prevOut, ShortChannelID: shortChanID, - ChanType: channeldb.SingleFunderTweaklessBit, + ChanType: chanType, IsInitiator: true, Capacity: channelCapacity, RemoteCurrentRevocation: bobCommitPoint, RevocationProducer: alicePreimageProducer, RevocationStore: shachain.NewRevocationStore(), LocalCommitment: aliceCommit, - RemoteCommitment: aliceCommit, - Db: dbAlice, - Packager: channeldb.NewChannelPackager(shortChanID), + RemoteCommitment: bobCommit, + Db: aliceDb, + Packager: alicePackager, FundingTxn: channels.TestFundingTx, } bobChannelState := &channeldb.OpenChannel{ LocalChanCfg: bobCfg, RemoteChanCfg: aliceCfg, - IdentityPub: bobKeyPub, + IdentityPub: alicePub, FundingOutpoint: *prevOut, - ChanType: channeldb.SingleFunderTweaklessBit, + ChanType: chanType, IsInitiator: false, Capacity: channelCapacity, RemoteCurrentRevocation: aliceCommitPoint, RevocationProducer: bobPreimageProducer, RevocationStore: shachain.NewRevocationStore(), LocalCommitment: bobCommit, - RemoteCommitment: bobCommit, - Db: dbBob, - Packager: channeldb.NewChannelPackager(shortChanID), + RemoteCommitment: aliceCommit, + Db: bobDb, + Packager: bobPackager, } - // Set custom values on the channel states. - updateChan(aliceChannelState, bobChannelState) - - aliceAddr := &net.TCPAddr{ - IP: net.ParseIP("127.0.0.1"), - Port: 18555, - } + update(aliceChannelState, bobChannelState) if err := aliceChannelState.SyncPending(aliceAddr, 0); err != nil { - return nil, nil, nil, err - } - - bobAddr := &net.TCPAddr{ - IP: net.ParseIP("127.0.0.1"), - Port: 18556, + return nil, nil, err } if err := bobChannelState.SyncPending(bobAddr, 0); err != nil { - return nil, nil, nil, err + return nil, nil, err } - cleanUpFunc := func() { - os.RemoveAll(bobPath) - os.RemoveAll(alicePath) - } - - aliceSigner := &mock.SingleSigner{Privkey: aliceKeyPriv} - bobSigner := &mock.SingleSigner{Privkey: bobKeyPriv} - - alicePool := lnwallet.NewSigPool(1, aliceSigner) - channelAlice, err := lnwallet.NewLightningChannel( - aliceSigner, aliceChannelState, alicePool, + bobChannel, err := lnwallet.NewLightningChannel( + bobSigner, bobChannelState, bobSigPool, ) if err != nil { - return nil, nil, nil, err - } - _ = alicePool.Start() - - bobPool := lnwallet.NewSigPool(1, bobSigner) - channelBob, err := lnwallet.NewLightningChannel( - bobSigner, bobChannelState, bobPool, - ) - if err != nil { - return nil, nil, nil, err - } - _ = bobPool.Start() - - chainIO := &mock.ChainIO{ - BestHeight: broadcastHeight, - } - wallet := &lnwallet.LightningWallet{ - WalletController: &mock.WalletController{ - RootKey: aliceKeyPriv, - PublishedTransactions: publTx, - }, - } - - const chanActiveTimeout = time.Minute - - errBuffer, err := queue.NewCircularBuffer(ErrorBufferSize) - if err != nil { - return nil, nil, nil, err + return nil, nil, err } - var pubKey [33]byte - copy(pubKey[:], aliceKeyPub.SerializeCompressed()) - - cfgAddr := &lnwire.NetAddress{ - IdentityKey: aliceKeyPub, - Address: aliceAddr, - ChainNet: wire.SimNet, - } - - cfg := &Config{ - Addr: cfgAddr, - PubKeyBytes: pubKey, - ErrorBuffer: errBuffer, - ChainIO: chainIO, - Switch: newMockMessageSwitch(), - - ChanActiveTimeout: chanActiveTimeout, - InterceptSwitch: htlcswitch.NewInterceptableSwitch(nil), - - ChannelDB: dbAlice, - FeeEstimator: estimator, - Wallet: wallet, - ChainNotifier: notifier, - ChanStatusMgr: newMockStatusMgr(), - DisconnectPeer: func(b *btcec.PublicKey) error { return nil }, - } - - alicePeer := NewBrontide(*cfg) - - chanID := lnwire.NewChanIDFromOutPoint(channelAlice.ChannelPoint()) - alicePeer.activeChannels[chanID] = channelAlice - - alicePeer.wg.Add(1) - go alicePeer.channelManager() - - return alicePeer, channelBob, cleanUpFunc, nil + return aliceChannelState, bobChannel, nil } +// noUpdate is a function which can be used as a parameter in +// createTestChannels to call the setup code with no custom values on the +// channels set up. +var noUpdate = func(a, b *channeldb.OpenChannel) {} + type mockMessageLink struct { cid lnwire.ChannelID cfg htlcswitch.ChannelLinkConfig