From adb239300fa1afa4a7d5450b35454a7e0fcd210b Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 6 Dec 2022 18:32:06 -0800 Subject: [PATCH 1/6] fn: add new fn (func utils) package for generic helper funcs We start with a simple Map function that can be useful for transforming objects on the fly. We may want to eventually make this Taro pakage into a module so we can just have everything in one place: https://github.com/lightninglabs/taro/tree/main/chanutils. --- fn/stream.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 fn/stream.go diff --git a/fn/stream.go b/fn/stream.go new file mode 100644 index 0000000000..9974eb4676 --- /dev/null +++ b/fn/stream.go @@ -0,0 +1,13 @@ +package fn + +// Map takes an input slice, and applies the function f to each element, +// yielding a new slice. +func Map[T1, T2 any](s []T1, f func(T1) T2) []T2 { + r := make([]T2, len(s)) + + for i, v := range s { + r[i] = f(v) + } + + return r +} From e8177ea4275c1975ccb6f17bfb7597574e394bba Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 6 Dec 2022 18:33:04 -0800 Subject: [PATCH 2/6] discovery: add isLocal bool to msgWithSenders This lets us keep track of which messages we created ourselves vs the messages that originated remotely from a peer. --- discovery/gossiper.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 8bb5a52145..8fc2fcc3bb 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -826,6 +826,11 @@ type msgWithSenders struct { // msg is the wire message itself. msg lnwire.Message + // isLocal is true if this was a message that originated locally. We'll + // use this to bypass our normal checks to ensure we prioritize sending + // out our own updates. + isLocal bool + // sender is the set of peers that sent us this message. senders map[route.Vertex]struct{} } @@ -903,6 +908,7 @@ func (d *deDupedAnnouncements) addMsg(message networkMsg) { if !ok { mws = msgWithSenders{ msg: msg, + isLocal: !message.isRemote, senders: make(map[route.Vertex]struct{}), } mws.senders[sender] = struct{}{} @@ -949,6 +955,7 @@ func (d *deDupedAnnouncements) addMsg(message networkMsg) { if oldTimestamp < msg.Timestamp { mws = msgWithSenders{ msg: msg, + isLocal: !message.isRemote, senders: make(map[route.Vertex]struct{}), } @@ -992,6 +999,7 @@ func (d *deDupedAnnouncements) addMsg(message networkMsg) { if oldTimestamp < msg.Timestamp { mws = msgWithSenders{ msg: msg, + isLocal: !message.isRemote, senders: make(map[route.Vertex]struct{}), } @@ -2949,6 +2957,7 @@ func (d *AuthenticatedGossiper) handleAnnSig(nMsg *networkMsg, } else { remotePubKey = chanInfo.NodeKey1Bytes } + // Since the remote peer might not be online we'll call a // method that will attempt to deliver the proof when it comes // online. From 9a4701d7095062b2e7502ca04b90bbe8bfc96906 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 6 Dec 2022 18:33:56 -0800 Subject: [PATCH 3/6] discovery: add new msgsToBroadcast struct This struct will be used to later on do filtering when we go to broadcast, based on if a message was locally sourced or granted from a remote peer. --- discovery/gossiper.go | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 8fc2fcc3bb..e1fa53843d 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -1028,6 +1028,31 @@ func (d *deDupedAnnouncements) AddMsgs(msgs ...networkMsg) { } } +// msgsToBroadcast is returned by Emit() and partitions the messages we'd like +// to broadcast next into messages that are locally sourced and those that are +// sourced remotely. +type msgsToBroadcast struct { + // localMsgs is the set of messages we created locally. + localMsgs []msgWithSenders + + // remoteMsgs is the set of messages that we received from a remote party. + remoteMsgs []msgWithSenders +} + +// addMsg adds a new message to the appropriate sub-slice. +func (m *msgsToBroadcast) addMsg(msg msgWithSenders) { + if msg.isLocal { + m.localMsgs = append(m.localMsgs, msg) + } else { + m.remoteMsgs = append(m.remoteMsgs, msg) + } +} + +// isEmpty returns true if the batch is empty. +func (m *msgsToBroadcast) isEmpty() bool { + return len(m.localMsgs) == 0 && len(m.remoteMsgs) == 0 +} + // Emit returns the set of de-duplicated announcements to be sent out during // the next announcement epoch, in the order of channel announcements, channel // updates, and node announcements. Each message emitted, contains the set of From 52451b37af4692f3ca741307093ad489f68dbad9 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 6 Dec 2022 18:38:25 -0800 Subject: [PATCH 4/6] discovery: ensure we prioritize sending out our own local announcements In this commit, we modify our gossip broadcast logic to ensure that we always will send out our own gossip messages regardless of the filtering/feature policies of the peer. Before this commit, it was possible that when we went to broadcast an announcement, none of our peers actually had us as a syncer peer (lnd terminology). In this case, the FilterGossipMsg function wouldn't do anything, as they don't have an active timestamp filter set. When we go to them merge the syncer map, we'd add all these peers we didn't send to, meaning we would skip them when it came to broadcast time. In this commit, we now split things into two phases: we'll broadcast _our_ own announcements to all our peers, but then do the normal filtering and chunking for the announcements we got from a remote peer. Fixes https://github.com/lightningnetwork/lnd/issues/6531 Fixes https://github.com/lightningnetwork/lnd/issues/7223 Fixes https://github.com/lightningnetwork/lnd/issues/7073 --- discovery/gossiper.go | 141 +++++++++++++++++++++++++------------ discovery/gossiper_test.go | 24 ++++--- 2 files changed, 111 insertions(+), 54 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index e1fa53843d..f8510e93a2 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -18,6 +18,7 @@ import ( "github.com/lightningnetwork/lnd/batch" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/fn" "github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lnpeer" @@ -1035,7 +1036,8 @@ type msgsToBroadcast struct { // localMsgs is the set of messages we created locally. localMsgs []msgWithSenders - // remoteMsgs is the set of messages that we received from a remote party. + // remoteMsgs is the set of messages that we received from a remote + // party. remoteMsgs []msgWithSenders } @@ -1053,13 +1055,18 @@ func (m *msgsToBroadcast) isEmpty() bool { return len(m.localMsgs) == 0 && len(m.remoteMsgs) == 0 } +// length returns the length of the combined message set. +func (m *msgsToBroadcast) length() int { + return len(m.localMsgs) + len(m.remoteMsgs) +} + // Emit returns the set of de-duplicated announcements to be sent out during // the next announcement epoch, in the order of channel announcements, channel // updates, and node announcements. Each message emitted, contains the set of // peers that sent us the message. This way, we can ensure that we don't waste // bandwidth by re-sending a message to the peer that sent it to us in the // first place. Additionally, the set of stored messages are reset. -func (d *deDupedAnnouncements) Emit() []msgWithSenders { +func (d *deDupedAnnouncements) Emit() msgsToBroadcast { d.Lock() defer d.Unlock() @@ -1069,21 +1076,24 @@ func (d *deDupedAnnouncements) Emit() []msgWithSenders { // Create an empty array of lnwire.Messages with a length equal to // the total number of announcements. - msgs := make([]msgWithSenders, 0, numAnnouncements) + msgs := msgsToBroadcast{ + localMsgs: make([]msgWithSenders, 0, numAnnouncements), + remoteMsgs: make([]msgWithSenders, 0, numAnnouncements), + } // Add the channel announcements to the array first. for _, message := range d.channelAnnouncements { - msgs = append(msgs, message) + msgs.addMsg(message) } // Then add the channel updates. for _, message := range d.channelUpdates { - msgs = append(msgs, message) + msgs.addMsg(message) } // Finally add the node announcements. for _, message := range d.nodeAnnouncements { - msgs = append(msgs, message) + msgs.addMsg(message) } d.reset() @@ -1129,26 +1139,83 @@ func splitAnnouncementBatches(subBatchSize int, return splitAnnouncementBatch } +// splitAndSendAnnBatch takes a batch of messages, computes the proper batch +// split size, and then sends out all items to the set of target peers. If +// isLocal is true, then we'll send them to all peers and skip any gossip +// filter checks. +func (d *AuthenticatedGossiper) splitAndSendAnnBatch(annBatch []msgWithSenders, + isLocal bool) { + + // Next, If we have new things to announce then broadcast them to all + // our immediately connected peers. + subBatchSize := calculateSubBatchSize( + d.cfg.TrickleDelay, d.cfg.SubBatchDelay, + d.cfg.MinimumBatchSize, len(annBatch), + ) + + splitAnnouncementBatch := splitAnnouncementBatches( + subBatchSize, annBatch, + ) + + d.wg.Add(1) + go func() { + defer d.wg.Done() + + log.Infof("Broadcasting %v new announcements in %d sub "+ + "batches (local=%v)", len(annBatch), + len(splitAnnouncementBatch), isLocal) + + for _, announcementBatch := range splitAnnouncementBatch { + d.sendBatch(announcementBatch, isLocal) + + select { + case <-time.After(d.cfg.SubBatchDelay): + case <-d.quit: + return + } + } + }() +} + // sendBatch broadcasts a list of announcements to our peers. -func (d *AuthenticatedGossiper) sendBatch(announcementBatch []msgWithSenders) { +func (d *AuthenticatedGossiper) sendBatch(annBatch []msgWithSenders, + isLocal bool) { + + // If this is a batch of announcements created locally, then we can + // skip the filter and deup logic below, and just send the + // announcements out to all our coonnected peers. + if isLocal { + msgsToSend := fn.Map( + annBatch, func(m msgWithSenders) lnwire.Message { + return m.msg + }, + ) + err := d.cfg.Broadcast(nil, msgsToSend...) + if err != nil { + log.Errorf("Unable to send local batch "+ + "announcements: %v", err) + } + + return + } + syncerPeers := d.syncMgr.GossipSyncers() - // We'll first attempt to filter out this new message - // for all peers that have active gossip syncers - // active. + // We'll first attempt to filter out this new message for all peers + // that have active gossip syncers active. for _, syncer := range syncerPeers { - syncer.FilterGossipMsgs(announcementBatch...) + syncer.FilterGossipMsgs(annBatch...) } - for _, msgChunk := range announcementBatch { - // With the syncers taken care of, we'll merge - // the sender map with the set of syncers, so - // we don't send out duplicate messages. + for _, msgChunk := range annBatch { + msgChunk := msgChunk + + // With the syncers taken care of, we'll merge the sender map + // with the set of syncers, so we don't send out duplicate + // messages. msgChunk.mergeSyncerMap(syncerPeers) - err := d.cfg.Broadcast( - msgChunk.senders, msgChunk.msg, - ) + err := d.cfg.Broadcast(msgChunk.senders, msgChunk.msg) if err != nil { log.Errorf("Unable to send batch "+ "announcements: %v", err) @@ -1273,39 +1340,23 @@ func (d *AuthenticatedGossiper) networkHandler() { // If the current announcements batch is nil, then we // have no further work here. - if len(announcementBatch) == 0 { + if announcementBatch.isEmpty() { continue } - // Next, If we have new things to announce then - // broadcast them to all our immediately connected - // peers. - subBatchSize := calculateSubBatchSize( - d.cfg.TrickleDelay, d.cfg.SubBatchDelay, d.cfg.MinimumBatchSize, - len(announcementBatch), + // At this point, we have the set of local and remote + // announcements we want to send out. We'll do the + // batching as normal for both, but for local + // announcements, we'll blast them out w/o regard for + // our peer's policies so we ensure they propagate + // properly. + d.splitAndSendAnnBatch( + announcementBatch.localMsgs, true, ) - - splitAnnouncementBatch := splitAnnouncementBatches( - subBatchSize, announcementBatch, + d.splitAndSendAnnBatch( + announcementBatch.remoteMsgs, false, ) - d.wg.Add(1) - go func() { - defer d.wg.Done() - - log.Infof("Broadcasting %v new announcements in %d sub batches", - len(announcementBatch), len(splitAnnouncementBatch)) - - for _, announcementBatch := range splitAnnouncementBatch { - d.sendBatch(announcementBatch) - select { - case <-time.After(d.cfg.SubBatchDelay): - case <-d.quit: - return - } - } - }() - // The retransmission timer has ticked which indicates that we // should check if we need to prune or re-broadcast any of our // personal channels or node announcement. This addresses the diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index a426620fe0..4e220896d1 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -1910,31 +1910,37 @@ func TestDeDuplicatedAnnouncements(t *testing.T) { // Ensure that announcement batch delivers channel announcements, // channel updates, and node announcements in proper order. batch := announcements.Emit() - if len(batch) != 4 { + if batch.length() != 4 { t.Fatal("announcement batch incorrect length") } - if !reflect.DeepEqual(batch[0].msg, ca2) { + if !reflect.DeepEqual(batch.localMsgs[0].msg, ca2) { t.Fatalf("channel announcement not first in batch: got %v, "+ - "expected %v", spew.Sdump(batch[0].msg), spew.Sdump(ca2)) + "expected %v", spew.Sdump(batch.localMsgs[0].msg), + spew.Sdump(ca2)) } - if !reflect.DeepEqual(batch[1].msg, ua3) { + if !reflect.DeepEqual(batch.localMsgs[1].msg, ua3) { t.Fatalf("channel update not next in batch: got %v, "+ - "expected %v", spew.Sdump(batch[1].msg), spew.Sdump(ua2)) + "expected %v", spew.Sdump(batch.localMsgs[1].msg), + spew.Sdump(ua2)) } // We'll ensure that both node announcements are present. We check both // indexes as due to the randomized order of map iteration they may be // in either place. - if !reflect.DeepEqual(batch[2].msg, na) && !reflect.DeepEqual(batch[3].msg, na) { + if !reflect.DeepEqual(batch.localMsgs[2].msg, na) && + !reflect.DeepEqual(batch.localMsgs[3].msg, na) { + t.Fatalf("first node announcement not in last part of batch: "+ - "got %v, expected %v", batch[2].msg, + "got %v, expected %v", batch.localMsgs[2].msg, na) } - if !reflect.DeepEqual(batch[2].msg, na5) && !reflect.DeepEqual(batch[3].msg, na5) { + if !reflect.DeepEqual(batch.localMsgs[2].msg, na5) && + !reflect.DeepEqual(batch.localMsgs[3].msg, na5) { + t.Fatalf("second node announcement not in last part of batch: "+ - "got %v, expected %v", batch[3].msg, + "got %v, expected %v", batch.localMsgs[3].msg, na5) } From f94a67a26d02e9c8b281a71a8f2aaf3f3ee0874a Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Thu, 8 Dec 2022 17:47:33 -0800 Subject: [PATCH 5/6] discovery: properly set the isRemote field for validated networkMsg This wasn't set properly, leading to some test failures after the prior change. --- discovery/gossiper.go | 28 ++++++++++++++++------------ discovery/gossiper_test.go | 2 ++ 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index f8510e93a2..9bb6959e5a 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -1182,7 +1182,7 @@ func (d *AuthenticatedGossiper) sendBatch(annBatch []msgWithSenders, isLocal bool) { // If this is a batch of announcements created locally, then we can - // skip the filter and deup logic below, and just send the + // skip the filter and dedup logic below, and just send the // announcements out to all our coonnected peers. if isLocal { msgsToSend := fn.Map( @@ -1695,8 +1695,9 @@ func (d *AuthenticatedGossiper) processChanPolicyUpdate( // We set ourselves as the source of this message to indicate // that we shouldn't skip any peers when sending this message. chanUpdates = append(chanUpdates, networkMsg{ - source: d.selfKey, - msg: chanUpdate, + source: d.selfKey, + isRemote: false, + msg: chanUpdate, }) } @@ -2266,9 +2267,10 @@ func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg, // be broadcast to the rest of our peers. if isPublic { announcements = append(announcements, networkMsg{ - peer: nMsg.peer, - source: nMsg.source, - msg: nodeAnn, + peer: nMsg.peer, + isRemote: nMsg.isRemote, + source: nMsg.source, + msg: nodeAnn, }) } else { log.Tracef("Skipping broadcasting node announcement for %x "+ @@ -2544,9 +2546,10 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg, if proof != nil { announcements = append(announcements, networkMsg{ - peer: nMsg.peer, - source: nMsg.source, - msg: ann, + peer: nMsg.peer, + isRemote: nMsg.isRemote, + source: nMsg.source, + msg: ann, }) } @@ -2932,9 +2935,10 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg, var announcements []networkMsg if chanInfo.AuthProof != nil && !d.cfg.IsAlias(upd.ShortChannelID) { announcements = append(announcements, networkMsg{ - peer: nMsg.peer, - source: nMsg.source, - msg: upd, + peer: nMsg.peer, + source: nMsg.source, + isRemote: nMsg.isRemote, + msg: upd, }) } diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 4e220896d1..07a2b4bc07 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -824,6 +824,8 @@ func TestProcessAnnouncement(t *testing.T) { require.NoError(t, err, "can't create context") assertSenderExistence := func(sender *btcec.PublicKey, msg msgWithSenders) { + t.Helper() + if _, ok := msg.senders[route.NewVertex(sender)]; !ok { t.Fatalf("sender=%x not present in %v", sender.SerializeCompressed(), spew.Sdump(msg)) From 050db16e5df24bf2d08e7c3bc07654ea5de77ae1 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 13 Dec 2022 19:45:39 -0800 Subject: [PATCH 6/6] docs/release-notes: add release notes for gossip msg prioritization --- docs/release-notes/release-notes-0.16.0.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/docs/release-notes/release-notes-0.16.0.md b/docs/release-notes/release-notes-0.16.0.md index 3952878e7e..d183c98080 100644 --- a/docs/release-notes/release-notes-0.16.0.md +++ b/docs/release-notes/release-notes-0.16.0.md @@ -1,5 +1,13 @@ # Release Notes +## Peer to Peer Behavior + +`lnd` will now [properly prioritize sending out gossip updates generated +locally to all connected +peers](https://github.com/lightningnetwork/lnd/pull/7239), regardless of their +current gossip sync query status. + + ## BOLT Specs * Warning messages from peers are now recognized and