diff --git a/package.json b/package.json index 11e44c2..5ab79c4 100644 --- a/package.json +++ b/package.json @@ -22,13 +22,16 @@ "go-ipfs": "0.6.0", "ipfs-http-client": "^45.0.0", "ipfs-utils": "^2.2.0", - "ipfsd-ctl": "^5.0.0" + "ipfsd-ctl": "^5.0.0", + "it-all": "^1.0.2" }, "peerDependencies": { "ipfs-http-client": "^44.0.0" }, "dependencies": { + "cids": "^1.0.0", "debug": "^4.1.1", + "p-defer": "^3.0.0", "p-queue": "^6.3.0", "peer-id": "^0.14.0" }, diff --git a/src/index.js b/src/index.js index 5c05507..bcab850 100644 --- a/src/index.js +++ b/src/index.js @@ -2,7 +2,10 @@ const PeerId = require('peer-id') const createFindPeer = require('ipfs-http-client/src/dht/find-peer') +const createQuery = require('ipfs-http-client/src/dht/query') +const CID = require('cids') const { default: PQueue } = require('p-queue') +const defer = require('p-defer') const debug = require('debug') const log = debug('libp2p-delegated-peer-routing') @@ -19,7 +22,10 @@ const CONCURRENT_HTTP_REQUESTS = 4 class DelegatedPeerRouting { constructor (api) { this.api = Object.assign({}, DEFAULT_IPFS_API, api) - this.dht = { findPeer: createFindPeer(this.api) } + this.dht = { + findPeer: createFindPeer(this.api), + getClosestPeers: createQuery(this.api) + } // limit concurrency to avoid request flood in web browser // https://github.com/libp2p/js-libp2p-delegated-content-routing/issues/12 @@ -68,6 +74,65 @@ class DelegatedPeerRouting { log('findPeer finished: ' + id) } } + + /** + * Attempt to find the closest peers on the network to the given key + * @param {Uint8Array} key A CID like key + * @param {object} [options] + * @param {number} [options.timeout=30e3] How long the query can take. + * @returns {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>} + */ + async * getClosestPeers (key, options = {}) { + key = new CID(key) + const keyStr = key.toString() + + log('getClosestPeers starts:', keyStr) + options.timeout = options.timeout || DEFAULT_TIMEOUT + + const onStart = defer() + const onFinish = defer() + + this._httpQueue.add(() => { + onStart.resolve() + return onFinish.promise + }) + + try { + await onStart.promise + + const peers = new Map() + + for await (const result of this.dht.getClosestPeers(keyStr, { + timeout: options.timeout + })) { + switch (result.type) { + case 1: // Found Closer + // Track the addresses, so we can yield them when done + result.responses.forEach(response => { + peers.set(response.id, { + id: PeerId.createFromCID(response.id), + multiaddrs: response.addrs + }) + }) + break + case 2: // Final Peer + yield peers.get(result.id.string) || { + id: PeerId.createFromCID(result.id), + multiaddrs: [] + } + break + default: + log('getClosestPeers unhandled response', result) + } + } + } catch (err) { + log.error('getClosestPeers errored:', err) + throw err + } finally { + onFinish.resolve() + log('getClosestPeers finished:', keyStr) + } + } } module.exports = DelegatedPeerRouting diff --git a/test/index.spec.js b/test/index.spec.js index 8ce439b..63ac362 100644 --- a/test/index.spec.js +++ b/test/index.spec.js @@ -5,6 +5,7 @@ const { expect } = require('aegir/utils/chai') const { createFactory } = require('ipfsd-ctl') const PeerID = require('peer-id') const { isNode } = require('ipfs-utils/src/env') +const concat = require('it-all') const DelegatedPeerRouting = require('../src') const factory = createFactory({ @@ -166,4 +167,54 @@ describe('DelegatedPeerRouting', function () { expect(peer).to.not.exist() }) }) + + describe('query', () => { + it('should be able to query for the closest peers', async () => { + const opts = delegatedNode.apiAddr.toOptions() + + const router = new DelegatedPeerRouting({ + protocol: 'http', + port: opts.port, + host: opts.host + }) + + const nodeId = await delegatedNode.api.id() + const delegatePeerId = PeerID.createFromCID(nodeId.id) + + const key = PeerID.createFromB58String(peerIdToFind.id).id + const results = await concat(router.getClosestPeers(key)) + + // we should be closest to the 2 other peers + expect(results.length).to.equal(2) + results.forEach(result => { + // shouldnt be the delegate + expect(delegatePeerId.equals(result.id)).to.equal(false) + expect(result.multiaddrs).to.be.an('array') + }) + }) + + it('should find closest peers even if the peer doesnt exist', async () => { + const opts = delegatedNode.apiAddr.toOptions() + + const router = new DelegatedPeerRouting({ + protocol: 'http', + port: opts.port, + host: opts.host + }) + + const nodeId = await delegatedNode.api.id() + const delegatePeerId = PeerID.createFromCID(nodeId.id) + + const peerId = await PeerID.create({ keyType: 'ed25519' }) + const results = await concat(router.getClosestPeers(peerId.id)) + + // we should be closest to the 2 other peers + expect(results.length).to.equal(2) + results.forEach(result => { + // shouldnt be the delegate + expect(delegatePeerId.equals(result.id)).to.equal(false) + expect(result.multiaddrs).to.be.an('array') + }) + }) + }) })