Skip to content

Commit

Permalink
feat: abort all pending dials on stop
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobheun committed Jan 24, 2020
1 parent 404fa69 commit ba02764
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 3 deletions.
9 changes: 9 additions & 0 deletions src/dialer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class Dialer {
this.timeout = timeout
this.perPeerLimit = perPeerLimit
this.tokens = [...new Array(concurrency)].map((_, index) => index)
this.pendingDials = new Set()
}

/**
Expand Down Expand Up @@ -69,6 +70,12 @@ class Dialer {
const signal = anySignal(signals)
const timeoutId = setTimeout(() => timeoutController.abort(), this.timeout)

const dial = {
dialRequest,
controller: timeoutController
}
this.pendingDials.add(dial)

try {
const dialResult = await dialRequest.run({ ...options, signal })
clearTimeout(timeoutId)
Expand All @@ -81,6 +88,8 @@ class Dialer {
}
log.error(err)
throw err
} finally {
this.pendingDials.delete(dial)
}
}

Expand Down
11 changes: 9 additions & 2 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,15 @@ class Libp2p extends EventEmitter {
log('libp2p is stopping')

try {
this.pubsub && await this.pubsub.stop()
this._dht && await this._dht.stop()
await Promise.all([
this.pubsub && this.pubsub.stop(),
this._dht && this._dht.stop()
])

for (const dial of this.dialer.pendingDials.values()) {
dial.abort()
}

await this.transportManager.close()
await this.registrar.close()
} catch (err) {
Expand Down
20 changes: 20 additions & 0 deletions test/dialing/direct.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ describe('Dialing (direct, WebSockets)', () => {

// We should have 2 in progress, and 1 waiting
expect(dialer.tokens).to.have.length(0)
expect(dialer.pendingDials.size).to.equal(1) // 1 dial request

deferredDial.resolve(await createMockConnection())

Expand All @@ -178,6 +179,7 @@ describe('Dialing (direct, WebSockets)', () => {
// Only two dials will be run, as the first two succeeded
expect(localTM.dial.callCount).to.equal(2)
expect(dialer.tokens).to.have.length(2)
expect(dialer.pendingDials.size).to.equal(0)
})

describe('libp2p.dialer', () => {
Expand Down Expand Up @@ -278,5 +280,23 @@ describe('Dialing (direct, WebSockets)', () => {
await libp2p.hangUp(connection.remotePeer)
expect(connection.stat.timeline.close).to.exist()
})

it('should abort pending dials on stop', async () => {
libp2p = new Libp2p({
peerInfo,
modules: {
transport: [Transport],
streamMuxer: [Muxer],
connEncryption: [Crypto]
}
})
const abort = sinon.stub()
const dials = [{ abort }, { abort }, { abort }]
sinon.stub(libp2p.dialer, 'pendingDials').value(new Set(dials))

await libp2p.stop()

expect(abort).to.have.property('callCount', 3)
})
})
})
4 changes: 3 additions & 1 deletion test/peer-discovery/index.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ describe('peer discovery scenarios', () => {
remotePeerInfo2.multiaddrs.add(multiaddr('/ip4/127.0.0.1/tcp/0'))
})

afterEach(async () => {
afterEach(async function () {
// Increase timeout until abort support for dht queries is in place
this.timeout(10e3)
libp2p && await libp2p.stop()
})

Expand Down

0 comments on commit ba02764

Please sign in to comment.