diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 116dd30871..6b59a98aa6 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -18,6 +18,7 @@ import ( "github.com/lightningnetwork/lnd/batch" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/funcutils" "github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lnpeer" @@ -1052,13 +1053,18 @@ func (m *msgsToBroadcast) isEmpty() bool { return len(m.localMsgs) == 0 && len(m.remoteMsgs) == 0 } +// length returns the length of the combined message set. +func (m *msgsToBroadcast) length() int { + return len(m.localMsgs) + len(m.remoteMsgs) +} + // Emit returns the set of de-duplicated announcements to be sent out during // the next announcement epoch, in the order of channel announcements, channel // updates, and node announcements. Each message emitted, contains the set of // peers that sent us the message. This way, we can ensure that we don't waste // bandwidth by re-sending a message to the peer that sent it to us in the // first place. Additionally, the set of stored messages are reset. -func (d *deDupedAnnouncements) Emit() []msgWithSenders { +func (d *deDupedAnnouncements) Emit() msgsToBroadcast { d.Lock() defer d.Unlock() @@ -1068,21 +1074,24 @@ func (d *deDupedAnnouncements) Emit() []msgWithSenders { // Create an empty array of lnwire.Messages with a length equal to // the total number of announcements. - msgs := make([]msgWithSenders, 0, numAnnouncements) + msgs := msgsToBroadcast{ + localMsgs: make([]msgWithSenders, 0, numAnnouncements), + remoteMsgs: make([]msgWithSenders, 0, numAnnouncements), + } // Add the channel announcements to the array first. for _, message := range d.channelAnnouncements { - msgs = append(msgs, message) + msgs.addMsg(message) } // Then add the channel updates. for _, message := range d.channelUpdates { - msgs = append(msgs, message) + msgs.addMsg(message) } // Finally add the node announcements. for _, message := range d.nodeAnnouncements { - msgs = append(msgs, message) + msgs.addMsg(message) } d.reset() @@ -1128,26 +1137,80 @@ func splitAnnouncementBatches(subBatchSize int, return splitAnnouncementBatch } +// splitAndSendAnnBatch takes a batch of messages, computes the proper batch +// split size, and then sends out all items to the set of target peers. If +// isLocal is true, then we'll send them to all peers and skip any gossip +// filter checks. +func (d *AuthenticatedGossiper) splitAndSendAnnBatch(annBatch []msgWithSenders, isLocal bool) { + // Next, If we have new things to announce then + // broadcast them to all our immediately connected + // peers. + subBatchSize := calculateSubBatchSize( + d.cfg.TrickleDelay, d.cfg.SubBatchDelay, + d.cfg.MinimumBatchSize, len(annBatch), + ) + + splitAnnouncementBatch := splitAnnouncementBatches( + subBatchSize, annBatch, + ) + + d.wg.Add(1) + go func() { + defer d.wg.Done() + + log.Infof("Broadcasting %v new announcements in %d sub batches (local=%v)", + len(annBatch), len(splitAnnouncementBatch), isLocal) + + for _, announcementBatch := range splitAnnouncementBatch { + d.sendBatch(announcementBatch, isLocal) + + select { + case <-time.After(d.cfg.SubBatchDelay): + case <-d.quit: + return + } + } + }() +} + // sendBatch broadcasts a list of announcements to our peers. -func (d *AuthenticatedGossiper) sendBatch(announcementBatch []msgWithSenders) { +func (d *AuthenticatedGossiper) sendBatch(annBatch []msgWithSenders, isLocal bool) { + + // If this is a batch of announcements created locally, then we can + // skip the filter and deup logic below, and just send the + // announcements out to all our coonnected peers. + if isLocal { + msgsToSend := funcutils.Map( + annBatch, func(m msgWithSenders) lnwire.Message { + return m.msg + }, + ) + err := d.cfg.Broadcast(nil, msgsToSend...) + if err != nil { + log.Errorf("Unable to send local batch "+ + "announcements: %v", err) + } + + return + } + syncerPeers := d.syncMgr.GossipSyncers() - // We'll first attempt to filter out this new message - // for all peers that have active gossip syncers - // active. + // We'll first attempt to filter out this new message for all peers + // that have active gossip syncers active. for _, syncer := range syncerPeers { - syncer.FilterGossipMsgs(announcementBatch...) + syncer.FilterGossipMsgs(annBatch...) } - for _, msgChunk := range announcementBatch { - // With the syncers taken care of, we'll merge - // the sender map with the set of syncers, so - // we don't send out duplicate messages. + for _, msgChunk := range annBatch { + msgChunk := msgChunk + + // With the syncers taken care of, we'll merge the sender map + // with the set of syncers, so we don't send out duplicate + // messages. msgChunk.mergeSyncerMap(syncerPeers) - err := d.cfg.Broadcast( - msgChunk.senders, msgChunk.msg, - ) + err := d.cfg.Broadcast(msgChunk.senders, msgChunk.msg) if err != nil { log.Errorf("Unable to send batch "+ "announcements: %v", err) @@ -1339,39 +1402,23 @@ func (d *AuthenticatedGossiper) networkHandler() { // If the current announcements batch is nil, then we // have no further work here. - if len(announcementBatch) == 0 { + if announcementBatch.isEmpty() { continue } - // Next, If we have new things to announce then - // broadcast them to all our immediately connected - // peers. - subBatchSize := calculateSubBatchSize( - d.cfg.TrickleDelay, d.cfg.SubBatchDelay, d.cfg.MinimumBatchSize, - len(announcementBatch), + // At this point, we have the set of local and remote + // announcements we want to send out. We'll do the + // batching as normal for both, but for local + // announcements, we'll blast them out w/o regard for + // our peer's policies so we ensure they propagate + // properly. + d.splitAndSendAnnBatch( + announcementBatch.localMsgs, true, ) - - splitAnnouncementBatch := splitAnnouncementBatches( - subBatchSize, announcementBatch, + d.splitAndSendAnnBatch( + announcementBatch.remoteMsgs, false, ) - d.wg.Add(1) - go func() { - defer d.wg.Done() - - log.Infof("Broadcasting %v new announcements in %d sub batches", - len(announcementBatch), len(splitAnnouncementBatch)) - - for _, announcementBatch := range splitAnnouncementBatch { - d.sendBatch(announcementBatch) - select { - case <-time.After(d.cfg.SubBatchDelay): - case <-d.quit: - return - } - } - }() - // The retransmission timer has ticked which indicates that we // should check if we need to prune or re-broadcast any of our // personal channels or node announcement. This addresses the diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index a426620fe0..8c624c379d 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -1910,31 +1910,35 @@ func TestDeDuplicatedAnnouncements(t *testing.T) { // Ensure that announcement batch delivers channel announcements, // channel updates, and node announcements in proper order. batch := announcements.Emit() - if len(batch) != 4 { + if batch.length() != 4 { t.Fatal("announcement batch incorrect length") } - if !reflect.DeepEqual(batch[0].msg, ca2) { + if !reflect.DeepEqual(batch.localMsgs[0].msg, ca2) { t.Fatalf("channel announcement not first in batch: got %v, "+ - "expected %v", spew.Sdump(batch[0].msg), spew.Sdump(ca2)) + "expected %v", spew.Sdump(batch.localMsgs[0].msg), spew.Sdump(ca2)) } - if !reflect.DeepEqual(batch[1].msg, ua3) { + if !reflect.DeepEqual(batch.localMsgs[1].msg, ua3) { t.Fatalf("channel update not next in batch: got %v, "+ - "expected %v", spew.Sdump(batch[1].msg), spew.Sdump(ua2)) + "expected %v", spew.Sdump(batch.localMsgs[1].msg), spew.Sdump(ua2)) } // We'll ensure that both node announcements are present. We check both // indexes as due to the randomized order of map iteration they may be // in either place. - if !reflect.DeepEqual(batch[2].msg, na) && !reflect.DeepEqual(batch[3].msg, na) { + if !reflect.DeepEqual(batch.localMsgs[2].msg, na) && + !reflect.DeepEqual(batch.localMsgs[3].msg, na) { + t.Fatalf("first node announcement not in last part of batch: "+ - "got %v, expected %v", batch[2].msg, + "got %v, expected %v", batch.localMsgs[2].msg, na) } - if !reflect.DeepEqual(batch[2].msg, na5) && !reflect.DeepEqual(batch[3].msg, na5) { + if !reflect.DeepEqual(batch.localMsgs[2].msg, na5) && + !reflect.DeepEqual(batch.localMsgs[3].msg, na5) { + t.Fatalf("second node announcement not in last part of batch: "+ - "got %v, expected %v", batch[3].msg, + "got %v, expected %v", batch.localMsgs[3].msg, na5) }