Skip to content

Commit

Permalink
Merge pull request ipfs/go-bitswap#174 from ipfs/fix/dont-ignore-pend…
Browse files Browse the repository at this point in the history
…ing-blocks

Fix: don't ignore received blocks for pending wants

This commit was moved from ipfs/go-bitswap@7944a99
  • Loading branch information
Stebalien authored Aug 23, 2019
2 parents 957b6ca + 31cffd7 commit a748e62
Show file tree
Hide file tree
Showing 9 changed files with 544 additions and 286 deletions.
20 changes: 10 additions & 10 deletions bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,14 +273,14 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks
// HasBlock announces the existence of a block to this bitswap service. The
// service will potentially notify its peers.
func (bs *Bitswap) HasBlock(blk blocks.Block) error {
return bs.receiveBlocksFrom("", []blocks.Block{blk})
return bs.receiveBlocksFrom(context.Background(), "", []blocks.Block{blk})
}

// TODO: Some of this stuff really only needs to be done when adding a block
// from the user, not when receiving it from the network.
// In case you run `git blame` on this comment, I'll save you some time: ask
// @whyrusleeping, I don't know the answers you seek.
func (bs *Bitswap) receiveBlocksFrom(from peer.ID, blks []blocks.Block) error {
func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []blocks.Block) error {
select {
case <-bs.process.Closing():
return errors.New("bitswap is closed")
Expand All @@ -294,7 +294,7 @@ func (bs *Bitswap) receiveBlocksFrom(from peer.ID, blks []blocks.Block) error {
// Split blocks into wanted blocks vs duplicates
wanted = make([]blocks.Block, 0, len(blks))
for _, b := range blks {
if bs.wm.IsWanted(b.Cid()) {
if bs.sm.IsWanted(b.Cid()) {
wanted = append(wanted, b)
} else {
log.Debugf("[recv] block not in wantlist; cid=%s, peer=%s", b.Cid(), from)
Expand Down Expand Up @@ -354,6 +354,12 @@ func (bs *Bitswap) receiveBlocksFrom(from peer.ID, blks []blocks.Block) error {
}
}

if from != "" {
for _, b := range wanted {
log.Event(ctx, "Bitswap.GetBlockRequest.End", b.Cid())
}
}

return nil
}

Expand Down Expand Up @@ -382,17 +388,11 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
}

// Process blocks
err := bs.receiveBlocksFrom(p, iblocks)
err := bs.receiveBlocksFrom(ctx, p, iblocks)
if err != nil {
log.Warningf("ReceiveMessage recvBlockFrom error: %s", err)
return
}

for _, b := range iblocks {
if bs.wm.IsWanted(b.Cid()) {
log.Event(ctx, "Bitswap.GetBlockRequest.End", b.Cid())
}
}
}

func (bs *Bitswap) updateReceiveCounters(blocks []blocks.Block) {
Expand Down
65 changes: 65 additions & 0 deletions bitswap/bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
delay "github.com/ipfs/go-ipfs-delay"
mockrouting "github.com/ipfs/go-ipfs-routing/mock"
peer "github.com/libp2p/go-libp2p-core/peer"
p2ptestutil "github.com/libp2p/go-libp2p-netutil"
travis "github.com/libp2p/go-libp2p-testing/ci/travis"
tu "github.com/libp2p/go-libp2p-testing/etc"
Expand Down Expand Up @@ -138,6 +139,8 @@ func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) {
}
}

// Tests that a received block is not stored in the blockstore if the block was
// not requested by the client
func TestUnwantedBlockNotAdded(t *testing.T) {

net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
Expand Down Expand Up @@ -170,6 +173,68 @@ func TestUnwantedBlockNotAdded(t *testing.T) {
}
}

// Tests that a received block is returned to the client and stored in the
// blockstore in the following scenario:
// - the want for the block has been requested by the client
// - the want for the block has not yet been sent out to a peer
// (because the live request queue is full)
func TestPendingBlockAdded(t *testing.T) {
ctx := context.Background()
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
bg := blocksutil.NewBlockGenerator()
sessionBroadcastWantCapacity := 4

ig := testinstance.NewTestInstanceGenerator(net)
defer ig.Close()

instance := ig.Instances(1)[0]
defer instance.Exchange.Close()

oneSecCtx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

// Request enough blocks to exceed the session's broadcast want list
// capacity (by one block). The session will put the remaining block
// into the "tofetch" queue
blks := bg.Blocks(sessionBroadcastWantCapacity + 1)
ks := make([]cid.Cid, 0, len(blks))
for _, b := range blks {
ks = append(ks, b.Cid())
}
outch, err := instance.Exchange.GetBlocks(ctx, ks)
if err != nil {
t.Fatal(err)
}

// Wait a little while to make sure the session has time to process the wants
time.Sleep(time.Millisecond * 20)

// Simulate receiving a message which contains the block in the "tofetch" queue
lastBlock := blks[len(blks)-1]
bsMessage := message.New(true)
bsMessage.AddBlock(lastBlock)
unknownPeer := peer.ID("QmUHfvCQrzyR6vFXmeyCptfCWedfcmfa12V6UuziDtrw23")
instance.Exchange.ReceiveMessage(oneSecCtx, unknownPeer, bsMessage)

// Make sure Bitswap adds the block to the output channel
blkrecvd, ok := <-outch
if !ok {
t.Fatal("timed out waiting for block")
}
if !blkrecvd.Cid().Equals(lastBlock.Cid()) {
t.Fatal("received wrong block")
}

// Make sure Bitswap adds the block to the blockstore
blockInStore, err := instance.Blockstore().Has(lastBlock.Cid())
if err != nil {
t.Fatal(err)
}
if !blockInStore {
t.Fatal("Block was not added to block store")
}
}

func TestLargeSwarm(t *testing.T) {
if testing.Short() {
t.SkipNow()
Expand Down
Loading

0 comments on commit a748e62

Please sign in to comment.