Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
fix: random walk (#104)
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobheun authored and vasco-santos committed Apr 22, 2019
1 parent f03619e commit 9db17eb
Show file tree
Hide file tree
Showing 7 changed files with 337 additions and 72 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
},
"homepage": "https://github.com/libp2p/js-libp2p-kad-dht",
"dependencies": {
"abort-controller": "^3.0.0",
"async": "^2.6.2",
"base32.js": "~0.1.0",
"chai-checkmark": "^1.0.1",
Expand Down
2 changes: 1 addition & 1 deletion src/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,6 @@ exports.defaultRandomWalk = {
enabled: true,
queriesPerPeriod: 1,
interval: 5 * minute,
timeout: 10 * second,
timeout: 30 * second,
delay: 10 * second
}
9 changes: 4 additions & 5 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class KadDHT extends EventEmitter {
* @property {boolean} enabled discovery enabled (default: true)
* @property {number} queriesPerPeriod how many queries to run per period (default: 1)
* @property {number} interval how often to run the the random-walk process, in milliseconds (default: 300000)
* @property {number} timeout how long to wait for the the random-walk query to run, in milliseconds (default: 10000)
* @property {number} timeout how long to wait for the the random-walk query to run, in milliseconds (default: 30000)
* @property {number} delay how long to wait before starting the first random walk, in milliseconds (default: 10000)
*/

Expand Down Expand Up @@ -173,11 +173,10 @@ class KadDHT extends EventEmitter {
*/
stop (callback) {
this._running = false
this.randomWalk.stop(() => { // guarantee that random walk is stopped if it was started
this.providers.stop()
this.network.stop(callback)
})
this.randomWalk.stop()
this.providers.stop()
this._queryManager.stop()
this.network.stop(callback)
}

/**
Expand Down
121 changes: 62 additions & 59 deletions src/random-walk.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
const times = require('async/times')
const crypto = require('libp2p-crypto')
const waterfall = require('async/waterfall')
const timeout = require('async/timeout')
const multihashing = require('multihashing-async')
const PeerId = require('peer-id')
const assert = require('assert')
const c = require('./constants')
const { logger } = require('./utils')
const AbortController = require('abort-controller')

const errcode = require('err-code')

Expand All @@ -25,9 +25,8 @@ class RandomWalk {
* @param {DHT} options.dht
*/
constructor (dht, options) {
this._options = { ...c.defaultRandomWalk, ...options }
assert(dht, 'Random Walk needs an instance of the Kademlia DHT')
this._runningHandle = null
this._options = { ...c.defaultRandomWalk, ...options }
this._kadDHT = dht
this.log = logger(dht.peerInfo.id, 'random-walk')
}
Expand All @@ -41,64 +40,44 @@ class RandomWalk {
*/
start () {
// Don't run twice
if (this._running || !this._options.enabled) { return }

// Create running handle
const runningHandle = {
_onCancel: null,
_timeoutId: null,
runPeriodically: (walk, period) => {
runningHandle._timeoutId = setTimeout(() => {
runningHandle._timeoutId = null

walk((nextPeriod) => {
// Was walk cancelled while fn was being called?
if (runningHandle._onCancel) {
return runningHandle._onCancel()
}
// Schedule next
runningHandle.runPeriodically(walk, nextPeriod)
})
}, period)
},
cancel: (cb) => {
// Not currently running, can callback immediately
if (runningHandle._timeoutId) {
clearTimeout(runningHandle._timeoutId)
return cb()
}
// Wait to finish and then call callback
runningHandle._onCancel = cb
}
}
if (this._timeoutId || !this._options.enabled) { return }

// Start doing random walks after `this._options.delay`
runningHandle._timeoutId = setTimeout(() => {
this._timeoutId = setTimeout(() => {
// Start runner immediately
runningHandle.runPeriodically((done) => {
this._runPeriodically((done) => {
// Each subsequent walk should run on a `this._options.interval` interval
this._walk(this._options.queriesPerPeriod, this._options.timeout, () => done(this._options.interval))
}, 0)
}, this._options.delay)

this._runningHandle = runningHandle
}

/**
* Stop the random-walk process.
* @param {function(Error)} callback
* Stop the random-walk process. Any active
* queries will be aborted.
*
* @returns {void}
*/
stop (callback) {
const runningHandle = this._runningHandle

if (!runningHandle) {
return callback()
}
stop () {
clearTimeout(this._timeoutId)
this._timeoutId = null
this._controller && this._controller.abort()
}

this._runningHandle = null
runningHandle.cancel(callback)
/**
* Run function `walk` on every `interval` ms
* @param {function(callback)} walk The function to execute on `interval`
* @param {number} interval The interval to run on in ms
*
* @private
*/
_runPeriodically (walk, interval) {
this._timeoutId = setTimeout(() => {
walk((nextInterval) => {
// Schedule next
this._runPeriodically(walk, nextInterval)
})
}, interval)
}

/**
Expand All @@ -113,39 +92,63 @@ class RandomWalk {
*/
_walk (queries, walkTimeout, callback) {
this.log('start')
this._controller = new AbortController()

times(queries, (i, cb) => {
times(queries, (i, next) => {
this.log('running query %d', i)

// Perform the walk
waterfall([
(cb) => this._randomPeerId(cb),
(id, cb) => timeout((cb) => {
this._query(id, cb)
}, walkTimeout)(cb)
(id, cb) => {
// Check if we've happened to already abort
if (!this._controller) return cb()

this._query(id, {
timeout: walkTimeout,
signal: this._controller.signal
}, cb)
}
], (err) => {
if (err) {
this.log.error('query finished with error', err)
return callback(err)
if (err && err.code !== 'ETIMEDOUT') {
this.log.error('query %d finished with error', i, err)
return next(err)
}

this.log('done')
callback(null)
this.log('finished query %d', i)
next(null)
})
}, (err) => {
this._controller = null
this.log('finished queries')
callback(err)
})
}

/**
* The query run during a random walk request.
*
* TODO: While query currently supports an abort controller, it is not
* yet supported by `DHT.findPeer`. Once https://github.com/libp2p/js-libp2p-kad-dht/pull/82
* is complete, and AbortController support has been added to the
* DHT query functions, the abort here will just work, provided the
* functions support `options.signal`. Once done, this todo should be
* removed.
*
* @param {PeerId} id
* @param {object} options
* @param {number} options.timeout
* @param {AbortControllerSignal} options.signal
* @param {function(Error)} callback
* @returns {void}
*
* @private
*/
_query (id, callback) {
_query (id, options, callback) {
this.log('query:%s', id.toB58String())

this._kadDHT.findPeer(id, (err, peer) => {
if (err.code === 'ERR_NOT_FOUND') {
this._kadDHT.findPeer(id, options, (err, peer) => {
if (err && err.code === 'ERR_NOT_FOUND') {
// expected case, we asked for random stuff after all
return callback()
}
Expand Down
10 changes: 3 additions & 7 deletions test/kad-dht.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ function connect (a, b, callback) {

function bootstrap (dhts) {
dhts.forEach((dht) => {
dht.randomWalk._walk(1, 1000, () => {})
dht.randomWalk._walk(1, 10000, () => {})
})
}

Expand Down Expand Up @@ -574,17 +574,13 @@ describe('KadDHT', () => {
})

it('random-walk', function (done) {
this.timeout(10 * 1000)
this.timeout(20 * 1000)

const nDHTs = 20
const tdht = new TestDHT()

// random walk disabled for a manual usage
tdht.spawn(nDHTs, {
randomWalk: {
enabled: false
}
}, (err, dhts) => {
tdht.spawn(nDHTs, (err, dhts) => {
expect(err).to.not.exist()

series([
Expand Down
Loading

0 comments on commit 9db17eb

Please sign in to comment.