Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: limit concurrent HTTP requests #16

Merged
merged 3 commits into from
Jul 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,5 @@ typings/
.env

yarn.lock
package-lock.json
package-lock.json
dist/
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
"peer-id": "~0.13.1"
},
"dependencies": {
"debug": "^4.1.1",
"ipfs-http-client": "^33.1.0",
"multiaddr": "^6.1.0"
"multiaddr": "^6.1.0",
"p-queue": "^6.1.0"
},
"contributors": [
"Alan Shaw <alan.shaw@protocol.ai>",
Expand Down
72 changes: 31 additions & 41 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,28 +1,22 @@
'use strict'

const dht = require('ipfs-http-client/src/dht')
const swarm = require('ipfs-http-client/src/swarm')
const refs = require('ipfs-http-client/src/files-regular/refs')
const defaultConfig = require('ipfs-http-client/src/utils/default-config')
const multiaddr = require('multiaddr')
const { default: PQueue } = require('p-queue')
const debug = require('debug')

const log = debug('libp2p-delegated-content-routing')
log.error = debug('libp2p-delegated-content-routing:error')

const DEFAULT_MAX_TIMEOUT = 30e3 // 30 second default
const DEFAULT_IPFS_API = {
protocol: 'https',
port: 443,
host: 'ipfs.io'
host: 'node0.delegate.ipfs.io'
}

const DEFAULT_BOOSTRAP_NODES = [
'/ipfs/QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd',
'/ipfs/QmSoLMeWqB7YGVLJN3pNLQpmmEk35v6wYtsMGLzSr5QBU3',
'/ipfs/QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM',
'/ipfs/QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu',
'/ipfs/QmSoLueR4xBeUbY9WZ9xGUUxunbKWcrNFTDAadQJmocnWm',
'/ipfs/QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64',
'/ipfs/QmZMxNdpMkewiVZLMRxaNxUeZpDUb34pWjZ1kZvsd16Zic',
'/ipfs/Qmbut9Ywz9YEDrz8ySBSgWyJk41Uvm2QJPhwDJzJyGFsD6'
]
const CONCURRENT_HTTP_REQUESTS = 4

/**
* An implementation of content routing, using a delegated peer.
Expand All @@ -33,20 +27,27 @@ class DelegatedContentRouting {
*
* @param {PeerID} peerId - the id of the node that is using this routing.
* @param {object} [api] - (Optional) the api endpoint of the delegated node to use.
* @param {Array<Multiaddr>} [bootstrappers] - (Optional) list of bootstrapper nodes we are connected to.
*/
constructor (peerId, api, bootstrappers) {
constructor (peerId, api) {
if (peerId == null) {
throw new Error('missing self peerId')
}

this.api = Object.assign({}, defaultConfig(), DEFAULT_IPFS_API, api)
this.dht = dht(this.api)
this.swarm = swarm(this.api)
this.refs = refs(this.api)

this.peerId = peerId
this.bootstrappers = bootstrappers || DEFAULT_BOOSTRAP_NODES.map((addr) => multiaddr(addr))

// limit concurrency to avoid request flood in web browser
// https://github.com/libp2p/js-libp2p-delegated-content-routing/issues/12
const concurrency = { concurrency: CONCURRENT_HTTP_REQUESTS }
this._httpQueue = new PQueue(concurrency)
// sometimes refs requests take long time, they need separate queue
// to not suffocate regular bussiness
this._httpQueueRefs = new PQueue(Object.assign({}, concurrency, {
concurrency: 2
}))
log(`enabled DelegatedContentRouting via ${this.api.protocol}://${this.api.host}:${this.api.port}`)
}

/**
Expand All @@ -60,49 +61,38 @@ class DelegatedContentRouting {
* @returns {AsyncIterable<PeerInfo>}
*/
async * findProviders (key, options = {}) {
const keyString = key.toBaseEncodedString()
log('findProviders starts: ' + keyString)
options.maxTimeout = options.maxTimeout || DEFAULT_MAX_TIMEOUT

const results = await this.dht.findProvs(key, {
const results = await this._httpQueue.add(() => this.dht.findProvs(key, {
timeout: `${options.maxTimeout}ms` // The api requires specification of the time unit (s/ms)
})
}))

for (let i = 0; i < results.length; i++) {
yield results[i]
}
log('findProviders finished: ' + keyString)
}

/**
* Announce to the network that the delegated node can provide the given key.
*
* Currently this uses the following hack
* - call swarm.connect on the delegated node to us, to ensure we are connected
* - call refs --recursive on the delegated node, so it fetches the content
* - delegate is one of bootstrap nodes, so we are always connected to it
* - call refs on the delegated node, so it fetches the content
*
* @param {CID} key
* @param {function(Error)} callback
* @returns {Promise<void>}
*/
async provide (key) {
const addrs = this.bootstrappers.map((addr) => {
return addr.encapsulate(`/p2p-circuit/ipfs/${this.peerId.toB58String()}`)
})

const results = await Promise.all(
addrs.map((addr) => {
return this.swarm.connect(addr.toString()).catch(() => {})
})
const keyString = key.toBaseEncodedString()
log('provide starts: ' + keyString)
await this._httpQueueRefs.add(() =>
this.refs(keyString, { recursive: false })
)

// only some need to succeed
const success = results.filter((res) => res && res.error == null)

if (success.length === 0) {
throw new Error('unable to swarm.connect using p2p-circuit')
}

this.refs(key.toBaseEncodedString(), {
recursive: true
})
log('provide finished: ' + keyString)
}
}

Expand Down
4 changes: 2 additions & 2 deletions test/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ describe('DelegatedContentRouting', function () {
expect(() => new DelegatedContentRouting()).to.throw()
})

it('should default to https://ipfs.io as the delegate', () => {
it('should default to https://node0.delegate.ipfs.io as the delegate', () => {
const router = new DelegatedContentRouting(selfId)

expect(router.api).to.include({
'api-path': '/api/v0/',
protocol: 'https',
port: 443,
host: 'ipfs.io'
host: 'node0.delegate.ipfs.io'
})
})

Expand Down