Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
introduce a ledger that stores which peers are waiting for a Cid
Browse files Browse the repository at this point in the history
When receiving a new block (Engine.ReceiveFrom), we shouldn't have to loop over
all peers in order to determine if they need this block. Instead, use a map to
save which peers are waiting for a give Cid.
  • Loading branch information
marten-seemann committed May 12, 2021
1 parent a016d39 commit 99b09fe
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 31 deletions.
81 changes: 52 additions & 29 deletions internal/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,14 @@ type Engine struct {

tagQueued, tagUseful string

lock sync.RWMutex // protects the fields immediatly below
lock sync.RWMutex // protects the fields immediately below

// ledgerMap lists block-related Ledgers by their Partner key.
ledgerMap map[peer.ID]*ledger

// peerLedger saves which peers are waiting for a Cid
peerLedger *peerLedger

// an external ledger dealing with peer scores
scoreLedger ScoreLedger

Expand Down Expand Up @@ -191,6 +194,7 @@ func newEngine(bs bstore.Blockstore, bstoreWorkerCount int, peerTagger PeerTagge
taskWorkerCount: taskWorkerCount,
sendDontHaves: true,
self: self,
peerLedger: newPeerLedger(),
}
e.tagQueued = fmt.Sprintf(tagFormat, "queued", uuid.New().String())
e.tagUseful = fmt.Sprintf(tagFormat, "useful", uuid.New().String())
Expand Down Expand Up @@ -456,6 +460,15 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
return
}

e.lock.Lock()
for _, entry := range wants {
e.peerLedger.Wants(p, entry.Cid)
}
for _, entry := range cancels {
e.peerLedger.CancelWant(p, entry.Cid)
}
e.lock.Unlock()

// Get the ledger for the peer
l := e.findOrCreate(p)
l.lk.Lock()
Expand Down Expand Up @@ -588,40 +601,44 @@ func (e *Engine) ReceiveFrom(from peer.ID, blks []blocks.Block) {
}

// Check each peer to see if it wants one of the blocks we received
work := false
var work bool
e.lock.RLock()
for _, b := range blks {
k := b.Cid()

for _, l := range e.ledgerMap {
l.lk.RLock()

for _, b := range blks {
k := b.Cid()

if entry, ok := l.WantListContains(k); ok {
work = true

blockSize := blockSizes[k]
isWantBlock := e.sendAsBlock(entry.WantType, blockSize)
for _, p := range e.peerLedger.Peers(k) {
ledger, ok := e.ledgerMap[p]
if !ok {
continue
}
ledger.lk.RLock()
entry, ok := ledger.WantListContains(k)
ledger.lk.RUnlock()
if !ok { // should never happen
continue
}
work = true

entrySize := blockSize
if !isWantBlock {
entrySize = bsmsg.BlockPresenceSize(k)
}
blockSize := blockSizes[k]
isWantBlock := e.sendAsBlock(entry.WantType, blockSize)

e.peerRequestQueue.PushTasks(l.Partner, peertask.Task{
Topic: entry.Cid,
Priority: int(entry.Priority),
Work: entrySize,
Data: &taskData{
BlockSize: blockSize,
HaveBlock: true,
IsWantBlock: isWantBlock,
SendDontHave: false,
},
})
entrySize := blockSize
if !isWantBlock {
entrySize = bsmsg.BlockPresenceSize(k)
}

e.peerRequestQueue.PushTasks(p, peertask.Task{
Topic: entry.Cid,
Priority: int(entry.Priority),
Work: entrySize,
Data: &taskData{
BlockSize: blockSize,
HaveBlock: true,
IsWantBlock: isWantBlock,
SendDontHave: false,
},
})
}
l.lk.RUnlock()
}
e.lock.RUnlock()

Expand Down Expand Up @@ -677,6 +694,12 @@ func (e *Engine) PeerDisconnected(p peer.ID) {
e.lock.Lock()
defer e.lock.Unlock()

ledger, ok := e.ledgerMap[p]
if ok {
for _, entry := range ledger.Entries() {
e.peerLedger.CancelWant(p, entry.Cid)
}
}
delete(e.ledgerMap, p)

e.scoreLedger.PeerDisconnected(p)
Expand Down
8 changes: 6 additions & 2 deletions internal/decision/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
pb "github.com/ipfs/go-bitswap/message/pb"
wl "github.com/ipfs/go-bitswap/wantlist"

cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/peer"
)

func newLedger(p peer.ID) *ledger {
Expand Down Expand Up @@ -40,3 +40,7 @@ func (l *ledger) CancelWant(k cid.Cid) bool {
func (l *ledger) WantListContains(k cid.Cid) (wl.Entry, bool) {
return l.wantList.Contains(k)
}

func (l *ledger) Entries() []wl.Entry {
return l.wantList.Entries()
}
46 changes: 46 additions & 0 deletions internal/decision/peer_ledger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package decision

import (
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/peer"
)

type peerLedger struct {
cids map[cid.Cid]map[peer.ID]struct{}
}

func newPeerLedger() *peerLedger {
return &peerLedger{cids: make(map[cid.Cid]map[peer.ID]struct{})}
}

func (l *peerLedger) Wants(p peer.ID, k cid.Cid) {
m, ok := l.cids[k]
if !ok {
m = make(map[peer.ID]struct{})
l.cids[k]=m
}
m[p] = struct{}{}
}

func (l *peerLedger) CancelWant(p peer.ID, k cid.Cid) {
m, ok := l.cids[k]
if !ok {
return
}
delete(m, p)
if len(m) == 0 {
delete(l.cids, k)
}
}

func (l *peerLedger) Peers(k cid.Cid) []peer.ID {
m, ok := l.cids[k]
if !ok {
return nil
}
peers := make([]peer.ID, 0, len(m))
for p := range m {
peers = append(peers, p)
}
return peers
}

0 comments on commit 99b09fe

Please sign in to comment.