Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: race condition when requesting the same block twice #214

Merged
merged 12 commits into from
May 27, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions benchmarks/put-get.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

const Benchmark = require('benchmark')
const assert = require('assert')
const all = require('async-iterator-all')
const all = require('it-all')
const drain = require('it-drain')
const makeBlock = require('../test/utils/make-block')
const genBitswapNetwork = require('../test/utils/mocks').genBitswapNetwork

Expand All @@ -24,7 +25,7 @@ const blockSizes = [10, 1024, 10 * 1024]
suite.add(`put-get ${n} blocks of size ${k}`, async (defer) => {
const blocks = await makeBlock(n, k)

await bitswap.putMany(blocks)
await drain(bitswap.putMany(blocks))

const res = await all(bitswap.getMany(blocks.map(block => block.cid)))

Expand Down
11 changes: 7 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,16 @@
"homepage": "https://github.com/ipfs/js-ipfs-bitswap#readme",
"devDependencies": {
"@nodeutils/defaults-deep": "^1.1.0",
"aegir": "^21.10.1",
"async-iterator-all": "^1.0.0",
"aegir": "^22.0.0",
"benchmark": "^2.1.4",
"buffer": "^5.6.0",
"chai": "^4.2.0",
"delay": "^4.3.0",
"dirty-chai": "^2.0.1",
"ipfs-repo": "^2.0.0",
"ipfs-repo": "^3.0.1",
"ipfs-utils": "^2.2.0",
"it-all": "^1.0.2",
"it-drain": "^1.0.1",
"iso-random-stream": "^1.1.1",
"libp2p": "^0.27.0",
"libp2p-kad-dht": "^0.18.3",
Expand All @@ -71,14 +72,16 @@
"peer-info": "^0.17.0",
"promisify-es6": "^1.0.3",
"rimraf": "^3.0.0",
"sinon": "^9.0.0",
"stats-lite": "^2.2.0",
"uuid": "^3.3.2"
"uuid": "^8.0.0"
},
"dependencies": {
"bignumber.js": "^9.0.0",
"cids": "~0.8.0",
"debug": "^4.1.0",
"ipld-block": "^0.9.1",
"it-first": "^1.0.2",
"it-length-prefixed": "^3.0.0",
"it-pipe": "^1.1.0",
"just-debounce-it": "^1.1.0",
Expand Down
135 changes: 77 additions & 58 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const DecisionEngine = require('./decision-engine')
const Notifications = require('./notifications')
const logger = require('./utils').logger
const Stats = require('./stats')
const first = require('it-first')

const defaultOptions = {
statsEnabled: false,
Expand Down Expand Up @@ -100,10 +101,24 @@ class Bitswap {
async _handleReceivedBlock (peerId, block, wasWanted) {
this._log('received block')

const has = await this.blockstore.has(block.cid)
let has = false

try {
await this.blockstore.get(block.cid)
has = true
} catch (err) {
if (err.code !== 'ERR_NOT_FOUND') {
throw err
}
}

this._updateReceiveCounters(peerId.toB58String(), block, has)

if (has || !wasWanted) {
if (wasWanted) {
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
this._sendHaveBlockNotifications(block)
}

return
}
achingbrain marked this conversation as resolved.
Show resolved Hide resolved

Expand Down Expand Up @@ -178,58 +193,69 @@ class Bitswap {
* @param {CID} cid
* @returns {Promise<Block>}
*/
async get (cid) {
for await (const block of this.getMany([cid])) {
return block
}
async get (cid) { // eslint-disable-line require-await
return first(this.getMany([cid]))
}

/**
* Fetch a a list of blocks by cid. If the blocks are in the local
* blockstore they are returned, otherwise the blocks are added to the wantlist and returned once another node sends them to us.
*
* @param {Iterable<CID>} cids
* @param {AsyncIterator<CID>} cids
* @returns {Promise<AsyncIterator<Block>>}
*/
async * getMany (cids) {
let pendingStart = cids.length
const wantList = []
let promptedNetwork = false

const fetchFromNetwork = async (cid) => {
wantList.push(cid)
// add it to the want list
this.wm.wantBlocks([cid])

const blockP = this.notifications.wantBlock(cid)
const block = await this.notifications.wantBlock(cid)

if (!pendingStart) {
this.wm.wantBlocks(wantList)
}

const block = await blockP
// we've got it, remove it from the want list
this.wm.cancelWants([cid])

return block
}

for (const cid of cids) {
const has = await this.blockstore.has(cid)
pendingStart--
if (has) {
if (!pendingStart) {
this.wm.wantBlocks(wantList)
let promptedNetwork = false

const loadOrFetchFromNetwork = async (cid) => {
try {
// have to await here as we want to handle ERR_NOT_FOUND
const block = await this.blockstore.get(cid)

return block
} catch (err) {
if (err.code !== 'ERR_NOT_FOUND') {
throw err
}
yield this.blockstore.get(cid)

continue
}
if (!promptedNetwork) {
promptedNetwork = true

this.network.findAndConnect(cid)
.catch((err) => this._log.error(err))
}

if (!promptedNetwork) {
promptedNetwork = true
this.network.findAndConnect(cids[0]).catch((err) => this._log.error(err))
// we don't have the block locally so fetch it from the network
return fetchFromNetwork(cid)
}
}

// we don't have the block locally so fetch it from the network
yield fetchFromNetwork(cid)
for (const cid of cids) {
// depending on implementation it's possible for blocks to come in while
// we do the async operations to get them from the blockstore leading to
// a race condition, so register for incoming block notifications as well
// as trying to get it from the datastore
const block = await Promise.race([
this.notifications.wantBlock(cid),
loadOrFetchFromNetwork(cid)
])

// since we have the block we can now remove our listener
this.notifications.unwantBlock(cid)

yield block
}
}

Expand Down Expand Up @@ -269,45 +295,38 @@ class Bitswap {
* @returns {Promise<void>}
*/
async put (block) { // eslint-disable-line require-await
return this.putMany([block])
return first(this.putMany([block]))
}

/**
* Put the given blocks to the underlying blockstore and
* send it to nodes that have it them their wantlist.
*
* @param {AsyncIterable<Block>|Iterable<Block>} blocks
* @returns {Promise<void>}
* @returns {AsyncIterable<Block>}
*/
async putMany (blocks) { // eslint-disable-line require-await
const self = this

// Add any new blocks to the blockstore
const newBlocks = []
await this.blockstore.putMany(async function * () {
for await (const block of blocks) {
if (await self.blockstore.has(block.cid)) {
continue
}
async * putMany (blocks) { // eslint-disable-line require-await
for await (const block of this.blockstore.putMany(blocks)) {
this._sendHaveBlockNotifications(block)

yield block
newBlocks.push(block)
}
}())

// Notify engine that we have new blocks
this.engine.receivedBlocks(newBlocks)

// Notify listeners that we have received the new blocks
for (const block of newBlocks) {
this.notifications.hasBlock(block)
// Note: Don't wait for provide to finish before returning
this.network.provide(block.cid).catch((err) => {
self._log.error('Failed to provide: %s', err.message)
})
yield block
}
}

/**
* Sends notifications about the arrival of a block
*
* @param {Block} block
*/
_sendHaveBlockNotifications (block) {
this.notifications.hasBlock(block)
this.engine.receivedBlocks([block])
// Note: Don't wait for provide to finish before returning
this.network.provide(block.cid).catch((err) => {
this._log.error('Failed to provide: %s', err.message)
})
}

/**
* Get the current list of wants.
*
Expand Down
2 changes: 1 addition & 1 deletion src/network.js
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class Network {

// Dial to the peer and try to use the most recent Bitswap
_dialPeer (peer) {
return this.libp2p.dialProtocol(peer, [BITSWAP110, BITSWAP100])
return this.libp2p.dialProtocol(peer, [BITSWAP120, BITSWAP110, BITSWAP100])
}

_updateSentStats (peer, blocks) {
Expand Down
4 changes: 4 additions & 0 deletions src/notifications.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ class Notifications extends EventEmitter {
* @returns {Promise<Block>}
*/
wantBlock (cid) {
if (!cid) {
throw new Error('Not a valid cid')
}

const cidStr = cid.toString('base58btc')
this._log(`wantBlock:${cidStr}`)

Expand Down
13 changes: 6 additions & 7 deletions test/bitswap-mock-internals.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ const chai = require('chai')
chai.use(require('dirty-chai'))
const expect = chai.expect
const PeerId = require('peer-id')
const all = require('async-iterator-all')
const all = require('it-all')
const drain = require('it-drain')
const Message = require('../src/types/message')
const Bitswap = require('../src')

Expand Down Expand Up @@ -143,7 +144,7 @@ describe('bitswap with mocks', function () {

await bs._receiveMessage(other, msg)

const res = await Promise.all([b1.cid, b2.cid, b3.cid].map((cid) => repo.blocks.has(cid)))
const res = await Promise.all([b1.cid, b2.cid, b3.cid].map((cid) => repo.blocks.get(cid).then(() => true, () => false)))
expect(res).to.eql([false, true, false])

const ledger = bs.ledgerForPeer(other)
Expand Down Expand Up @@ -190,7 +191,7 @@ describe('bitswap with mocks', function () {
const b2 = blocks[14]
const b3 = blocks[13]

await repo.blocks.putMany([b1, b2, b3])
await drain(repo.blocks.putMany([b1, b2, b3]))
const bs = new Bitswap(mockLibp2pNode(), repo.blocks)

const retrievedBlocks = await all(bs.getMany([b1.cid, b2.cid, b3.cid]))
Expand All @@ -203,7 +204,7 @@ describe('bitswap with mocks', function () {
const b2 = blocks[6]
const b3 = blocks[7]

await repo.blocks.putMany([b1, b2, b3])
await drain(repo.blocks.putMany([b1, b2, b3]))
const bs = new Bitswap(mockLibp2pNode(), repo.blocks)

const block1 = await bs.get(b1.cid)
Expand Down Expand Up @@ -337,9 +338,7 @@ describe('bitswap with mocks', function () {
bs.get(block.cid)
])

bs.put(block, (err) => {
expect(err).to.not.exist()
})
bs.put(block)

const res = await resP
expect(res[0]).to.eql(block)
Expand Down
Loading