Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
feat: add peer block filter option
Browse files Browse the repository at this point in the history
This feature lets a user configure a function that will
allow / deny request for a block coming from a peer.
  • Loading branch information
laurentsenta committed Feb 28, 2022
1 parent dbfc6a1 commit dd1e961
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 2 deletions.
11 changes: 11 additions & 0 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,15 @@ func WithTargetMessageSize(tms int) Option {
}
}

func WithPeerBlockRequestFilter(pbrf PeerBlockRequestFilter) Option {
return func(bs *Bitswap) {
bs.peerBlockRequestFilter = pbrf
}
}

type TaskInfo = decision.TaskInfo
type TaskComparator = decision.TaskComparator
type PeerBlockRequestFilter = decision.PeerBlockRequestFilter

// WithTaskComparator configures custom task prioritization logic.
func WithTaskComparator(comparator TaskComparator) Option {
Expand Down Expand Up @@ -291,6 +298,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
activeBlocksGauge,
decision.WithTaskComparator(bs.taskComparator),
decision.WithTargetMessageSize(bs.engineTargetMessageSize),
decision.WithPeerBlockRequestFilter(bs.peerBlockRequestFilter),
)
bs.engine.SetSendDontHaves(bs.engineSetSendDontHaves)

Expand Down Expand Up @@ -399,6 +407,9 @@ type Bitswap struct {
simulateDontHavesOnTimeout bool

taskComparator TaskComparator

// an optional feature to accept / deny HAVE - DONT HAVE requests
peerBlockRequestFilter PeerBlockRequestFilter
}

type counters struct {
Expand Down
22 changes: 20 additions & 2 deletions internal/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ type Engine struct {
metricUpdateCounter int

taskComparator TaskComparator

peerBlockRequestFilter PeerBlockRequestFilter
}

// TaskInfo represents the details of a request from a peer.
Expand All @@ -201,6 +203,10 @@ type TaskInfo struct {
// It should return true if task 'ta' has higher priority than task 'tb'
type TaskComparator func(ta, tb *TaskInfo) bool

// PeerBlockRequestFilter is used to accept / deny requests for a CID coming from a PeerID
// It should return true if the request should be fullfilled.
type PeerBlockRequestFilter func(p peer.ID, c cid.Cid) bool

type Option func(*Engine)

func WithTaskComparator(comparator TaskComparator) Option {
Expand All @@ -209,6 +215,12 @@ func WithTaskComparator(comparator TaskComparator) Option {
}
}

func WithPeerBlockRequestFilter(pbrf PeerBlockRequestFilter) Option {
return func(e *Engine) {
e.peerBlockRequestFilter = pbrf
}
}

func WithTargetMessageSize(size int) Option {
return func(e *Engine) {
e.targetMessageSize = size
Expand Down Expand Up @@ -647,8 +659,14 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
// Add each want-have / want-block to the ledger
l.Wants(c, entry.Priority, entry.WantType)

// If the block was not found
if !found {
// Check if the peer is allowed to retrieve this block
passFilter := true
if e.peerBlockRequestFilter != nil {
passFilter = e.peerBlockRequestFilter(p, c)
}

// If the block was not found or the peer doesn't pass the policy
if !found || !passFilter {
log.Debugw("Bitswap engine: block not found", "local", e.self, "from", p, "cid", entry.Cid, "sendDontHave", entry.SendDontHave)

// Only add the task to the queue if the requester wants a DONT_HAVE
Expand Down
86 changes: 86 additions & 0 deletions internal/decision/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1112,6 +1112,92 @@ func TestTaskComparator(t *testing.T) {
}
}

func TestPeerBlockFilter(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

// Generate a few keys
keys := []string{"a", "b", "c"}
cids := make(map[cid.Cid]int)
blks := make([]blocks.Block, 0, len(keys))
for i, letter := range keys {
block := blocks.NewBlock([]byte(letter))
blks = append(blks, block)
cids[block.Cid()] = i
}

// Generate a few peers
peerIDs := make([]peer.ID, len(keys))
for _, i := range cids {
peerID := libp2ptest.RandPeerIDFatal(t)
peerIDs[i] = peerID
}

// Setup the peer
fpt := &fakePeerTagger{}
sl := NewTestScoreLedger(shortTerm, nil, clock.New())
bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
if err := bs.PutMany(ctx, blks); err != nil {
t.Fatal(err)
}

// use a single task worker so that the order of outgoing messages is deterministic
engineTaskWorkerCount := 1
e := newEngineForTesting(ctx, bs, 4, engineTaskWorkerCount, defaults.BitswapMaxOutstandingBytesPerPeer, fpt, "localhost", 0, sl,
// if this Option is omitted, the test fails
WithPeerBlockRequestFilter(func(p peer.ID, c cid.Cid) bool {
// peer 0 has access to everything
if p == peerIDs[0] {
return true
}
// peer 1 has access to key b and c
if p == peerIDs[1] {
return blks[1].Cid().Equals(c) || blks[2].Cid().Equals(c)
}
// peer 2 and other have access to key c
return blks[2].Cid().Equals(c)
}),
)
e.StartWorkers(ctx, process.WithTeardown(func() error { return nil }))

// Create wants requests
for _, peerID := range peerIDs {
partnerWantBlocks(e, keys, peerID)
}

// check that outgoing messages are sent with the correct content
checkPeer := func(peerIndex int, expectedBlocks []blocks.Block) {
next := <-e.Outbox()
envelope := <-next

peerID := peerIDs[peerIndex]
responseBlocks := envelope.Message.Blocks()

if peerID != envelope.Peer {
t.Errorf("(Peer%v) expected message for peer ID %#v but instead got message for peer ID %#v", peerIndex, peerID, envelope.Peer)
}

if len(responseBlocks) != len(expectedBlocks) {
t.Errorf("(Peer%v) expected %v block in response but instead got %v", peerIndex, len(expectedBlocks), len(responseBlocks))
}

responseBlockSet := make(map[cid.Cid]bool)
for _, b := range responseBlocks {
responseBlockSet[b.Cid()] = true
}

for _, b := range expectedBlocks {
if !responseBlockSet[b.Cid()] {
t.Errorf("(Peer%v) expected block with CID %v", peerIndex, b.Cid())
}
}
}

checkPeer(0, blks[0:3])
checkPeer(1, blks[1:3])
checkPeer(2, blks[2:3])
}

func TestTaggingPeers(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
Expand Down

0 comments on commit dd1e961

Please sign in to comment.