Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
feat: plumb through contexts (#539)
Browse files Browse the repository at this point in the history
  • Loading branch information
guseggert authored Nov 10, 2021
1 parent 92d1e7a commit d74d658
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 62 deletions.
12 changes: 6 additions & 6 deletions benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ func runDistribution(b *testing.B, instances []testinstance.Instance, blocks []b

func allToAll(b *testing.B, provs []testinstance.Instance, blocks []blocks.Block) {
for _, p := range provs {
if err := p.Blockstore().PutMany(blocks); err != nil {
if err := p.Blockstore().PutMany(context.Background(), blocks); err != nil {
b.Fatal(err)
}
}
Expand All @@ -452,10 +452,10 @@ func overlap1(b *testing.B, provs []testinstance.Instance, blks []blocks.Block)
bill := provs[0]
jeff := provs[1]

if err := bill.Blockstore().PutMany(blks[:75]); err != nil {
if err := bill.Blockstore().PutMany(context.Background(), blks[:75]); err != nil {
b.Fatal(err)
}
if err := jeff.Blockstore().PutMany(blks[25:]); err != nil {
if err := jeff.Blockstore().PutMany(context.Background(), blks[25:]); err != nil {
b.Fatal(err)
}
}
Expand All @@ -473,12 +473,12 @@ func overlap2(b *testing.B, provs []testinstance.Instance, blks []blocks.Block)
even := i%2 == 0
third := i%3 == 0
if third || even {
if err := bill.Blockstore().Put(blk); err != nil {
if err := bill.Blockstore().Put(context.Background(), blk); err != nil {
b.Fatal(err)
}
}
if third || !even {
if err := jeff.Blockstore().Put(blk); err != nil {
if err := jeff.Blockstore().Put(context.Background(), blk); err != nil {
b.Fatal(err)
}
}
Expand All @@ -490,7 +490,7 @@ func overlap2(b *testing.B, provs []testinstance.Instance, blks []blocks.Block)
// but we're mostly just testing performance of the sync algorithm
func onePeerPerBlock(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) {
for _, blk := range blks {
err := provs[rand.Intn(len(provs))].Blockstore().Put(blk)
err := provs[rand.Intn(len(provs))].Blockstore().Put(context.Background(), blk)
if err != nil {
b.Fatal(err)
}
Expand Down
8 changes: 4 additions & 4 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,8 +436,8 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks

// HasBlock announces the existence of a block to this bitswap service. The
// service will potentially notify its peers.
func (bs *Bitswap) HasBlock(blk blocks.Block) error {
return bs.receiveBlocksFrom(context.Background(), "", []blocks.Block{blk}, nil, nil)
func (bs *Bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
return bs.receiveBlocksFrom(ctx, "", []blocks.Block{blk}, nil, nil)
}

// TODO: Some of this stuff really only needs to be done when adding a block
Expand All @@ -464,7 +464,7 @@ func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []b

// Put wanted blocks into blockstore
if len(wanted) > 0 {
err := bs.blockstore.PutMany(wanted)
err := bs.blockstore.PutMany(ctx, wanted)
if err != nil {
log.Errorf("Error writing %d blocks to datastore: %s", len(wanted), err)
return err
Expand Down Expand Up @@ -604,7 +604,7 @@ func (bs *Bitswap) blockstoreHas(blks []blocks.Block) []bool {
go func(i int, b blocks.Block) {
defer wg.Done()

has, err := bs.blockstore.Has(b.Cid())
has, err := bs.blockstore.Has(context.TODO(), b.Cid())
if err != nil {
log.Infof("blockstore.Has error: %s", err)
has = false
Expand Down
30 changes: 15 additions & 15 deletions bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
hasBlock := peers[0]
defer hasBlock.Exchange.Close()

if err := hasBlock.Exchange.HasBlock(block); err != nil {
if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -123,7 +123,7 @@ func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) {
wantsBlock := ig.Next()
defer wantsBlock.Exchange.Close()

if err := hasBlock.Exchange.HasBlock(block); err != nil {
if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -158,7 +158,7 @@ func TestUnwantedBlockNotAdded(t *testing.T) {
hasBlock := peers[0]
defer hasBlock.Exchange.Close()

if err := hasBlock.Exchange.HasBlock(block); err != nil {
if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil {
t.Fatal(err)
}

Expand All @@ -170,7 +170,7 @@ func TestUnwantedBlockNotAdded(t *testing.T) {

doesNotWantBlock.Exchange.ReceiveMessage(ctx, hasBlock.Peer, bsMessage)

blockInStore, err := doesNotWantBlock.Blockstore().Has(block.Cid())
blockInStore, err := doesNotWantBlock.Blockstore().Has(ctx, block.Cid())
if err != nil || blockInStore {
t.Fatal("Unwanted block added to block store")
}
Expand Down Expand Up @@ -229,7 +229,7 @@ func TestPendingBlockAdded(t *testing.T) {
}

// Make sure Bitswap adds the block to the blockstore
blockInStore, err := instance.Blockstore().Has(lastBlock.Cid())
blockInStore, err := instance.Blockstore().Has(context.Background(), lastBlock.Cid())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -302,7 +302,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
first := instances[0]
for _, b := range blocks {
blkeys = append(blkeys, b.Cid())
err := first.Exchange.HasBlock(b)
err := first.Exchange.HasBlock(ctx, b)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -341,7 +341,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {

for _, inst := range instances {
for _, b := range blocks {
if _, err := inst.Blockstore().Get(b.Cid()); err != nil {
if _, err := inst.Blockstore().Get(ctx, b.Cid()); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -378,7 +378,7 @@ func TestSendToWantingPeer(t *testing.T) {
}

// peerB announces to the network that he has block alpha
err = peerB.Exchange.HasBlock(alpha)
err = peerB.Exchange.HasBlock(ctx, alpha)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -440,7 +440,7 @@ func TestBasicBitswap(t *testing.T) {
blocks := bg.Blocks(1)

// First peer has block
err := instances[0].Exchange.HasBlock(blocks[0])
err := instances[0].Exchange.HasBlock(context.Background(), blocks[0])
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -540,7 +540,7 @@ func TestDoubleGet(t *testing.T) {
t.Fatal("expected channel to be closed")
}

err = instances[0].Exchange.HasBlock(blocks[0])
err = instances[0].Exchange.HasBlock(context.Background(), blocks[0])
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -703,7 +703,7 @@ func TestBitswapLedgerOneWay(t *testing.T) {

instances := ig.Instances(2)
blocks := bg.Blocks(1)
err := instances[0].Exchange.HasBlock(blocks[0])
err := instances[0].Exchange.HasBlock(context.Background(), blocks[0])
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -755,12 +755,12 @@ func TestBitswapLedgerTwoWay(t *testing.T) {

instances := ig.Instances(2)
blocks := bg.Blocks(2)
err := instances[0].Exchange.HasBlock(blocks[0])
err := instances[0].Exchange.HasBlock(context.Background(), blocks[0])
if err != nil {
t.Fatal(err)
}

err = instances[1].Exchange.HasBlock(blocks[1])
err = instances[1].Exchange.HasBlock(context.Background(), blocks[1])
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -906,7 +906,7 @@ func TestTracer(t *testing.T) {
bitswap.WithTracer(wiretap)(instances[0].Exchange)

// First peer has block
err := instances[0].Exchange.HasBlock(blocks[0])
err := instances[0].Exchange.HasBlock(context.Background(), blocks[0])
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -990,7 +990,7 @@ func TestTracer(t *testing.T) {
// After disabling WireTap, no new messages are logged
bitswap.WithTracer(nil)(instances[0].Exchange)

err = instances[0].Exchange.HasBlock(blocks[1])
err = instances[0].Exchange.HasBlock(context.Background(), blocks[1])
if err != nil {
t.Fatal(err)
}
Expand Down
18 changes: 9 additions & 9 deletions bitswap_with_sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestBasicSessions(t *testing.T) {
b := inst[1]

// Add a block to Peer B
if err := b.Blockstore().Put(block); err != nil {
if err := b.Blockstore().Put(ctx, block); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -82,7 +82,7 @@ func TestSessionBetweenPeers(t *testing.T) {

// Add 101 blocks to Peer A
blks := bgen.Blocks(101)
if err := inst[0].Blockstore().PutMany(blks); err != nil {
if err := inst[0].Blockstore().PutMany(ctx, blks); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -143,7 +143,7 @@ func TestSessionSplitFetch(t *testing.T) {
// Add 10 distinct blocks to each of 10 peers
blks := bgen.Blocks(100)
for i := 0; i < 10; i++ {
if err := inst[i].Blockstore().PutMany(blks[i*10 : (i+1)*10]); err != nil {
if err := inst[i].Blockstore().PutMany(ctx, blks[i*10:(i+1)*10]); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -187,7 +187,7 @@ func TestFetchNotConnected(t *testing.T) {
// Provide 10 blocks on Peer A
blks := bgen.Blocks(10)
for _, block := range blks {
if err := other.Exchange.HasBlock(block); err != nil {
if err := other.Exchange.HasBlock(ctx, block); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -243,7 +243,7 @@ func TestFetchAfterDisconnect(t *testing.T) {

firstBlks := blks[:5]
for _, block := range firstBlks {
if err := peerA.Exchange.HasBlock(block); err != nil {
if err := peerA.Exchange.HasBlock(ctx, block); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -279,7 +279,7 @@ func TestFetchAfterDisconnect(t *testing.T) {
// Provide remaining blocks
lastBlks := blks[5:]
for _, block := range lastBlks {
if err := peerA.Exchange.HasBlock(block); err != nil {
if err := peerA.Exchange.HasBlock(ctx, block); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -334,7 +334,7 @@ func TestInterestCacheOverflow(t *testing.T) {
// wait to ensure that all the above cids were added to the sessions cache
time.Sleep(time.Millisecond * 50)

if err := b.Exchange.HasBlock(blks[0]); err != nil {
if err := b.Exchange.HasBlock(ctx, blks[0]); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -381,7 +381,7 @@ func TestPutAfterSessionCacheEvict(t *testing.T) {
// wait to ensure that all the above cids were added to the sessions cache
time.Sleep(time.Millisecond * 50)

if err := a.Exchange.HasBlock(blks[17]); err != nil {
if err := a.Exchange.HasBlock(ctx, blks[17]); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -423,7 +423,7 @@ func TestMultipleSessions(t *testing.T) {
}

time.Sleep(time.Millisecond * 10)
if err := b.Exchange.HasBlock(blk); err != nil {
if err := b.Exchange.HasBlock(ctx, blk); err != nil {
t.Fatal(err)
}

Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ require (
github.com/google/uuid v1.2.0
github.com/ipfs/go-block-format v0.0.3
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.4.5
github.com/ipfs/go-datastore v0.5.0
github.com/ipfs/go-detect-race v0.0.1
github.com/ipfs/go-ipfs-blockstore v0.1.6
github.com/ipfs/go-ipfs-blockstore v0.2.0
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-delay v0.0.1
github.com/ipfs/go-ipfs-exchange-interface v0.0.1
github.com/ipfs/go-ipfs-routing v0.1.0
github.com/ipfs/go-ipfs-exchange-interface v0.1.0
github.com/ipfs/go-ipfs-routing v0.2.0
github.com/ipfs/go-ipfs-util v0.0.2
github.com/ipfs/go-log v1.0.5
github.com/ipfs/go-metrics-interface v0.0.1
Expand Down
21 changes: 10 additions & 11 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -254,12 +254,12 @@ github.com/ipfs/go-cid v0.0.6/go.mod h1:6Ux9z5e+HpkQdckYoX1PG/6xqKspzlEIR5SDmgqg
github.com/ipfs/go-cid v0.0.7 h1:ysQJVJA3fNDF1qigJbsSQOdjhVLsOEoPdh0+R97k3jY=
github.com/ipfs/go-cid v0.0.7/go.mod h1:6Ux9z5e+HpkQdckYoX1PG/6xqKspzlEIR5SDmgqgC/I=
github.com/ipfs/go-datastore v0.0.1/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE=
github.com/ipfs/go-datastore v0.1.1/go.mod h1:w38XXW9kVFNp57Zj5knbKWM2T+KOZCGDRVNdgPHtbHw=
github.com/ipfs/go-datastore v0.4.0/go.mod h1:SX/xMIKoCszPqp+z9JhPYCmoOoXTvaa13XEbGtsFUhA=
github.com/ipfs/go-datastore v0.4.1/go.mod h1:SX/xMIKoCszPqp+z9JhPYCmoOoXTvaa13XEbGtsFUhA=
github.com/ipfs/go-datastore v0.4.4/go.mod h1:SX/xMIKoCszPqp+z9JhPYCmoOoXTvaa13XEbGtsFUhA=
github.com/ipfs/go-datastore v0.4.5 h1:cwOUcGMLdLPWgu3SlrCckCMznaGADbPqE0r8h768/Dg=
github.com/ipfs/go-datastore v0.4.5/go.mod h1:eXTcaaiN6uOlVCLS9GjJUJtlvJfM3xk23w3fyfrmmJs=
github.com/ipfs/go-datastore v0.5.0 h1:rQicVCEacWyk4JZ6G5bD9TKR7lZEG1MWcG7UdWYrFAU=
github.com/ipfs/go-datastore v0.5.0/go.mod h1:9zhEApYMTl17C8YDp7JmU7sQZi2/wqiYh73hakZ90Bk=
github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk=
github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps=
github.com/ipfs/go-ds-badger v0.0.2/go.mod h1:Y3QpeSFWQf6MopLTiZD+VT6IC1yZqaGmjvRcKeSGij8=
Expand All @@ -269,22 +269,21 @@ github.com/ipfs/go-ds-badger v0.2.3/go.mod h1:pEYw0rgg3FIrywKKnL+Snr+w/LjJZVMTBR
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
github.com/ipfs/go-ds-leveldb v0.4.2/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
github.com/ipfs/go-ipfs-blockstore v0.1.6 h1:+RNM/gkTF6lzLPtt/xqjEUXJuG0lFwAiv+MV8MoAhvA=
github.com/ipfs/go-ipfs-blockstore v0.1.6/go.mod h1:Jxm3XMVjh6R17WvxFEiyKBLUGr86HgIYJW/D/MwqeYQ=
github.com/ipfs/go-ipfs-blockstore v0.2.0 h1:o/jv4tsJbPVB8niV29DVnJzlR4lp/ksU3CemmdMqprk=
github.com/ipfs/go-ipfs-blockstore v0.2.0/go.mod h1:SNeEpz/ICnMYZQYr7KNZTjdn7tEPB/99xpe8xI1RW7o=
github.com/ipfs/go-ipfs-blocksutil v0.0.1 h1:Eh/H4pc1hsvhzsQoMEP3Bke/aW5P5rVM1IWFJMcGIPQ=
github.com/ipfs/go-ipfs-blocksutil v0.0.1/go.mod h1:Yq4M86uIOmxmGPUHv/uI7uKqZNtLb449gwKqXjIsnRk=
github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw=
github.com/ipfs/go-ipfs-delay v0.0.1 h1:r/UXYyRcddO6thwOnhiznIAiSvxMECGgtv35Xs1IeRQ=
github.com/ipfs/go-ipfs-delay v0.0.1/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw=
github.com/ipfs/go-ipfs-ds-help v0.0.1/go.mod h1:gtP9xRaZXqIQRh1HRpp595KbBEdgqWFxefeVKOV8sxo=
github.com/ipfs/go-ipfs-ds-help v0.1.1 h1:IW/bXGeaAZV2VH0Kuok+Ohva/zHkHmeLFBxC1k7mNPc=
github.com/ipfs/go-ipfs-ds-help v0.1.1/go.mod h1:SbBafGJuGsPI/QL3j9Fc5YPLeAu+SzOkI0gFwAg+mOs=
github.com/ipfs/go-ipfs-exchange-interface v0.0.1 h1:LJXIo9W7CAmugqI+uofioIpRb6rY30GUu7G6LUfpMvM=
github.com/ipfs/go-ipfs-exchange-interface v0.0.1/go.mod h1:c8MwfHjtQjPoDyiy9cFquVtVHkO9b9Ob3FG91qJnWCM=
github.com/ipfs/go-ipfs-ds-help v1.1.0 h1:yLE2w9RAsl31LtfMt91tRZcrx+e61O5mDxFRR994w4Q=
github.com/ipfs/go-ipfs-ds-help v1.1.0/go.mod h1:YR5+6EaebOhfcqVCyqemItCLthrpVNot+rsOU/5IatU=
github.com/ipfs/go-ipfs-exchange-interface v0.1.0 h1:TiMekCrOGQuWYtZO3mf4YJXDIdNgnKWZ9IE3fGlnWfo=
github.com/ipfs/go-ipfs-exchange-interface v0.1.0/go.mod h1:ych7WPlyHqFvCi/uQI48zLZuAWVP5iTQPXEfVaw5WEI=
github.com/ipfs/go-ipfs-pq v0.0.2 h1:e1vOOW6MuOwG2lqxcLA+wEn93i/9laCY8sXAw76jFOY=
github.com/ipfs/go-ipfs-pq v0.0.2/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY=
github.com/ipfs/go-ipfs-routing v0.1.0 h1:gAJTT1cEeeLj6/DlLX6t+NxD9fQe2ymTO6qWRDI/HQQ=
github.com/ipfs/go-ipfs-routing v0.1.0/go.mod h1:hYoUkJLyAUKhF58tysKpids8RNDPO42BVMgK5dNsoqY=
github.com/ipfs/go-ipfs-routing v0.2.0 h1:EwcBZMi72MTTEuH/7a2lbXtmeO6X0COoQGWqUOGq0KU=
github.com/ipfs/go-ipfs-routing v0.2.0/go.mod h1:384byD/LHKhAgKE3NmwOjXCpDzhczROMBzidoYV7tfM=
github.com/ipfs/go-ipfs-util v0.0.1/go.mod h1:spsl5z8KUnrve+73pOhSVZND1SIxPW5RyBCNzQxlJBc=
github.com/ipfs/go-ipfs-util v0.0.2 h1:59Sswnk1MFaiq+VcaknX7aYEyGyGDAA73ilhEK2POp8=
github.com/ipfs/go-ipfs-util v0.0.2/go.mod h1:CbPtkWJzjLdEcezDns2XYaehFVNXG9zrdrtMecczcsQ=
Expand Down
4 changes: 2 additions & 2 deletions internal/decision/blockstoremanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (bsm *blockstoreManager) getBlockSizes(ctx context.Context, ks []cid.Cid) (

var lk sync.Mutex
return res, bsm.jobPerKey(ctx, ks, func(c cid.Cid) {
size, err := bsm.bs.GetSize(c)
size, err := bsm.bs.GetSize(ctx, c)
if err != nil {
if err != bstore.ErrNotFound {
// Note: this isn't a fatal error. We shouldn't abort the request
Expand All @@ -107,7 +107,7 @@ func (bsm *blockstoreManager) getBlocks(ctx context.Context, ks []cid.Cid) (map[

var lk sync.Mutex
return res, bsm.jobPerKey(ctx, ks, func(c cid.Cid) {
blk, err := bsm.bs.Get(c)
blk, err := bsm.bs.Get(ctx, c)
if err != nil {
if err != bstore.ErrNotFound {
// Note: this isn't a fatal error. We shouldn't abort the request
Expand Down
Loading

0 comments on commit d74d658

Please sign in to comment.