diff --git a/internal/decision/engine.go b/internal/decision/engine.go index cb7b2be3..67f912a4 100644 --- a/internal/decision/engine.go +++ b/internal/decision/engine.go @@ -144,11 +144,14 @@ type Engine struct { tagQueued, tagUseful string - lock sync.RWMutex // protects the fields immediatly below + lock sync.RWMutex // protects the fields immediately below // ledgerMap lists block-related Ledgers by their Partner key. ledgerMap map[peer.ID]*ledger + // peerLedger saves which peers are waiting for a Cid + peerLedger *peerLedger + // an external ledger dealing with peer scores scoreLedger ScoreLedger @@ -191,6 +194,7 @@ func newEngine(bs bstore.Blockstore, bstoreWorkerCount int, peerTagger PeerTagge taskWorkerCount: taskWorkerCount, sendDontHaves: true, self: self, + peerLedger: newPeerLedger(), } e.tagQueued = fmt.Sprintf(tagFormat, "queued", uuid.New().String()) e.tagUseful = fmt.Sprintf(tagFormat, "useful", uuid.New().String()) @@ -456,6 +460,15 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap return } + e.lock.Lock() + for _, entry := range wants { + e.peerLedger.Wants(p, entry.Cid) + } + for _, entry := range cancels { + e.peerLedger.CancelWant(p, entry.Cid) + } + e.lock.Unlock() + // Get the ledger for the peer l := e.findOrCreate(p) l.lk.Lock() @@ -588,40 +601,42 @@ func (e *Engine) ReceiveFrom(from peer.ID, blks []blocks.Block) { } // Check each peer to see if it wants one of the blocks we received - work := false + var work bool e.lock.RLock() + for _, b := range blks { + k := b.Cid() - for _, l := range e.ledgerMap { - l.lk.RLock() - - for _, b := range blks { - k := b.Cid() - - if entry, ok := l.WantListContains(k); ok { - work = true - - blockSize := blockSizes[k] - isWantBlock := e.sendAsBlock(entry.WantType, blockSize) + for _, p := range e.peerLedger.Peers(k) { + ledger, ok := e.ledgerMap[p] + if !ok { + continue + } + entry, ok := ledger.WantListContains(k) + if !ok { // should never happen + continue + } + work = true - entrySize := blockSize - if !isWantBlock { - entrySize = bsmsg.BlockPresenceSize(k) - } + blockSize := blockSizes[k] + isWantBlock := e.sendAsBlock(entry.WantType, blockSize) - e.peerRequestQueue.PushTasks(l.Partner, peertask.Task{ - Topic: entry.Cid, - Priority: int(entry.Priority), - Work: entrySize, - Data: &taskData{ - BlockSize: blockSize, - HaveBlock: true, - IsWantBlock: isWantBlock, - SendDontHave: false, - }, - }) + entrySize := blockSize + if !isWantBlock { + entrySize = bsmsg.BlockPresenceSize(k) } + + e.peerRequestQueue.PushTasks(p, peertask.Task{ + Topic: entry.Cid, + Priority: int(entry.Priority), + Work: entrySize, + Data: &taskData{ + BlockSize: blockSize, + HaveBlock: true, + IsWantBlock: isWantBlock, + SendDontHave: false, + }, + }) } - l.lk.RUnlock() } e.lock.RUnlock() @@ -677,6 +692,12 @@ func (e *Engine) PeerDisconnected(p peer.ID) { e.lock.Lock() defer e.lock.Unlock() + ledger, ok := e.ledgerMap[p] + if ok { + for _, entry := range ledger.Entries() { + e.peerLedger.CancelWant(p, entry.Cid) + } + } delete(e.ledgerMap, p) e.scoreLedger.PeerDisconnected(p) diff --git a/internal/decision/ledger.go b/internal/decision/ledger.go index a607ff4f..58723d0f 100644 --- a/internal/decision/ledger.go +++ b/internal/decision/ledger.go @@ -6,8 +6,8 @@ import ( pb "github.com/ipfs/go-bitswap/message/pb" wl "github.com/ipfs/go-bitswap/wantlist" - cid "github.com/ipfs/go-cid" - peer "github.com/libp2p/go-libp2p-core/peer" + "github.com/ipfs/go-cid" + "github.com/libp2p/go-libp2p-core/peer" ) func newLedger(p peer.ID) *ledger { @@ -40,3 +40,7 @@ func (l *ledger) CancelWant(k cid.Cid) bool { func (l *ledger) WantListContains(k cid.Cid) (wl.Entry, bool) { return l.wantList.Contains(k) } + +func (l *ledger) Entries() []wl.Entry { + return l.wantList.Entries() +} diff --git a/internal/decision/peer_ledger.go b/internal/decision/peer_ledger.go new file mode 100644 index 00000000..d5616cec --- /dev/null +++ b/internal/decision/peer_ledger.go @@ -0,0 +1,46 @@ +package decision + +import ( + "github.com/ipfs/go-cid" + "github.com/libp2p/go-libp2p-core/peer" +) + +type peerLedger struct { + cids map[cid.Cid]map[peer.ID]struct{} +} + +func newPeerLedger() *peerLedger { + return &peerLedger{cids: make(map[cid.Cid]map[peer.ID]struct{})} +} + +func (l *peerLedger) Wants(p peer.ID, k cid.Cid) { + m, ok := l.cids[k] + if !ok { + m = make(map[peer.ID]struct{}) + l.cids[k]=m + } + m[p] = struct{}{} +} + +func (l *peerLedger) CancelWant(p peer.ID, k cid.Cid) { + m, ok := l.cids[k] + if !ok { + return + } + delete(m, p) + if len(m) == 0 { + delete(l.cids, k) + } +} + +func (l *peerLedger) Peers(k cid.Cid) []peer.ID { + m, ok := l.cids[k] + if !ok { + return nil + } + peers := make([]peer.ID, 0, len(m)) + for p := range m { + peers = append(peers, p) + } + return peers +}