Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
feat: find providers and closest peers return async iterable (#157)
Browse files Browse the repository at this point in the history
BREAKING CHANGE: API for find providers and closest peers return async iterable instead of an array of PeerInfo
  • Loading branch information
vasco-santos authored Nov 30, 2019
1 parent d6f645e commit f0e6800
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 31 deletions.
20 changes: 10 additions & 10 deletions src/content-fetching/index.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
'use strict'

const errcode = require('err-code')

const pFilter = require('p-filter')
const pTimeout = require('p-timeout')

const libp2pRecord = require('libp2p-record')
Expand Down Expand Up @@ -108,22 +106,24 @@ module.exports = (dht) => {
await putLocal(key, record)

// put record to the closest peers
const peers = await dht.getClosestPeers(key, { shallow: true })
const results = await pFilter(peers, async (peer) => {
let counterAll = 0
let counterSuccess = 0

for await (const peer of dht.getClosestPeers(key, { shallow: true })) {
try {
counterAll += 1
await dht._putValueToPeer(key, record, peer)
return true
counterSuccess += 1
} catch (err) {
dht._log.error('Failed to put to peer (%b): %s', peer.id, err)
return false
}
})
}

// verify if we were able to put to enough peers
const minPeers = options.minPeers || peers.length // Ensure we have a default `minPeers`
const minPeers = options.minPeers || counterAll // Ensure we have a default `minPeers`

if (minPeers > results.length) {
const error = errcode(new Error(`Failed to put value to enough peers: ${results.length}/${minPeers}`), 'ERR_NOT_ENOUGH_PUT_PEERS')
if (minPeers > counterSuccess) {
const error = errcode(new Error(`Failed to put value to enough peers: ${counterSuccess}/${minPeers}`), 'ERR_NOT_ENOUGH_PUT_PEERS')
dht._log.error(error)
throw error
}
Expand Down
15 changes: 8 additions & 7 deletions src/content-routing/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,18 @@ module.exports = (dht) => {
// Add peer as provider
await dht.providers.addProvider(key, dht.peerInfo.id)

// Notify closest peers
const peers = await dht.getClosestPeers(key.buffer)
const msg = new Message(Message.TYPES.ADD_PROVIDER, key.buffer, 0)
msg.providerPeers = [dht.peerInfo]

await Promise.all(peers.map(async (peer) => {
// Notify closest peers
for await (const peer of dht.getClosestPeers(key.buffer)) {
dht._log('putProvider %s to %s', key.toBaseEncodedString(), peer.toB58String())
try {
await dht.network.sendMessage(peer, msg)
} catch (err) {
errors.push(err)
}
}))
}

if (errors.length) {
// TODO:
Expand All @@ -69,9 +68,9 @@ module.exports = (dht) => {
* @param {Object} options - findProviders options
* @param {number} options.timeout - how long the query should maximally run, in milliseconds (default: 60000)
* @param {number} options.maxNumProviders - maximum number of providers to find
* @returns {Promise<PeerInfo>}
* @returns {AsyncIterable<PeerInfo>}
*/
async findProviders (key, options = {}) {
async * findProviders (key, options = {}) {
const providerTimeout = options.timeout || c.minute
const n = options.maxNumProviders || c.K

Expand Down Expand Up @@ -149,7 +148,9 @@ module.exports = (dht) => {
throw errcode(new Error('no providers found'), 'ERR_NOT_FOUND')
}

return out.toArray()
for (const pInfo of out.toArray()) {
yield pInfo
}
}
}
}
16 changes: 10 additions & 6 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,12 @@ class KadDHT extends EventEmitter {
* @param {Object} options - findProviders options
* @param {number} options.timeout - how long the query should maximally run, in milliseconds (default: 60000)
* @param {number} options.maxNumProviders - maximum number of providers to find
* @returns {Promise<PeerInfo>}
* @returns {AsyncIterable<PeerInfo>}
*/
async findProviders (key, options = {}) { // eslint-disable-line require-await
return this.contentRouting.findProviders(key, options)
async * findProviders (key, options = {}) {
for await (const pInfo of this.contentRouting.findProviders(key, options)) {
yield pInfo
}
}

// ----------- Peer Routing -----------
Expand All @@ -282,10 +284,12 @@ class KadDHT extends EventEmitter {
* @param {Buffer} key
* @param {Object} [options]
* @param {boolean} [options.shallow] shallow query (default: false)
* @returns {Promise<Array<PeerId>>}
* @returns {AsyncIterable<PeerId>}
*/
async getClosestPeers (key, options = { shallow: false }) { // eslint-disable-line require-await
return this.peerRouting.getClosestPeers(key, options)
async * getClosestPeers (key, options = { shallow: false }) {
for await (const pId of this.peerRouting.getClosestPeers(key, options)) {
yield pId
}
}

/**
Expand Down
9 changes: 6 additions & 3 deletions src/peer-routing/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,9 @@ module.exports = (dht) => {
* @param {Buffer} key
* @param {Object} [options]
* @param {boolean} [options.shallow] shallow query (default: false)
* @returns {Promise<Array<PeerId>>}
* @returns {AsyncIterable<PeerId>}
*/
async getClosestPeers (key, options = { shallow: false }) {
async * getClosestPeers (key, options = { shallow: false }) {
dht._log('getClosestPeers to %b', key)

const id = await utils.convertBuffer(key)
Expand All @@ -213,7 +213,10 @@ module.exports = (dht) => {
}

const sorted = await utils.sortClosestPeers(Array.from(res.finalSet), id)
return sorted.slice(0, dht.kBucketSize)

for (const pId of sorted.slice(0, dht.kBucketSize)) {
yield pId
}
},

/**
Expand Down
11 changes: 6 additions & 5 deletions test/kad-dht.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const sinon = require('sinon')
const { Record } = require('libp2p-record')
const errcode = require('err-code')

const all = require('async-iterator-all')
const pMapSeries = require('p-map-series')
const pEachSeries = require('p-each-series')
const delay = require('delay')
Expand Down Expand Up @@ -446,7 +447,7 @@ describe('KadDHT', () => {
await pEachSeries(values, async (v) => {
n = (n + 1) % 3

const provs = await dhts[n].findProviders(v.cid, { timeout: 5000 })
const provs = await all(dhts[n].findProviders(v.cid, { timeout: 5000 }))

expect(provs).to.have.length(1)
expect(provs[0].id.id).to.be.eql(ids[3].id)
Expand All @@ -470,8 +471,8 @@ describe('KadDHT', () => {

await Promise.all(dhts.map((dht) => dht.provide(val.cid)))

const res0 = await dhts[0].findProviders(val.cid)
const res1 = await dhts[0].findProviders(val.cid, { maxNumProviders: 2 })
const res0 = await all(dhts[0].findProviders(val.cid))
const res1 = await all(dhts[0].findProviders(val.cid, { maxNumProviders: 2 }))

// find providers find all the 3 providers
expect(res0).to.exist()
Expand Down Expand Up @@ -564,7 +565,7 @@ describe('KadDHT', () => {
const otherIds = ids.slice(0, guyIndex).concat(ids.slice(guyIndex + 1))

// Make the query
const out = await guy.getClosestPeers(val)
const out = await all(guy.getClosestPeers(val))
const actualClosest = await kadUtils.sortClosestPeers(otherIds, rtval)

// Expect that the response includes nodes that are were not
Expand Down Expand Up @@ -597,7 +598,7 @@ describe('KadDHT', () => {
await tdht.connect(dhts[index], dhts[(index + 1) % dhts.length])
})

const res = await dhts[1].getClosestPeers(Buffer.from('foo'))
const res = await all(dhts[1].getClosestPeers(Buffer.from('foo')))
expect(res).to.have.length(c.K)

return tdht.teardown()
Expand Down

0 comments on commit f0e6800

Please sign in to comment.