Skip to content

Commit

Permalink
Merge pull request #7239 from Roasbeef/gossip-scoping-issues
Browse files Browse the repository at this point in the history
discovery: ensure we prioritize sending out our own local announcements
  • Loading branch information
Roasbeef authored Dec 16, 2022
2 parents a347e40 + 050db16 commit de3e0d7
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 64 deletions.
199 changes: 144 additions & 55 deletions discovery/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/lightningnetwork/lnd/batch"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnpeer"
Expand Down Expand Up @@ -826,6 +827,11 @@ type msgWithSenders struct {
// msg is the wire message itself.
msg lnwire.Message

// isLocal is true if this was a message that originated locally. We'll
// use this to bypass our normal checks to ensure we prioritize sending
// out our own updates.
isLocal bool

// sender is the set of peers that sent us this message.
senders map[route.Vertex]struct{}
}
Expand Down Expand Up @@ -903,6 +909,7 @@ func (d *deDupedAnnouncements) addMsg(message networkMsg) {
if !ok {
mws = msgWithSenders{
msg: msg,
isLocal: !message.isRemote,
senders: make(map[route.Vertex]struct{}),
}
mws.senders[sender] = struct{}{}
Expand Down Expand Up @@ -949,6 +956,7 @@ func (d *deDupedAnnouncements) addMsg(message networkMsg) {
if oldTimestamp < msg.Timestamp {
mws = msgWithSenders{
msg: msg,
isLocal: !message.isRemote,
senders: make(map[route.Vertex]struct{}),
}

Expand Down Expand Up @@ -992,6 +1000,7 @@ func (d *deDupedAnnouncements) addMsg(message networkMsg) {
if oldTimestamp < msg.Timestamp {
mws = msgWithSenders{
msg: msg,
isLocal: !message.isRemote,
senders: make(map[route.Vertex]struct{}),
}

Expand Down Expand Up @@ -1020,13 +1029,44 @@ func (d *deDupedAnnouncements) AddMsgs(msgs ...networkMsg) {
}
}

// msgsToBroadcast is returned by Emit() and partitions the messages we'd like
// to broadcast next into messages that are locally sourced and those that are
// sourced remotely.
type msgsToBroadcast struct {
// localMsgs is the set of messages we created locally.
localMsgs []msgWithSenders

// remoteMsgs is the set of messages that we received from a remote
// party.
remoteMsgs []msgWithSenders
}

// addMsg adds a new message to the appropriate sub-slice.
func (m *msgsToBroadcast) addMsg(msg msgWithSenders) {
if msg.isLocal {
m.localMsgs = append(m.localMsgs, msg)
} else {
m.remoteMsgs = append(m.remoteMsgs, msg)
}
}

// isEmpty returns true if the batch is empty.
func (m *msgsToBroadcast) isEmpty() bool {
return len(m.localMsgs) == 0 && len(m.remoteMsgs) == 0
}

// length returns the length of the combined message set.
func (m *msgsToBroadcast) length() int {
return len(m.localMsgs) + len(m.remoteMsgs)
}

// Emit returns the set of de-duplicated announcements to be sent out during
// the next announcement epoch, in the order of channel announcements, channel
// updates, and node announcements. Each message emitted, contains the set of
// peers that sent us the message. This way, we can ensure that we don't waste
// bandwidth by re-sending a message to the peer that sent it to us in the
// first place. Additionally, the set of stored messages are reset.
func (d *deDupedAnnouncements) Emit() []msgWithSenders {
func (d *deDupedAnnouncements) Emit() msgsToBroadcast {
d.Lock()
defer d.Unlock()

Expand All @@ -1036,21 +1076,24 @@ func (d *deDupedAnnouncements) Emit() []msgWithSenders {

// Create an empty array of lnwire.Messages with a length equal to
// the total number of announcements.
msgs := make([]msgWithSenders, 0, numAnnouncements)
msgs := msgsToBroadcast{
localMsgs: make([]msgWithSenders, 0, numAnnouncements),
remoteMsgs: make([]msgWithSenders, 0, numAnnouncements),
}

// Add the channel announcements to the array first.
for _, message := range d.channelAnnouncements {
msgs = append(msgs, message)
msgs.addMsg(message)
}

// Then add the channel updates.
for _, message := range d.channelUpdates {
msgs = append(msgs, message)
msgs.addMsg(message)
}

// Finally add the node announcements.
for _, message := range d.nodeAnnouncements {
msgs = append(msgs, message)
msgs.addMsg(message)
}

d.reset()
Expand Down Expand Up @@ -1096,26 +1139,83 @@ func splitAnnouncementBatches(subBatchSize int,
return splitAnnouncementBatch
}

// splitAndSendAnnBatch takes a batch of messages, computes the proper batch
// split size, and then sends out all items to the set of target peers. If
// isLocal is true, then we'll send them to all peers and skip any gossip
// filter checks.
func (d *AuthenticatedGossiper) splitAndSendAnnBatch(annBatch []msgWithSenders,
isLocal bool) {

// Next, If we have new things to announce then broadcast them to all
// our immediately connected peers.
subBatchSize := calculateSubBatchSize(
d.cfg.TrickleDelay, d.cfg.SubBatchDelay,
d.cfg.MinimumBatchSize, len(annBatch),
)

splitAnnouncementBatch := splitAnnouncementBatches(
subBatchSize, annBatch,
)

d.wg.Add(1)
go func() {
defer d.wg.Done()

log.Infof("Broadcasting %v new announcements in %d sub "+
"batches (local=%v)", len(annBatch),
len(splitAnnouncementBatch), isLocal)

for _, announcementBatch := range splitAnnouncementBatch {
d.sendBatch(announcementBatch, isLocal)

select {
case <-time.After(d.cfg.SubBatchDelay):
case <-d.quit:
return
}
}
}()
}

// sendBatch broadcasts a list of announcements to our peers.
func (d *AuthenticatedGossiper) sendBatch(announcementBatch []msgWithSenders) {
func (d *AuthenticatedGossiper) sendBatch(annBatch []msgWithSenders,
isLocal bool) {

// If this is a batch of announcements created locally, then we can
// skip the filter and dedup logic below, and just send the
// announcements out to all our coonnected peers.
if isLocal {
msgsToSend := fn.Map(
annBatch, func(m msgWithSenders) lnwire.Message {
return m.msg
},
)
err := d.cfg.Broadcast(nil, msgsToSend...)
if err != nil {
log.Errorf("Unable to send local batch "+
"announcements: %v", err)
}

return
}

syncerPeers := d.syncMgr.GossipSyncers()

// We'll first attempt to filter out this new message
// for all peers that have active gossip syncers
// active.
// We'll first attempt to filter out this new message for all peers
// that have active gossip syncers active.
for _, syncer := range syncerPeers {
syncer.FilterGossipMsgs(announcementBatch...)
syncer.FilterGossipMsgs(annBatch...)
}

for _, msgChunk := range announcementBatch {
// With the syncers taken care of, we'll merge
// the sender map with the set of syncers, so
// we don't send out duplicate messages.
for _, msgChunk := range annBatch {
msgChunk := msgChunk

// With the syncers taken care of, we'll merge the sender map
// with the set of syncers, so we don't send out duplicate
// messages.
msgChunk.mergeSyncerMap(syncerPeers)

err := d.cfg.Broadcast(
msgChunk.senders, msgChunk.msg,
)
err := d.cfg.Broadcast(msgChunk.senders, msgChunk.msg)
if err != nil {
log.Errorf("Unable to send batch "+
"announcements: %v", err)
Expand Down Expand Up @@ -1240,39 +1340,23 @@ func (d *AuthenticatedGossiper) networkHandler() {

// If the current announcements batch is nil, then we
// have no further work here.
if len(announcementBatch) == 0 {
if announcementBatch.isEmpty() {
continue
}

// Next, If we have new things to announce then
// broadcast them to all our immediately connected
// peers.
subBatchSize := calculateSubBatchSize(
d.cfg.TrickleDelay, d.cfg.SubBatchDelay, d.cfg.MinimumBatchSize,
len(announcementBatch),
// At this point, we have the set of local and remote
// announcements we want to send out. We'll do the
// batching as normal for both, but for local
// announcements, we'll blast them out w/o regard for
// our peer's policies so we ensure they propagate
// properly.
d.splitAndSendAnnBatch(
announcementBatch.localMsgs, true,
)

splitAnnouncementBatch := splitAnnouncementBatches(
subBatchSize, announcementBatch,
d.splitAndSendAnnBatch(
announcementBatch.remoteMsgs, false,
)

d.wg.Add(1)
go func() {
defer d.wg.Done()

log.Infof("Broadcasting %v new announcements in %d sub batches",
len(announcementBatch), len(splitAnnouncementBatch))

for _, announcementBatch := range splitAnnouncementBatch {
d.sendBatch(announcementBatch)
select {
case <-time.After(d.cfg.SubBatchDelay):
case <-d.quit:
return
}
}
}()

// The retransmission timer has ticked which indicates that we
// should check if we need to prune or re-broadcast any of our
// personal channels or node announcement. This addresses the
Expand Down Expand Up @@ -1611,8 +1695,9 @@ func (d *AuthenticatedGossiper) processChanPolicyUpdate(
// We set ourselves as the source of this message to indicate
// that we shouldn't skip any peers when sending this message.
chanUpdates = append(chanUpdates, networkMsg{
source: d.selfKey,
msg: chanUpdate,
source: d.selfKey,
isRemote: false,
msg: chanUpdate,
})
}

Expand Down Expand Up @@ -2182,9 +2267,10 @@ func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg,
// be broadcast to the rest of our peers.
if isPublic {
announcements = append(announcements, networkMsg{
peer: nMsg.peer,
source: nMsg.source,
msg: nodeAnn,
peer: nMsg.peer,
isRemote: nMsg.isRemote,
source: nMsg.source,
msg: nodeAnn,
})
} else {
log.Tracef("Skipping broadcasting node announcement for %x "+
Expand Down Expand Up @@ -2460,9 +2546,10 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg,

if proof != nil {
announcements = append(announcements, networkMsg{
peer: nMsg.peer,
source: nMsg.source,
msg: ann,
peer: nMsg.peer,
isRemote: nMsg.isRemote,
source: nMsg.source,
msg: ann,
})
}

Expand Down Expand Up @@ -2848,9 +2935,10 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
var announcements []networkMsg
if chanInfo.AuthProof != nil && !d.cfg.IsAlias(upd.ShortChannelID) {
announcements = append(announcements, networkMsg{
peer: nMsg.peer,
source: nMsg.source,
msg: upd,
peer: nMsg.peer,
source: nMsg.source,
isRemote: nMsg.isRemote,
msg: upd,
})
}

Expand Down Expand Up @@ -2949,6 +3037,7 @@ func (d *AuthenticatedGossiper) handleAnnSig(nMsg *networkMsg,
} else {
remotePubKey = chanInfo.NodeKey1Bytes
}

// Since the remote peer might not be online we'll call a
// method that will attempt to deliver the proof when it comes
// online.
Expand Down
26 changes: 17 additions & 9 deletions discovery/gossiper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,8 @@ func TestProcessAnnouncement(t *testing.T) {
require.NoError(t, err, "can't create context")

assertSenderExistence := func(sender *btcec.PublicKey, msg msgWithSenders) {
t.Helper()

if _, ok := msg.senders[route.NewVertex(sender)]; !ok {
t.Fatalf("sender=%x not present in %v",
sender.SerializeCompressed(), spew.Sdump(msg))
Expand Down Expand Up @@ -1910,31 +1912,37 @@ func TestDeDuplicatedAnnouncements(t *testing.T) {
// Ensure that announcement batch delivers channel announcements,
// channel updates, and node announcements in proper order.
batch := announcements.Emit()
if len(batch) != 4 {
if batch.length() != 4 {
t.Fatal("announcement batch incorrect length")
}

if !reflect.DeepEqual(batch[0].msg, ca2) {
if !reflect.DeepEqual(batch.localMsgs[0].msg, ca2) {
t.Fatalf("channel announcement not first in batch: got %v, "+
"expected %v", spew.Sdump(batch[0].msg), spew.Sdump(ca2))
"expected %v", spew.Sdump(batch.localMsgs[0].msg),
spew.Sdump(ca2))
}

if !reflect.DeepEqual(batch[1].msg, ua3) {
if !reflect.DeepEqual(batch.localMsgs[1].msg, ua3) {
t.Fatalf("channel update not next in batch: got %v, "+
"expected %v", spew.Sdump(batch[1].msg), spew.Sdump(ua2))
"expected %v", spew.Sdump(batch.localMsgs[1].msg),
spew.Sdump(ua2))
}

// We'll ensure that both node announcements are present. We check both
// indexes as due to the randomized order of map iteration they may be
// in either place.
if !reflect.DeepEqual(batch[2].msg, na) && !reflect.DeepEqual(batch[3].msg, na) {
if !reflect.DeepEqual(batch.localMsgs[2].msg, na) &&
!reflect.DeepEqual(batch.localMsgs[3].msg, na) {

t.Fatalf("first node announcement not in last part of batch: "+
"got %v, expected %v", batch[2].msg,
"got %v, expected %v", batch.localMsgs[2].msg,
na)
}
if !reflect.DeepEqual(batch[2].msg, na5) && !reflect.DeepEqual(batch[3].msg, na5) {
if !reflect.DeepEqual(batch.localMsgs[2].msg, na5) &&
!reflect.DeepEqual(batch.localMsgs[3].msg, na5) {

t.Fatalf("second node announcement not in last part of batch: "+
"got %v, expected %v", batch[3].msg,
"got %v, expected %v", batch.localMsgs[3].msg,
na5)
}

Expand Down
8 changes: 8 additions & 0 deletions docs/release-notes/release-notes-0.16.0.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Release Notes

## Peer to Peer Behavior

`lnd` will now [properly prioritize sending out gossip updates generated
locally to all connected
peers](https://github.com/lightningnetwork/lnd/pull/7239), regardless of their
current gossip sync query status.


## BOLT Specs

* Warning messages from peers are now recognized and
Expand Down
Loading

0 comments on commit de3e0d7

Please sign in to comment.