Skip to content

Commit

Permalink
Fix wantlist overflow handling to select newer entries.
Browse files Browse the repository at this point in the history
wantlist overflow handling now cancels existing entries to make room for newer requests. This fix prevents the wantlist from filling up with CIDs that the server does not have.

Fixes #527
  • Loading branch information
gammazero committed Jul 3, 2024
1 parent 733fa55 commit a06156c
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 30 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ The following emojis are used to highlight certain changes:
### Changed

- `boxo/gateway` is now tested against [gateway-conformance v6](https://github.com/ipfs/gateway-conformance/releases/tag/v0.6.0)
- `bitswap/client` supports additional tracing
- `bitswap/client` supports additional tracing

### Removed

Expand All @@ -41,6 +41,7 @@ The following emojis are used to highlight certain changes:

- `routing/http`: the `FindPeer` now returns `routing.ErrNotFound` when no addresses are found
- `routing/http`: the `FindProvidersAsync` no longer causes a goroutine buildup
- bitswap wantlist overflow handling now cancels existing entries to make room for newer entries. This fix prevents the wantlist from filling up with CIDs that the server does not have.

## [v0.20.0]

Expand Down
89 changes: 61 additions & 28 deletions bitswap/server/internal/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package decision
import (
"context"
"fmt"
"math/bits"
"sync"
"time"

Expand Down Expand Up @@ -134,7 +133,7 @@ type PeerLedger interface {
// Wants informs the ledger that [peer.ID] wants [wl.Entry].
Wants(p peer.ID, e wl.Entry)

// CancelWant returns true if the [cid.Cid] is present in the wantlist of [peer.ID].
// CancelWant returns true if the [cid.Cid] was removed from the wantlist of [peer.ID].
CancelWant(p peer.ID, k cid.Cid) bool

// CancelWantWithType will not cancel WantBlock if we sent a HAVE message.
Expand Down Expand Up @@ -702,38 +701,72 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
e.peerLedger.ClearPeerWantlist(p)
}

s := uint(e.peerLedger.WantlistSizeForPeer(p))
if wouldBe := s + uint(len(wants)); wouldBe > e.maxQueuedWantlistEntriesPerPeer {
log.Debugw("wantlist overflow", "local", e.self, "remote", p, "would be", wouldBe)
// truncate wantlist to avoid overflow
available, o := bits.Sub(e.maxQueuedWantlistEntriesPerPeer, s, 0)
if o != 0 {
available = 0
if len(wants) != 0 {
filteredWants := wants[:0] // shift inplace
for _, entry := range wants {
if entry.Cid.Prefix().MhType == mh.IDENTITY {
// This is a truely broken client, let's kill the connection.
e.lock.Unlock()
log.Warnw("peer wants an identity CID", "local", e.self, "remote", p)
return true
}
if e.maxCidSize != 0 && uint(entry.Cid.ByteLen()) > e.maxCidSize {
// Ignore requests about CIDs that big.
continue
}
filteredWants = append(filteredWants, entry)
if len(filteredWants) == int(e.maxQueuedWantlistEntriesPerPeer) {
// filteredWants at limit, ignore remaining wants from request.
log.Debugw("requested wants exceeds max wantlist size", "local", e.self, "remote", p, "ignoring", len(wants)-len(filteredWants))
break
}
}
wants = wants[:available]
}

filteredWants := wants[:0] // shift inplace

for _, entry := range wants {
if entry.Cid.Prefix().MhType == mh.IDENTITY {
// This is a truely broken client, let's kill the connection.
e.lock.Unlock()
log.Warnw("peer wants an identity CID", "local", e.self, "remote", p)
return true
wants = wants[len(filteredWants):]
for i := range wants {
wants[i] = bsmsg.Entry{} // early GC
}
if e.maxCidSize != 0 && uint(entry.Cid.ByteLen()) > e.maxCidSize {
// Ignore requests about CIDs that big.
continue
wants = filteredWants

// Ensure sufficient space for new wants.
s := e.peerLedger.WantlistSizeForPeer(p)
available := int(e.maxQueuedWantlistEntriesPerPeer) - s
if len(wants) > available {
needSpace := len(wants) - available
log.Debugw("wantlist overflow", "local", e.self, "remote", p, "would be", s+len(wants), "canceling", needSpace)
// Cancel any wants that are being requested again. This makes room
// for new wants and minimizes that existing wants to cancel that
// are not in the new request.
for _, entry := range wants {
if e.peerLedger.CancelWant(p, entry.Cid) {
e.peerRequestQueue.Remove(entry.Cid, p)
needSpace--
if needSpace == 0 {
break
}
}
}
// Cancel additional wants, that are not being replaced, to make
// room for new wants.
if needSpace != 0 {
wl := e.peerLedger.WantlistForPeer(p)
for i := range wl {
entCid := wl[i].Cid
if e.peerLedger.CancelWant(p, entCid) {
e.peerRequestQueue.Remove(entCid, p)
needSpace--
if needSpace == 0 {
break
}
}
}
}
}

e.peerLedger.Wants(p, entry.Entry)
filteredWants = append(filteredWants, entry)
for _, entry := range wants {
e.peerLedger.Wants(p, entry.Entry)
}
}
// Clear truncated entries - early GC.
clear(wants[len(filteredWants):])

wants = filteredWants
for _, entry := range cancels {
c := entry.Cid
if c.Prefix().MhType == mh.IDENTITY {
Expand Down
58 changes: 58 additions & 0 deletions bitswap/server/internal/decision/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1733,3 +1733,61 @@ func TestKillConnectionForInlineCid(t *testing.T) {
t.Fatal("connection was not killed when receiving inline in cancel")
}
}

func TestWantlistOverflow(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

const limit = 32
warsaw := newTestEngine(ctx, "warsaw", WithMaxQueuedWantlistEntriesPerPeer(limit))
riga := newTestEngine(ctx, "riga")

m := message.New(false)
for i := 0; i < limit+(limit/2); i++ {
m.AddEntry(blocks.NewBlock([]byte(fmt.Sprint(i))).Cid(), 0, pb.Message_Wantlist_Block, true)
}
warsaw.Engine.MessageReceived(ctx, riga.Peer, m)

if warsaw.Peer == riga.Peer {
t.Fatal("Sanity Check: Peers have same Key!")
}

// Check that the wantlist is at the size limit, and limit/2 wants ignored.
wl := warsaw.Engine.WantlistForPeer(riga.Peer)
if len(wl) != limit {
t.Fatal("wantlist does not match limit", len(wl))
}

m = message.New(false)
blockCids := make([]cid.Cid, limit/2+4)
for i := 0; i < limit/2+4; i++ {
c := blocks.NewBlock([]byte(fmt.Sprint(i + limit))).Cid()
m.AddEntry(c, 0, pb.Message_Wantlist_Block, true)
blockCids[i] = c
}
warsaw.Engine.MessageReceived(ctx, riga.Peer, m)
wl = warsaw.Engine.WantlistForPeer(riga.Peer)

// Check that wantlist is still at size limit.
if len(wl) != limit {
t.Fatalf("wantlist size %d does not match limit %d", len(wl), limit)
}

// Check that all new blocks are in wantlist.
var missing int
for _, c := range blockCids {
var found bool
for i := range wl {
if wl[i].Cid == c {
found = true
break
}
}
if !found {
missing++
}
}
if missing != 0 {
t.Fatalf("Missing %d new wants expected in wantlist", missing)
}
}
3 changes: 2 additions & 1 deletion bitswap/server/internal/decision/peer_ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ func (l *DefaultPeerLedger) CancelWant(p peer.ID, k cid.Cid) bool {
if !ok {
return false
}
_, had := wants[k]
delete(wants, k)
if len(wants) == 0 {
delete(l.peers, p)
}

l.removePeerFromCid(p, k)
return true
return had
}

func (l *DefaultPeerLedger) CancelWantWithType(p peer.ID, k cid.Cid, typ pb.Message_Wantlist_WantType) {
Expand Down

0 comments on commit a06156c

Please sign in to comment.