From 57ce00a16d7a3c73aa5220a745341ec4d13452b5 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Wed, 10 Apr 2019 23:05:47 +0800 Subject: [PATCH 1/7] feat: limit scope of queries to k closest peers --- src/constants.js | 2 +- src/peer-distance-list.js | 101 +++++ src/query.js | 636 +++++++++++++++++++++++--------- test/kad-dht.spec.js | 37 ++ test/peer-distance-list.spec.js | 150 ++++++++ test/query.spec.js | 112 +++++- 6 files changed, 862 insertions(+), 176 deletions(-) create mode 100644 src/peer-distance-list.js create mode 100644 test/peer-distance-list.spec.js diff --git a/src/constants.js b/src/constants.js index 7597af80..469c58f6 100644 --- a/src/constants.js +++ b/src/constants.js @@ -28,7 +28,7 @@ exports.READ_MESSAGE_TIMEOUT = minute // The number of records that will be retrieved on a call to getMany() exports.GET_MANY_RECORD_COUNT = 16 -// K is the maximum number of requests to perform before returning failue +// K is the maximum number of requests to perform before returning failure exports.K = 20 // Alpha is the concurrency for asynchronous requests diff --git a/src/peer-distance-list.js b/src/peer-distance-list.js new file mode 100644 index 00000000..d18b58be --- /dev/null +++ b/src/peer-distance-list.js @@ -0,0 +1,101 @@ +'use strict' + +const distance = require('xor-distance') +const utils = require('./utils') +const map = require('async/map') + +/** + * Maintains a list of peerIds sorted by distance from a DHT key. + */ +class PeerDistanceList { + /** + * Creates a new PeerDistanceList. + * + * @param {Buffer} originDhtKey - the DHT key from which distance is calculated + * @param {number} capacity - the maximum size of the list + */ + constructor (originDhtKey, capacity) { + this.originDhtKey = originDhtKey + this.capacity = capacity + this.peerDistances = [] + } + + /** + * The length of the list + */ + get length () { + return this.peerDistances.length + } + + /** + * The peerIds in the list, in order of distance from the origin key + */ + get peers () { + return this.peerDistances.map(pd => pd.peerId) + } + + /** + * Add a peerId to the list. + * + * @param {PeerId} peerId + * @param {function(Error)} callback + * @returns {void} + */ + add (peerId, callback) { + if (this.peerDistances.find(pd => pd.peerId.id.equals(peerId.id))) { + return callback() + } + + utils.convertPeerId(peerId, (err, dhtKey) => { + if (err) { + return callback(err) + } + + const el = { + peerId, + distance: distance(this.originDhtKey, dhtKey) + } + + this.peerDistances.push(el) + this.peerDistances.sort((a, b) => distance.compare(a.distance, b.distance)) + this.peerDistances = this.peerDistances.slice(0, this.capacity) + + callback() + }) + } + + /** + * Indicates whether any of the peerIds passed as a parameter are closer + * to the origin key than the furthest peerId in the PeerDistanceList. + * + * @param {Array} peerIds + * @param {function(Error, Boolean)} callback + * @returns {void} + */ + anyCloser (peerIds, callback) { + if (!peerIds.length) { + return callback(null, false) + } + + if (!this.length) { + return callback(null, true) + } + + map(peerIds, (peerId, cb) => utils.convertPeerId(peerId, cb), (err, dhtKeys) => { + if (err) { + return callback(err) + } + + const furthestDistance = this.peerDistances[this.peerDistances.length - 1].distance + for (const dhtKey of dhtKeys) { + const keyDistance = distance(this.originDhtKey, dhtKey) + if (distance.compare(keyDistance, furthestDistance) < 0) { + return callback(null, true) + } + } + return callback(null, false) + }) + } +} + +module.exports = PeerDistanceList diff --git a/src/query.js b/src/query.js index 0d5fe3b5..edde2420 100644 --- a/src/query.js +++ b/src/query.js @@ -1,5 +1,6 @@ 'use strict' +const EventEmitter = require('events') const waterfall = require('async/waterfall') const each = require('async/each') const queue = require('async/queue') @@ -7,6 +8,7 @@ const mh = require('multihashes') const c = require('./constants') const PeerQueue = require('./peer-queue') +const PeerDistanceList = require('./peer-distance-list') const utils = require('./utils') /** @@ -42,8 +44,12 @@ class Query { this.dht = dht this.key = key this.makePath = makePath - this.concurrency = c.ALPHA this._log = utils.logger(this.dht.peerInfo.id, 'query:' + mh.toB58String(key)) + + this.running = false + + this._onStart = this._onStart.bind(this) + this._onComplete = this._onComplete.bind(this) } /** @@ -58,266 +64,556 @@ class Query { this._log.error('Attempt to run query after shutdown') return callback(null, { finalSet: new Set(), paths: [] }) } + if (peers.length === 0) { this._log.error('Running query with no peers') return callback(null, { finalSet: new Set(), paths: [] }) } - const run = { - peersSeen: new Set(), - errors: [], - paths: null // array of states per disjoint path + this.run = new Run(this) + this.run.once('start', this._onStart) + this.run.once('complete', this._onComplete) + this.run.execute(peers, callback) + } + + /** + * Called when the run starts. + */ + _onStart () { + this.running = true + this._log('query:start') + + // Register this query so we can stop it if the DHT stops + this.dht._queryManager.queryStarted(this) + } + + /** + * Called when the run completes (even if there's an error). + */ + _onComplete () { + this._log('query:done') + + // Ensure worker queues for all paths are stopped at the end of the query + this.stop() + } + + /** + * Stop the query. + */ + stop () { + if (!this.running) { + return } - // create correct number of paths + this.run.removeListener('start', this._onStart) + this.run.removeListener('complete', this._onComplete) + + this.running = false + this.run && this.run.stop() + this.dht._queryManager.queryCompleted(this) + } +} + +/** + * Manages a single run of the query. + */ +class Run extends EventEmitter { + /** + * Creates a Run. + * + * @param {Query} query + */ + constructor (query) { + super() + + this.query = query + + this.running = false + this.workers = [] + + // The peers that have been queried (including error responses) + this.peersSeen = new Set() + // The errors received when querying peers + this.errors = [] + // The closest K peers that have been queried successfully + // (this member is initialized when the worker queues start) + this.peersQueried = null + } + + /** + * Stop all the workers + */ + stop () { + if (!this.running) { + return + } + + this.running = false + for (const worker of this.workers) { + worker.stop() + } + } + + /** + * Execute the run with the given initial set of peers. + * + * @param {Array} peers + * @param {function(Error, Object)} callback + */ + execute (peers, callback) { + const paths = [] // array of states per disjoint path + + // Create disjoint paths const numPaths = Math.min(c.DISJOINT_PATHS, peers.length) - const pathPeers = [] for (let i = 0; i < numPaths; i++) { - pathPeers.push([]) + paths.push(new Path(this, this.query.makePath(i, numPaths))) } - // assign peers to paths round-robin style + // Assign peers to paths round-robin style peers.forEach((peer, i) => { - pathPeers[i % numPaths].push(peer) + paths[i % numPaths].addInitialPeer(peer) }) - run.paths = pathPeers.map((peers, i) => { - return { - peers, - run, - query: this.makePath(i, numPaths), - peersToQuery: null + + // Execute the query along each disjoint path + // each(paths, (path, cb) => path.execute(cb), (err) => { + this.executePaths(paths, (err) => { + if (err) { + return callback(err) } - }) - // Register this query so we stop it if the DHT stops - this.dht._queryManager.queryStarted(this) + const res = { + // The closest K peers we were able to query successfully + finalSet: new Set(this.peersQueried.peers), + paths: [] + } - // Create a manager to keep track of the worker queue for each path - this.workerManager = new WorkerManager() - each(run.paths, (path, cb) => { - waterfall([ - (cb) => PeerQueue.fromKey(this.key, cb), - (q, cb) => { - path.peersToQuery = q - each(path.peers, (p, cb) => addPeerToQuery(p, this.dht, path, cb), cb) - }, - (cb) => { - this.workerManager.workerQueue(this, path, cb) + // Collect the results from each completed path + for (const path of paths) { + if (path.res && (path.res.pathComplete || path.res.queryComplete)) { + path.res.success = true + res.paths.push(path.res) } - ], cb) - }, (err, results) => { - this._log('query:done') + } - // Ensure worker queues for all paths are stopped at the end of the query - this.workerManager.stop() + callback(err, res) + }) + } + + /** + * Execute all paths through the DHT. + * + * @param {Array} paths + * @param {function(Error)} callback + */ + executePaths (paths, callback) { + this.running = true + + this.emit('start') + each(paths, (path, cb) => path.execute(cb), (err) => { + // Ensure all workers are stopped + this.stop() + + // Completed the Run + this.emit('complete') if (err) { return callback(err) } - if (run.errors.length === run.peersSeen.size) { - return callback(run.errors[0]) + // If all queries errored out, something is seriously wrong, so callback + // with an error + if (this.errors.length === this.peersSeen.size) { + return callback(this.errors[0]) } - run.res = { - finalSet: run.peersSeen, - paths: [] - } + callback() + }) + } - run.paths.forEach((path) => { - if (path.res && (path.res.pathComplete || path.res.queryComplete)) { - path.res.success = true - run.res.paths.push(path.res) - } - }) + /** + * Initialize the list of queried peers, then start a worker queue for the + * given path. + * + * @param {Path} path + * @param {function(Error)} callback + */ + workerQueue (path, callback) { + this.init(() => this.startWorker(path, callback)) + } - callback(null, run.res) + /** + * Create and start a worker queue for a particular path. + * + * @param {Path} path + * @param {function(Error)} callback + */ + startWorker (path, callback) { + const worker = new WorkerQueue(this.query.dht, this, path, this.query._log) + this.workers.push(worker) + worker.execute(callback) + } + + /** + * Initialize the list of closest peers we've queried - this is shared by all + * paths in the run. + * + * @param {function(Error)} callback + * @returns {void} + */ + init (callback) { + if (this.peersQueried) { + return callback() + } + + // We only want to initialize it once for the run, and then inform each + // path worker that it's ready + if (this.awaitingKey) { + this.awaitingKey.push(callback) + return + } + + this.awaitingKey = [callback] + + // Convert the key into a DHT key by hashing it + utils.convertBuffer(this.query.key, (err, dhtKey) => { + this.peersQueried = new PeerDistanceList(dhtKey, c.K) + + for (const cb of this.awaitingKey) { + cb(err) + } + this.awaitingKey = undefined }) } /** - * Stop the query + * If we've queried K peers, and the remaining peers in the queues are all + * further from the key than the peers we've already queried, then we should + * stop querying. + * + * @param {function(Error, boolean)} callback + * @returns {void} */ - stop () { - this.workerManager && this.workerManager.stop() - this.dht._queryManager.queryCompleted(this) + continueQuerying (callback) { + // If we haven't queried K peers yet, keep going + if (this.peersQueried.length < this.peersQueried.capacity) { + return callback(null, true) + } + + // Get all the peers that are currently being queried. + // Note that this function gets called right after a peer has been popped + // off the head of the closest peers queue so it will include that peer. + let running = [] + for (const worker of this.workers) { + const peerIds = worker.queue.workersList().map(i => i.data) + running = running.concat(peerIds) + } + + // Check if any of the peers that are currently being queried are closer + // to the key than the peers we've already queried + this.peersQueried.anyCloser(running, (err, someCloser) => { + if (err) { + return callback(err) + } + + // Some are closer, keep going + if (someCloser) { + return callback(null, true) + } + + // None are closer, so we can stop querying + this.stop() + callback(null, false) + }) } } /** - * Manages the worker queues for each path through the DHT + * Manages a single Path through the DHT. */ -class WorkerManager { +class Path { /** - * Creates a new WorkerManager + * Creates a Path. + * + * @param {Run} run + * @param {queryFunc} queryFunc */ - constructor () { - this.running = true - this.workers = [] + constructor (run, queryFunc) { + this.run = run + this.queryFunc = queryFunc + this.initialPeers = [] } /** - * Stop all the workers + * Add a peer to the set of peers that are used to intialize the path. + * + * @param {PeerId} peer */ - stop () { - this.running = false - for (const worker of this.workers) { - worker.stop() + addInitialPeer (peer) { + this.initialPeers.push(peer) + } + + /** + * Execute the path. + * + * @param {function(Error)} callback + */ + execute (callback) { + waterfall([ + // Create a queue of peers ordered by distance from the key + (cb) => PeerQueue.fromKey(this.run.query.key, cb), + // Add initial peers to the queue + (q, cb) => { + this.peersToQuery = q + each(this.initialPeers, this.addPeerToQuery.bind(this), cb) + }, + // Start processing the queue + (cb) => { + this.run.workerQueue(this, cb) + } + ], callback) + } + + /** + * Add a peer to the peers to be queried. + * + * @param {PeerId} peer + * @param {function(Error)} callback + * @returns {void} + * @private + */ + addPeerToQuery (peer, callback) { + // Don't add self + if (this.run.query.dht._isSelf(peer)) { + return callback() } + + // The paths must be disjoint, meaning that no two paths in the Query may + // traverse the same peer + if (this.run.peersSeen.has(peer)) { + return callback() + } + + this.peersToQuery.enqueue(peer, callback) + } +} + +class WorkerQueue { + /** + * Creates a new WorkerQueue. + * + * @param {DHT} dht + * @param {Run} run + * @param {Object} path + * @param {function} log + */ + constructor (dht, run, path, log) { + this.dht = dht + this.run = run + this.path = path + this.log = log + + this.concurrency = c.ALPHA + this.queue = this.setupQueue() + } + + /** + * Create the underlying async queue. + * + * @returns {Object} + */ + setupQueue () { + const q = queue(this.processNext.bind(this), this.concurrency) + + // If there's an error, stop the worker + q.error = (err) => { + this.log.error('queue', err) + this.stop(err) + } + + // When all peers in the queue have been processed, stop the worker + q.drain = () => { + this.log('queue:drain') + this.stop() + } + + // When a space opens up in the queue, add some more peers + q.unsaturated = () => { + if (this.running) { + this.log('queue:unsaturated') + this.fill() + } + } + + q.buffer = 0 + + return q + } + + /** + * Stop the worker, optionally providing an error to pass to the worker's + * callback. + * + * @param {Error} err + */ + stop (err) { + if (!this.running) { + return + } + + this.running = false + this.queue.kill() + this.callbackFn(err) } /** * Use the queue from async to keep `concurrency` amount items running * per path. * - * @param {Query} query - * @param {Object} path * @param {function(Error)} callback */ - workerQueue (query, path, callback) { - let workerRunning = true - const q = queue((next, cb) => { - query._log('queue:work') - this.execQuery(next, query, path, (err, state) => { + execute (callback) { + this.running = true + this.callbackFn = callback + this.fill() + } + + /** + * Add peers to the queue until there are enough to satisfy the concurrency. + */ + fill () { + this.log('queue:fill') + while (this.queue.length() < this.concurrency && + this.path.peersToQuery.length > 0) { + this.queue.push(this.path.peersToQuery.dequeue()) + } + } + + /** + * Process the next peer in the queue + * + * @param {PeerId} peer + * @param {function(Error)} cb + * @returns {void} + */ + processNext (peer, cb) { + if (!this.running) { + return + } + + // The paths must be disjoint, meaning that no two paths in the Query may + // traverse the same peer + if (this.run.peersSeen.has(peer)) { + return cb() + } + + // Check if we've queried enough peers already + this.run.continueQuerying((err, continueQuerying) => { + if (!this.running) { + return + } + + if (err) { + return cb(err) + } + + // If we've queried enough peers, bail out + if (!continueQuerying) { + return + } + + // Check if another path has queried this peer in the mean time + if (this.run.peersSeen.has(peer)) { + return cb() + } + this.run.peersSeen.add(peer) + + // Execute the query on the next peer + this.log('queue:work') + this.execQuery(peer, (err, state) => { // Ignore response after worker killed - if (!workerRunning || !this.running) { + if (!this.running) { return cb() } - query._log('queue:work:done', err, state) + this.log('queue:work:done', err, state) if (err) { return cb(err) } // If query is complete, stop all workers. - // Note: this.stop() calls stop() on all the workers, which kills the + // Note: run.stop() calls stop() on all the workers, which kills the // queue and calls callback(), so we don't need to call cb() here if (state && state.queryComplete) { - query._log('query:complete') - return this.stop() + this.log('query:complete') + return this.run.stop() } // If path is complete, just stop this worker. - // Note: worker.stop() kills the queue and calls callback() so we don't + // Note: this.stop() kills the queue and calls callback() so we don't // need to call cb() here if (state && state.pathComplete) { - return worker.stop() + return this.stop() } // Otherwise, process next peer cb() }) - }, query.concurrency) - - // Keeps track of a running worker - const worker = { - stop: (err) => { - if (workerRunning) { - q.kill() - workerRunning = false - callback(err) - } - } - } - this.workers.push(worker) - - // Add peers to the queue until there are enough to satisfy the concurrency - const fill = () => { - query._log('queue:fill') - while (q.length() < query.concurrency && - path.peersToQuery.length > 0) { - q.push(path.peersToQuery.dequeue()) - } - } - - fill() - - // If there's an error, stop the worker - q.error = (err) => { - query._log.error('queue', err) - worker.stop(err) - } - - // When all peers in the queue have been processed, stop the worker - q.drain = () => { - query._log('queue:drain') - worker.stop() - } - - // When a space opens up in the queue, add some more peers - q.unsaturated = () => { - query._log('queue:unsaturated') - fill() - } - - q.buffer = 0 + }) } /** - * Execute a query on the `next` peer. + * Execute a query on the next peer. * - * @param {PeerId} next - * @param {Query} query - * @param {Object} path + * @param {PeerId} peer * @param {function(Error)} callback * @returns {void} * @private */ - execQuery (next, query, path, callback) { - path.query(next, (err, res) => { + execQuery (peer, callback) { + this.path.queryFunc(peer, (err, res) => { // If the run has completed, bail out if (!this.running) { return callback() } if (err) { - path.run.errors.push(err) - callback() - } else if (res.pathComplete || res.queryComplete) { - path.res = res - callback(null, { - pathComplete: res.pathComplete, - queryComplete: res.queryComplete - }) - } else if (res.closerPeers && res.closerPeers.length > 0) { - each(res.closerPeers, (closer, cb) => { - // don't add ourselves - if (query.dht._isSelf(closer.id)) { - return cb() - } - closer = query.dht.peerBook.put(closer) - query.dht._peerDiscovered(closer) - addPeerToQuery(closer.id, query.dht, path, cb) - }, callback) - } else { - callback() + this.run.errors.push(err) + return callback() } - }) - } -} -/** - * Add a peer to the peers to be queried. - * - * @param {PeerId} next - * @param {DHT} dht - * @param {Object} path - * @param {function(Error)} callback - * @returns {void} - * @private - */ -function addPeerToQuery (next, dht, path, callback) { - const run = path.run - if (dht._isSelf(next)) { - return callback() - } + // Add the peer to the closest peers we have successfully queried + this.run.peersQueried.add(peer, (err) => { + if (err) { + return callback(err) + } - if (run.peersSeen.has(next)) { - return callback() - } + // If the query indicates that this path or the whole query is complete + // set the path result and bail out + if (res.pathComplete || res.queryComplete) { + this.path.res = res + return callback(null, { + pathComplete: res.pathComplete, + queryComplete: res.queryComplete + }) + } + + // If there are closer peers to query, add them to the queue + if (res.closerPeers && res.closerPeers.length > 0) { + return each(res.closerPeers, (closer, cb) => { + // don't add ourselves + if (this.dht._isSelf(closer.id)) { + return cb() + } + closer = this.dht.peerBook.put(closer) + this.dht._peerDiscovered(closer) + this.path.addPeerToQuery(closer.id, cb) + }, callback) + } - run.peersSeen.add(next) - path.peersToQuery.enqueue(next, callback) + callback() + }) + }) + } } module.exports = Query diff --git a/test/kad-dht.spec.js b/test/kad-dht.spec.js index fc6f7bc3..96600466 100644 --- a/test/kad-dht.spec.js +++ b/test/kad-dht.spec.js @@ -103,6 +103,7 @@ function waitForWellFormedTables (dhts, minPeers, avgPeers, waitTimeout, callbac }, waitTimeout)(callback) } +// Count how many peers are in b but are not in a function countDiffPeers (a, b) { const s = new Set() a.forEach((p) => s.add(p.toB58String())) @@ -678,6 +679,7 @@ describe('KadDHT', () => { it('find peer query', function (done) { this.timeout(40 * 1000) + // Create 101 nodes const nDHTs = 101 const tdht = new TestDHT() @@ -686,12 +688,17 @@ describe('KadDHT', () => { const ids = dhts.map((d) => d.peerInfo.id) + // The origin node for the FIND_PEER query const guy = dhts[0] + // The other nodes const others = dhts.slice(1) + // The DHT key const val = Buffer.from('foobar') const connected = {} // indexes in others that are reachable from guy series([ + // Make connections from each of the first 20 nodes to 16 of the + // remaining 80 nodes at random (cb) => times(20, (i, cb) => { times(16, (j, cb) => { const t = 20 + random(79) @@ -699,41 +706,71 @@ describe('KadDHT', () => { connect(others[i], others[t], cb) }, cb) }, cb), + // Make a connection from the origin node to each of the first 20 nodes (cb) => times(20, (i, cb) => { connected[i] = true connect(guy, others[i], cb) }, cb), + // Hash the key into the DHT's key format (cb) => kadUtils.convertBuffer(val, (err, rtval) => { expect(err).to.not.exist() + + // Get the alpha (3) closest peers to the key from the origin's + // routing table const rtablePeers = guy.routingTable.closestPeers(rtval, c.ALPHA) expect(rtablePeers).to.have.length(3) + // Get all connected peers from the origin's peer book const netPeers = guy.peerBook.getAllArray().filter((p) => p.isConnected()) expect(netPeers).to.have.length(20) + // The set of peers used to initiate the query (the closest alpha + // peers to the key that the origin knows about) const rtableSet = {} rtablePeers.forEach((p) => { rtableSet[p.toB58String()] = true }) + // The ids of nodes that have connections const connectedIds = ids.slice(1).filter((id, i) => connected[i]) series([ + // Make the query (cb) => guy.getClosestPeers(val, cb), + // Find the closest connected peers to the key (cb) => kadUtils.sortClosestPeers(connectedIds, rtval, cb) ], (err, res) => { expect(err).to.not.exist() + + // Query response const out = res[0] + + // All connected peers in order of distance from key const actualClosest = res[1] + // Expect that the response includes nodes that are were not + // already in the origin's routing table (ie it went out to + // the network to find closer peers) expect(out.filter((p) => !rtableSet[p.toB58String()])) .to.not.be.empty() + // Expect that there were 20 peers found expect(out).to.have.length(20) + + // The expected closest 20 peers to the key const exp = actualClosest.slice(0, 20) + // Expect the 20 peers found to be the 20 closest connected peers + // to the key kadUtils.sortClosestPeers(out, rtval, (err, got) => { expect(err).to.not.exist() + + // TODO: + // With this change this expectation fails intermittently, + // because getClosestPeers() doesn't always find the closest 20 + // peers (eg sometimes it will find 18 of the closest plus a + // couple of others that are not quite the closest). + // I'm not sure what the best approach to testing should be here? expect(countDiffPeers(exp, got)).to.eql(0) cb() diff --git a/test/peer-distance-list.spec.js b/test/peer-distance-list.spec.js new file mode 100644 index 00000000..a34322b3 --- /dev/null +++ b/test/peer-distance-list.spec.js @@ -0,0 +1,150 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect +const PeerId = require('peer-id') +const series = require('async/series') + +const kadUtils = require('../src/utils') +const PeerDistanceList = require('../src/peer-distance-list') + +describe('PeerDistanceList', () => { + const p1 = new PeerId(Buffer.from('11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31')) + const p2 = new PeerId(Buffer.from('11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a32')) + const p3 = new PeerId(Buffer.from('11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33')) + const p4 = new PeerId(Buffer.from('11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a34')) + const p5 = new PeerId(Buffer.from('11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31')) + const p6 = new PeerId(Buffer.from('11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a35')) + const p7 = new PeerId(Buffer.from('11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a32')) + + let key + before((done) => { + kadUtils.convertPeerId(p1, (err, k) => { + if (err) { + return done(err) + } + + key = k + + done() + }) + }) + + describe('basics', () => { + it('add', (done) => { + const pdl = new PeerDistanceList(key) + + series([ + (cb) => pdl.add(p3, cb), + (cb) => pdl.add(p1, cb), + (cb) => pdl.add(p2, cb), + (cb) => pdl.add(p4, cb), + (cb) => pdl.add(p5, cb), + (cb) => pdl.add(p1, cb) + ], (err) => { + expect(err).to.not.exist() + + // Note: p1 and p5 are equal + expect(pdl.length).to.eql(4) + expect(pdl.peers).to.be.eql([p1, p4, p3, p2]) + + done() + }) + }) + + it('capacity', (done) => { + const pdl = new PeerDistanceList(key, 3) + + series([ + (cb) => pdl.add(p1, cb), + (cb) => pdl.add(p2, cb), + (cb) => pdl.add(p3, cb), + (cb) => pdl.add(p4, cb), + (cb) => pdl.add(p5, cb), + (cb) => pdl.add(p6, cb) + ], (err) => { + expect(err).to.not.exist() + + // Note: p1 and p5 are equal + expect(pdl.length).to.eql(3) + + // Closer peers added later should replace further + // peers added earlier + expect(pdl.peers).to.be.eql([p1, p6, p4]) + + done() + }) + }) + }) + + describe('closer', () => { + let pdl + before((done) => { + pdl = new PeerDistanceList(key) + series([ + (cb) => pdl.add(p1, cb), + (cb) => pdl.add(p2, cb), + (cb) => pdl.add(p3, cb), + (cb) => pdl.add(p4, cb) + ], done) + }) + + it('single closer peer', (done) => { + pdl.anyCloser([p6], (err, closer) => { + expect(err).to.not.exist() + expect(closer).to.be.eql(true) + done() + }) + }) + + it('single further peer', (done) => { + pdl.anyCloser([p7], (err, closer) => { + expect(err).to.not.exist() + expect(closer).to.be.eql(false) + done() + }) + }) + + it('closer and further peer', (done) => { + pdl.anyCloser([p6, p7], (err, closer) => { + expect(err).to.not.exist() + expect(closer).to.be.eql(true) + done() + }) + }) + + it('single peer equal to furthest in list', (done) => { + pdl.anyCloser([p2], (err, closer) => { + expect(err).to.not.exist() + expect(closer).to.be.eql(false) + done() + }) + }) + + it('no peers', (done) => { + pdl.anyCloser([], (err, closer) => { + expect(err).to.not.exist() + expect(closer).to.be.eql(false) + done() + }) + }) + + it('empty peer distance list', (done) => { + new PeerDistanceList(key).anyCloser([p1], (err, closer) => { + expect(err).to.not.exist() + expect(closer).to.be.eql(true) + done() + }) + }) + + it('empty peer distance list and no peers', (done) => { + new PeerDistanceList(key).anyCloser([], (err, closer) => { + expect(err).to.not.exist() + expect(closer).to.be.eql(false) + done() + }) + }) + }) +}) diff --git a/test/query.spec.js b/test/query.spec.js index 82e8a938..21dfe2b8 100644 --- a/test/query.spec.js +++ b/test/query.spec.js @@ -15,6 +15,7 @@ const Query = require('../src/query') const createPeerInfo = require('./utils/create-peer-info') const createDisjointTracks = require('./utils/create-disjoint-tracks') +const kadUtils = require('../src/utils') const createDHT = (peerInfos, cb) => { const sw = new Switch(peerInfos[0], new PeerBook()) @@ -31,7 +32,7 @@ describe('Query', () => { before(function (done) { this.timeout(5 * 1000) - createPeerInfo(12, (err, result) => { + createPeerInfo(40, (err, result) => { if (err) { return done(err) } @@ -87,10 +88,14 @@ describe('Query', () => { dht.switch.dial = (peer, callback) => callback() let i = 0 + const visited = [] const query = (p, cb) => { + visited.push(p) + if (i++ === 1) { return cb(new Error('fail')) } + cb(null, { closerPeers: [peerInfos[2]] }) @@ -103,9 +108,12 @@ describe('Query', () => { // Should have visited // - the initial peer passed to the query: peerInfos[1] // - the peer returned in closerPeers: peerInfos[2] - expect(res.finalSet.size).to.eql(2) + expect(visited).to.eql([peerInfos[1].id, peerInfos[2].id]) + + // The final set should only contain peers that were successfully queried + // (ie no errors) + expect(res.finalSet.size).to.eql(1) expect(res.finalSet.has(peerInfos[1].id)).to.equal(true) - expect(res.finalSet.has(peerInfos[2].id)).to.equal(true) done() }) @@ -570,6 +578,99 @@ describe('Query', () => { }) }) + it('stop after finding k closest peers', (done) => { + // mock this so we can dial non existing peers + dht.switch.dial = (peer, callback) => callback() + + // Sort peers by distance from peerInfos[0] + kadUtils.convertPeerId(peerInfos[0].id, (err, peerZeroDhtKey) => { + if (err) { + return done(err) + } + + const peerIds = peerInfos.map(pi => pi.id) + kadUtils.sortClosestPeers(peerIds, peerZeroDhtKey, (err, sorted) => { + if (err) { + return done(err) + } + + // Local node has nodes 10, 16 and 18 in k-bucket + const initial = [sorted[10], sorted[16], sorted[18]] + + // Should zoom in to peers near target, and then zoom out again until it + // has successfully queried 20 peers + const topology = { + // Local node has nodes 10, 16 and 18 in k-bucket + 10: [12, 20, 22, 24, 26, 28], + 16: [14, 18, 20, 22, 24, 26], + 18: [4, 6, 8, 12, 14, 16], + + 26: [24, 28, 30, 38], + 30: [14, 28], + 38: [2], + + // Should zoom out from this point, until it has 20 peers + 2: [13], + 13: [15], + 15: [17], + + // Right before we get to 20 peers, it finds some new peers that are + // closer than some of the ones it has already queried + 17: [1, 3, 5, 11], + 1: [7, 9], + 9: [19], + + // At this point it's visited 20 (actually more than 20 peers), and + // there are no closer peers to be found, so it should stop querying. + // Because there are 3 paths, each with a worker queue with + // concurrency 3, the exact order in which peers are visited is + // unpredictable, so we add a long tail and below we test to make + // sure that it never reaches the end of the tail. + 19: [21], + 21: [23], + 23: [25], + 25: [27], + 27: [29], + 29: [31] + } + + const peerIndex = (peerId) => sorted.findIndex(p => p === peerId) + const peerIdToInfo = (peerId) => peerInfos.find(pi => pi.id === peerId) + + const visited = [] + const query = (peerId, cb) => { + visited.push(peerId) + const i = peerIndex(peerId) + const closerIndexes = topology[i] || [] + const closerPeers = closerIndexes.map(j => peerIdToInfo(sorted[j])) + setTimeout(() => cb(null, { closerPeers })) + } + + const q = new Query(dht, peerInfos[0].id.id, () => query) + q.run(initial, (err, res) => { + expect(err).to.not.exist() + + // Should query 20 peers and then stop + expect(visited.length).to.be.gt(20) + + // Should never get to end of tail (see note above) + expect(visited.find(p => peerIndex(p) === 29)).not.to.exist() + + // Final set should have 20 peers, and the closer peers that were + // found near the end of the query should displace further away + // peers that were found at the beginning + expect(res.finalSet.size).to.eql(20) + expect(res.finalSet.has(sorted[1])).to.eql(true) + expect(res.finalSet.has(sorted[3])).to.eql(true) + expect(res.finalSet.has(sorted[5])).to.eql(true) + expect(res.finalSet.has(sorted[38])).to.eql(false) + + done() + }) + }) + }) + }) + /* * This test creates two disjoint tracks of peers, one for * each of the query's two paths to follow. The "good" @@ -590,7 +691,8 @@ describe('Query', () => { */ it('uses disjoint paths', (done) => { const goodLength = 3 - createDisjointTracks(peerInfos, goodLength, (err, targetId, starts, getResponse) => { + const samplePeerInfos = peerInfos.slice(0, 12) + createDisjointTracks(samplePeerInfos, goodLength, (err, targetId, starts, getResponse) => { expect(err).to.not.exist() // mock this so we can dial non existing peers dht.switch.dial = (peer, callback) => callback() @@ -619,7 +721,7 @@ describe('Query', () => { // we should reach the target node expect(targetVisited).to.eql(true) // we should visit all nodes (except the target) - expect(res.finalSet.size).to.eql(peerInfos.length - 1) + expect(res.finalSet.size).to.eql(samplePeerInfos.length - 1) // there should be one successful path expect(res.paths.length).to.eql(1) done() From 67e46b95c222e48b6868cba784bf75a1ea63700a Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Sun, 14 Apr 2019 00:05:46 +0800 Subject: [PATCH 2/7] fix: queueing logic --- src/query.js | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/query.js b/src/query.js index edde2420..4940e06b 100644 --- a/src/query.js +++ b/src/query.js @@ -478,11 +478,20 @@ class WorkerQueue { } /** - * Add peers to the queue until there are enough to satisfy the concurrency. + * Add peers to the worker queue until there are enough to satisfy the + * worker queue concurrency. + * Note that we don't want to take any more than those required to satisfy + * concurrency from the peers-to-query queue, because we always want to + * query the closest peers to the key first, and new peers are continously + * being added to the peers-to-query queue. */ fill () { this.log('queue:fill') - while (this.queue.length() < this.concurrency && + + // Note: + // - queue.running(): number of items that are currently running + // - queue.length(): the number of items that are waiting to be run + while (this.queue.running() + this.queue.length() < this.concurrency && this.path.peersToQuery.length > 0) { this.queue.push(this.path.peersToQuery.dequeue()) } From 6fabae8f618c34c87f1d6062645fe9bf5ac5e197 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Sun, 14 Apr 2019 00:06:52 +0800 Subject: [PATCH 3/7] test: fix find peer test --- test/kad-dht.spec.js | 83 +++++++++++++++++++++----------------------- 1 file changed, 40 insertions(+), 43 deletions(-) diff --git a/test/kad-dht.spec.js b/test/kad-dht.spec.js index 96600466..373d7f5f 100644 --- a/test/kad-dht.spec.js +++ b/test/kad-dht.spec.js @@ -13,7 +13,6 @@ const timeout = require('async/timeout') const retry = require('async/retry') const each = require('async/each') const waterfall = require('async/waterfall') -const random = require('lodash.random') const Record = require('libp2p-record').Record const PeerId = require('peer-id') const PeerInfo = require('peer-info') @@ -680,50 +679,59 @@ describe('KadDHT', () => { this.timeout(40 * 1000) // Create 101 nodes - const nDHTs = 101 + const nDHTs = 100 const tdht = new TestDHT() tdht.spawn(nDHTs, (err, dhts) => { expect(err).to.not.exist() - const ids = dhts.map((d) => d.peerInfo.id) + const dhtsById = new Map(dhts.map((d) => [d.peerInfo.id, d])) + const ids = [...dhtsById.keys()] // The origin node for the FIND_PEER query const guy = dhts[0] - // The other nodes - const others = dhts.slice(1) - // The DHT key + + // The key const val = Buffer.from('foobar') - const connected = {} // indexes in others that are reachable from guy + // The key as a DHT key + let rtval series([ - // Make connections from each of the first 20 nodes to 16 of the - // remaining 80 nodes at random - (cb) => times(20, (i, cb) => { - times(16, (j, cb) => { - const t = 20 + random(79) - connected[t] = true - connect(others[i], others[t], cb) - }, cb) - }, cb), - // Make a connection from the origin node to each of the first 20 nodes - (cb) => times(20, (i, cb) => { - connected[i] = true - connect(guy, others[i], cb) - }, cb), // Hash the key into the DHT's key format - (cb) => kadUtils.convertBuffer(val, (err, rtval) => { + (cb) => kadUtils.convertBuffer(val, (err, dhtKey) => { + expect(err).to.not.exist() + rtval = dhtKey + cb() + }), + // Make connections between nodes close to each other + (cb) => kadUtils.sortClosestPeers(ids, rtval, (err, sorted) => { expect(err).to.not.exist() + const conns = [] + const maxRightIndex = sorted.length - 1 + for (let i = 0; i < sorted.length; i++) { + // Connect to 5 nodes on either side (10 in total) + for (const distance of [1, 3, 11, 31, 63]) { + let rightIndex = i + distance + if (rightIndex > maxRightIndex) { + rightIndex = maxRightIndex * 2 - (rightIndex + 1) + } + let leftIndex = i - distance + if (leftIndex < 0) { + leftIndex = 1 - leftIndex + } + conns.push([sorted[leftIndex], sorted[rightIndex]]) + } + } + + each(conns, (conn, _cb) => connect(dhtsById.get(conn[0]), dhtsById.get(conn[1]), _cb), cb) + }), + (cb) => { // Get the alpha (3) closest peers to the key from the origin's // routing table const rtablePeers = guy.routingTable.closestPeers(rtval, c.ALPHA) expect(rtablePeers).to.have.length(3) - // Get all connected peers from the origin's peer book - const netPeers = guy.peerBook.getAllArray().filter((p) => p.isConnected()) - expect(netPeers).to.have.length(20) - // The set of peers used to initiate the query (the closest alpha // peers to the key that the origin knows about) const rtableSet = {} @@ -731,14 +739,13 @@ describe('KadDHT', () => { rtableSet[p.toB58String()] = true }) - // The ids of nodes that have connections - const connectedIds = ids.slice(1).filter((id, i) => connected[i]) - + const guyIndex = ids.findIndex(i => i.id.equals(guy.peerInfo.id.id)) + const otherIds = ids.slice(0, guyIndex).concat(ids.slice(guyIndex + 1)) series([ // Make the query (cb) => guy.getClosestPeers(val, cb), // Find the closest connected peers to the key - (cb) => kadUtils.sortClosestPeers(connectedIds, rtval, cb) + (cb) => kadUtils.sortClosestPeers(otherIds, rtval, cb) ], (err, res) => { expect(err).to.not.exist() @@ -762,21 +769,11 @@ describe('KadDHT', () => { // Expect the 20 peers found to be the 20 closest connected peers // to the key - kadUtils.sortClosestPeers(out, rtval, (err, got) => { - expect(err).to.not.exist() + expect(countDiffPeers(exp, out)).to.eql(0) - // TODO: - // With this change this expectation fails intermittently, - // because getClosestPeers() doesn't always find the closest 20 - // peers (eg sometimes it will find 18 of the closest plus a - // couple of others that are not quite the closest). - // I'm not sure what the best approach to testing should be here? - expect(countDiffPeers(exp, got)).to.eql(0) - - cb() - }) + cb() }) - }) + } ], (err) => { expect(err).to.not.exist() tdht.teardown(done) From abc0df9f4ca5e21f96d67535cb6a574e4859ae13 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Tue, 16 Apr 2019 11:00:37 +0800 Subject: [PATCH 4/7] chore: increase maximum allowed bundle size --- .aegir.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.aegir.js b/.aegir.js index 211642ca..34e15446 100644 --- a/.aegir.js +++ b/.aegir.js @@ -1,4 +1,4 @@ module.exports = { - bundlesize: { maxSize: '191kB' } + bundlesize: { maxSize: '192kB' } } \ No newline at end of file From 89c3123db3b773f484b4a539851870ab66448f22 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Tue, 16 Apr 2019 11:22:08 +0800 Subject: [PATCH 5/7] fix: find peer should return k peers --- src/index.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/index.js b/src/index.js index 675f02e4..9a81ef7a 100644 --- a/src/index.js +++ b/src/index.js @@ -73,14 +73,14 @@ class KadDHT extends EventEmitter { * * @type {number} */ - this.kBucketSize = options.kBucketSize || 20 + this.kBucketSize = options.kBucketSize || c.K /** - * Number of closest peers to return on kBucket search, default 6 + * Number of closest peers to return on kBucket search, default 20 * * @type {number} */ - this.ncp = options.ncp || 6 + this.ncp = options.ncp || c.K /** * The routing table. From 6710c2f14e81093388b567b30290ce3c94740399 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Tue, 16 Apr 2019 11:23:01 +0800 Subject: [PATCH 6/7] test: make scoped query test more specific --- test/query.spec.js | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/test/query.spec.js b/test/query.spec.js index 21dfe2b8..cc8c4836 100644 --- a/test/query.spec.js +++ b/test/query.spec.js @@ -650,8 +650,13 @@ describe('Query', () => { q.run(initial, (err, res) => { expect(err).to.not.exist() - // Should query 20 peers and then stop - expect(visited.length).to.be.gt(20) + // Should query 19 peers, then find some peers closer to the key, and + // finally stop once those closer peers have been queried + const expectedVisited = new Set([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 20, 22, 24, 26, 28, 30, 38]) + const visitedSet = new Set(visited.map(peerIndex)) + for (const i of expectedVisited) { + expect(visitedSet.has(i)) + } // Should never get to end of tail (see note above) expect(visited.find(p => peerIndex(p) === 29)).not.to.exist() From 08296cf787735c51a543825579a6d8cfe79724a9 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Tue, 16 Apr 2019 17:25:00 +0800 Subject: [PATCH 7/7] fix: ensure queue callback always called --- src/query.js | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/query.js b/src/query.js index 4940e06b..7e3cdf9a 100644 --- a/src/query.js +++ b/src/query.js @@ -506,7 +506,7 @@ class WorkerQueue { */ processNext (peer, cb) { if (!this.running) { - return + return cb() } // The paths must be disjoint, meaning that no two paths in the Query may @@ -518,7 +518,7 @@ class WorkerQueue { // Check if we've queried enough peers already this.run.continueQuerying((err, continueQuerying) => { if (!this.running) { - return + return cb() } if (err) { @@ -527,7 +527,7 @@ class WorkerQueue { // If we've queried enough peers, bail out if (!continueQuerying) { - return + return cb() } // Check if another path has queried this peer in the mean time @@ -551,17 +551,18 @@ class WorkerQueue { // If query is complete, stop all workers. // Note: run.stop() calls stop() on all the workers, which kills the - // queue and calls callback(), so we don't need to call cb() here + // queue and calls callbackFn() if (state && state.queryComplete) { this.log('query:complete') - return this.run.stop() + this.run.stop() + return cb() } // If path is complete, just stop this worker. - // Note: this.stop() kills the queue and calls callback() so we don't - // need to call cb() here + // Note: this.stop() kills the queue and calls callbackFn() if (state && state.pathComplete) { - return this.stop() + this.stop() + return cb() } // Otherwise, process next peer