Skip to content

Commit

Permalink
discovery: ensure we prioritize sending out our own local announcements
Browse files Browse the repository at this point in the history
In this commit, we modify our gossip broadcast logic to ensure that we
always will send out our own gossip messages regardless of the
filtering/feature policies of the peer.

Before this commit, it was possible that when we went to broadcast an
announcement, none of our peers actually had us as a syncer peer (lnd
terminology). In this case, the FilterGossipMsg function wouldn't do
anything, as they don't have an active timestamp filter set. When we go
to them merge the syncer map, we'd add all these peers we didn't send
to, meaning we would skip them when it came to broadcast time.

In this commit, we now split things into two phases: we'll broadcast
_our_ own announcements to all our peers, but then do the normal
filtering and chunking for the announcements we got from a remote peer.

Fixes lightningnetwork#6531
Fixes lightningnetwork#7223
Fixes lightningnetwork#7073
  • Loading branch information
Roasbeef committed Dec 7, 2022
1 parent 650ea97 commit 909d5d1
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 53 deletions.
135 changes: 91 additions & 44 deletions discovery/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/lightningnetwork/lnd/batch"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/funcutils"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnpeer"
Expand Down Expand Up @@ -1052,13 +1053,18 @@ func (m *msgsToBroadcast) isEmpty() bool {
return len(m.localMsgs) == 0 && len(m.remoteMsgs) == 0
}

// length returns the length of the combined message set.
func (m *msgsToBroadcast) length() int {
return len(m.localMsgs) + len(m.remoteMsgs)
}

// Emit returns the set of de-duplicated announcements to be sent out during
// the next announcement epoch, in the order of channel announcements, channel
// updates, and node announcements. Each message emitted, contains the set of
// peers that sent us the message. This way, we can ensure that we don't waste
// bandwidth by re-sending a message to the peer that sent it to us in the
// first place. Additionally, the set of stored messages are reset.
func (d *deDupedAnnouncements) Emit() []msgWithSenders {
func (d *deDupedAnnouncements) Emit() msgsToBroadcast {
d.Lock()
defer d.Unlock()

Expand All @@ -1068,21 +1074,24 @@ func (d *deDupedAnnouncements) Emit() []msgWithSenders {

// Create an empty array of lnwire.Messages with a length equal to
// the total number of announcements.
msgs := make([]msgWithSenders, 0, numAnnouncements)
msgs := msgsToBroadcast{
localMsgs: make([]msgWithSenders, 0, numAnnouncements),
remoteMsgs: make([]msgWithSenders, 0, numAnnouncements),
}

// Add the channel announcements to the array first.
for _, message := range d.channelAnnouncements {
msgs = append(msgs, message)
msgs.addMsg(message)
}

// Then add the channel updates.
for _, message := range d.channelUpdates {
msgs = append(msgs, message)
msgs.addMsg(message)
}

// Finally add the node announcements.
for _, message := range d.nodeAnnouncements {
msgs = append(msgs, message)
msgs.addMsg(message)
}

d.reset()
Expand Down Expand Up @@ -1128,26 +1137,80 @@ func splitAnnouncementBatches(subBatchSize int,
return splitAnnouncementBatch
}

// splitAndSendAnnBatch takes a batch of messages, computes the proper batch
// split size, and then sends out all items to the set of target peers. If
// isLocal is true, then we'll send them to all peers and skip any gossip
// filter checks.
func (d *AuthenticatedGossiper) splitAndSendAnnBatch(annBatch []msgWithSenders, isLocal bool) {
// Next, If we have new things to announce then
// broadcast them to all our immediately connected
// peers.
subBatchSize := calculateSubBatchSize(
d.cfg.TrickleDelay, d.cfg.SubBatchDelay,
d.cfg.MinimumBatchSize, len(annBatch),
)

splitAnnouncementBatch := splitAnnouncementBatches(
subBatchSize, annBatch,
)

d.wg.Add(1)
go func() {
defer d.wg.Done()

log.Infof("Broadcasting %v new announcements in %d sub batches (local=%v)",
len(annBatch), len(splitAnnouncementBatch), isLocal)

for _, announcementBatch := range splitAnnouncementBatch {
d.sendBatch(announcementBatch, isLocal)

select {
case <-time.After(d.cfg.SubBatchDelay):
case <-d.quit:
return
}
}
}()
}

// sendBatch broadcasts a list of announcements to our peers.
func (d *AuthenticatedGossiper) sendBatch(announcementBatch []msgWithSenders) {
func (d *AuthenticatedGossiper) sendBatch(annBatch []msgWithSenders, isLocal bool) {

// If this is a batch of announcements created locally, then we can
// skip the filter and deup logic below, and just send the
// announcements out to all our coonnected peers.
if isLocal {
msgsToSend := funcutils.Map(
annBatch, func(m msgWithSenders) lnwire.Message {
return m.msg
},
)
err := d.cfg.Broadcast(nil, msgsToSend...)
if err != nil {
log.Errorf("Unable to send local batch "+
"announcements: %v", err)
}

return
}

syncerPeers := d.syncMgr.GossipSyncers()

// We'll first attempt to filter out this new message
// for all peers that have active gossip syncers
// active.
// We'll first attempt to filter out this new message for all peers
// that have active gossip syncers active.
for _, syncer := range syncerPeers {
syncer.FilterGossipMsgs(announcementBatch...)
syncer.FilterGossipMsgs(annBatch...)
}

for _, msgChunk := range announcementBatch {
// With the syncers taken care of, we'll merge
// the sender map with the set of syncers, so
// we don't send out duplicate messages.
for _, msgChunk := range annBatch {
msgChunk := msgChunk

// With the syncers taken care of, we'll merge the sender map
// with the set of syncers, so we don't send out duplicate
// messages.
msgChunk.mergeSyncerMap(syncerPeers)

err := d.cfg.Broadcast(
msgChunk.senders, msgChunk.msg,
)
err := d.cfg.Broadcast(msgChunk.senders, msgChunk.msg)
if err != nil {
log.Errorf("Unable to send batch "+
"announcements: %v", err)
Expand Down Expand Up @@ -1339,39 +1402,23 @@ func (d *AuthenticatedGossiper) networkHandler() {

// If the current announcements batch is nil, then we
// have no further work here.
if len(announcementBatch) == 0 {
if announcementBatch.isEmpty() {
continue
}

// Next, If we have new things to announce then
// broadcast them to all our immediately connected
// peers.
subBatchSize := calculateSubBatchSize(
d.cfg.TrickleDelay, d.cfg.SubBatchDelay, d.cfg.MinimumBatchSize,
len(announcementBatch),
// At this point, we have the set of local and remote
// announcements we want to send out. We'll do the
// batching as normal for both, but for local
// announcements, we'll blast them out w/o regard for
// our peer's policies so we ensure they propagate
// properly.
d.splitAndSendAnnBatch(
announcementBatch.localMsgs, true,
)

splitAnnouncementBatch := splitAnnouncementBatches(
subBatchSize, announcementBatch,
d.splitAndSendAnnBatch(
announcementBatch.remoteMsgs, false,
)

d.wg.Add(1)
go func() {
defer d.wg.Done()

log.Infof("Broadcasting %v new announcements in %d sub batches",
len(announcementBatch), len(splitAnnouncementBatch))

for _, announcementBatch := range splitAnnouncementBatch {
d.sendBatch(announcementBatch)
select {
case <-time.After(d.cfg.SubBatchDelay):
case <-d.quit:
return
}
}
}()

// The retransmission timer has ticked which indicates that we
// should check if we need to prune or re-broadcast any of our
// personal channels or node announcement. This addresses the
Expand Down
22 changes: 13 additions & 9 deletions discovery/gossiper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1910,31 +1910,35 @@ func TestDeDuplicatedAnnouncements(t *testing.T) {
// Ensure that announcement batch delivers channel announcements,
// channel updates, and node announcements in proper order.
batch := announcements.Emit()
if len(batch) != 4 {
if batch.length() != 4 {
t.Fatal("announcement batch incorrect length")
}

if !reflect.DeepEqual(batch[0].msg, ca2) {
if !reflect.DeepEqual(batch.localMsgs[0].msg, ca2) {
t.Fatalf("channel announcement not first in batch: got %v, "+
"expected %v", spew.Sdump(batch[0].msg), spew.Sdump(ca2))
"expected %v", spew.Sdump(batch.localMsgs[0].msg), spew.Sdump(ca2))
}

if !reflect.DeepEqual(batch[1].msg, ua3) {
if !reflect.DeepEqual(batch.localMsgs[1].msg, ua3) {
t.Fatalf("channel update not next in batch: got %v, "+
"expected %v", spew.Sdump(batch[1].msg), spew.Sdump(ua2))
"expected %v", spew.Sdump(batch.localMsgs[1].msg), spew.Sdump(ua2))
}

// We'll ensure that both node announcements are present. We check both
// indexes as due to the randomized order of map iteration they may be
// in either place.
if !reflect.DeepEqual(batch[2].msg, na) && !reflect.DeepEqual(batch[3].msg, na) {
if !reflect.DeepEqual(batch.localMsgs[2].msg, na) &&
!reflect.DeepEqual(batch.localMsgs[3].msg, na) {

t.Fatalf("first node announcement not in last part of batch: "+
"got %v, expected %v", batch[2].msg,
"got %v, expected %v", batch.localMsgs[2].msg,
na)
}
if !reflect.DeepEqual(batch[2].msg, na5) && !reflect.DeepEqual(batch[3].msg, na5) {
if !reflect.DeepEqual(batch.localMsgs[2].msg, na5) &&
!reflect.DeepEqual(batch.localMsgs[3].msg, na5) {

t.Fatalf("second node announcement not in last part of batch: "+
"got %v, expected %v", batch[3].msg,
"got %v, expected %v", batch.localMsgs[3].msg,
na5)
}

Expand Down

0 comments on commit 909d5d1

Please sign in to comment.