diff --git a/benchmarks/put-get.js b/benchmarks/put-get.js index 6d02efa5..04aa06f3 100644 --- a/benchmarks/put-get.js +++ b/benchmarks/put-get.js @@ -4,7 +4,8 @@ const Benchmark = require('benchmark') const assert = require('assert') -const all = require('async-iterator-all') +const all = require('it-all') +const drain = require('it-drain') const makeBlock = require('../test/utils/make-block') const genBitswapNetwork = require('../test/utils/mocks').genBitswapNetwork @@ -24,7 +25,7 @@ const blockSizes = [10, 1024, 10 * 1024] suite.add(`put-get ${n} blocks of size ${k}`, async (defer) => { const blocks = await makeBlock(n, k) - await bitswap.putMany(blocks) + await drain(bitswap.putMany(blocks)) const res = await all(bitswap.getMany(blocks.map(block => block.cid))) diff --git a/package.json b/package.json index 2c154c13..63caa976 100644 --- a/package.json +++ b/package.json @@ -43,16 +43,15 @@ "homepage": "https://github.com/ipfs/js-ipfs-bitswap#readme", "devDependencies": { "@nodeutils/defaults-deep": "^1.1.0", - "aegir": "^21.10.1", - "async-iterator-all": "^1.0.0", + "aegir": "^22.0.0", "benchmark": "^2.1.4", "buffer": "^5.6.0", - "chai": "^4.2.0", "delay": "^4.3.0", - "dirty-chai": "^2.0.1", - "ipfs-repo": "^2.0.0", + "ipfs-repo": "^3.0.1", "ipfs-utils": "^2.2.0", "iso-random-stream": "^1.1.1", + "it-all": "^1.0.2", + "it-drain": "^1.0.1", "libp2p": "^0.27.0", "libp2p-kad-dht": "^0.18.3", "libp2p-mplex": "^0.9.2", @@ -71,10 +70,13 @@ "peer-info": "^0.17.0", "promisify-es6": "^1.0.3", "rimraf": "^3.0.0", + "sinon": "^9.0.0", "stats-lite": "^2.2.0", - "uuid": "^3.3.2" + "uuid": "^8.0.0" }, "dependencies": { + "abort-controller": "^3.0.0", + "any-signal": "^1.1.0", "bignumber.js": "^9.0.0", "cids": "~0.8.0", "debug": "^4.1.0", diff --git a/src/index.js b/src/index.js index 380a7b72..0ef91d17 100644 --- a/src/index.js +++ b/src/index.js @@ -6,6 +6,8 @@ const DecisionEngine = require('./decision-engine') const Notifications = require('./notifications') const logger = require('./utils').logger const Stats = require('./stats') +const AbortController = require('abort-controller') +const anySignal = require('any-signal') const defaultOptions = { statsEnabled: false, @@ -101,9 +103,10 @@ class Bitswap { this._log('received block') const has = await this.blockstore.has(block.cid) + this._updateReceiveCounters(peerId.toB58String(), block, has) - if (has || !wasWanted) { + if (!wasWanted) { return } @@ -176,65 +179,88 @@ class Bitswap { * blockstore it is returned, otherwise the block is added to the wantlist and returned once another node sends it to us. * * @param {CID} cid + * @param {Object} options + * @param {AbortSignal} options.abortSignal * @returns {Promise} */ - async get (cid) { - for await (const block of this.getMany([cid])) { - return block + async get (cid, options = {}) { + const fetchFromNetwork = (cid, options) => { + // add it to the want list - n.b. later we will abort the AbortSignal + // so no need to remove the blocks from the wantlist after we have it + this.wm.wantBlocks([cid], options) + + return this.notifications.wantBlock(cid, options) } - } - /** - * Fetch a a list of blocks by cid. If the blocks are in the local - * blockstore they are returned, otherwise the blocks are added to the wantlist and returned once another node sends them to us. - * - * @param {Iterable} cids - * @returns {Promise>} - */ - async * getMany (cids) { - let pendingStart = cids.length - const wantList = [] let promptedNetwork = false - const fetchFromNetwork = async (cid) => { - wantList.push(cid) + const loadOrFetchFromNetwork = async (cid, options) => { + try { + // have to await here as we want to handle ERR_NOT_FOUND + const block = await this.blockstore.get(cid, options) - const blockP = this.notifications.wantBlock(cid) + return block + } catch (err) { + if (err.code !== 'ERR_NOT_FOUND') { + throw err + } - if (!pendingStart) { - this.wm.wantBlocks(wantList) - } + if (!promptedNetwork) { + promptedNetwork = true - const block = await blockP - this.wm.cancelWants([cid]) + this.network.findAndConnect(cid) + .catch((err) => this._log.error(err)) + } - return block + // we don't have the block locally so fetch it from the network + return fetchFromNetwork(cid, options) + } } - for (const cid of cids) { - const has = await this.blockstore.has(cid) - pendingStart-- - if (has) { - if (!pendingStart) { - this.wm.wantBlocks(wantList) - } - yield this.blockstore.get(cid) + // depending on implementation it's possible for blocks to come in while + // we do the async operations to get them from the blockstore leading to + // a race condition, so register for incoming block notifications as well + // as trying to get it from the datastore + const controller = new AbortController() + const signal = anySignal([options.signal, controller.signal]) + + const block = await Promise.race([ + this.notifications.wantBlock(cid, { + signal + }), + loadOrFetchFromNetwork(cid, { + signal + }) + ]) - continue - } + // since we have the block we can now remove our listener + controller.abort() - if (!promptedNetwork) { - promptedNetwork = true - this.network.findAndConnect(cids[0]).catch((err) => this._log.error(err)) - } + return block + } - // we don't have the block locally so fetch it from the network - yield fetchFromNetwork(cid) + /** + * Fetch a a list of blocks by cid. If the blocks are in the local + * blockstore they are returned, otherwise the blocks are added to the wantlist and returned once another node sends them to us. + * + * @param {AsyncIterator} cids + * @param {Object} options + * @param {AbortSignal} options.abortSignal + * @returns {Promise>} + */ + async * getMany (cids, options = {}) { + for await (const cid of cids) { + yield this.get(cid, options) } } /** - * Removes the given CIDs from the wantlist independent of any ref counts + * Removes the given CIDs from the wantlist independent of any ref counts. + * + * This will cause all outstanding promises for a given block to reject. + * + * If you want to cancel the want for a block without doing that, pass an + * AbortSignal in to `.get` or `.getMany` and abort it. * * @param {Iterable} cids * @returns {void} @@ -249,7 +275,9 @@ class Bitswap { } /** - * Removes the given keys from the want list + * Removes the given keys from the want list. This may cause pending promises + * for blocks to never resolve. If you wish these promises to abort instead + * call `unwant(cids)` instead. * * @param {Iterable} cids * @returns {void} @@ -268,46 +296,40 @@ class Bitswap { * @param {Block} block * @returns {Promise} */ - async put (block) { // eslint-disable-line require-await - return this.putMany([block]) + async put (block) { + await this.blockstore.put(block) + this._sendHaveBlockNotifications(block) } /** * Put the given blocks to the underlying blockstore and * send it to nodes that have it them their wantlist. * - * @param {AsyncIterable|Iterable} blocks - * @returns {Promise} + * @param {AsyncIterable} blocks + * @returns {AsyncIterable} */ - async putMany (blocks) { // eslint-disable-line require-await - const self = this - - // Add any new blocks to the blockstore - const newBlocks = [] - await this.blockstore.putMany(async function * () { - for await (const block of blocks) { - if (await self.blockstore.has(block.cid)) { - continue - } - - yield block - newBlocks.push(block) - } - }()) - - // Notify engine that we have new blocks - this.engine.receivedBlocks(newBlocks) + async * putMany (blocks) { + for await (const block of this.blockstore.putMany(blocks)) { + this._sendHaveBlockNotifications(block) - // Notify listeners that we have received the new blocks - for (const block of newBlocks) { - this.notifications.hasBlock(block) - // Note: Don't wait for provide to finish before returning - this.network.provide(block.cid).catch((err) => { - self._log.error('Failed to provide: %s', err.message) - }) + yield block } } + /** + * Sends notifications about the arrival of a block + * + * @param {Block} block + */ + _sendHaveBlockNotifications (block) { + this.notifications.hasBlock(block) + this.engine.receivedBlocks([block]) + // Note: Don't wait for provide to finish before returning + this.network.provide(block.cid).catch((err) => { + this._log.error('Failed to provide: %s', err.message) + }) + } + /** * Get the current list of wants. * diff --git a/src/network.js b/src/network.js index dc1678ce..8d34b157 100644 --- a/src/network.js +++ b/src/network.js @@ -105,14 +105,17 @@ class Network { * * @param {CID} cid * @param {number} maxProviders - * @returns {Promise>} + * @param {Object} options + * @param {AbortSignal} options.abortSignal + * @returns {AsyncIterable} */ - findProviders (cid, maxProviders) { + findProviders (cid, maxProviders, options = {}) { return this.libp2p.contentRouting.findProviders( cid, { maxTimeout: CONSTANTS.providerRequestTimeout, - maxNumProviders: maxProviders + maxNumProviders: maxProviders, + signal: options.signal } ) } @@ -121,19 +124,29 @@ class Network { * Find the providers of a given `cid` and connect to them. * * @param {CID} cid + * @param {Object} options + * @param {AbortSignal} options.abortSignal * @returns {void} */ - async findAndConnect (cid) { + async findAndConnect (cid, options) { const connectAttempts = [] - for await (const provider of this.findProviders(cid, CONSTANTS.maxProvidersPerRequest)) { + for await (const provider of this.findProviders(cid, CONSTANTS.maxProvidersPerRequest, options)) { this._log('connecting to providers', provider.id.toB58String()) - connectAttempts.push(this.connectTo(provider)) + connectAttempts.push(this.connectTo(provider, options)) } await Promise.all(connectAttempts) } - async provide (cid) { - await this.libp2p.contentRouting.provide(cid) + /** + * Tell the network we can provide content for the passed CID + * + * @param {CID} cid + * @param {Object} options + * @param {AbortSignal} options.abortSignal + * @returns {Promise} + */ + async provide (cid, options) { + await this.libp2p.contentRouting.provide(cid, options) } // Connect to the given peer @@ -169,19 +182,21 @@ class Network { * Connects to another peer * * @param {PeerInfo|PeerId|Multiaddr} peer - * @returns {Promise.} + * @param {Object} options + * @param {AbortSignal} options.abortSignal + * @returns {Promise} */ - async connectTo (peer) { // eslint-disable-line require-await + async connectTo (peer, options) { // eslint-disable-line require-await if (!this._running) { throw new Error('network isn\'t running') } - return this.libp2p.dial(peer) + return this.libp2p.dial(peer, options) } // Dial to the peer and try to use the most recent Bitswap _dialPeer (peer) { - return this.libp2p.dialProtocol(peer, [BITSWAP110, BITSWAP100]) + return this.libp2p.dialProtocol(peer, [BITSWAP120, BITSWAP110, BITSWAP100]) } _updateSentStats (peer, blocks) { diff --git a/src/notifications.js b/src/notifications.js index f6024cfd..57fb063a 100644 --- a/src/notifications.js +++ b/src/notifications.js @@ -1,12 +1,14 @@ 'use strict' const EventEmitter = require('events').EventEmitter +const Block = require('ipld-block') const CONSTANTS = require('./constants') const logger = require('./utils').logger -const unwantEvent = (c) => `unwant:${c}` -const blockEvent = (c) => `block:${c}` +const cidToMultihashString = (cid) => cid.multihash.toString('base64') +const unwantEvent = (cid) => `unwant:${cidToMultihashString(cid)}` +const blockEvent = (cid) => `block:${cidToMultihashString(cid)}` /** * Internal module used to track events about incoming blocks, @@ -22,9 +24,6 @@ class Notifications extends EventEmitter { this.setMaxListeners(CONSTANTS.maxListeners) this._log = logger(peerId, 'notif') - - this._unwantListeners = {} - this._blockListeners = {} } /** @@ -34,10 +33,9 @@ class Notifications extends EventEmitter { * @return {void} */ hasBlock (block) { - const cidStr = block.cid.toString('base58btc') - const str = `block:${cidStr}` - this._log(str) - this.emit(str, block) + const event = blockEvent(block.cid) + this._log(event) + this.emit(event, block) } /** @@ -47,32 +45,50 @@ class Notifications extends EventEmitter { * or undefined when the block is unwanted. * * @param {CID} cid + * @param {Object} options + * @param {AbortSignal} options.abortSignal * @returns {Promise} */ - wantBlock (cid) { - const cidStr = cid.toString('base58btc') - this._log(`wantBlock:${cidStr}`) + wantBlock (cid, options = {}) { + if (!cid) { + throw new Error('Not a valid cid') + } + + const blockEvt = blockEvent(cid) + const unwantEvt = unwantEvent(cid) + + this._log(`wantBlock:${cid}`) return new Promise((resolve, reject) => { - this._unwantListeners[cidStr] = () => { - this._log(`manual unwant: ${cidStr}`) - this._cleanup(cidStr) - resolve() + const onUnwant = () => { + this.removeListener(blockEvt, onBlock) + reject(new Error(`Block for ${cid} unwanted`)) } + const onBlock = (block) => { + this.removeListener(unwantEvt, onUnwant) + + if (!cid.multihash.equals(block.cid.multihash)) { + // wrong block + return reject(new Error(`Incorrect block received for ${cid}`)) + } else if (cid.version !== block.cid.version || cid.codec !== block.cid.codec) { + // right block but wrong version or codec + block = new Block(block.data, cid) + } - this._blockListeners[cidStr] = (block) => { - this._cleanup(cidStr) resolve(block) } - this.once( - unwantEvent(cidStr), - this._unwantListeners[cidStr] - ) - this.once( - blockEvent(cidStr), - this._blockListeners[cidStr] - ) + this.once(unwantEvt, onUnwant) + this.once(blockEvt, onBlock) + + if (options && options.signal) { + options.signal.addEventListener('abort', () => { + this.removeListener(blockEvt, onBlock) + this.removeListener(unwantEvt, onUnwant) + + reject(new Error(`Want for ${cid} aborted`)) + }) + } }) } @@ -83,34 +99,9 @@ class Notifications extends EventEmitter { * @returns {void} */ unwantBlock (cid) { - const str = `unwant:${cid.toString('base58btc')}` - this._log(str) - this.emit(str) - } - - /** - * Internal method to clean up once a block was received or unwanted. - * - * @private - * @param {string} cidStr - * @returns {void} - */ - _cleanup (cidStr) { - if (this._unwantListeners[cidStr]) { - this.removeListener( - unwantEvent(cidStr), - this._unwantListeners[cidStr] - ) - delete this._unwantListeners[cidStr] - } - - if (this._blockListeners[cidStr]) { - this.removeListener( - blockEvent(cidStr), - this._blockListeners[cidStr] - ) - delete this._blockListeners[cidStr] - } + const event = unwantEvent(cid) + this._log(event) + this.emit(event) } } diff --git a/src/want-manager/index.js b/src/want-manager/index.js index b68a4928..a0f1af37 100644 --- a/src/want-manager/index.js +++ b/src/want-manager/index.js @@ -82,8 +82,14 @@ module.exports = class WantManager { } // add all the cids to the wantlist - wantBlocks (cids) { + wantBlocks (cids, options = {}) { this._addEntries(cids, false) + + if (options && options.signal) { + options.signal.addEventListener('abort', () => { + this.cancelWants(cids) + }) + } } // remove blocks of all the given keys without respecting refcounts diff --git a/test/bitswap-mock-internals.js b/test/bitswap-mock-internals.js index 2e6c968c..048b2c7e 100644 --- a/test/bitswap-mock-internals.js +++ b/test/bitswap-mock-internals.js @@ -3,13 +3,16 @@ 'use strict' const range = require('lodash.range') -const chai = require('chai') -chai.use(require('dirty-chai')) -const expect = chai.expect +const { expect } = require('aegir/utils/chai') const PeerId = require('peer-id') -const all = require('async-iterator-all') +const all = require('it-all') +const drain = require('it-drain') const Message = require('../src/types/message') const Bitswap = require('../src') +const CID = require('cids') +const Block = require('ipld-block') +const AbortController = require('abort-controller') +const delay = require('delay') const createTempRepo = require('./utils/create-temp-repo-nodejs') const mockNetwork = require('./utils/mocks').mockNetwork @@ -20,6 +23,16 @@ const makeBlock = require('./utils/make-block') const makePeerId = require('./utils/make-peer-id') const orderedFinish = require('./utils/helpers').orderedFinish +function wantsBlock (cid, bitswap) { + for (const [key, value] of bitswap.getWantlist()) { // eslint-disable-line no-unused-vars + if (value.cid.toString() === cid.toString()) { + return true + } + } + + return false +} + describe('bitswap with mocks', function () { this.timeout(10 * 1000) @@ -143,7 +156,7 @@ describe('bitswap with mocks', function () { await bs._receiveMessage(other, msg) - const res = await Promise.all([b1.cid, b2.cid, b3.cid].map((cid) => repo.blocks.has(cid))) + const res = await Promise.all([b1.cid, b2.cid, b3.cid].map((cid) => repo.blocks.get(cid).then(() => true, () => false))) expect(res).to.eql([false, true, false]) const ledger = bs.ledgerForPeer(other) @@ -190,7 +203,7 @@ describe('bitswap with mocks', function () { const b2 = blocks[14] const b3 = blocks[13] - await repo.blocks.putMany([b1, b2, b3]) + await drain(repo.blocks.putMany([b1, b2, b3])) const bs = new Bitswap(mockLibp2pNode(), repo.blocks) const retrievedBlocks = await all(bs.getMany([b1.cid, b2.cid, b3.cid])) @@ -203,7 +216,7 @@ describe('bitswap with mocks', function () { const b2 = blocks[6] const b3 = blocks[7] - await repo.blocks.putMany([b1, b2, b3]) + await drain(repo.blocks.putMany([b1, b2, b3])) const bs = new Bitswap(mockLibp2pNode(), repo.blocks) const block1 = await bs.get(b1.cid) @@ -337,14 +350,94 @@ describe('bitswap with mocks', function () { bs.get(block.cid) ]) - bs.put(block, (err) => { - expect(err).to.not.exist() - }) + bs.put(block) const res = await resP expect(res[0]).to.eql(block) expect(res[1]).to.eql(block) }) + + it('gets the same block data with different CIDs', async () => { + const block = blocks[11] + + const bs = new Bitswap(mockLibp2pNode(), repo.blocks) + + expect(block).to.have.nested.property('cid.codec', 'dag-pb') + expect(block).to.have.nested.property('cid.version', 0) + + const cid1 = new CID(0, 'dag-pb', block.cid.multihash) + const cid2 = new CID(1, 'dag-pb', block.cid.multihash) + const cid3 = new CID(1, 'raw', block.cid.multihash) + + const resP = Promise.all([ + bs.get(cid1), + bs.get(cid2), + bs.get(cid3) + ]) + + bs.put(block) + + const res = await resP + + // blocks should have the requested CID but with the same data + expect(res[0]).to.deep.equal(new Block(block.data, cid1)) + expect(res[1]).to.deep.equal(new Block(block.data, cid2)) + expect(res[2]).to.deep.equal(new Block(block.data, cid3)) + }) + + it('removes a block from the wantlist when the request is aborted', async () => { + const block = await makeBlock() + const bs = new Bitswap(mockLibp2pNode(), repo.blocks) + const controller = new AbortController() + + const p = bs.get(block.cid, { + signal: controller.signal + }) + + await delay(1000) + + expect(wantsBlock(block.cid, bs)).to.be.true() + + controller.abort() + + await expect(p).to.eventually.rejectedWith(/aborted/) + + expect(wantsBlock(block.cid, bs)).to.be.false() + }) + + it('block should still be in the wantlist if only one request is aborted', async () => { + const block = await makeBlock() + const bs = new Bitswap(mockLibp2pNode(), repo.blocks) + const controller = new AbortController() + + // request twice + const p1 = bs.get(block.cid, { + signal: controller.signal + }) + const p2 = bs.get(block.cid) + + await delay(100) + + // should want the block + expect(wantsBlock(block.cid, bs)).to.be.true() + + // abort one request + controller.abort() + + await expect(p1).to.eventually.rejectedWith(/aborted/) + + // here comes the block + bs.put(block) + + // should still want it + expect(wantsBlock(block.cid, bs)).to.be.true() + + // second request should resolve with the block + await expect(p2).to.eventually.deep.equal(block) + + // should not be in the want list any more + expect(wantsBlock(block.cid, bs)).to.be.false() + }) }) describe('unwant', () => { @@ -360,8 +453,7 @@ describe('bitswap with mocks', function () { setTimeout(() => bs.unwant(b.cid), 1e3) - const res = await p - expect(res[1]).to.not.exist() + await expect(p).to.eventually.be.rejected() bs.stop() }) diff --git a/test/bitswap-stats.js b/test/bitswap-stats.js index a30bdcad..94167f93 100644 --- a/test/bitswap-stats.js +++ b/test/bitswap-stats.js @@ -1,9 +1,7 @@ /* eslint-env mocha */ 'use strict' -const chai = require('chai') -chai.use(require('dirty-chai')) -const expect = chai.expect +const { expect } = require('aegir/utils/chai') const pEvent = require('p-event') const Message = require('../src/types/message') const Bitswap = require('../src') diff --git a/test/bitswap.js b/test/bitswap.js index de4150db..2679e437 100644 --- a/test/bitswap.js +++ b/test/bitswap.js @@ -1,10 +1,10 @@ /* eslint-env mocha */ 'use strict' -const chai = require('chai') -chai.use(require('dirty-chai')) -const expect = chai.expect +const { expect } = require('aegir/utils/chai') const delay = require('delay') +const PeerId = require('peer-id') +const sinon = require('sinon') const Bitswap = require('../src') @@ -12,6 +12,7 @@ const createTempRepo = require('./utils/create-temp-repo-nodejs') const createLibp2pNode = require('./utils/create-libp2p-node') const makeBlock = require('./utils/make-block') const orderedFinish = require('./utils/helpers').orderedFinish +const Message = require('../src/types/message') // Creates a repo + libp2pNode + Bitswap with or without DHT async function createThing (dht) { @@ -64,12 +65,58 @@ describe('bitswap without DHT', function () { nodes[0].bitswap.unwant(block.cid) }, 200) - const b = await node0Get - expect(b).to.not.exist() + await expect(node0Get).to.eventually.be.rejectedWith(/unwanted/) finish(2) finish.assert() }) + + it('wants a block, receives a block, wants it again before the blockstore has it, receives it after the blockstore has it', async () => { + // the block we want + const block = await makeBlock() + + // id of a peer with the block we want + const peerId = await PeerId.create({ bits: 512 }) + + // incoming message with requested block from the other peer + const message = new Message(false) + message.addEntry(block.cid, 1, false) + message.addBlock(block) + + // slow blockstore + nodes[0].bitswap.blockstore = { + get: sinon.stub().withArgs(block.cid).throws({ code: 'ERR_NOT_FOUND' }), + has: sinon.stub().withArgs(block.cid).returns(false), + put: sinon.stub() + } + + // add the block to our want list + const wantBlockPromise1 = nodes[0].bitswap.get(block.cid) + + // oh look, a peer has sent it to us - this will trigger a `blockstore.put` which + // is an async operation so `self.blockstore.get(cid)` will still throw + // until the write has completed + await nodes[0].bitswap._receiveMessage(peerId, message) + + // block store did not have it + expect(nodes[0].bitswap.blockstore.get.calledWith(block.cid)).to.be.true() + + // another context wants the same block + const wantBlockPromise2 = nodes[0].bitswap.get(block.cid) + + // meanwhile the blockstore has written the block + nodes[0].bitswap.blockstore.has = sinon.stub().withArgs(block.cid).returns(true) + + // here it comes again + await nodes[0].bitswap._receiveMessage(peerId, message) + + // block store had it this time + expect(nodes[0].bitswap.blockstore.get.calledWith(block.cid)).to.be.true() + + // both requests should get the block + expect(await wantBlockPromise1).to.deep.equal(block) + expect(await wantBlockPromise2).to.deep.equal(block) + }) }) describe('bitswap with DHT', function () { @@ -101,8 +148,6 @@ describe('bitswap with DHT', function () { it('put a block in 2, get it in 0', async () => { const block = await makeBlock() - nodes[2].bitswap.put(block) - await nodes[2].bitswap.put(block) // Give put time to process diff --git a/test/decision-engine/decision-engine.js b/test/decision-engine/decision-engine.js index eec8dcf4..4c9b45ff 100644 --- a/test/decision-engine/decision-engine.js +++ b/test/decision-engine/decision-engine.js @@ -1,9 +1,7 @@ /* eslint-env mocha */ 'use strict' -const chai = require('chai') -chai.use(require('dirty-chai')) -const expect = chai.expect +const { expect } = require('aegir/utils/chai') const PeerId = require('peer-id') const range = require('lodash.range') const difference = require('lodash.difference') @@ -12,6 +10,7 @@ const Block = require('ipld-block') const CID = require('cids') const multihashing = require('multihashing-async') const { Buffer } = require('buffer') +const drain = require('it-drain') const Message = require('../../src/types/message') const DecisionEngine = require('../../src/decision-engine') @@ -131,7 +130,7 @@ describe('Engine', () => { async function peerSendsBlocks (dEngine, repo, blocks, peer) { // Bitswap puts blocks into the blockstore then passes the blocks to the // Decision Engine - await repo.blocks.putMany(blocks) + await drain(repo.blocks.putMany(blocks)) await dEngine.receivedBlocks(blocks) } @@ -192,7 +191,7 @@ describe('Engine', () => { } const repo = await createTempRepo() - await repo.blocks.putMany(blocks) + await drain(repo.blocks.putMany(blocks)) let network let rcvdBlockCount = 0 @@ -276,7 +275,7 @@ describe('Engine', () => { // Simulate receiving message - put blocks into the blockstore then pass // them to the Decision Engine const rcvdBlocks = [blocks[0], blocks[2]] - await repo.blocks.putMany(rcvdBlocks) + await drain(repo.blocks.putMany(rcvdBlocks)) await dEngine.receivedBlocks(rcvdBlocks) // Wait till the engine sends a message @@ -329,7 +328,7 @@ describe('Engine', () => { // Simulate receiving message with blocks - put blocks into the blockstore // then pass them to the Decision Engine - await repo.blocks.putMany(blocks) + await drain(repo.blocks.putMany(blocks)) await dEngine.receivedBlocks(blocks) const [toPeer2, msg2] = await receiveMessage() @@ -625,7 +624,7 @@ describe('Engine', () => { }) const repo = await createTempRepo() - await repo.blocks.putMany(blocks) + await drain(repo.blocks.putMany(blocks)) const dEngine = new DecisionEngine(id, repo.blocks, network, null, { maxSizeReplaceHasWithBlock: 0 }) dEngine._scheduleProcessTasks = () => {} dEngine.start() diff --git a/test/decision-engine/ledger.spec.js b/test/decision-engine/ledger.spec.js index e06b9550..ec517907 100644 --- a/test/decision-engine/ledger.spec.js +++ b/test/decision-engine/ledger.spec.js @@ -1,9 +1,7 @@ /* eslint-env mocha */ 'use strict' -const chai = require('chai') -chai.use(require('dirty-chai')) -const expect = chai.expect +const { expect } = require('aegir/utils/chai') const PeerId = require('peer-id') const Ledger = require('../../src/decision-engine/ledger') diff --git a/test/decision-engine/req-queue.spec.js b/test/decision-engine/req-queue.spec.js index b185b46d..164dde3a 100644 --- a/test/decision-engine/req-queue.spec.js +++ b/test/decision-engine/req-queue.spec.js @@ -1,9 +1,7 @@ /* eslint-env mocha */ 'use strict' -const chai = require('chai') -chai.use(require('dirty-chai')) -const expect = chai.expect +const { expect } = require('aegir/utils/chai') const PeerId = require('peer-id') const RequestQueue = require('../../src/decision-engine/req-queue') diff --git a/test/decision-engine/task-merger.spec.js b/test/decision-engine/task-merger.spec.js index 3a9d1cd6..19268ec4 100644 --- a/test/decision-engine/task-merger.spec.js +++ b/test/decision-engine/task-merger.spec.js @@ -1,9 +1,7 @@ /* eslint-env mocha */ 'use strict' -const chai = require('chai') -chai.use(require('dirty-chai')) -const expect = chai.expect +const { expect } = require('aegir/utils/chai') const PeerId = require('peer-id') const RequestQueue = require('../../src/decision-engine/req-queue') diff --git a/test/network/gen-bitswap-network.node.js b/test/network/gen-bitswap-network.node.js index 39e596d9..4fb1087d 100644 --- a/test/network/gen-bitswap-network.node.js +++ b/test/network/gen-bitswap-network.node.js @@ -2,9 +2,7 @@ /* eslint-disable no-console */ 'use strict' -const chai = require('chai') -chai.use(require('dirty-chai')) -const expect = chai.expect +const { expect } = require('aegir/utils/chai') const Block = require('ipld-block') const { Buffer } = require('buffer') const crypto = require('crypto') diff --git a/test/network/network.node.js b/test/network/network.node.js index 38517668..acad3053 100644 --- a/test/network/network.node.js +++ b/test/network/network.node.js @@ -1,9 +1,7 @@ /* eslint-env mocha */ 'use strict' -const chai = require('chai') -chai.use(require('dirty-chai')) -const expect = chai.expect +const { expect, assert } = require('aegir/utils/chai') const lp = require('it-length-prefixed') const pipe = require('it-pipe') const pDefer = require('p-defer') @@ -79,7 +77,7 @@ describe('network', () => { it('connectTo fail', async () => { try { await networkA.connectTo(p2pB.peerInfo.id) - chai.assert.fail() + assert.fail() } catch (err) { expect(err).to.exist() } @@ -199,4 +197,28 @@ describe('network', () => { await networkA.sendMessage(p2pC.peerInfo.id, msg) await deferred.promise }) + + it('dials to peer using Bitswap 1.2.0', async () => { + networkA = new Network(p2pA, bitswapMockA) + + // only supports 1.2.0 + networkB = new Network(p2pB, bitswapMockB) + networkB.protocols = ['/ipfs/bitswap/1.2.0'] + + networkA.start() + networkB.start() + + // FIXME: have to already be connected as sendMessage only accepts a peer id, not a PeerInfo + await p2pA.dial(p2pB.peerInfo) + + const deferred = pDefer() + + bitswapMockB._receiveMessage = () => { + deferred.resolve() + } + + await networkA.sendMessage(p2pB.peerInfo.id, new Message(true)) + + return deferred + }) }) diff --git a/test/notifications.spec.js b/test/notifications.spec.js index 23a70059..eef19818 100644 --- a/test/notifications.spec.js +++ b/test/notifications.spec.js @@ -1,12 +1,10 @@ /* eslint-env mocha */ 'use strict' -const chai = require('chai') -chai.use(require('dirty-chai')) - -const expect = chai.expect +const { expect } = require('aegir/utils/chai') const CID = require('cids') const Block = require('ipld-block') +const AbortController = require('abort-controller') const Notifications = require('../src/notifications') @@ -25,7 +23,7 @@ describe('Notifications', () => { it('hasBlock', (done) => { const n = new Notifications(peerId) const b = blocks[0] - n.once(`block:${b.cid}`, (block) => { + n.once(`block:${b.cid.multihash.toString('base64')}`, (block) => { expect(b).to.eql(block) done() }) @@ -39,6 +37,10 @@ describe('Notifications', () => { const p = n.wantBlock(b.cid) + // check that listeners have been set up + expect(n.listenerCount(`block:${b.cid.multihash.toString('base64')}`)).to.equal(1) + expect(n.listenerCount(`unwant:${b.cid.multihash.toString('base64')}`)).to.equal(1) + n.hasBlock(b) const block = await p @@ -46,8 +48,8 @@ describe('Notifications', () => { expect(b).to.eql(block) // check that internal cleanup works as expected - expect(Object.keys(n._blockListeners)).to.have.length(0) - expect(Object.keys(n._unwantListeners)).to.have.length(0) + expect(n.listenerCount(`block:${b.cid.multihash.toString('base64')}`)).to.equal(0) + expect(n.listenerCount(`unwant:${b.cid.multihash.toString('base64')}`)).to.equal(0) }) it('unwant block', async () => { @@ -58,9 +60,22 @@ describe('Notifications', () => { n.unwantBlock(b.cid) - const block = await p + await expect(p).to.eventually.be.rejectedWith(/unwanted/) + }) + + it('abort block want', async () => { + const n = new Notifications() + const b = blocks[0] + + const controller = new AbortController() - expect(block).to.be.undefined() + const p = n.wantBlock(b.cid, { + signal: controller.signal + }) + + controller.abort() + + await expect(p).to.eventually.be.rejectedWith(/aborted/) }) }) @@ -73,15 +88,17 @@ describe('Notifications', () => { const cid2 = new CID(b.cid.toString('base32')) const p = n.wantBlock(cid2) - n.hasBlock(b) + // check that listeners have been set up + expect(n.listenerCount(`block:${cid2.multihash.toString('base64')}`)).to.equal(1) + expect(n.listenerCount(`unwant:${cid2.multihash.toString('base64')}`)).to.equal(1) - const block = await p + n.hasBlock(b) - expect(b).to.eql(block) + await expect(p).to.eventually.be.eql(b) // check that internal cleanup works as expected - expect(Object.keys(n._blockListeners)).to.have.length(0) - expect(Object.keys(n._unwantListeners)).to.have.length(0) + expect(n.listenerCount(`block:${cid2.multihash.toString('base64')}`)).to.equal(0) + expect(n.listenerCount(`unwant:${cid2.multihash.toString('base64')}`)).to.equal(0) }) it('unwant block', async () => { @@ -94,9 +111,7 @@ describe('Notifications', () => { n.unwantBlock(b.cid) - const block = await p - - expect(block).to.be.undefined() + await expect(p).to.eventually.be.rejectedWith(/unwanted/) }) }) }) diff --git a/test/types/message.spec.js b/test/types/message.spec.js index 139c0150..bb31f615 100644 --- a/test/types/message.spec.js +++ b/test/types/message.spec.js @@ -1,9 +1,7 @@ /* eslint-env mocha */ 'use strict' -const chai = require('chai') -chai.use(require('dirty-chai')) -const expect = chai.expect +const { expect } = require('aegir/utils/chai') const CID = require('cids') const { Buffer } = require('buffer') const loadFixture = require('aegir/fixtures') diff --git a/test/types/wantlist.spec.js b/test/types/wantlist.spec.js index 2249e04b..daae7f3b 100644 --- a/test/types/wantlist.spec.js +++ b/test/types/wantlist.spec.js @@ -1,9 +1,7 @@ /* eslint-env mocha */ 'use strict' -const chai = require('chai') -chai.use(require('dirty-chai')) -const expect = chai.expect +const { expect } = require('aegir/utils/chai') const CID = require('cids') const multihashing = require('multihashing-async') diff --git a/test/utils.spec.js b/test/utils.spec.js index e8d5aac8..d9394b03 100644 --- a/test/utils.spec.js +++ b/test/utils.spec.js @@ -1,14 +1,13 @@ /* eslint-env mocha */ 'use strict' -const chai = require('chai') + +const { expect } = require('aegir/utils/chai') const CID = require('cids') const Block = require('ipld-block') const { Buffer } = require('buffer') const multihashing = require('multihashing-async') const BitswapMessageEntry = require('../src/types/message/entry') -chai.use(require('dirty-chai')) -const expect = chai.expect const { groupBy, uniqWith, pullAllWith, includesWith, sortBy, isMapEqual } = require('../src/utils') const SortedMap = require('../src/utils/sorted-map') diff --git a/test/utils/distribution-test.js b/test/utils/distribution-test.js index 52aacb14..d1e68d71 100644 --- a/test/utils/distribution-test.js +++ b/test/utils/distribution-test.js @@ -1,9 +1,7 @@ 'use strict' const range = require('lodash.range') -const chai = require('chai') -chai.use(require('dirty-chai')) -const expect = chai.expect +const { expect } = require('aegir/utils/chai') const createBitswap = require('./create-bitswap') const makeBlock = require('./make-block') diff --git a/test/utils/helpers.js b/test/utils/helpers.js index 042a7e9a..a9aad9a1 100644 --- a/test/utils/helpers.js +++ b/test/utils/helpers.js @@ -1,7 +1,7 @@ 'use strict' const range = require('lodash.range') -const expect = require('chai').expect +const { expect } = require('aegir/utils/chai') exports.orderedFinish = (n) => { const r = range(1, n + 1) diff --git a/test/utils/make-block.js b/test/utils/make-block.js index bb3bb94b..8848fc8b 100644 --- a/test/utils/make-block.js +++ b/test/utils/make-block.js @@ -6,7 +6,7 @@ const Block = require('ipld-block') const randomBytes = require('iso-random-stream/src/random') const range = require('lodash.range') const { Buffer } = require('buffer') -const uuid = require('uuid/v4') +const { v4: uuid } = require('uuid') module.exports = async (count, size) => { const blocks = await Promise.all( diff --git a/test/utils/store-has-blocks.js b/test/utils/store-has-blocks.js index 5aa12fdf..6a1d0e0e 100644 --- a/test/utils/store-has-blocks.js +++ b/test/utils/store-has-blocks.js @@ -1,6 +1,6 @@ 'use strict' -const expect = require('chai').expect +const { expect } = require('aegir/utils/chai') async function storeHasBlocks (message, store) { for (const b of message.blocks.values()) { diff --git a/test/wantmanager/index.spec.js b/test/wantmanager/index.spec.js index 55cab2f9..d4d1f782 100644 --- a/test/wantmanager/index.spec.js +++ b/test/wantmanager/index.spec.js @@ -1,9 +1,7 @@ /* eslint-env mocha */ 'use strict' -const chai = require('chai') -chai.use(require('dirty-chai')) -const expect = chai.expect +const { expect } = require('aegir/utils/chai') const cs = require('../../src/constants') const Message = require('../../src/types/message') diff --git a/test/wantmanager/msg-queue.spec.js b/test/wantmanager/msg-queue.spec.js index e68ece2c..7f1d22c8 100644 --- a/test/wantmanager/msg-queue.spec.js +++ b/test/wantmanager/msg-queue.spec.js @@ -1,9 +1,7 @@ /* eslint-env mocha */ 'use strict' -const chai = require('chai') -chai.use(require('dirty-chai')) -const expect = chai.expect +const { expect } = require('aegir/utils/chai') const PeerId = require('peer-id') const CID = require('cids') const multihashing = require('multihashing-async')