Skip to content

Commit

Permalink
fix: release tokens as soon as they are available
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobheun committed Jan 24, 2020
1 parent 633b0c2 commit 2570a1b
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 3 deletions.
6 changes: 3 additions & 3 deletions src/dialer/dial-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,21 @@ class DialRequest {
const tokenHolder = new FIFO()
tokens.forEach(token => tokenHolder.push(token))
const dialAbortControllers = this.addrs.map(() => new AbortController())
let completedDials = 0
let startedDials = 0

try {
return await pAny(this.addrs.map(async (addr, i) => {
const token = await tokenHolder.shift() // get token
startedDials++
let conn
try {
const signal = dialAbortControllers[i].signal
conn = await this.dialAction(addr, { ...options, signal: anySignal([signal, options.signal]) })
// Remove the successful AbortController so it is not aborted
dialAbortControllers.splice(i, 1)
} finally {
completedDials++
// If we have more dials to make, recycle the token, otherwise release it
if (completedDials < this.addrs.length) {
if (startedDials < this.addrs.length) {
tokenHolder.push(token)
} else {
this.dialer.releaseToken(tokens.splice(tokens.indexOf(token), 1)[0])
Expand Down
49 changes: 49 additions & 0 deletions test/dialing/dial-request.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const { AbortError } = require('libp2p-interfaces/src/transport/errors')
const AbortController = require('abort-controller')
const AggregateError = require('aggregate-error')
const pDefer = require('p-defer')
const delay = require('delay')

const { DialRequest } = require('../../src/dialer/dial-request')
const createMockConnection = require('../utils/mockConnection')
Expand Down Expand Up @@ -50,6 +51,54 @@ describe('Dial Request', () => {
expect(dialer.releaseToken).to.have.property('callCount', tokens.length)
})

it('should release tokens when all addr dials have started', async () => {
const mockConnection = await createMockConnection()
const deferred = pDefer()
const actions = {
1: async () => {
await delay(0)
return Promise.reject(error)
},
2: async () => {
await delay(0)
return Promise.reject(error)
},
3: () => deferred.promise
}
const dialAction = (num) => actions[num]()
const tokens = ['a', 'b']
const controller = new AbortController()
const dialer = {
getTokens: () => [...tokens],
releaseToken: () => {}
}

const dialRequest = new DialRequest({
addrs: Object.keys(actions),
dialer,
dialAction
})

sinon.spy(actions, 1)
sinon.spy(actions, 2)
sinon.spy(actions, 3)
sinon.spy(dialer, 'releaseToken')
dialRequest.run({ signal: controller.signal })
// Let the first dials run
await delay(10)

// Only 1 dial should remain, so 1 token should have been released
expect(actions[1]).to.have.property('callCount', 1)
expect(actions[2]).to.have.property('callCount', 1)
expect(actions[3]).to.have.property('callCount', 1)
expect(dialer.releaseToken).to.have.property('callCount', 1)

// Finish the dial
deferred.resolve(mockConnection)
await delay(0)
expect(dialer.releaseToken).to.have.property('callCount', 2)
})

it('should throw an AggregateError if all dials fail', async () => {
const actions = {
1: () => Promise.reject(error),
Expand Down

0 comments on commit 2570a1b

Please sign in to comment.