Skip to content

Commit

Permalink
discovery: split sendBatch into local and remote
Browse files Browse the repository at this point in the history
This commit refactors the method `sendBatch` into `sendLocalBatch` and
`sendRemoteBatch` for clarity. The batch size calculation is also moved
into `splitAnnouncementBatches`.
  • Loading branch information
yyforyongyu committed Feb 17, 2023
1 parent c3d1d3c commit b73cfc5
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 38 deletions.
72 changes: 38 additions & 34 deletions discovery/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -1112,8 +1112,8 @@ func calculateSubBatchSize(totalDelay, subBatchDelay time.Duration,
return batchSize
}

subBatchSize := (int(batchSize)*int(subBatchDelay) + int(totalDelay) - 1) /
int(totalDelay)
subBatchSize := (batchSize*int(subBatchDelay) +
int(totalDelay) - 1) / int(totalDelay)

if subBatchSize < minimumBatchSize {
return minimumBatchSize
Expand All @@ -1122,10 +1122,20 @@ func calculateSubBatchSize(totalDelay, subBatchDelay time.Duration,
return subBatchSize
}

// batchSizeCalculator maps to the function `calculateSubBatchSize`. We create
// this variable so the function can be mocked in our test.
var batchSizeCalculator = calculateSubBatchSize

// splitAnnouncementBatches takes an exiting list of announcements and
// decomposes it into sub batches controlled by the `subBatchSize`.
func splitAnnouncementBatches(subBatchSize int,
func (d *AuthenticatedGossiper) splitAnnouncementBatches(
announcementBatch []msgWithSenders) [][]msgWithSenders {

subBatchSize := batchSizeCalculator(
d.cfg.TrickleDelay, d.cfg.SubBatchDelay,
d.cfg.MinimumBatchSize, len(announcementBatch),
)

var splitAnnouncementBatch [][]msgWithSenders

for subBatchSize < len(announcementBatch) {
Expand All @@ -1136,7 +1146,9 @@ func splitAnnouncementBatches(subBatchSize int,
append(splitAnnouncementBatch,
announcementBatch[0:subBatchSize:subBatchSize])
}
splitAnnouncementBatch = append(splitAnnouncementBatch, announcementBatch)
splitAnnouncementBatch = append(
splitAnnouncementBatch, announcementBatch,
)

return splitAnnouncementBatch
}
Expand All @@ -1148,16 +1160,7 @@ func splitAnnouncementBatches(subBatchSize int,
func (d *AuthenticatedGossiper) splitAndSendAnnBatch(annBatch []msgWithSenders,
isLocal bool) {

// Next, If we have new things to announce then broadcast them to all
// our immediately connected peers.
subBatchSize := calculateSubBatchSize(
d.cfg.TrickleDelay, d.cfg.SubBatchDelay,
d.cfg.MinimumBatchSize, len(annBatch),
)

splitAnnouncementBatch := splitAnnouncementBatches(
subBatchSize, annBatch,
)
splitAnnouncementBatch := d.splitAnnouncementBatches(annBatch)

d.wg.Add(1)
go func() {
Expand All @@ -1168,7 +1171,11 @@ func (d *AuthenticatedGossiper) splitAndSendAnnBatch(annBatch []msgWithSenders,
len(splitAnnouncementBatch), isLocal)

for _, announcementBatch := range splitAnnouncementBatch {
d.sendBatch(announcementBatch, isLocal)
if isLocal {
d.sendLocalBatch(announcementBatch)
} else {
d.sendRemoteBatch(announcementBatch)
}

select {
case <-time.After(d.cfg.SubBatchDelay):
Expand All @@ -1179,28 +1186,25 @@ func (d *AuthenticatedGossiper) splitAndSendAnnBatch(annBatch []msgWithSenders,
}()
}

// sendBatch broadcasts a list of announcements to our peers.
func (d *AuthenticatedGossiper) sendBatch(annBatch []msgWithSenders,
isLocal bool) {

// If this is a batch of announcements created locally, then we can
// skip the filter and dedup logic below, and just send the
// announcements out to all our coonnected peers.
if isLocal {
msgsToSend := lnutils.Map(
annBatch, func(m msgWithSenders) lnwire.Message {
return m.msg
},
)
err := d.cfg.Broadcast(nil, msgsToSend...)
if err != nil {
log.Errorf("Unable to send local batch "+
"announcements: %v", err)
}
// sendLocalBatch broadcasts a list of locally generated announcements to our
// peers. For local announcements, we skip the filter and dedup logic and just
// send the announcements out to all our coonnected peers.
func (d *AuthenticatedGossiper) sendLocalBatch(annBatch []msgWithSenders) {
msgsToSend := lnutils.Map(
annBatch, func(m msgWithSenders) lnwire.Message {
return m.msg
},
)

return
err := d.cfg.Broadcast(nil, msgsToSend...)
if err != nil {
log.Errorf("Unable to send local batch announcements: %v", err)
}
}

// sendRemoteBatch broadcasts a list of remotely generated announcements to our
// peers.
func (d *AuthenticatedGossiper) sendRemoteBatch(annBatch []msgWithSenders) {
syncerPeers := d.syncMgr.GossipSyncers()

// We'll first attempt to filter out this new message for all peers
Expand Down
18 changes: 14 additions & 4 deletions discovery/gossiper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3531,7 +3531,10 @@ func assertMessage(t *testing.T, expected, got lnwire.Message) {
// TestSplitAnnouncementsCorrectSubBatches checks that we split a given
// sizes of announcement list into the correct number of batches.
func TestSplitAnnouncementsCorrectSubBatches(t *testing.T) {
t.Parallel()
// Create our test harness.
const blockHeight = 100
ctx, err := createTestCtx(t, blockHeight)
require.NoError(t, err, "can't create context")

const subBatchSize = 10

Expand All @@ -3541,6 +3544,12 @@ func TestSplitAnnouncementsCorrectSubBatches(t *testing.T) {
lengthAnnouncementBatchSizes := len(announcementBatchSizes)
lengthExpectedNumberMiniBatches := len(expectedNumberMiniBatches)

batchSizeCalculator = func(totalDelay, subBatchDelay time.Duration,
minimumBatchSize, batchSize int) int {

return subBatchSize
}

if lengthAnnouncementBatchSizes != lengthExpectedNumberMiniBatches {
t.Fatal("Length of announcementBatchSizes and " +
"expectedNumberMiniBatches should be equal")
Expand All @@ -3550,15 +3559,16 @@ func TestSplitAnnouncementsCorrectSubBatches(t *testing.T) {
var batchSize = announcementBatchSizes[testIndex]
announcementBatch := make([]msgWithSenders, batchSize)

splitAnnouncementBatch := splitAnnouncementBatches(
subBatchSize, announcementBatch,
splitAnnouncementBatch := ctx.gossiper.splitAnnouncementBatches(
announcementBatch,
)

lengthMiniBatches := len(splitAnnouncementBatch)

if lengthMiniBatches != expectedNumberMiniBatches[testIndex] {
t.Fatalf("Expecting %d mini batches, actual %d",
expectedNumberMiniBatches[testIndex], lengthMiniBatches)
expectedNumberMiniBatches[testIndex],
lengthMiniBatches)
}
}
}
Expand Down

0 comments on commit b73cfc5

Please sign in to comment.