From 754fbc2d0b9e8b82bfa3b78bebdacb42cd16c3dd Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Fri, 6 Dec 2019 17:45:29 +0100 Subject: [PATCH] feat: abort all pending dials on stop --- src/dialer/index.js | 9 +++++++++ src/index.js | 11 +++++++++-- test/dialing/direct.spec.js | 20 ++++++++++++++++++++ test/peer-discovery/index.node.js | 4 +++- 4 files changed, 41 insertions(+), 3 deletions(-) diff --git a/src/dialer/index.js b/src/dialer/index.js index b44364a87b..ff6add0bec 100644 --- a/src/dialer/index.js +++ b/src/dialer/index.js @@ -38,6 +38,7 @@ class Dialer { this.timeout = timeout this.perPeerLimit = perPeerLimit this.tokens = [...new Array(concurrency)].map((_, index) => index) + this.pendingDials = new Set() } /** @@ -69,6 +70,12 @@ class Dialer { const signal = anySignal(signals) const timeoutId = setTimeout(() => timeoutController.abort(), this.timeout) + const dial = { + dialRequest, + controller: timeoutController + } + this.pendingDials.add(dial) + try { const dialResult = await dialRequest.run({ ...options, signal }) clearTimeout(timeoutId) @@ -81,6 +88,8 @@ class Dialer { } log.error(err) throw err + } finally { + this.pendingDials.delete(dial) } } diff --git a/src/index.js b/src/index.js index 609edd32b7..1a8ab7f964 100644 --- a/src/index.js +++ b/src/index.js @@ -194,8 +194,15 @@ class Libp2p extends EventEmitter { log('libp2p is stopping') try { - this.pubsub && await this.pubsub.stop() - this._dht && await this._dht.stop() + await Promise.all([ + this.pubsub && this.pubsub.stop(), + this._dht && this._dht.stop() + ]) + + for (const dial of this.dialer.pendingDials.values()) { + dial.abort() + } + await this.transportManager.close() await this.registrar.close() } catch (err) { diff --git a/test/dialing/direct.spec.js b/test/dialing/direct.spec.js index 155b6dbe76..f22a620348 100644 --- a/test/dialing/direct.spec.js +++ b/test/dialing/direct.spec.js @@ -169,6 +169,7 @@ describe('Dialing (direct, WebSockets)', () => { // We should have 2 in progress, and 1 waiting expect(dialer.tokens).to.have.length(0) + expect(dialer.pendingDials.size).to.equal(1) // 1 dial request deferredDial.resolve(await createMockConnection()) @@ -178,6 +179,7 @@ describe('Dialing (direct, WebSockets)', () => { // Only two dials will be run, as the first two succeeded expect(localTM.dial.callCount).to.equal(2) expect(dialer.tokens).to.have.length(2) + expect(dialer.pendingDials.size).to.equal(0) }) describe('libp2p.dialer', () => { @@ -278,5 +280,23 @@ describe('Dialing (direct, WebSockets)', () => { await libp2p.hangUp(connection.remotePeer) expect(connection.stat.timeline.close).to.exist() }) + + it('should abort pending dials on stop', async () => { + libp2p = new Libp2p({ + peerInfo, + modules: { + transport: [Transport], + streamMuxer: [Muxer], + connEncryption: [Crypto] + } + }) + const abort = sinon.stub() + const dials = [{ abort }, { abort }, { abort }] + sinon.stub(libp2p.dialer, 'pendingDials').value(new Set(dials)) + + await libp2p.stop() + + expect(abort).to.have.property('callCount', 3) + }) }) }) diff --git a/test/peer-discovery/index.node.js b/test/peer-discovery/index.node.js index b346f80eb1..0d4b7b1cd8 100644 --- a/test/peer-discovery/index.node.js +++ b/test/peer-discovery/index.node.js @@ -29,7 +29,9 @@ describe('peer discovery scenarios', () => { remotePeerInfo2.multiaddrs.add(multiaddr('/ip4/127.0.0.1/tcp/0')) }) - afterEach(async () => { + afterEach(async function () { + // Increase timeout until abort support for dht queries is in place + this.timeout(10e3) libp2p && await libp2p.stop() })