Skip to content

Commit

Permalink
fix: race condition when requesting the same block twice (#214)
Browse files Browse the repository at this point in the history
When we call `blockstore.putMany`, some implementations will batch up all the `put`s and write them at once.  This means that `blockstore.has` might not return `true` for a little while - if another request for a given block comes in before `blockstore.has` returns `true` it'll get added to the want list.  If the block then finishes it's batch and finally a remote peer supplies the wanted block, the notifications that complete the second block request will never get sent and the process will hang indefinitely.

The change made here is to separate the sending of notifications out from putting things into the blockstore.  If the blockstore has a block, but the block is still in the wantlist, send notifications that we now have the block.

Also:

- Upgrade to use the streaming API from interface-datastore
  - Does not assume that only arrays of CIDs are being passed any more, only uses the AsyncIterable interface contract to access data
- Actually dial remote nodes with bitswap 1.2.0

Co-authored-by: dirkmc <dirkmdev@gmail.com>
  • Loading branch information
achingbrain and dirkmc authored May 27, 2020
1 parent 46490f5 commit 78ce032
Show file tree
Hide file tree
Showing 25 changed files with 419 additions and 230 deletions.
5 changes: 3 additions & 2 deletions benchmarks/put-get.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

const Benchmark = require('benchmark')
const assert = require('assert')
const all = require('async-iterator-all')
const all = require('it-all')
const drain = require('it-drain')
const makeBlock = require('../test/utils/make-block')
const genBitswapNetwork = require('../test/utils/mocks').genBitswapNetwork

Expand All @@ -24,7 +25,7 @@ const blockSizes = [10, 1024, 10 * 1024]
suite.add(`put-get ${n} blocks of size ${k}`, async (defer) => {
const blocks = await makeBlock(n, k)

await bitswap.putMany(blocks)
await drain(bitswap.putMany(blocks))

const res = await all(bitswap.getMany(blocks.map(block => block.cid)))

Expand Down
14 changes: 8 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,15 @@
"homepage": "https://github.com/ipfs/js-ipfs-bitswap#readme",
"devDependencies": {
"@nodeutils/defaults-deep": "^1.1.0",
"aegir": "^21.10.1",
"async-iterator-all": "^1.0.0",
"aegir": "^22.0.0",
"benchmark": "^2.1.4",
"buffer": "^5.6.0",
"chai": "^4.2.0",
"delay": "^4.3.0",
"dirty-chai": "^2.0.1",
"ipfs-repo": "^2.0.0",
"ipfs-repo": "^3.0.1",
"ipfs-utils": "^2.2.0",
"iso-random-stream": "^1.1.1",
"it-all": "^1.0.2",
"it-drain": "^1.0.1",
"libp2p": "^0.27.0",
"libp2p-kad-dht": "^0.18.3",
"libp2p-mplex": "^0.9.2",
Expand All @@ -71,10 +70,13 @@
"peer-info": "^0.17.0",
"promisify-es6": "^1.0.3",
"rimraf": "^3.0.0",
"sinon": "^9.0.0",
"stats-lite": "^2.2.0",
"uuid": "^3.3.2"
"uuid": "^8.0.0"
},
"dependencies": {
"abort-controller": "^3.0.0",
"any-signal": "^1.1.0",
"bignumber.js": "^9.0.0",
"cids": "~0.8.0",
"debug": "^4.1.0",
Expand Down
164 changes: 93 additions & 71 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ const DecisionEngine = require('./decision-engine')
const Notifications = require('./notifications')
const logger = require('./utils').logger
const Stats = require('./stats')
const AbortController = require('abort-controller')
const anySignal = require('any-signal')

const defaultOptions = {
statsEnabled: false,
Expand Down Expand Up @@ -101,9 +103,10 @@ class Bitswap {
this._log('received block')

const has = await this.blockstore.has(block.cid)

this._updateReceiveCounters(peerId.toB58String(), block, has)

if (has || !wasWanted) {
if (!wasWanted) {
return
}

Expand Down Expand Up @@ -176,65 +179,88 @@ class Bitswap {
* blockstore it is returned, otherwise the block is added to the wantlist and returned once another node sends it to us.
*
* @param {CID} cid
* @param {Object} options
* @param {AbortSignal} options.abortSignal
* @returns {Promise<Block>}
*/
async get (cid) {
for await (const block of this.getMany([cid])) {
return block
async get (cid, options = {}) {
const fetchFromNetwork = (cid, options) => {
// add it to the want list - n.b. later we will abort the AbortSignal
// so no need to remove the blocks from the wantlist after we have it
this.wm.wantBlocks([cid], options)

return this.notifications.wantBlock(cid, options)
}
}

/**
* Fetch a a list of blocks by cid. If the blocks are in the local
* blockstore they are returned, otherwise the blocks are added to the wantlist and returned once another node sends them to us.
*
* @param {Iterable<CID>} cids
* @returns {Promise<AsyncIterator<Block>>}
*/
async * getMany (cids) {
let pendingStart = cids.length
const wantList = []
let promptedNetwork = false

const fetchFromNetwork = async (cid) => {
wantList.push(cid)
const loadOrFetchFromNetwork = async (cid, options) => {
try {
// have to await here as we want to handle ERR_NOT_FOUND
const block = await this.blockstore.get(cid, options)

const blockP = this.notifications.wantBlock(cid)
return block
} catch (err) {
if (err.code !== 'ERR_NOT_FOUND') {
throw err
}

if (!pendingStart) {
this.wm.wantBlocks(wantList)
}
if (!promptedNetwork) {
promptedNetwork = true

const block = await blockP
this.wm.cancelWants([cid])
this.network.findAndConnect(cid)
.catch((err) => this._log.error(err))
}

return block
// we don't have the block locally so fetch it from the network
return fetchFromNetwork(cid, options)
}
}

for (const cid of cids) {
const has = await this.blockstore.has(cid)
pendingStart--
if (has) {
if (!pendingStart) {
this.wm.wantBlocks(wantList)
}
yield this.blockstore.get(cid)
// depending on implementation it's possible for blocks to come in while
// we do the async operations to get them from the blockstore leading to
// a race condition, so register for incoming block notifications as well
// as trying to get it from the datastore
const controller = new AbortController()
const signal = anySignal([options.signal, controller.signal])

const block = await Promise.race([
this.notifications.wantBlock(cid, {
signal
}),
loadOrFetchFromNetwork(cid, {
signal
})
])

continue
}
// since we have the block we can now remove our listener
controller.abort()

if (!promptedNetwork) {
promptedNetwork = true
this.network.findAndConnect(cids[0]).catch((err) => this._log.error(err))
}
return block
}

// we don't have the block locally so fetch it from the network
yield fetchFromNetwork(cid)
/**
* Fetch a a list of blocks by cid. If the blocks are in the local
* blockstore they are returned, otherwise the blocks are added to the wantlist and returned once another node sends them to us.
*
* @param {AsyncIterator<CID>} cids
* @param {Object} options
* @param {AbortSignal} options.abortSignal
* @returns {Promise<AsyncIterator<Block>>}
*/
async * getMany (cids, options = {}) {
for await (const cid of cids) {
yield this.get(cid, options)
}
}

/**
* Removes the given CIDs from the wantlist independent of any ref counts
* Removes the given CIDs from the wantlist independent of any ref counts.
*
* This will cause all outstanding promises for a given block to reject.
*
* If you want to cancel the want for a block without doing that, pass an
* AbortSignal in to `.get` or `.getMany` and abort it.
*
* @param {Iterable<CID>} cids
* @returns {void}
Expand All @@ -249,7 +275,9 @@ class Bitswap {
}

/**
* Removes the given keys from the want list
* Removes the given keys from the want list. This may cause pending promises
* for blocks to never resolve. If you wish these promises to abort instead
* call `unwant(cids)` instead.
*
* @param {Iterable<CID>} cids
* @returns {void}
Expand All @@ -268,46 +296,40 @@ class Bitswap {
* @param {Block} block
* @returns {Promise<void>}
*/
async put (block) { // eslint-disable-line require-await
return this.putMany([block])
async put (block) {
await this.blockstore.put(block)
this._sendHaveBlockNotifications(block)
}

/**
* Put the given blocks to the underlying blockstore and
* send it to nodes that have it them their wantlist.
*
* @param {AsyncIterable<Block>|Iterable<Block>} blocks
* @returns {Promise<void>}
* @param {AsyncIterable<Block>} blocks
* @returns {AsyncIterable<Block>}
*/
async putMany (blocks) { // eslint-disable-line require-await
const self = this

// Add any new blocks to the blockstore
const newBlocks = []
await this.blockstore.putMany(async function * () {
for await (const block of blocks) {
if (await self.blockstore.has(block.cid)) {
continue
}

yield block
newBlocks.push(block)
}
}())

// Notify engine that we have new blocks
this.engine.receivedBlocks(newBlocks)
async * putMany (blocks) {
for await (const block of this.blockstore.putMany(blocks)) {
this._sendHaveBlockNotifications(block)

// Notify listeners that we have received the new blocks
for (const block of newBlocks) {
this.notifications.hasBlock(block)
// Note: Don't wait for provide to finish before returning
this.network.provide(block.cid).catch((err) => {
self._log.error('Failed to provide: %s', err.message)
})
yield block
}
}

/**
* Sends notifications about the arrival of a block
*
* @param {Block} block
*/
_sendHaveBlockNotifications (block) {
this.notifications.hasBlock(block)
this.engine.receivedBlocks([block])
// Note: Don't wait for provide to finish before returning
this.network.provide(block.cid).catch((err) => {
this._log.error('Failed to provide: %s', err.message)
})
}

/**
* Get the current list of wants.
*
Expand Down
39 changes: 27 additions & 12 deletions src/network.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,17 @@ class Network {
*
* @param {CID} cid
* @param {number} maxProviders
* @returns {Promise<Result<Array>>}
* @param {Object} options
* @param {AbortSignal} options.abortSignal
* @returns {AsyncIterable<PeerInfo>}
*/
findProviders (cid, maxProviders) {
findProviders (cid, maxProviders, options = {}) {
return this.libp2p.contentRouting.findProviders(
cid,
{
maxTimeout: CONSTANTS.providerRequestTimeout,
maxNumProviders: maxProviders
maxNumProviders: maxProviders,
signal: options.signal
}
)
}
Expand All @@ -121,19 +124,29 @@ class Network {
* Find the providers of a given `cid` and connect to them.
*
* @param {CID} cid
* @param {Object} options
* @param {AbortSignal} options.abortSignal
* @returns {void}
*/
async findAndConnect (cid) {
async findAndConnect (cid, options) {
const connectAttempts = []
for await (const provider of this.findProviders(cid, CONSTANTS.maxProvidersPerRequest)) {
for await (const provider of this.findProviders(cid, CONSTANTS.maxProvidersPerRequest, options)) {
this._log('connecting to providers', provider.id.toB58String())
connectAttempts.push(this.connectTo(provider))
connectAttempts.push(this.connectTo(provider, options))
}
await Promise.all(connectAttempts)
}

async provide (cid) {
await this.libp2p.contentRouting.provide(cid)
/**
* Tell the network we can provide content for the passed CID
*
* @param {CID} cid
* @param {Object} options
* @param {AbortSignal} options.abortSignal
* @returns {Promise<void>}
*/
async provide (cid, options) {
await this.libp2p.contentRouting.provide(cid, options)
}

// Connect to the given peer
Expand Down Expand Up @@ -169,19 +182,21 @@ class Network {
* Connects to another peer
*
* @param {PeerInfo|PeerId|Multiaddr} peer
* @returns {Promise.<Connection>}
* @param {Object} options
* @param {AbortSignal} options.abortSignal
* @returns {Promise<Connection>}
*/
async connectTo (peer) { // eslint-disable-line require-await
async connectTo (peer, options) { // eslint-disable-line require-await
if (!this._running) {
throw new Error('network isn\'t running')
}

return this.libp2p.dial(peer)
return this.libp2p.dial(peer, options)
}

// Dial to the peer and try to use the most recent Bitswap
_dialPeer (peer) {
return this.libp2p.dialProtocol(peer, [BITSWAP110, BITSWAP100])
return this.libp2p.dialProtocol(peer, [BITSWAP120, BITSWAP110, BITSWAP100])
}

_updateSentStats (peer, blocks) {
Expand Down
Loading

0 comments on commit 78ce032

Please sign in to comment.