Skip to content

Commit

Permalink
refactor(bitswap): performance improvements (#634)
Browse files Browse the repository at this point in the history
* Minor performance improvements around wantlist handling

- Create fewer copies of wantlist.
- Fewer iterations of wantlist when processing received message.

These changes make some minor improvements
and separates these from functional changes in other PR(s).

* Do not try to process empty emssages
* Do not let splitWants modify entries list argument, but pass in message instead
* increase timeouts to fix flaky tests
  • Loading branch information
gammazero authored Jul 2, 2024
1 parent dfd4a53 commit 733fa55
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 114 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ The following emojis are used to highlight certain changes:

### Changed

- `bitswap/server` minor memory use and performance improvements

### Removed

### Fixed
Expand Down
37 changes: 17 additions & 20 deletions bitswap/client/internal/messagequeue/messagequeue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ import (
bsmsg "github.com/ipfs/boxo/bitswap/message"
pb "github.com/ipfs/boxo/bitswap/message/pb"
bsnet "github.com/ipfs/boxo/bitswap/network"
"github.com/ipfs/boxo/internal/test"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
)

const collectTimeout = 100 * time.Millisecond

type fakeMessageNetwork struct {
connectError error
messageSenderError error
Expand Down Expand Up @@ -172,7 +173,7 @@ func TestStartupAndShutdown(t *testing.T) {

messageQueue.Startup()
messageQueue.AddBroadcastWantHaves(bcstwh)
messages := collectMessages(ctx, t, messagesSent, 100*time.Millisecond)
messages := collectMessages(ctx, t, messagesSent, collectTimeout)
if len(messages) != 1 {
t.Fatal("wrong number of messages were sent for broadcast want-haves")
}
Expand Down Expand Up @@ -212,16 +213,14 @@ func TestSendingMessagesDeduped(t *testing.T) {
messageQueue.Startup()
messageQueue.AddWants(wantBlocks, wantHaves)
messageQueue.AddWants(wantBlocks, wantHaves)
messages := collectMessages(ctx, t, messagesSent, 100*time.Millisecond)
messages := collectMessages(ctx, t, messagesSent, collectTimeout)

if totalEntriesLength(messages) != len(wantHaves)+len(wantBlocks) {
t.Fatal("Messages were not deduped")
}
}

func TestSendingMessagesPartialDupe(t *testing.T) {
test.Flaky(t)

ctx := context.Background()
messagesSent := make(chan []bsmsg.Entry)
resetChan := make(chan struct{}, 1)
Expand All @@ -235,16 +234,14 @@ func TestSendingMessagesPartialDupe(t *testing.T) {
messageQueue.Startup()
messageQueue.AddWants(wantBlocks[:8], wantHaves[:8])
messageQueue.AddWants(wantBlocks[3:], wantHaves[3:])
messages := collectMessages(ctx, t, messagesSent, 20*time.Millisecond)
messages := collectMessages(ctx, t, messagesSent, 5*collectTimeout)

if totalEntriesLength(messages) != len(wantHaves)+len(wantBlocks) {
t.Fatal("messages were not correctly deduped")
}
}

func TestSendingMessagesPriority(t *testing.T) {
test.Flaky(t)

ctx := context.Background()
messagesSent := make(chan []bsmsg.Entry)
resetChan := make(chan struct{}, 1)
Expand All @@ -262,7 +259,7 @@ func TestSendingMessagesPriority(t *testing.T) {
messageQueue.Startup()
messageQueue.AddWants(wantBlocks1, wantHaves1)
messageQueue.AddWants(wantBlocks2, wantHaves2)
messages := collectMessages(ctx, t, messagesSent, 20*time.Millisecond)
messages := collectMessages(ctx, t, messagesSent, 5*collectTimeout)

if totalEntriesLength(messages) != len(wantHaves)+len(wantBlocks) {
t.Fatal("wrong number of wants")
Expand Down Expand Up @@ -327,7 +324,7 @@ func TestCancelOverridesPendingWants(t *testing.T) {
messageQueue.Startup()
messageQueue.AddWants(wantBlocks, wantHaves)
messageQueue.AddCancels(cancels)
messages := collectMessages(ctx, t, messagesSent, 100*time.Millisecond)
messages := collectMessages(ctx, t, messagesSent, collectTimeout)

if totalEntriesLength(messages) != len(wantHaves)+len(wantBlocks)-len(cancels) {
t.Fatal("Wrong message count")
Expand All @@ -351,7 +348,7 @@ func TestCancelOverridesPendingWants(t *testing.T) {
// Cancel the remaining want-blocks and want-haves
cancels = append(wantHaves, wantBlocks...)
messageQueue.AddCancels(cancels)
messages = collectMessages(ctx, t, messagesSent, 100*time.Millisecond)
messages = collectMessages(ctx, t, messagesSent, collectTimeout)

// The remaining 2 cancels should be sent to the network as they are for
// wants that were sent to the network
Expand Down Expand Up @@ -379,7 +376,7 @@ func TestWantOverridesPendingCancels(t *testing.T) {
// Add 1 want-block and 2 want-haves
messageQueue.AddWants(wantBlocks, wantHaves)

messages := collectMessages(ctx, t, messagesSent, 100*time.Millisecond)
messages := collectMessages(ctx, t, messagesSent, collectTimeout)
if totalEntriesLength(messages) != len(wantBlocks)+len(wantHaves) {
t.Fatal("Wrong message count", totalEntriesLength(messages))
}
Expand All @@ -389,7 +386,7 @@ func TestWantOverridesPendingCancels(t *testing.T) {
// Override one cancel with a want-block (before cancel is sent to network)
messageQueue.AddWants(cids[:1], []cid.Cid{})

messages = collectMessages(ctx, t, messagesSent, 100*time.Millisecond)
messages = collectMessages(ctx, t, messagesSent, collectTimeout)
if totalEntriesLength(messages) != 3 {
t.Fatal("Wrong message count", totalEntriesLength(messages))
}
Expand Down Expand Up @@ -531,7 +528,7 @@ func TestSendingLargeMessages(t *testing.T) {

messageQueue.Startup()
messageQueue.AddWants(wantBlocks, []cid.Cid{})
messages := collectMessages(ctx, t, messagesSent, 100*time.Millisecond)
messages := collectMessages(ctx, t, messagesSent, 5*collectTimeout)

// want-block has size 44, so with maxMsgSize 44 * 3 (3 want-blocks), then if
// we send 10 want-blocks we should expect 4 messages:
Expand Down Expand Up @@ -563,7 +560,7 @@ func TestSendToPeerThatDoesntSupportHave(t *testing.T) {
// Check broadcast want-haves
bcwh := testutil.GenerateCids(10)
messageQueue.AddBroadcastWantHaves(bcwh)
messages := collectMessages(ctx, t, messagesSent, 100*time.Millisecond)
messages := collectMessages(ctx, t, messagesSent, collectTimeout)

if len(messages) != 1 {
t.Fatal("wrong number of messages were sent", len(messages))
Expand All @@ -582,7 +579,7 @@ func TestSendToPeerThatDoesntSupportHave(t *testing.T) {
wbs := testutil.GenerateCids(10)
whs := testutil.GenerateCids(10)
messageQueue.AddWants(wbs, whs)
messages = collectMessages(ctx, t, messagesSent, 100*time.Millisecond)
messages = collectMessages(ctx, t, messagesSent, collectTimeout)

if len(messages) != 1 {
t.Fatal("wrong number of messages were sent", len(messages))
Expand Down Expand Up @@ -612,7 +609,7 @@ func TestSendToPeerThatDoesntSupportHaveMonitorsTimeouts(t *testing.T) {

wbs := testutil.GenerateCids(10)
messageQueue.AddWants(wbs, nil)
collectMessages(ctx, t, messagesSent, 100*time.Millisecond)
collectMessages(ctx, t, messagesSent, collectTimeout)

// Check want-blocks are added to DontHaveTimeoutMgr
if dhtm.pendingCount() != len(wbs) {
Expand All @@ -621,7 +618,7 @@ func TestSendToPeerThatDoesntSupportHaveMonitorsTimeouts(t *testing.T) {

cancelCount := 2
messageQueue.AddCancels(wbs[:cancelCount])
collectMessages(ctx, t, messagesSent, 100*time.Millisecond)
collectMessages(ctx, t, messagesSent, collectTimeout)

// Check want-blocks are removed from DontHaveTimeoutMgr
if dhtm.pendingCount() != len(wbs)-cancelCount {
Expand Down Expand Up @@ -692,9 +689,9 @@ func TestResponseReceivedAppliesForFirstResponseOnly(t *testing.T) {

cids := testutil.GenerateCids(2)

// Add some wants and wait 10ms
// Add some wants and wait
messageQueue.AddWants(cids, nil)
collectMessages(ctx, t, messagesSent, 100*time.Millisecond)
collectMessages(ctx, t, messagesSent, collectTimeout)

// Receive a response for the wants
messageQueue.ResponseReceived(cids)
Expand Down
30 changes: 15 additions & 15 deletions bitswap/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,15 +182,9 @@ func (m *impl) Clone() BitSwapMessage {
// Reset the values in the message back to defaults, so it can be reused
func (m *impl) Reset(full bool) {
m.full = full
for k := range m.wantlist {
delete(m.wantlist, k)
}
for k := range m.blocks {
delete(m.blocks, k)
}
for k := range m.blockPresences {
delete(m.blockPresences, k)
}
clear(m.wantlist)
clear(m.blocks)
clear(m.blockPresences)
m.pendingBytes = 0
}

Expand Down Expand Up @@ -253,25 +247,31 @@ func (m *impl) Empty() bool {
}

func (m *impl) Wantlist() []Entry {
out := make([]Entry, 0, len(m.wantlist))
out := make([]Entry, len(m.wantlist))
var i int
for _, e := range m.wantlist {
out = append(out, *e)
out[i] = *e
i++
}
return out
}

func (m *impl) Blocks() []blocks.Block {
bs := make([]blocks.Block, 0, len(m.blocks))
bs := make([]blocks.Block, len(m.blocks))
var i int
for _, block := range m.blocks {
bs = append(bs, block)
bs[i] = block
i++
}
return bs
}

func (m *impl) BlockPresences() []BlockPresence {
bps := make([]BlockPresence, 0, len(m.blockPresences))
bps := make([]BlockPresence, len(m.blockPresences))
var i int
for c, t := range m.blockPresences {
bps = append(bps, BlockPresence{c, t})
bps[i] = BlockPresence{c, t}
i++
}
return bps
}
Expand Down
Loading

0 comments on commit 733fa55

Please sign in to comment.