Skip to content

Commit

Permalink
fix: clean up pending dials abort per feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobheun committed Jan 24, 2020
1 parent 7d50549 commit 633b0c2
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 17 deletions.
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
"dependencies": {
"abort-controller": "^3.0.0",
"aggregate-error": "^3.0.1",
"any-signal": "^1.0.0",
"any-signal": "^1.1.0",
"async": "^2.6.2",
"async-iterator-all": "^1.0.0",
"bignumber.js": "^9.0.0",
Expand Down Expand Up @@ -78,6 +78,7 @@
"pull-handshake": "^1.1.4",
"pull-stream": "^3.6.9",
"retimer": "^2.0.0",
"timeout-abort-controller": "^1.0.0",
"xsalsa20": "^1.0.2"
},
"devDependencies": {
Expand Down
27 changes: 20 additions & 7 deletions src/dialer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

const multiaddr = require('multiaddr')
const errCode = require('err-code')
const AbortController = require('abort-controller')
const TimeoutController = require('timeout-abort-controller')
const anySignal = require('any-signal')
const debug = require('debug')
const log = debug('libp2p:dialer')
Expand Down Expand Up @@ -38,7 +38,21 @@ class Dialer {
this.timeout = timeout
this.perPeerLimit = perPeerLimit
this.tokens = [...new Array(concurrency)].map((_, index) => index)
this.pendingDials = new Set()
this._pendingDials = new Set()
}

/**
* Clears any pending dials
*/
destroy () {
for (const dial of this._pendingDials.values()) {
try {
dial.controller.abort()
} catch (err) {
log.error(err)
}
}
this._pendingDials.clear()
}

/**
Expand All @@ -64,21 +78,20 @@ class Dialer {
})

// Combine the timeout signal and options.signal, if provided
const timeoutController = new AbortController()
const timeoutController = new TimeoutController(this.timeout)
const signals = [timeoutController.signal]
options.signal && signals.push(options.signal)
const signal = anySignal(signals)
const timeoutId = setTimeout(() => timeoutController.abort(), this.timeout)

const dial = {
dialRequest,
controller: timeoutController
}
this.pendingDials.add(dial)
this._pendingDials.add(dial)

try {
const dialResult = await dialRequest.run({ ...options, signal })
clearTimeout(timeoutId)
timeoutController.clear()
log('dial succeeded to %s', dialResult.remoteAddr)
return dialResult
} catch (err) {
Expand All @@ -89,7 +102,7 @@ class Dialer {
log.error(err)
throw err
} finally {
this.pendingDials.delete(dial)
this._pendingDials.delete(dial)
}
}

Expand Down
4 changes: 1 addition & 3 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,7 @@ class Libp2p extends EventEmitter {
this._dht && this._dht.stop()
])

for (const dial of this.dialer.pendingDials.values()) {
dial.abort()
}
this.dialer.destroy()

await this.transportManager.close()
await this.registrar.close()
Expand Down
49 changes: 43 additions & 6 deletions test/dialing/direct.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ describe('Dialing (direct, WebSockets)', () => {

// We should have 2 in progress, and 1 waiting
expect(dialer.tokens).to.have.length(0)
expect(dialer.pendingDials.size).to.equal(1) // 1 dial request
expect(dialer._pendingDials.size).to.equal(1) // 1 dial request

deferredDial.resolve(await createMockConnection())

Expand All @@ -179,7 +179,45 @@ describe('Dialing (direct, WebSockets)', () => {
// Only two dials will be run, as the first two succeeded
expect(localTM.dial.callCount).to.equal(2)
expect(dialer.tokens).to.have.length(2)
expect(dialer.pendingDials.size).to.equal(0)
expect(dialer._pendingDials.size).to.equal(0)
})

it('.destroy should abort pending dials', async () => {
const dialer = new Dialer({
transportManager: localTM,
concurrency: 2
})

expect(dialer.tokens).to.have.length(2)

sinon.stub(localTM, 'dial').callsFake((_, options) => {
const deferredDial = pDefer()
const onAbort = () => {
options.signal.removeEventListener('abort', onAbort)
deferredDial.reject(new AbortError())
}
options.signal.addEventListener('abort', onAbort)
return deferredDial.promise
})

// Perform 3 multiaddr dials
const dialPromise = dialer.connectToMultiaddr([remoteAddr, remoteAddr, remoteAddr])

// Let the call stack run
await delay(0)

// We should have 2 in progress, and 1 waiting
expect(dialer.tokens).to.have.length(0)
expect(dialer._pendingDials.size).to.equal(1) // 1 dial request

try {
dialer.destroy()
await dialPromise
expect.fail('should have failed')
} catch (err) {
expect(err).to.be.an.instanceof(AggregateError)
expect(dialer._pendingDials.size).to.equal(0) // 1 dial request
}
})

describe('libp2p.dialer', () => {
Expand Down Expand Up @@ -290,13 +328,12 @@ describe('Dialing (direct, WebSockets)', () => {
connEncryption: [Crypto]
}
})
const abort = sinon.stub()
const dials = [{ abort }, { abort }, { abort }]
sinon.stub(libp2p.dialer, 'pendingDials').value(new Set(dials))

sinon.spy(libp2p.dialer, 'destroy')

await libp2p.stop()

expect(abort).to.have.property('callCount', 3)
expect(libp2p.dialer.destroy).to.have.property('callCount', 1)
})
})
})

0 comments on commit 633b0c2

Please sign in to comment.