Skip to content

Commit

Permalink
feat: add support for api dht/query endpoint (#37)
Browse files Browse the repository at this point in the history
* feat: add support for api dht/query endpoint

* fix: correct the logs

* fix: flush out tests and correct response issue

* chore: apply suggestions from code review

Co-authored-by: Vasco Santos <vasco.santos@moxy.studio>

* chore: remove unused dep

Co-authored-by: Vasco Santos <vasco.santos@moxy.studio>
  • Loading branch information
jacobheun and vasco-santos authored Aug 14, 2020
1 parent b90d1ac commit 6fa569c
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 2 deletions.
5 changes: 4 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@
"go-ipfs": "0.6.0",
"ipfs-http-client": "^45.0.0",
"ipfs-utils": "^2.2.0",
"ipfsd-ctl": "^5.0.0"
"ipfsd-ctl": "^5.0.0",
"it-all": "^1.0.2"
},
"peerDependencies": {
"ipfs-http-client": "^44.0.0"
},
"dependencies": {
"cids": "^1.0.0",
"debug": "^4.1.1",
"p-defer": "^3.0.0",
"p-queue": "^6.3.0",
"peer-id": "^0.14.0"
},
Expand Down
67 changes: 66 additions & 1 deletion src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

const PeerId = require('peer-id')
const createFindPeer = require('ipfs-http-client/src/dht/find-peer')
const createQuery = require('ipfs-http-client/src/dht/query')
const CID = require('cids')
const { default: PQueue } = require('p-queue')
const defer = require('p-defer')
const debug = require('debug')

const log = debug('libp2p-delegated-peer-routing')
Expand All @@ -19,7 +22,10 @@ const CONCURRENT_HTTP_REQUESTS = 4
class DelegatedPeerRouting {
constructor (api) {
this.api = Object.assign({}, DEFAULT_IPFS_API, api)
this.dht = { findPeer: createFindPeer(this.api) }
this.dht = {
findPeer: createFindPeer(this.api),
getClosestPeers: createQuery(this.api)
}

// limit concurrency to avoid request flood in web browser
// https://github.com/libp2p/js-libp2p-delegated-content-routing/issues/12
Expand Down Expand Up @@ -68,6 +74,65 @@ class DelegatedPeerRouting {
log('findPeer finished: ' + id)
}
}

/**
* Attempt to find the closest peers on the network to the given key
* @param {Uint8Array} key A CID like key
* @param {object} [options]
* @param {number} [options.timeout=30e3] How long the query can take.
* @returns {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>}
*/
async * getClosestPeers (key, options = {}) {
key = new CID(key)
const keyStr = key.toString()

log('getClosestPeers starts:', keyStr)
options.timeout = options.timeout || DEFAULT_TIMEOUT

const onStart = defer()
const onFinish = defer()

this._httpQueue.add(() => {
onStart.resolve()
return onFinish.promise
})

try {
await onStart.promise

const peers = new Map()

for await (const result of this.dht.getClosestPeers(keyStr, {
timeout: options.timeout
})) {
switch (result.type) {
case 1: // Found Closer
// Track the addresses, so we can yield them when done
result.responses.forEach(response => {
peers.set(response.id, {
id: PeerId.createFromCID(response.id),
multiaddrs: response.addrs
})
})
break
case 2: // Final Peer
yield peers.get(result.id.string) || {
id: PeerId.createFromCID(result.id),
multiaddrs: []
}
break
default:
log('getClosestPeers unhandled response', result)
}
}
} catch (err) {
log.error('getClosestPeers errored:', err)
throw err
} finally {
onFinish.resolve()
log('getClosestPeers finished:', keyStr)
}
}
}

module.exports = DelegatedPeerRouting
51 changes: 51 additions & 0 deletions test/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const { expect } = require('aegir/utils/chai')
const { createFactory } = require('ipfsd-ctl')
const PeerID = require('peer-id')
const { isNode } = require('ipfs-utils/src/env')
const concat = require('it-all')

const DelegatedPeerRouting = require('../src')
const factory = createFactory({
Expand Down Expand Up @@ -166,4 +167,54 @@ describe('DelegatedPeerRouting', function () {
expect(peer).to.not.exist()
})
})

describe('query', () => {
it('should be able to query for the closest peers', async () => {
const opts = delegatedNode.apiAddr.toOptions()

const router = new DelegatedPeerRouting({
protocol: 'http',
port: opts.port,
host: opts.host
})

const nodeId = await delegatedNode.api.id()
const delegatePeerId = PeerID.createFromCID(nodeId.id)

const key = PeerID.createFromB58String(peerIdToFind.id).id
const results = await concat(router.getClosestPeers(key))

// we should be closest to the 2 other peers
expect(results.length).to.equal(2)
results.forEach(result => {
// shouldnt be the delegate
expect(delegatePeerId.equals(result.id)).to.equal(false)
expect(result.multiaddrs).to.be.an('array')
})
})

it('should find closest peers even if the peer doesnt exist', async () => {
const opts = delegatedNode.apiAddr.toOptions()

const router = new DelegatedPeerRouting({
protocol: 'http',
port: opts.port,
host: opts.host
})

const nodeId = await delegatedNode.api.id()
const delegatePeerId = PeerID.createFromCID(nodeId.id)

const peerId = await PeerID.create({ keyType: 'ed25519' })
const results = await concat(router.getClosestPeers(peerId.id))

// we should be closest to the 2 other peers
expect(results.length).to.equal(2)
results.forEach(result => {
// shouldnt be the delegate
expect(delegatePeerId.equals(result.id)).to.equal(false)
expect(result.multiaddrs).to.be.an('array')
})
})
})
})

0 comments on commit 6fa569c

Please sign in to comment.