Skip to content

Commit

Permalink
discovery: send both local and remote anns in the same goroutine
Browse files Browse the repository at this point in the history
This commit changes the sending of anns from using separate goroutines
to always sending both local and remote announcements in the same
goroutine. In addition, the local announcements are always sent first.
This change is to fix the following case:

1. Alice and Bob have a channel
2. Alice receives Bob's NodeAnnouncement
3. Alice goes to broadcast the channel
4. The broadcast is split into a local and remote broadcast due to PR
   lightningnetwork#7239. Bob's NodeAnnouncement is in the remote batch. Everything else
   (ChannelAnnouncement, ChannelUpdate x2, and Alice's NodeAnnouncement)
   is in the local batch.
5. The remote batch (containing Bob's NodeAnnouncement) runs before the
   local batch since they are spawned in separate goroutines. This means
   that Alice sends Carol the NodeAnnouncement before Carol knows of the
   channel.

In step 2), Bob's NodeAnnouncement (isRemote = true) replaces Bob's
NodeAnnouncement that Alice was going to relay (isRemote = false) after
processing the AnnouncementSignatures.
  • Loading branch information
yyforyongyu committed Feb 17, 2023
1 parent b73cfc5 commit e34a088
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 28 deletions.
61 changes: 35 additions & 26 deletions discovery/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -1154,34 +1154,48 @@ func (d *AuthenticatedGossiper) splitAnnouncementBatches(
}

// splitAndSendAnnBatch takes a batch of messages, computes the proper batch
// split size, and then sends out all items to the set of target peers. If
// isLocal is true, then we'll send them to all peers and skip any gossip
// filter checks.
func (d *AuthenticatedGossiper) splitAndSendAnnBatch(annBatch []msgWithSenders,
isLocal bool) {
// split size, and then sends out all items to the set of target peers. Locally
// generated announcements are always sent before remotely generated
// announcements.
func (d *AuthenticatedGossiper) splitAndSendAnnBatch(
annBatch msgsToBroadcast) {

// delayNextBatch is a helper closure that blocks for `SubBatchDelay`
// duration to delay the sending of next announcement batch.
delayNextBatch := func() {
select {
case <-time.After(d.cfg.SubBatchDelay):
case <-d.quit:
return
}
}

splitAnnouncementBatch := d.splitAnnouncementBatches(annBatch)
// Fetch the local and remote announcements.
localBatches := d.splitAnnouncementBatches(annBatch.localMsgs)
remoteBatches := d.splitAnnouncementBatches(annBatch.remoteMsgs)

d.wg.Add(1)
go func() {
defer d.wg.Done()

log.Infof("Broadcasting %v new announcements in %d sub "+
"batches (local=%v)", len(annBatch),
len(splitAnnouncementBatch), isLocal)
log.Debugf("Broadcasting %v new local announcements in %d "+
"sub batches", len(annBatch.localMsgs),
len(localBatches))

for _, announcementBatch := range splitAnnouncementBatch {
if isLocal {
d.sendLocalBatch(announcementBatch)
} else {
d.sendRemoteBatch(announcementBatch)
}
// Send out the local announcements first.
for _, annBatch := range localBatches {
d.sendLocalBatch(annBatch)
delayNextBatch()
}

select {
case <-time.After(d.cfg.SubBatchDelay):
case <-d.quit:
return
}
log.Debugf("Broadcasting %v new remote announcements in %d "+
"sub batches", len(annBatch.remoteMsgs),
len(remoteBatches))

// Now send the remote announcements.
for _, annBatch := range remoteBatches {
d.sendRemoteBatch(annBatch)
delayNextBatch()
}
}()
}
Expand Down Expand Up @@ -1354,12 +1368,7 @@ func (d *AuthenticatedGossiper) networkHandler() {
// announcements, we'll blast them out w/o regard for
// our peer's policies so we ensure they propagate
// properly.
d.splitAndSendAnnBatch(
announcementBatch.localMsgs, true,
)
d.splitAndSendAnnBatch(
announcementBatch.remoteMsgs, false,
)
d.splitAndSendAnnBatch(announcementBatch)

// The retransmission timer has ticked which indicates that we
// should check if we need to prune or re-broadcast any of our
Expand Down
6 changes: 4 additions & 2 deletions discovery/gossiper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,7 @@ func createTestCtx(t *testing.T, startHeight uint32) (*testCtx, error) {
HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval),
NumActiveSyncers: 3,
AnnSigner: &mock.SingleSigner{Privkey: selfKeyPriv},
SubBatchDelay: time.Second * 5,
SubBatchDelay: 1 * time.Millisecond,
MinimumBatchSize: 10,
MaxChannelUpdateBurst: DefaultMaxChannelUpdateBurst,
ChannelUpdateInterval: DefaultChannelUpdateInterval,
Expand Down Expand Up @@ -3371,7 +3371,9 @@ out:
case <-sentMsgs:
case err := <-notifyErr:
t.Fatal(err)
default:

// Give it 5 seconds to drain out.
case <-time.After(5 * time.Second):
break out
}
}
Expand Down

0 comments on commit e34a088

Please sign in to comment.