Skip to content

Commit

Permalink
fix: bitswap performance issue (#692)
Browse files Browse the repository at this point in the history
- Fix exhausted wants problem resulting in possible performance issue
- Minor improvements for GC.
- RWLock not justified for time reading
- replace unneeded RWMutex with Mutex
- build strings with strings.Builder

(cherry picked from commit 19bcc75)
  • Loading branch information
gammazero committed Oct 22, 2024
1 parent 3129f1e commit d168417
Show file tree
Hide file tree
Showing 9 changed files with 32 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ The following emojis are used to highlight certain changes:

- `routing/http/client`: optional address and protocol filter parameters from [IPIP-484](https://github.com/ipfs/specs/pull/484) use human-readable `,` instead of `%2C`. [#688](https://github.com/ipfs/boxo/pull/688)
- `bitswap/client` Cleanup live wants when wants are canceled. This prevents live wants from continuing to get rebroadcasted even after the wants are canceled. [#690](https://github.com/ipfs/boxo/pull/690)
- Fix problem adding invalid CID to exhausted wants list resulting in possible performance issue. [#692](https://github.com/ipfs/boxo/pull/692)

### Security

Expand Down
12 changes: 6 additions & 6 deletions bitswap/client/internal/messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ type MessageQueue struct {

// Dont touch any of these variables outside of run loop
sender bsnet.MessageSender
rebroadcastIntervalLk sync.RWMutex
rebroadcastIntervalLk sync.Mutex
rebroadcastInterval time.Duration
rebroadcastTimer *clock.Timer
// For performance reasons we just clear out the fields of the message
Expand Down Expand Up @@ -389,9 +389,9 @@ func (mq *MessageQueue) SetRebroadcastInterval(delay time.Duration) {

// Startup starts the processing of messages and rebroadcasting.
func (mq *MessageQueue) Startup() {
mq.rebroadcastIntervalLk.RLock()
mq.rebroadcastIntervalLk.Lock()
mq.rebroadcastTimer = mq.clock.Timer(mq.rebroadcastInterval)
mq.rebroadcastIntervalLk.RUnlock()
mq.rebroadcastIntervalLk.Unlock()
go mq.runQueue()
}

Expand Down Expand Up @@ -422,7 +422,7 @@ func (mq *MessageQueue) runQueue() {
}

var workScheduled time.Time
for mq.ctx.Err() == nil {
for {
select {
case <-mq.rebroadcastTimer.C:
mq.rebroadcastWantlist()
Expand Down Expand Up @@ -471,9 +471,9 @@ func (mq *MessageQueue) runQueue() {

// Periodically resend the list of wants to the peer
func (mq *MessageQueue) rebroadcastWantlist() {
mq.rebroadcastIntervalLk.RLock()
mq.rebroadcastIntervalLk.Lock()
mq.rebroadcastTimer.Reset(mq.rebroadcastInterval)
mq.rebroadcastIntervalLk.RUnlock()
mq.rebroadcastIntervalLk.Unlock()

// If some wants were transferred from the rebroadcast list
if mq.transferRebroadcastWants() {
Expand Down
3 changes: 2 additions & 1 deletion bitswap/client/internal/notifications/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,13 @@ func (ps *impl) Shutdown() {
// corresponding to |keys|.
func (ps *impl) Subscribe(ctx context.Context, keys ...cid.Cid) <-chan blocks.Block {
blocksCh := make(chan blocks.Block, len(keys))
valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking
if len(keys) == 0 {
close(blocksCh)
return blocksCh
}

valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking

// prevent shutdown
ps.lk.RLock()
defer ps.lk.RUnlock()
Expand Down
2 changes: 1 addition & 1 deletion bitswap/client/internal/peermanager/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type PeerManager struct {
createPeerQueue PeerQueueFactory
ctx context.Context

psLk sync.RWMutex
psLk sync.Mutex
sessions map[uint64]Session
peerSessions map[peer.ID]map[uint64]struct{}

Expand Down
10 changes: 6 additions & 4 deletions bitswap/client/internal/peermanager/peerwantmanager.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package peermanager

import (
"bytes"
"fmt"
"strings"

cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -158,8 +158,6 @@ func (pwm *peerWantManager) broadcastWantHaves(wantHaves []cid.Cid) {
// sendWants only sends the peer the want-blocks and want-haves that have not
// already been sent to it.
func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) {
fltWantBlks := make([]cid.Cid, 0, len(wantBlocks))
fltWantHvs := make([]cid.Cid, 0, len(wantHaves))

// Get the existing want-blocks and want-haves for the peer
pws, ok := pwm.peerWants[p]
Expand All @@ -169,6 +167,8 @@ func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves
return
}

fltWantBlks := make([]cid.Cid, 0, len(wantBlocks))

// Iterate over the requested want-blocks
for _, c := range wantBlocks {
// If the want-block hasn't been sent to the peer
Expand Down Expand Up @@ -198,6 +198,8 @@ func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves
pwm.reverseIndexAdd(c, p)
}

fltWantHvs := make([]cid.Cid, 0, len(wantHaves))

// Iterate over the requested want-haves
for _, c := range wantHaves {
// If we've already broadcasted this want, don't bother with a
Expand Down Expand Up @@ -450,7 +452,7 @@ func (pwm *peerWantManager) getWants() []cid.Cid {
}

func (pwm *peerWantManager) String() string {
var b bytes.Buffer
var b strings.Builder
for p, ws := range pwm.peerWants {
b.WriteString(fmt.Sprintf("Peer %s: %d want-have / %d want-block:\n", p, ws.wantHaves.Len(), ws.wantBlocks.Len()))
for _, c := range ws.wantHaves.Keys() {
Expand Down
6 changes: 2 additions & 4 deletions bitswap/client/internal/session/peerresponsetracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ func (prt *peerResponseTracker) choose(peers []peer.ID) peer.ID {
return ""
}

rnd := rand.Float64()

// Find the total received blocks for all candidate peers
total := 0
for _, p := range peers {
Expand All @@ -41,6 +39,7 @@ func (prt *peerResponseTracker) choose(peers []peer.ID) peer.ID {

// Choose one of the peers with a chance proportional to the number
// of blocks received from that peer
rnd := rand.Float64()
counted := 0.0
for _, p := range peers {
counted += float64(prt.getPeerCount(p)) / float64(total)
Expand All @@ -52,8 +51,7 @@ func (prt *peerResponseTracker) choose(peers []peer.ID) peer.ID {
// We shouldn't get here unless there is some weirdness with floating point
// math that doesn't quite cover the whole range of peers in the for loop
// so just choose the last peer.
index := len(peers) - 1
return peers[index]
return peers[len(peers)-1]
}

// getPeerCount returns the number of times the peer was first to send us a
Expand Down
9 changes: 7 additions & 2 deletions bitswap/client/internal/session/sessionwants.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,12 @@ func (sw *sessionWants) GetNextWants() []cid.Cid {
// limit)
currentLiveCount := len(sw.liveWants)
toAdd := sw.broadcastLimit - currentLiveCount
liveSize := min(toAdd, sw.toFetch.Len())
if liveSize == 0 {
return nil
}

var live []cid.Cid
live := make([]cid.Cid, 0, liveSize)
for ; toAdd > 0 && sw.toFetch.Len() > 0; toAdd-- {
c := sw.toFetch.Pop()
live = append(live, c)
Expand Down Expand Up @@ -117,6 +121,7 @@ func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration)
cleaned = append(cleaned, c)
}
}
clear(sw.liveWantsOrder[len(cleaned):]) // GC cleared items
sw.liveWantsOrder = cleaned
}

Expand All @@ -127,7 +132,7 @@ func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration)
// live want CIDs up to the broadcast limit.
func (sw *sessionWants) PrepareBroadcast() []cid.Cid {
now := time.Now()
live := make([]cid.Cid, 0, len(sw.liveWants))
live := make([]cid.Cid, 0, min(len(sw.liveWants), sw.broadcastLimit))
for _, c := range sw.liveWantsOrder {
if _, ok := sw.liveWants[c]; ok {
// No response was received for the want, so reset the sent time
Expand Down
8 changes: 3 additions & 5 deletions bitswap/client/internal/session/sessionwantsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,7 @@ func (sws *sessionWantSender) Cancel(ks []cid.Cid) {
// Update is called when the session receives a message with incoming blocks
// or HAVE / DONT_HAVE
func (sws *sessionWantSender) Update(from peer.ID, ks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) {
hasUpdate := len(ks) > 0 || len(haves) > 0 || len(dontHaves) > 0
if !hasUpdate {
if len(ks) == 0 && len(haves) == 0 && len(dontHaves) == 0 {
return
}

Expand Down Expand Up @@ -349,8 +348,7 @@ func (sws *sessionWantSender) trackWant(c cid.Cid) {
}

// Create the want info
wi := newWantInfo(sws.peerRspTrkr)
sws.wants[c] = wi
sws.wants[c] = newWantInfo(sws.peerRspTrkr)

// For each available peer, register any information we know about
// whether the peer has the block
Expand Down Expand Up @@ -481,7 +479,7 @@ func (sws *sessionWantSender) checkForExhaustedWants(dontHaves []cid.Cid, newlyU
// (because it may be the last peer who hadn't sent a DONT_HAVE for a CID)
if len(newlyUnavailable) > 0 {
// Collect all pending wants
wants = make([]cid.Cid, len(sws.wants))
wants = make([]cid.Cid, 0, len(sws.wants))
for c := range sws.wants {
wants = append(wants, c)
}
Expand Down
8 changes: 4 additions & 4 deletions bitswap/client/internal/sessionmanager/sessionmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type SessionManager struct {
notif notifications.PubSub

// Sessions
sessLk sync.RWMutex
sessLk sync.Mutex
sessions map[uint64]Session

// Session Index
Expand Down Expand Up @@ -159,13 +159,13 @@ func (sm *SessionManager) ReceiveFrom(ctx context.Context, p peer.ID, blks []cid

// Notify each session that is interested in the blocks / HAVEs / DONT_HAVEs
for _, id := range sm.sessionInterestManager.InterestedSessions(blks, haves, dontHaves) {
sm.sessLk.RLock()
sm.sessLk.Lock()
if sm.sessions == nil { // check if SessionManager was shutdown
sm.sessLk.RUnlock()
sm.sessLk.Unlock()
return
}
sess, ok := sm.sessions[id]
sm.sessLk.RUnlock()
sm.sessLk.Unlock()

if ok {
sess.ReceiveFrom(p, blks, haves, dontHaves)
Expand Down

0 comments on commit d168417

Please sign in to comment.