From 7c3371bf17106c8240c42e9c4ac97369d49c96d6 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Tue, 10 Dec 2019 13:48:34 +0100 Subject: [PATCH] fix: clean up pending dials abort per feedback --- package.json | 3 ++- src/dialer/index.js | 27 ++++++++++++++------ src/index.js | 4 +-- test/dialing/direct.spec.js | 49 ++++++++++++++++++++++++++++++++----- 4 files changed, 66 insertions(+), 17 deletions(-) diff --git a/package.json b/package.json index 94559e8c63..9deef33de6 100644 --- a/package.json +++ b/package.json @@ -43,7 +43,7 @@ "dependencies": { "abort-controller": "^3.0.0", "aggregate-error": "^3.0.1", - "any-signal": "^1.0.0", + "any-signal": "^1.1.0", "async": "^2.6.2", "async-iterator-all": "^1.0.0", "bignumber.js": "^9.0.0", @@ -78,6 +78,7 @@ "pull-handshake": "^1.1.4", "pull-stream": "^3.6.9", "retimer": "^2.0.0", + "timeout-abort-controller": "^1.0.0", "xsalsa20": "^1.0.2" }, "devDependencies": { diff --git a/src/dialer/index.js b/src/dialer/index.js index ff6add0bec..47bda2d436 100644 --- a/src/dialer/index.js +++ b/src/dialer/index.js @@ -2,7 +2,7 @@ const multiaddr = require('multiaddr') const errCode = require('err-code') -const AbortController = require('abort-controller') +const TimeoutController = require('timeout-abort-controller') const anySignal = require('any-signal') const debug = require('debug') const log = debug('libp2p:dialer') @@ -38,7 +38,21 @@ class Dialer { this.timeout = timeout this.perPeerLimit = perPeerLimit this.tokens = [...new Array(concurrency)].map((_, index) => index) - this.pendingDials = new Set() + this._pendingDials = new Set() + } + + /** + * Clears any pending dials + */ + destroy () { + for (const dial of this._pendingDials.values()) { + try { + dial.controller.abort() + } catch (err) { + log.error(err) + } + } + this._pendingDials.clear() } /** @@ -64,21 +78,20 @@ class Dialer { }) // Combine the timeout signal and options.signal, if provided - const timeoutController = new AbortController() + const timeoutController = new TimeoutController(this.timeout) const signals = [timeoutController.signal] options.signal && signals.push(options.signal) const signal = anySignal(signals) - const timeoutId = setTimeout(() => timeoutController.abort(), this.timeout) const dial = { dialRequest, controller: timeoutController } - this.pendingDials.add(dial) + this._pendingDials.add(dial) try { const dialResult = await dialRequest.run({ ...options, signal }) - clearTimeout(timeoutId) + timeoutController.clear() log('dial succeeded to %s', dialResult.remoteAddr) return dialResult } catch (err) { @@ -89,7 +102,7 @@ class Dialer { log.error(err) throw err } finally { - this.pendingDials.delete(dial) + this._pendingDials.delete(dial) } } diff --git a/src/index.js b/src/index.js index 1a8ab7f964..c54e3fb8e5 100644 --- a/src/index.js +++ b/src/index.js @@ -199,9 +199,7 @@ class Libp2p extends EventEmitter { this._dht && this._dht.stop() ]) - for (const dial of this.dialer.pendingDials.values()) { - dial.abort() - } + this.dialer.destroy() await this.transportManager.close() await this.registrar.close() diff --git a/test/dialing/direct.spec.js b/test/dialing/direct.spec.js index f22a620348..5281acd574 100644 --- a/test/dialing/direct.spec.js +++ b/test/dialing/direct.spec.js @@ -169,7 +169,7 @@ describe('Dialing (direct, WebSockets)', () => { // We should have 2 in progress, and 1 waiting expect(dialer.tokens).to.have.length(0) - expect(dialer.pendingDials.size).to.equal(1) // 1 dial request + expect(dialer._pendingDials.size).to.equal(1) // 1 dial request deferredDial.resolve(await createMockConnection()) @@ -179,7 +179,45 @@ describe('Dialing (direct, WebSockets)', () => { // Only two dials will be run, as the first two succeeded expect(localTM.dial.callCount).to.equal(2) expect(dialer.tokens).to.have.length(2) - expect(dialer.pendingDials.size).to.equal(0) + expect(dialer._pendingDials.size).to.equal(0) + }) + + it('.destroy should abort pending dials', async () => { + const dialer = new Dialer({ + transportManager: localTM, + concurrency: 2 + }) + + expect(dialer.tokens).to.have.length(2) + + sinon.stub(localTM, 'dial').callsFake((_, options) => { + const deferredDial = pDefer() + const onAbort = () => { + options.signal.removeEventListener('abort', onAbort) + deferredDial.reject(new AbortError()) + } + options.signal.addEventListener('abort', onAbort) + return deferredDial.promise + }) + + // Perform 3 multiaddr dials + const dialPromise = dialer.connectToMultiaddr([remoteAddr, remoteAddr, remoteAddr]) + + // Let the call stack run + await delay(0) + + // We should have 2 in progress, and 1 waiting + expect(dialer.tokens).to.have.length(0) + expect(dialer._pendingDials.size).to.equal(1) // 1 dial request + + try { + dialer.destroy() + await dialPromise + expect.fail('should have failed') + } catch (err) { + expect(err).to.be.an.instanceof(AggregateError) + expect(dialer._pendingDials.size).to.equal(0) // 1 dial request + } }) describe('libp2p.dialer', () => { @@ -290,13 +328,12 @@ describe('Dialing (direct, WebSockets)', () => { connEncryption: [Crypto] } }) - const abort = sinon.stub() - const dials = [{ abort }, { abort }, { abort }] - sinon.stub(libp2p.dialer, 'pendingDials').value(new Set(dials)) + + sinon.spy(libp2p.dialer, 'destroy') await libp2p.stop() - expect(abort).to.have.property('callCount', 3) + expect(libp2p.dialer.destroy).to.have.property('callCount', 1) }) }) })